0%

IM即时通讯系统业务流程

上篇文件讲述了IM系统的整体架构(查看),对细节的业务流程没有详细的说明。这篇文章主要梳理消息的流转过程,也就是用户A发送消息给客户B,消息是如何到达用户B的。主要流程有消息发送,消息推送,心跳消息

首先对上文中没提到的概念进行说明:

  • Session 用户在一定时间的有效会话服务。
  • Room 用户对用户发送的消息(个人消息、群消息、讨论组消息),其实就是一个发布订阅过程,发布者将消息发送到room,订阅者从room中取走消息,对于这样的上下文叫room,有些IM系统中将个人消息的上下文叫channel,这里我统一叫room,只不过个人消息是一个订阅者的room。
  • Msg Queue 未读消息队列
  • hearbeat 心跳服务器,之前包括在了logic中。
  • caller 消息推送触发者,可能是hearbeat,可能是logic。
  • sequence 消息序列号生成器

消息发送流程

用户A发送消息hello给用户B为例。

  1. 用户A使用终端登录客户端,用户A登录需要验证用户身份(用户权限体系,这里不多说)。登录成功服务器会返回用户唯一标识uid给客户端,同时返回用户A的好友列表(用户B当然就在这个列表中了),之后开始发送心跳消息(这个流程下面讲)
  2. 用户A使用客户端发送hello消息。客户端组装消息报文msg request,消息体包括:uid、消息内容、发送消息时间、发送设备ID、Room ID等。将msg request发送到transfer接入服务,然后等待ack响应。
  3. tranfer接收到客户端发送的信息后,使用token(uid)去logic中得到连接描述key。
  4. logic服务收到消息会做几件事情,A. 返回token(uid)的key(uid+roomid) B. 通知router更新用户状态(uid),C. 通知router更新用户接入信息,uid、transferid、更新时间
  5. tranfer接收到logic返回key,以key保存连接,带上transferid将消息分发到logic服务,等待logic服务处理。
  6. logic服务收到消息会做几件事情,D. 排重,根据uid、消息发送时间、设备ID在缓存中查下是否存在。
  7. logic服务收到消息会做几件事情,E. 生成msgid,以uid + transferid + roomid取个最新的msgid,自增+1或者用snake算法得到自增的msgid。
  8. logic服务收到消息会做几件事情,F. 保存消息 G. 响应ack ack报文: uid、Room ID、msgid。
  9. transfer收到ack后,把ack响应返回给客户端。
  10. 客户端收到ack报文,更新本地信箱,信箱中有msgid的消息即为消息发送成功

Alt text

到此用户A的消息就发送出去了,这好像个用户B没有上面关系呀,接下来说消息推送流程。
注意 上面流程中没有说明Room ID是如何生成和分配的,通常是按照具体业务场景进行生成和分配,如按用户A的好友列表进行维护。

消息推送流程

用户B如何收到消息呢,首先用户B已经登录客户端(这里不讨论离线消息)。

  1. logic服务获取Room ID下的所有用户。(维护用户A好友列表的服务可以提供)
  2. logic服务去router中用户是否在线,并获取在线用户的transferid。
  3. logic服务更新transferid将消息推送到对应的tranfer。
  4. logic服务将此消息加入到未读消息队列Msg Queue。
  5. tranfer收到消息根据uid找到对应连接将其发送到客户端。
  6. 客户端接收到消息,将消息保存到本地信箱,按msgid排序展示给用户B。

Alt text

客户B终于收到了消息了,真的一定收到了吗? transfer推送了消息,没有确认机制保证消息一定到达,这过程可能出现网络问题,也可能客户B异常。所以我们还需要一种机制保证客户的正确的在线状态和消息拉取确认机制。

心跳消息流程

心跳消息是客户端登录成功后发起的,心跳时长根据不同的网络环境设置不同值。

  1. 用户A使用客户端发送心跳消息。客户端组装消息报文hearbeat request,消息体包括:uid、网络类型、发送消息时间、发送设备ID、所有Room ID的最后的msgid(确保机制)。将hearbeat request发送到transfer接入服务,然后等待ack响应。
  2. tranfer接收到客户端发送的心跳,带上transferid将消息分发到hearbeat服务,等待hearbeat服务处理。
  3. hearbeat服务收到消息会做几件事情,A. 通知router更新用户状态(uid),B. 通知router更新用户接入信息,uid、transferid、更新时间
  4. hearbeat服务收到消息会做几件事情,C. Room ID在sequence查询最新的msgid D. 查询的msgid不等于上报的msgid,发送消息通知logic服务启动消息推送流程(uid、Room ID、上报的msgid)
  5. logic服务收到消息,去未读消息队列中取所有大于上报msgid的消息,在到数据存储层取得消息的全部内容,将消息发送到transfer。
  6. tranfer收到消息根据uid找到对应连接将其发送到客户端。
  7. 客户端接收到消息,将消息保存到本地信箱,按msgid排序展示给用户B。
  8. hearbeat服务收到消息会做几件事情,E. 响应hearbeat ack,如果需要调整心跳时长,可以将心跳时长加入到ack响应中。
  9. tranfer收到消息根据uid找到对应连接将其发送到客户端。
  10. 客户端接收到消息,更新自己的心跳时长。

hearbeat心跳时间间隔根据不同的网络环境,心跳发送成功次数,信息发送次数等因素进行动态调整。

长连接维护

长连接的TCP服务器与客户端通讯: client向server发起连接,server接受client连接,双方建立连接。Client与server完成一次读写之后,它们之间的连接并不会主动关闭,后续的读写操作会继续使用这个连接。
下面是用Golang编写的tcp服务端与客户端,客户端每隔一段时间发送消息到服务器,服务器响应回复。代码conn, err = lis.AcceptTCP();中的conn是需要维护的连接,transfer中确保uid与conn的关系,可以使用map[string]*net.TCPConn

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package main

import (
"bufio"
"log"
"net"
"time"
)

var (
Debug = true
maxInt = 1<<31 - 1
)

func main() {
InitTCP([]string{"0.0.0.0:9999"})
select {}
}

func InitTCP(addrs []string) (err error) {
var (
bind string
listener *net.TCPListener
addr *net.TCPAddr
)
for _, bind = range addrs {
if addr, err = net.ResolveTCPAddr("tcp4", bind); err != nil {
log.Printf("net.ResolveTCPAddr(\"tcp4\", \"%s\") error(%v)", bind, err)
return
}
if listener, err = net.ListenTCP("tcp4", addr); err != nil {
log.Printf("net.ListenTCP(\"tcp4\", \"%s\") error(%v)", bind, err)
return
}
if Debug {
log.Printf("start tcp listen: \"%s\"", bind)
}

go acceptTCP(listener)
}
return
}

func acceptTCP(lis *net.TCPListener) {
var (
conn *net.TCPConn
err error
r int
)
for {
if conn, err = lis.AcceptTCP(); err != nil {
// if listener close then return
log.Printf("listener.Accept(\"%s\") error(%v)", lis.Addr().String(), err)
return
}
if err = conn.SetKeepAlive(false); err != nil {
log.Printf("conn.SetKeepAlive() error(%v)", err)
return
}
if err = conn.SetReadBuffer(256); err != nil {
log.Printf("conn.SetReadBuffer() error(%v)", err)
return
}
if err = conn.SetWriteBuffer(2048); err != nil {
log.Printf("conn.SetWriteBuffer() error(%v)", err)
return
}
go serveTCP(conn, r)
if r++; r == maxInt {
r = 0
}
}
}

func serveTCP(conn *net.TCPConn, r int) {
var (
// ip addr
lAddr = conn.LocalAddr().String()
rAddr = conn.RemoteAddr().String()
rbuf = make([]byte, 1024)
)
if Debug {
log.Printf("start tcp serve \"%s\" with \"%s\"", lAddr, rAddr)
}

reader := bufio.NewReader(conn)
for {
n, err := reader.Read(rbuf)
//message, err := reader.ReadString('\n')
if err != nil {
return
}
message := rbuf[0:n]
log.Printf("read message (%s)", string(message))

reply := time.Now().String()
conn.Write([]byte(reply))
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
"bufio"
"fmt"
"net"
"time"
)

var quitSemaphore chan bool

func main() {
var tcpAddr *net.TCPAddr
tcpAddr, _ = net.ResolveTCPAddr("tcp", "192.168.0.9:9999")

conn, _ := net.DialTCP("tcp", nil, tcpAddr)
defer conn.Close()
fmt.Println("connected!")

go onMessageRecived(conn)

b := []byte("time")
conn.Write(b)

<-quitSemaphore
}

func onMessageRecived(conn *net.TCPConn) {

var rbuf = make([]byte, 1024)
reader := bufio.NewReader(conn)
value := 5
incr := 10

for {
n, err := reader.Read(rbuf)
if err != nil {
quitSemaphore <- true
break
}
msg := rbuf[0:n]
fmt.Println(string(msg))

time.Sleep(time.Duration(value) * time.Second)
conn.Write([]byte(msg))
value += incr
}
}

总结

重新梳理了消息流转流程,基本清楚了消息的轮转,对Room ID的生成和分配还需进行研究。