package websocketUtil import ( "fmt" "time" "github.com/gorilla/websocket" ) const ( // 最大重试次数 send_max_retry_num int = 60 // 发送错误计数次数 send_error_num int32 = 3 ) type model struct { wsurl string msgChan chan *sendModel con *websocket.Conn isclose bool } type sendModel struct { data []byte sendType int } // 构造请求对象 func newModel(wsurl string) (*model, error) { result := &model{} result.wsurl = wsurl result.msgChan = make(chan *sendModel, 1000) // 创建连接 err := result.createCon() if err != nil { return nil, err } go result.sendhandler() return result, nil } // 发送消息 func (this *model) send(st int, d []byte) { sm := &sendModel{ data: d, sendType: st, } this.msgChan <- sm } // 发送处理器 func (this *model) sendhandler() { defer func() { if rerr := recover(); rerr != nil { errLog("url:%s sendhandler recover:%s", this.wsurl, rerr) } }() // 下面逻辑需要考虑这些情况: // 1.对端重启 -> 需要尽可能保证消息投递成功 // 2.对端永久关闭 -> 需要尽快释放channel里面的消息,以免堵塞channel,增加内占用 var err error var errCount int32 for { if this.isclose { return } sm := <-this.msgChan isOk := false for i := 0; i < send_max_retry_num; i++ { if this.isclose { return } if this.checkcon() == false { time.Sleep(time.Second * 1) debugLog("url:%s channel msgchen 数量达到%v", this.wsurl, len(this.msgChan)) continue } err = this.con.WriteMessage(sm.sendType, sm.data) if err == nil { isOk = true break } // 记录错误日志 errLog(fmt.Sprintf("wsurl:%s 发送数据错误 err:%s", this.wsurl, err)) this.con = nil // 如果错误计数到达闸值,则不再重试 if errCount >= send_error_num { break } else { time.Sleep(time.Second) continue } } // 修改错误计数 if isOk == false { errCount = errCount + 1 } else { errCount = 0 } } } func (this *model) closeHandler(code int, text string) error { debugLog("websocketUtil.model 连接关闭 wsurl:%s code:%v text:%s", this.wsurl, code, text) this.con = nil return nil } // 创建ws连接 func (this *model) createCon() error { dd := &websocket.Dialer{ Proxy: websocket.DefaultDialer.Proxy, HandshakeTimeout: 3 * time.Second, } ws, _, err := dd.Dial(this.wsurl, nil) if err != nil { return err } ws.SetCloseHandler(this.closeHandler) this.con = ws return nil } // 校验con是否可用 func (this *model) checkcon() bool { if this.con == nil { if err := this.createCon(); err != nil { errLog("wsurl:%s checkcon err:%s", this.wsurl, err) return false } } return true } // close // @Description: 关闭连接 // @receiver this *model func (this *model) close() { close(this.msgChan) this.isclose = true debugLog("websocketUtil.model close wsurl:%s", this.wsurl) }