// ************************************ // @package: websocketServer // @description: websocket管理 // @author: // @revision history: // @create date: 2022-02-18 15:38:17 // ************************************ package websocketServer import ( "errors" "github.com/gorilla/websocket" routineCtrlUtil "goutil/routineCtrlUtil" "sync" "time" ) const ( // 默认广播并发数 con_DEFAULT_BROADCAST_CONCURRENT = 10 ) // connManager // @description: websocket连接管理 type connManager struct { // 是否禁止新连接 disableNewConn bool // 广播并发数 broadcastConcurrent int // websocket服务端配置结构 upgrader *websocket.Upgrader // 广播锁-限制 消息广播/关闭所有连接 并发访问 muBroadcast sync.Mutex // 连接池map写锁(增加/删除) muAllConns sync.Mutex // 连接池-所有已连接的websocket allConns map[*websocket.Conn]*Context //------------------- // 心跳控制 // 接收到Ping消息时,是否自动回复Pong autuPong bool // 心跳周期 heartbeatCycle time.Duration // 断连周期数(超过几个心跳周期即自动关闭连接);设置为0即关闭心跳检测功能 heartbeatCloseCount int // 是否已开启心跳检测协程 isHeartbeatDetectStart bool } // heartbeatDetect // @description: 开启心跳检测协程 // parameter: // // @receiver connMgr: // // return: func (connMgr *connManager) heartbeatDetect() { // 限制每个websocket连接管理只开启一个心跳检测协程 if !connMgr.isHeartbeatDetectStart { connMgr.isHeartbeatDetectStart = true // 开启心跳检测协程 go func() { for { if connMgr.heartbeatCloseCount <= 0 { // 心跳检测功能已关闭;每秒检测此标志 time.Sleep(time.Second) continue } // 心跳检测功能已开启 connMgr.muAllConns.Lock() // 连接池map锁 ctxs_timeout := make([]*Context, 0, len(connMgr.allConns)) // 存放心跳超时,需要关闭的websocket环境 for _, ctx := range connMgr.allConns { if time.Since(ctx.heartbeat) > (connMgr.heartbeatCycle*time.Duration(connMgr.heartbeatCloseCount) + 1) { // 心跳超时,需要关闭的websocket环境加入列表 ctxs_timeout = append(ctxs_timeout, ctx) } } connMgr.muAllConns.Unlock() // 连接池map及时解锁 // 关闭所有心跳超时的连接 func() { // 获取广播并发数 broadcastConcurrent := connMgr.broadcastConcurrent if broadcastConcurrent <= 0 { broadcastConcurrent = con_DEFAULT_BROADCAST_CONCURRENT } // 协程并发限制 rtCtrl := routineCtrlUtil.New(broadcastConcurrent) for _, ctx := range ctxs_timeout { ctxTemp := ctx rtCtrl.Run(func(arg interface{}) { // 执行受限并发函数 ctxTemp.Close() }, nil) } // 等待完成 rtCtrl.Wait() }() // 休眠半个心跳周期 slpTime := time.Duration(connMgr.heartbeatCycle / 2) if slpTime < time.Second { slpTime = time.Second } time.Sleep(slpTime) } }() } } // upgrade // @description: 升级为websocket // parameter: // // @receiver connMgr: // @ctx: websocket环境 // // return: // // @*websocket.Conn: 建立的连接对象 // @error: func (connMgr *connManager) upgrade(ctx *Context) (conn *websocket.Conn, err error) { if connMgr.disableNewConn { // 禁止新连接 return nil, errors.New("connManager(disableNewConn)") } // 建立websocket连接 conn, err = connMgr.upgrader.Upgrade(ctx.GetWebServerContext().GetResponseWriter(), ctx.GetWebServerContext().GetRequest(), nil) if err != nil { return } // 添加到连接池 ctx.conn = conn connMgr.addConn(conn, ctx) return } // addConn // @description: 添加到连接池 // parameter: // // @receiver connMgr: // @conn: // @ctx: // // return: func (connMgr *connManager) addConn(conn *websocket.Conn, ctx *Context) { connMgr.muAllConns.Lock() defer connMgr.muAllConns.Unlock() connMgr.allConns[conn] = ctx } // delConn // @description: 将连接从连接池删除 // parameter: // // @receiver connMgr: // @conn: // // return: func (connMgr *connManager) delConn(conn *websocket.Conn) { connMgr.muAllConns.Lock() defer connMgr.muAllConns.Unlock() delete(connMgr.allConns, conn) } // renewAllConnsMap // @description: 重新替换一个新allConns的map结构,以免被标记删除的冗余信息造成存储和性能问题 // parameter: // // @receiver connMgr: // // return: // // @map[*websocket.Conn]*Context: 返回原内部使用的连接池(现内部已不再使用) func (connMgr *connManager) renewAllConnsMap() map[*websocket.Conn]*Context { connMgr.muAllConns.Lock() defer connMgr.muAllConns.Unlock() // map拷贝 allConnsCopy := make(map[*websocket.Conn]*Context, len(connMgr.allConns)) for conn, ctx := range connMgr.allConns { allConnsCopy[conn] = ctx } // map替换;因map删除时只是标记,并非真正删除,使用一段时间后可能会出现大量未使用信息;这里顺便更新一下map connMgr.allConns, allConnsCopy = allConnsCopy, connMgr.allConns return allConnsCopy } // SetBroadcastConcurrent // @description: 设置广播并发数 // parameter: // // @receiver connMgr: // @n: 广播并发数 // // return: func (connMgr *connManager) SetBroadcastConcurrent(n int) { connMgr.broadcastConcurrent = n } // EnableNewConn // @description: 允许新连接 // parameter: // // @receiver connMgr: // // return: func (connMgr *connManager) EnableNewConn() { connMgr.disableNewConn = false } // DisableNewConn // @description: 禁用新连接 // parameter: // // @receiver connMgr: // // return: func (connMgr *connManager) DisableNewConn() { connMgr.disableNewConn = true } // MulticastMessage // @description: 多播消息(给指定多用户发送消息) // parameter: // // @receiver connMgr: // @ctxs: 指定多用户的*Context切片 // @messageType: websocket类型 // @data: 发送的数据 // // return: // // @err: 若有错误,则为最后一个错误 func (connMgr *connManager) MulticastMessage(ctxs []*Context, messageType int, data []byte) (err error) { // 广播锁,防重入 connMgr.muBroadcast.Lock() defer connMgr.muBroadcast.Unlock() // 获取广播并发数 broadcastConcurrent := connMgr.broadcastConcurrent if broadcastConcurrent <= 0 { broadcastConcurrent = con_DEFAULT_BROADCAST_CONCURRENT } // 协程并发限制 rtCtrl := routineCtrlUtil.New(broadcastConcurrent) var mu sync.Mutex for _, ctx := range ctxs { // 执行受限并发函数 // 注意:这里的ctx需要使用参数传入Run;否则随时变化的ctx在闭包内使用时,会出现不符合程序要求逻辑的结果 rtCtrl.Run(func(arg interface{}) { ctxTemp, _ := arg.(*Context) e := ctxTemp.SendMessage(messageType, data) if e != nil { mu.Lock() err = e mu.Unlock() } }, ctx) } // 等待完成 rtCtrl.Wait() return } // BroadcastMessage // @description: 消息广播 // parameter: // // @receiver connMgr: // @messageType: websocket类型 // @data: 发送的数据 // // return: // // @err: 若有错误,则为最后一个错误 func (connMgr *connManager) BroadcastMessage(messageType int, data []byte) (err error) { // 广播锁,防重入 connMgr.muBroadcast.Lock() defer connMgr.muBroadcast.Unlock() // 重新替换一个新allConns的map结构,以免被标记删除的冗余信息造成存储和性能问题 allConnsCopy := connMgr.renewAllConnsMap() // 获取广播并发数 broadcastConcurrent := connMgr.broadcastConcurrent if broadcastConcurrent <= 0 { broadcastConcurrent = con_DEFAULT_BROADCAST_CONCURRENT } // 协程并发限制 rtCtrl := routineCtrlUtil.New(broadcastConcurrent) var mu sync.Mutex for _, ctx := range allConnsCopy { // 执行受限并发函数 // 注意:这里的ctx需要使用参数传入Run;否则随时变化的ctx在闭包内使用时,会出现不符合程序要求逻辑的结果 rtCtrl.Run(func(arg interface{}) { ctxTemp, _ := arg.(*Context) e := ctxTemp.SendMessage(messageType, data) if e != nil { mu.Lock() err = e mu.Unlock() } }, ctx) } // 等待完成 rtCtrl.Wait() return } // CloseAll // @description: 关闭所有连接 // parameter: // // @receiver connMgr: // // return: func (connMgr *connManager) CloseAll() { // 广播锁,防重入 connMgr.muBroadcast.Lock() defer connMgr.muBroadcast.Unlock() // 重新替换一个新allConns的map结构,以免被标记删除的冗余信息造成存储和性能问题 allConnsCopy := connMgr.renewAllConnsMap() // 获取广播并发数 broadcastConcurrent := connMgr.broadcastConcurrent if broadcastConcurrent <= 0 { broadcastConcurrent = con_DEFAULT_BROADCAST_CONCURRENT } // 协程并发限制 rtCtrl := routineCtrlUtil.New(broadcastConcurrent) for _, ctx := range allConnsCopy { rtCtrl.Run(func(arg interface{}) { ctxTemp, _ := arg.(*Context) // 执行受限并发函数 ctxTemp.Close() }, ctx) } // 等待完成 rtCtrl.Wait() } // SetUpgrader // @description: 设置websocket参数结构 // parameter: // // @receiver connMgr: // @upgrader: websocket中的websocket.Upgrader结构体指针(可以设置握手超时/读写缓存/是否允许跨域等) // // return: func (connMgr *connManager) SetUpgrader(upgrader *websocket.Upgrader) { connMgr.upgrader = upgrader } // GetUpgrader // @description: 获取websocket参数结构 // parameter: // // @receiver connMgr: // // return: // // @*websocket.Upgrader: websocket中的websocket.Upgrader结构体指针(可以设置握手超时/读写缓存/是否允许跨域等) func (connMgr *connManager) GetUpgrader() *websocket.Upgrader { return connMgr.upgrader } // SetAutoPong // @description: 设置接收到Ping消息时,是否自动回复Pong信息 // parameter: // // @receiver connMgr: // @autuPong: // // return: func (connMgr *connManager) SetAutoPong(autuPong bool) { connMgr.autuPong = autuPong } // GetAutoPong // @description: 获取接收到Ping消息时,是否自动回复Pong信息 // parameter: // // @receiver connMgr: // // return: // // @bool: func (connMgr *connManager) GetAutoPong() bool { return connMgr.autuPong } // SetHeartbeatDetectInfo // @description: 设置心跳检测信息 // parameter: // // @receiver connMgr: // @heartbeatCloseCount: 断连周期数(超过几个心跳周期即自动关闭连接);设置为0即关闭心跳检测功能 // @heartbeatCycle: 心跳周期 // // return: func (connMgr *connManager) SetHeartbeatDetectInfo(heartbeatCloseCount int, heartbeatCycle time.Duration) { connMgr.heartbeatCycle = heartbeatCycle connMgr.heartbeatCloseCount = heartbeatCloseCount } // GetHeartbeatDetectInfo // @description: 获取心跳检测信息 // parameter: // // @receiver connMgr: // // return: // // @heartbeatCloseCount: 断连周期数(超过几个心跳周期即自动关闭连接) // @heartbeatCycle: 心跳周期 func (connMgr *connManager) GetHeartbeatDetectInfo() (heartbeatCloseCount int, heartbeatCycle time.Duration) { return connMgr.heartbeatCloseCount, connMgr.heartbeatCycle }