goProject/trunk/framework/sqlAsyncMgr/sqlAsyncWorker.go

144 lines
3.0 KiB
Go
Raw Normal View History

2025-01-06 16:01:02 +08:00
package sqlAsyncMgr
import (
"fmt"
"time"
"framework/goroutineMgr"
"github.com/jinzhu/gorm"
"goutil/logUtil"
)
const (
// 同步多少次打印待同步sql数量
LogSyncCount = 1000
)
// sql同步的worker
type SqlAsyncWorker struct {
// 工作者Id
Id int32
// 是否停止工作
IsTop bool
// 当前同步的条数
CurSyncCount int32
// 待执行对象
cachModel *SqlAsyncListModel
// 刷新到时候的回调方法
flushDbCallBack func(*SqlAsyncItemModel)
// 日志处理方法
logAction func(logUtil.LogType, string)
// 获取同步标识
getSyncName func() string
// 数据库驱动
db *gorm.DB
tempCount int
}
// newSqlAsyncWorker 创建新SqlAsyncWorker对象
func newSqlAsyncWorker(_id int32, _flushCallBack func(*SqlAsyncItemModel), _logAction func(logUtil.LogType, string), _getSyncName func() string, _db *gorm.DB) *SqlAsyncWorker {
return &SqlAsyncWorker{
Id: _id,
cachModel: newSqlAsyncListModel(),
flushDbCallBack: _flushCallBack,
logAction: _logAction,
getSyncName: _getSyncName,
db: _db,
}
}
// Start 启动工作
func (this *SqlAsyncWorker) Start() {
go this.handler()
}
// Stop 停止工作
func (this *SqlAsyncWorker) Stop() []*SqlAsyncItemModel {
this.IsTop = true
return this.cachModel.GetAllSqlModel()
}
// Add 添加执行的sql对象
func (this *SqlAsyncWorker) Add(item *SqlAsyncItemModel) {
if this.IsTop {
return
}
this.cachModel.Add(item)
}
// WaitSqlCount 获取待同步的sql数量
func (this *SqlAsyncWorker) WaitSqlCount() int32 {
return this.cachModel.SqlCount()
}
// FlushDB 刷新到DB
func (this *SqlAsyncWorker) handler() {
// 处理goroutine数量
goroutineName := fmt.Sprintf("%s-%v", this.getSyncName(), this.Id)
goroutineMgr.Monitor(goroutineName)
defer goroutineMgr.ReleaseMonitor(goroutineName)
for {
if this.IsTop {
this.logAction(logUtil.Debug, fmt.Sprintf("sql异步线程停止工作 name=%s", goroutineName))
return
}
if this.flushItem() == false {
time.Sleep(time.Millisecond * 100)
}
}
}
// flushItem 刷新到DB
func (this *SqlAsyncWorker) flushItem() bool {
defer func() {
if r := recover(); r != nil {
this.logAction(logUtil.Error, fmt.Sprintf("flushItem err:%s", r))
}
}()
// 获取同步对象
item, exists := this.cachModel.GetItem()
if exists == false {
return false
}
this.tempCount = this.tempCount + 1
// 写入数据库
for i := 0; i < 3; i++ {
err := this.db.Exec(item.Sql).Error
if err == nil {
break
}
this.logAction(logUtil.Error, fmt.Sprintf("写入数据库出现错误 sql:%s err:%s", item.Sql, err.Error()))
}
// 移除缓存
this.cachModel.RemoveFirst()
// 减少统计
this.flushDbCallBack(item)
// 统计当前待同步数量
this.CurSyncCount = this.CurSyncCount + 1
if this.CurSyncCount%LogSyncCount == 0 {
this.logAction(logUtil.Debug, fmt.Sprintf("当前待同步的sql数量为:%v Name:%s id:%v", this.cachModel.SqlCount(), this.getSyncName(), this.Id))
this.CurSyncCount = 0
}
return true
}