【Go语言实战】基于 WebSocket + MongoDB 的IM即时聊天Demo

网友投稿 978 2022-05-30

文章目录

写在前面

1. WebSocket原理

2. 具体流程

2.1 定义类型

2.2 进行连接

2.2.1 服务器监听连接

2.2.2 服务器监听断开连接

2.2.3 用户连接服务器

2.3 写入

2.3.1 定义类型

2.3.2 读取数据

2.3.3 接受消息

2.3.3 获取历史消息

2.4 读取

2.5 插入与查询

2.5.1 插入数据

2.5.2 查询数据

2.6 对方不在线

3. 演示

4. 源码地址

写在前面

这个项目是基于WebSocket + MongoDB + MySQL + Redis。

业务逻辑很简单,只是两人的聊天。

MySQL 用来存储用户基本信息

MongoDB 用来存放用户聊天信息

Redis 用来存储处理过期信息

github地址

https://github.com/CocaineCong/gin-chat-demo

1. WebSocket原理

WebSocket是应用层第七层上的一个应用层协议,它必须依赖 HTTP 协议进行一次握手。

握手成功后,数据就直接从TCP通道传输,与HTTP无关了。即:WebSocket分为握手和数据传输阶段。

即进行了HTTP握手 + 双工的TCP连接。

WebSocket 是一种在单个TCP连接上进行全双工通信的协议。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。

如果只是想左图这样的不断发送http请求,轮询的效率是非常低,非常浪费资源,所以就有了websocket协议了,建立在 TCP 协议之上,服务器端的实现比较容易。

WebSocket协议一旦建立之后,互相沟通所消耗的请求头是很小的,服务器向客户端推送消息的功耗就小了。

2. 具体流程

2.1 定义类型

发送消息的结构体

type SendMsg struct { Type int `json:"type"` Content string `json:"content"` }

1

2

3

4

回复消息的结构体

type ReplyMsg struct { From string `json:"from"` Code int `json:"code"` Content string `json:"content"` }

1

2

3

4

5

用户结构体

type Client struct { ID string SendID string Socket *websocket.Conn Send chan []byte }

1

2

3

4

5

6

广播类(包括广播内容和源用户)

type Broadcast struct { Client *Client Message []byte Type int }

1

2

3

4

5

用户管理

type ClientManager struct { Clients map[string]*Client Broadcast chan *Broadcast Reply chan *Client Register chan *Client Unregister chan *Client }

1

2

3

4

5

6

7

信息转JSON (包括:发送者、接收者、内容)

type Message struct { Sender string `json:"sender,omitempty"` Recipient string `json:"recipient,omitempty"` Content string `json:"content,omitempty"` }

1

2

3

4

5

2.2 进行连接

定义一个管理Manager

var Manager = ClientManager{ Clients : make(map[string]*Client), // 参与连接的用户,出于性能的考虑,需要设置最大连接数 Broadcast: make(chan *Broadcast), Register : make(chan *Client), Reply : make(chan *Client), Unregister: make(chan *Client), }

1

2

3

4

5

6

7

2.2.1 服务器监听连接

用 for 不断进行监听查看哪个用户进入通道通信,对用户一旦有用户进来,就 Register 进行注册

for { case conn := <- Manager.Register: log.Printf("建立新连接: %v", conn.ID) Manager.Clients[conn.ID] = conn replyMsg := &ReplyMsg{ Code: e.WebsocketSuccess, Content: "已连接至服务器", } msg , _ := json.Marshal(replyMsg) _ = conn.Socket.WriteMessage(websocket.TextMessage, msg) }

1

2

3

4

5

6

7

8

9

10

11

2.2.2 服务器监听断开连接

同样的,也可以用来对服务器和用户之间连接的断开。

case conn := <-Manager.Unregister: // 断开连接 log.Printf("连接失败:%v", conn.ID) if _, ok := Manager.Clients[conn.ID]; ok { replyMsg := &ReplyMsg{ Code: e.WebsocketEnd, Content: "连接已断开", } msg , _ := json.Marshal(replyMsg) _ = conn.Socket.WriteMessage(websocket.TextMessage, msg) close(conn.Send) delete(Manager.Clients, conn.ID) }

1

2

3

4

5

6

7

8

9

10

11

12

2.2.3 用户连接服务器

我们采用的是gin框架,所以这里我们可以先引入路由

r := gin.Default() r.Use(gin.Recovery(),gin.Logger()) v1 := r.Group("/") { v1.GET("ping", func(c *gin.Context) { c.JSON(200,"SUCCESS") }) v1.GET("ws",service.WsHandler) }

1

2

3

4

5

6

7

8

9

再在service层创建一个handler处理

读取两人的id

uid:=c.Query("uid") // 自己的id toUid:=c.Query("toUid") // 对方的id

1

2

升级ws协议

conn, err := (&websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { // CheckOrigin解决跨域问题 return true }}).Upgrade(c.Writer, c.Request, nil) // 升级成ws协议

1

2

3

4

创建用户实例

client := &Client{ ID : createId(uid,toUid), SendID : createId(toUid,uid), Socket : conn, Send : make(chan []byte), }

1

2

3

4

5

6

用户注册到用户管理上面

Manager.Register <- client

1

开通两个协程, 一个读,一个写

go client.Read() go client.Write()

1

2

2.3 写入

2.3.1 定义类型

我们定义的接受类型是json形式,结构体如下

我们这里设计了几个type

type = 1 接受消息

type = 2 获取历史消息

type SendMsg struct { Type int `json:"type"` Content string `json:"content"` }

1

2

3

4

2.3.2 读取数据

先用 PongHandler 返回当前的 socket 对象

c.Socket.PongHandler() sendMsg := new(SendMsg) // _,msg,_:=c.Socket.ReadMessage() // 不是json格式用这个 err := c.Socket.ReadJSON(&sendMsg) // json格式就用这个

1

2

3

4

2.3.3 接受消息

如果传过来的type=1的话,那么我们就可以先去redis上面查询一下当前有多少人进行了连接。

r1 ,_ := cache.RedisClient.Get(c.ID).Result() r2 ,_ := cache.RedisClient.Get(c.SendID).Result()

1

2

如果有三个人在线上,并且没有接受消息的话,就拒绝访问。

replyMsg := &ReplyMsg{ Code: e.WebsocketLimit, Content: "达到限制", } msg , _ := json.Marshal(replyMsg) _ = c.Socket.WriteMessage(websocket.TextMessage, msg)

1

2

3

4

5

6

如果没有的话,就先记录到redis中进行缓存

cache.RedisClient.Incr(c.ID) _ , _ =cache.RedisClient.Expire(c.ID,time.Hour*24*30*3).Result()

1

2

之后,我们再进行广播消息

Manager.Broadcast <- &Broadcast{ Client:c, Message:[]byte(sendMsg.Content), }

1

2

3

4

2.3.3 获取历史消息

那这个时候我们传来的 type 就等于 2,Content就是时间戳了

我们设置的话,是只保存三个月的,三个月过后我们就可以删除了。

timeT, err := strconv.Atoi(sendMsg.Content) // 传送来时间 if err != nil { timeT = 9999999 } results, _ := FindManyMsg(conf.MongoDBName,c.SendID,c.ID,int64(timeT),10)

1

2

3

4

5

这个FindManyMsg后面再说

返回前十条

if len(results) > 10 { results = results[:10] }else if len(results) == 0{ replyMsg := &ReplyMsg{ Code:e.WebsocketEnd, Content:"到底了", }

1

2

3

4

5

6

7

写入返回

msg , _ := json.Marshal(replyMsg) _ = c.Socket.WriteMessage(websocket.TextMessage,msg)

1

2

2.4 读取

我们用一个for循环进行消息的读取。

如果有消息的话,就WriteMessage写下来。发送出去。

for{ select { case message, ok := <-c.Send : if !ok { _=c.Socket.WriteMessage(websocket.CloseMessage,[]byte{}) return } log.Println(c.ID,"接受消息:",string(message)) replyMsg := &ReplyMsg{ Code:e.WebsocketSuccessMessage, Content:fmt.Sprintf("%s",string(message)), } msg , _ := json.Marshal(replyMsg) _ = c.Socket.WriteMessage(websocket.TextMessage, msg) } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

2.5 插入与查询

2.5.1 插入数据

我们使用的是mongoDB进行消息的存储,MongoDB的插入非常简单,文档数据库,插入json格式即可。

定义一个存储的数据类型

type Trainer struct { Content string `bson:"content"` // 内容 StartTime int64 `bson:"startTime"` // 创建时间 EndTime int64 `bson:"endTime"` // 过期时间 Read uint `bson:"read"` // 已读 }

1

2

3

4

5

6

传入数据库,用户ID,内容,是否已读,过期时间

func InsertMsg(database string, id string, content string, read uint, expire int64) (err error) { collection := conf.MongoDBClient.Database(database).Collection(id) comment := ws.Trainer{ Content: content, StartTime: time.Now().Unix(), EndTime: time.Now().Unix() + expire, Read: read, } _, err = collection.InsertOne(context.TODO(),comment) return }

1

2

3

4

5

6

7

8

9

10

11

2.5.2 查询数据

MongoDB的查询也非常容易,按照json格式进行查询。

定义一个存储对象的切片

var resultsMe []ws.Trainer

1

通过用户id查询所有的用户消息

idCollection := conf.MongoDBClient.Database(database).Collection(id)

1

根据传入的time 定义一个过滤器,进行这个时间内的查询。

filter := bson.M{"startTime": bson.M{"$lt": time}}

1

根据filter进行查询,然后再通过时间进行倒序排序,并且限定页数。

sendIdTimeCursor, err := sendIdCollection.Find(context.TODO(), filter, options.Find().SetSort(bson.D{{"StartTime", -1}}), options.Find(). SetLimit(int64(pageSize)))

1

2

3

把数据查询数据传入到resultsMe中

err = idTimeCurcor.All(context.TODO(), &resultsMe)

1

2.6 对方不在线

广播信息

case broadcast := <-Manager.Broadcast: message := broadcast.Message sendId := broadcast.Client.SendID flag := false // 默认对方不在线

1

2

3

4

如果没有这个人的话就一直找就可以了

for id, conn := range Manager.Clients { if id != sendId { continue } select { case conn.Send <- message: flag = true default: close(conn.Send) delete(Manager.Clients, conn.ID) } }

1

2

3

4

5

6

7

8

9

10

11

12

还是找到的话

就可以当作已读信息,存储

if flag { log.Println("对方在线应答") replyMsg := &ReplyMsg{ Code: e.WebsocketOnlineReply, Content: "对方在线应答", } msg , err := json.Marshal(replyMsg) _ = broadcast.Client.Socket.WriteMessage(websocket.TextMessage, msg) err = InsertMsg(conf.MongoDBName, id, string(message), 1, int64(3*month)) if err != nil { fmt.Println("InsertOneMsg Err", err) } }

1

2

3

4

5

6

7

8

9

10

11

12

13

如果没有找到的话,就是未读消息了。

else { log.Println("对方不在线") replyMsg := ReplyMsg{ Code: e.WebsocketOfflineReply, Content: "对方不在线应答", } msg , err := json.Marshal(replyMsg) _ = broadcast.Client.Socket.WriteMessage(websocket.TextMessage, msg) err = InsertMsg(conf.MongoDBName, id, string(message), 0, int64(3*month)) if err != nil { fmt.Println("InsertOneMsg Err", err) } }

1

2

3

4

5

6

7

8

9

10

11

12

13

3. 演示

【Go语言实战】基于 WebSocket + MongoDB 的IM即时聊天Demo

测试http连接

进行ws连接,连接服务器

当id=1上线,但是id=2没上线的时候发送消息

当id=2上线之后

再次发消息,就是在线应答了

这边就实时接受到消息了

获取历史信息

4. 源码地址

github地址

https://github.com/CocaineCong/gin-chat-demo

Go MongoDB websocket

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:2021年大数据Spark(十):环境搭建集群模式 Spark on YARN
下一篇:Bluetooth Profile Specification之1.1 A2DP 之Audio Codec-SBC
相关文章