goProject/trunk/goutil/websocketUtil/model.go

157 lines
2.9 KiB
Go
Raw Permalink Normal View History

2025-01-06 16:01:02 +08:00
package websocketUtil
import (
"fmt"
"time"
"github.com/gorilla/websocket"
)
const (
// 最大重试次数
send_max_retry_num int = 60
// 发送错误计数次数
send_error_num int32 = 3
)
type model struct {
wsurl string
msgChan chan *sendModel
con *websocket.Conn
isclose bool
}
type sendModel struct {
data []byte
sendType int
}
// 构造请求对象
func newModel(wsurl string) (*model, error) {
result := &model{}
result.wsurl = wsurl
result.msgChan = make(chan *sendModel, 1000)
// 创建连接
err := result.createCon()
if err != nil {
return nil, err
}
go result.sendhandler()
return result, nil
}
// 发送消息
func (this *model) send(st int, d []byte) {
sm := &sendModel{
data: d,
sendType: st,
}
this.msgChan <- sm
}
// 发送处理器
func (this *model) sendhandler() {
defer func() {
if rerr := recover(); rerr != nil {
errLog("url:%s sendhandler recover:%s", this.wsurl, rerr)
}
}()
// 下面逻辑需要考虑这些情况:
// 1.对端重启 -> 需要尽可能保证消息投递成功
// 2.对端永久关闭 -> 需要尽快释放channel里面的消息以免堵塞channel增加内占用
var err error
var errCount int32
for {
if this.isclose {
return
}
sm := <-this.msgChan
isOk := false
for i := 0; i < send_max_retry_num; i++ {
if this.isclose {
return
}
if this.checkcon() == false {
time.Sleep(time.Second * 1)
debugLog("url:%s channel msgchen 数量达到%v", this.wsurl, len(this.msgChan))
continue
}
err = this.con.WriteMessage(sm.sendType, sm.data)
if err == nil {
isOk = true
break
}
// 记录错误日志
errLog(fmt.Sprintf("wsurl:%s 发送数据错误 err:%s", this.wsurl, err))
this.con = nil
// 如果错误计数到达闸值,则不再重试
if errCount >= send_error_num {
break
} else {
time.Sleep(time.Second)
continue
}
}
// 修改错误计数
if isOk == false {
errCount = errCount + 1
} else {
errCount = 0
}
}
}
func (this *model) closeHandler(code int, text string) error {
debugLog("websocketUtil.model 连接关闭 wsurl:%s code:%v text:%s", this.wsurl, code, text)
this.con = nil
return nil
}
// 创建ws连接
func (this *model) createCon() error {
dd := &websocket.Dialer{
Proxy: websocket.DefaultDialer.Proxy,
HandshakeTimeout: 3 * time.Second,
}
ws, _, err := dd.Dial(this.wsurl, nil)
if err != nil {
return err
}
ws.SetCloseHandler(this.closeHandler)
this.con = ws
return nil
}
// 校验con是否可用
func (this *model) checkcon() bool {
if this.con == nil {
if err := this.createCon(); err != nil {
errLog("wsurl:%s checkcon err:%s", this.wsurl, err)
return false
}
}
return true
}
// close
// @Description: 关闭连接
// @receiver this *model
func (this *model) close() {
close(this.msgChan)
this.isclose = true
debugLog("websocketUtil.model close wsurl:%s", this.wsurl)
}