0%

GraphQL查询语言实践-实现BoltMQ的Console

上篇文章《GraphQL查询语言学习笔记》学习了GraphQL的语法和介绍Golang的库。最近再使用Golang编写一款分布式消息队列BoltMQ,我将GraphQL用到BoltMQ的web管理UI上,这样更进一步了解GraphQL。

GraphQL 设计篇

有了将GraphQL应用到BoltMQ的web UI上的需求,那如何来做呢。我总结了下:

  1. 业务梳理
  2. GraphQL API设计
  3. GraphQL Schema设计
  4. GraphQL 第三方库的选择
  5. GraphQL 客户端与服务端的实现

GraphQL API设计

API的设计比较重要,需要开发人员充分理解业务,在业务的基础抽象有查询Graph。例如console是BolotMQ的集群管理UI。需求:

  • 管理多个BoltMQ集群
  • 查询集群的节点信息
  • 查询集群的统计信息
  • 查询集群的topic信息
  • 查询集群的消息信息
  • 查询集群的订阅组信息
  • 查询集群的消费进度
  • 查询集群的在线消费进程
    意思列举了几个功能点,首先将集群这个概念抽象出来,用户在选定集群的情况下才会做下面的查询操作,所有集群就可以作为第一层,然后再往下梳理。graphql就是将业务抽象成图(树)的形式的。

以下是对console的查询操作的API设计,当然你需要理解BoltMQ的一些知识。你可查看BoltMQ了解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
query clusters($name: String, $like: String, $group: String, $msgId: String!) {
clusters(name: $name) {
name
stats {
producerNums
consumerNums
brokerNums
namesrvNums
topicNums
outTotalTodayNums
outTotalYestNums
inTotalTodayNums
inTotalYestNums
}
nodes {
namesrvAddrs
brokerNodes {
role
addr
version
desc
outTps
inTps
outTotalTodayNums
outTotalYestNums
inTotalTodayNums
inTotalYestNums
}
}
topics(like: $like) {
topic
type
isSystem
store {
brokerName
queueId
maxOffset
minOffset
lastUpdateTime
}
route {
queues {
brokerName
writeQueueNums
readQueueNums
perm
sysFlag
}
brokers {
brokerName
brokerAddrs {
brokerId
addr
}
}
}
groups
consumeConn {
describe
conns {
consumeGroup
clientId
clientAddr
language
version
consumeTps
consumeFromWhere
consumeType
diff
messageModel
}
}
consumeProgress(group: $group) {
consumeGroup
tps
diff
total
progress {
brokerOffset
consumeOffset
diff
brokerName
queueId
}
}
}
}
msg(msgId: $msgId) {
info {
msgId
topic
flag
body
queueId
storeSize
queueOffset
sysFlag
bornTimestamp
bornHost
storeTimestamp
storeHost
commitLogOffset
bodyCRC
reconsumeTimes
preparedTransactionOffset
properties {
key
val
}
}
tracks {
code
type
consumeGroup
desc
}
}
}

GraphQL Schema 设计

结合GraphQL API设计类型系统Schema,这里的设计偏后端一些,和API是相辅相成的。在API基础上再次确定返回值的类型以及结构的优化。当然API和Schema的设计可以同时做,也可以分开进行。

console的类型系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
# boltmq contole graphql schema
schema {
query: Query
mutation: Mutation
}

# The query type, represents all of the entry points into our object graph
type Query {
clusters(name: String): [Cluster]!
msg(name: String, msgId: String!): Message
}

# A Cluster from the boltmq server
type Cluster {
# The name of cluster
name: String!
# The stats info of cluster
stats: ClusterStats!
# The node info of cluster
nodes: ClusterNode!
# The topics of cluster
topics(like: String): [Topic]!
}

# A ClusterStats info of boltmq cluster
type ClusterStats {
# The producer nums of cluster
producerNums: Int!
# The consumer nums of cluster
consumerNums: Int!
# The broker nums of cluster
brokerNums: Int!
# The name server nums of cluster
namesrvNums: Int!
# The topic nums of cluster
topicNums: Int!
# The cluster consumer msg total number today
outTotalTodayNums: Int!
# The cluster consumer msg total number yest
outTotalYestNums: Int!
# The cluster producer msg total number yest
inTotalTodayNums: Int!
# The cluster producer msg total number today
inTotalYestNums: Int!
}

# A Cluster node info of boltmq cluster
type ClusterNode {
# The namesrv addr list fo cluster
namesrvAddrs: [String!]!
# The broker node list fo cluster
brokerNodes: [BrokerNode]!
}

# A Boker node info of boltmq cluster
type BrokerNode {
# The borker role
role: Int!
# The borker addr
addr: String!
# The borker server version
version: String!
# The borker server describe
desc: String!
# The borker server current out tps
outTps: Float!
# The borker server current in tps
inTps: Float!
# The cluster consumer msg total number today
outTotalTodayNums: Int!
# The cluster consumer msg total number yest
outTotalYestNums: Int!
# The cluster producer msg total number yest
inTotalTodayNums: Int!
# The cluster producer msg total number today
inTotalYestNums: Int!
}

# A topic info of boltmq cluster
type Topic {
# The topic name
topic: String!
# The topic type
type: Int!
# The topic type
isSystem: Boolean!
# The topic store
store: TopicStore!
# The topic route
route: TopicRoute!
# The consume group
groups: [String!]!
# The consume connection
consumeConn: ConsumeConn!
consumeProgress(group: String): [ConsumeProgress]!
}

# topic type
enum TopicType {
# normal topic
NORMAL_TOPIC
# retry topic
RETRY_TOPIC
# deadline queue topic
DLQ_TOPIC
}

# A topic stroe info of boltmq cluster
type TopicStore {
# The broker name
brokerName: String!
# The queue id
queueId: Int!
# The max offset
maxOffset: Int!
# The min offset
minOffset: Int!
# The last update time
lastUpdateTime: String!
}

# A topic route info of boltmq cluster
type TopicRoute {
# The route data of queue
queues: [QueueData]!
# The route data of broker
brokers: [BrokerData]!
}

# A queue route data of topic
type QueueData {
# The broker name
brokerName: String!
# The write queue nums
writeQueueNums: Int!
# The read queue nums
readQueueNums: Int!
# The permissions of topic on broker
perm: Int!
# The permissions of topic on broker
sysFlag: Int!
}

# A broker route data of topic
type BrokerData {
# The broker name
brokerName: String!
# The broker addrs
brokerAddrs: [BrokerAddr]!
}

# A broker addr of topic route
type BrokerAddr {
# The broker id
brokerId: Int!
# The broker addr
addr: String!
}

# consume connection
type ConsumeConn {
# The describe
describe: String!
# The connection
conns: [Connection]!
}

# connection info
type Connection {
# The consume group name
consumeGroup: String!
# The client id
clientId: String!
# The client addr
clientAddr: String!
# The language
language: String!
# The version
version: String!
# The consume tps
consumeTps: Float!
# The consume from where
consumeFromWhere: String!
# The consume type
consumeType: Int!
# The message diff total
diff: Int!
# The message model
messageModel: Int!
}

# consume type
enum ConsumeType {
# actively consume
CONSUME_ACTIVELY
# passively consume
CONSUME_PASSIVELY
}

# message model
enum MessageModel {
# broadcasting
BROADCASTING
# clustering
CLUSTERING
}

# consume progress
type ConsumeProgress {
# The consume group name
consumeGroup: String!
# The consume tps
tps: Float!
# The consume diff
diff: Int!
# The total
total: Int!
# The progress data list
progress: [ConsumeProgressData]!
}

# consume progress data
type ConsumeProgressData {
# The broker offset
brokerOffset: Int!
# The broker offset
consumeOffset: Int!
# The consume diff
diff: Int!
# The broker name
brokerName: String!
# The queue id
queueId: Int!
}

# message
type Message {
# The message base info
info: MessageInfo!
# The message track list
tracks: [MessageTrack]!
}

# message info
type MessageInfo {
# The message id
msgId: String!
# The topic name
topic: String!
# The message flag
flag: Int!
# The message body
body: String!
# The queue id
queueId: Int!
# The store size
storeSize: Int!
# The queue offset
queueOffset: Int!
# The message sys flag
sysFlag: Int!
# The born timestamp
bornTimestamp: String!
# The born host
bornHost: String!
# The store timestamp
storeTimestamp: String!
# The store host
storeHost: String!
# The commitlog offset
commitLogOffset: Int!
# The message body crc
bodyCRC: Int!
# The reconsume times
reconsumeTimes: Int!
# The reconsume times
preparedTransactionOffset: Int!
# The properties
properties: [Property!]!
}

# property, replace map
type Property {
key: String!
val: String!
}

# message track
type MessageTrack {
# The track code, 0: success, non-0: failed
code: Int!
# track type
type: Int!
# consume group name
consumeGroup: String!
# error describe
desc: String!
}

# track type
enum TrackType {
# subscribed and consumed
SUBSCRIBEDANDCONSUMED
# subscribed but filterd
SUBSCRIBEDBUTFILTERD
# subscribed but pull
SUBSCRIBEDBUTPULL
# subscribed and not consume yet
SUBSCRIBEDBUTNOTCONSUMEYET
# unknow exeption
UNKNOWEXEPTION
# not subscribed and not consumed
NOTSUBSCRIBEDANDNOTCONSUMED
# consume groupId not online
CONSUMEGROUPIDNOTONLINE
}

# The mutation type, represents all updates we can make to our data
type Mutation {
create2UpdateTopic(name: String!, topic: TopicInput!): TopicResponse
deleteTopic(name: String!, topic: String!): TopicResponse
}

# The input object sent when cluster is creating a new topic
input TopicInput {
# topic
topic: String!
# The read queue nums, optional
readQueueNums: Int!
# The write queue nums, optional
writeQueueNums: Int!
# The order topic, optional
order: Boolean!
# The unit topic, optional
unit: Boolean!
}

# Represents a topic for a cluster
interface Response {
code: Int!
desc: String!
}

type TopicResponse implements Response {
code: Int!
desc: String!
}

GraphQL 代码实现

console使用的neelance/graphql-go库,代码查看。使用的缺点和注意事项:

  • neelance使用反射实现
  • graphql类型匹配严格,并缺少对于的int,int64等基础类型。
  • 接口的实现不优雅,我提了issue,等待改进。
  • context无法向下传递

封装统一认证

请求header中取得jwtToken进行验证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type userClaims struct {
user
jwt.StandardClaims
}

type authenticator struct {
}

func (auth *authenticator) Chain(w http.ResponseWriter, r *http.Request, ctx *Context) bool {
// extract jwt
jwtToken := r.Header.Get("Authorization")

// parse tokentoken
token, err := jwt.ParseWithClaims(jwtToken, &userClaims{}, func(token *jwt.Token) (interface{}, error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("Unexpected signing method")
}
return jwtSecret, nil
})
if err != nil {
http.Error(w, "not authorized", http.StatusUnauthorized)
return false
}

claims, ok := token.Claims.(*userClaims)
if !ok || !token.Valid {
http.Error(w, "not authorized", http.StatusUnauthorized)
return false
}

ctx.ctx = context.WithValue(r.Context(), userAuthKey, claims.user)
return true
}

登录后生成token并返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
type loginHandler struct {
}

func (h *loginHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
userParam := struct {
Username string `json:"username"`
Password string `json:"password"`
}{}

err := decoder.Decode(&userParam)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer r.Body.Close()

if userParam.Username != "admin" || userParam.Password != "admin" {
http.Error(w, "invalid login", http.StatusUnauthorized)
return
}

//generate token
expire := time.Now().Add(time.Hour * 1).Unix()
// Create the Claims
claims := userClaims{
user: user{
UserID: 1,
UserName: userParam.Username,
IsAdmin: true,
},
StandardClaims: jwt.StandardClaims{
ExpiresAt: expire,
Issuer: "login",
},
}

token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
signedToken, _ := token.SignedString(jwtSecret)

//output token
tokenResponse := struct {
Token string `json:"token"`
}{signedToken}
json.NewEncoder(w).Encode(tokenResponse)
}

GraphQL接口接入认证

1
srv.mux.Handle(pattern, join(&relay.Handler{Schema: schema}, &authenticator{}))

登录接入

1
srv.mux.Handle(pattern, &loginHandler{})

以上4点就完成认证功能,详细代码查看console-server

还需功能

  • dataloader

对比graphql-go库,graphql-go更佳灵活,当抽象程度差一些。根据自己情况自行选择。