package sqlAsyncMgr import ( "encoding/json" "fmt" "io/ioutil" "os" "path/filepath" "time" "github.com/jinzhu/gorm" _ "github.com/jinzhu/gorm/dialects/mysql" "goutil/logUtil" "goutil/stringUtil" ) // sql异步util类 type SqlAsyncUtil struct { // 文件路径 filePath string // 名称 name string // 工作者最大数量 max_Worker_Nums int32 // 工作池 workerPool *SqlAsyncWorkerPool // 缓存统计 sqlStatistics *SqlAsyncIdentityStatistics // 日志处理方法 logAction func(logUtil.LogType, string) // db驱动 db *gorm.DB } // NewSqlAsyncUtil 创建新SqlAsyncUtil对象 func NewSqlAsyncUtil(_name, _filePath string, maxWorker int32, dbConnectionStr string, _logAction func(logUtil.LogType, string)) *SqlAsyncUtil { if stringUtil.IsEmpty(_name) || stringUtil.IsEmpty(_filePath) || stringUtil.IsEmpty(dbConnectionStr) { panic(fmt.Sprintf("NewSqlAsyncUtil方法_name/_filePath/dbConnectionStr参数不能为空")) } if maxWorker <= 0 { panic(fmt.Sprintf("NewSqlAsyncUtil方法maxWorker参数必须>0")) } if _logAction == nil { panic(fmt.Sprintf("NewSqlAsyncUtil方法_logAction参数并能为nil")) } result := &SqlAsyncUtil{ name: _name, filePath: _filePath, max_Worker_Nums: maxWorker, logAction: _logAction, } result.db = newGormDb(dbConnectionStr, maxWorker) result.workerPool = newSqlAsyncWorkerPool(maxWorker, result.reduceStatistics, _logAction, func() string { return result.name }, result.db) result.sqlStatistics = newSqlAsyncIdentityStatistics() return result } // NewSqlAsyncUtil2 创建新SqlAsyncUtil对象 func NewSqlAsyncUtil2(_name, _filePath string, maxWorker int32, db *gorm.DB, _logAction func(logUtil.LogType, string)) *SqlAsyncUtil { if stringUtil.IsEmpty(_name) || stringUtil.IsEmpty(_filePath) || db == nil { panic(fmt.Sprintf("NewSqlAsyncUtil方法_name/_filePath/db参数不能为空")) } if maxWorker <= 0 { panic(fmt.Sprintf("NewSqlAsyncUtil方法maxWorker参数必须>0")) } if _logAction == nil { panic(fmt.Sprintf("NewSqlAsyncUtil方法_logAction参数并能为nil")) } result := &SqlAsyncUtil{ name: _name, filePath: _filePath, max_Worker_Nums: maxWorker, logAction: _logAction, } result.db = db result.workerPool = newSqlAsyncWorkerPool(maxWorker, result.reduceStatistics, _logAction, func() string { return result.name }, result.db) result.sqlStatistics = newSqlAsyncIdentityStatistics() return result } // Start 启动 func (this *SqlAsyncUtil) Start() { // 启动工作线程 this.workerPool.Start() // 读取待同步sql waitSyncSqls := this.readWaitSql() if waitSyncSqls == nil { return } // 将数据加入写入队列 for _, item := range waitSyncSqls { if item == nil { continue } this.Write(item) } } // Stop 停止 func (this *SqlAsyncUtil) Stop(save bool) { saveList := this.workerPool.Stop() if saveList == nil || len(saveList) == 0 { return } if save == false { return } this.waitSqlFlushFile(saveList) } // WaitSqlSyncDone 等待剩余sql同步完成 func (this *SqlAsyncUtil) WaitSqlSyncDone() { for { num := this.GetAllCount() this.logAction(logUtil.Debug, fmt.Sprintf("SqlAsyncUtil(%s)当前剩余sql数量为:%v", this.name, num)) if num > 0 { time.Sleep(time.Millisecond * 100) continue } // 删除文件 this.delFile() return } } // Write1 写入Sql func (this *SqlAsyncUtil) Write1(tableName, identityId, sql string) { newItem := newSqlAsyncItemModel(tableName, identityId, sql) this.Write(newItem) } // Write 写入Sql func (this *SqlAsyncUtil) Write(item *SqlAsyncItemModel) { worker, err := this.workerPool.GetWork(item.TableName) if err != nil { this.logAction(logUtil.Error, err.Error()) return } worker.Add(item) this.sqlStatistics.AddCount(item.IdentityId, 1) } // GetCount 获取指定标识的待执行sql func (this *SqlAsyncUtil) GetCount(identityId string) int32 { return this.sqlStatistics.GetCount(identityId) } // GetAllCount 获取总的待执行sql func (this *SqlAsyncUtil) GetAllCount() int32 { return this.workerPool.GetWaitSyncCount() } // ReduceStatistics 减少统计 func (this *SqlAsyncUtil) reduceStatistics(item *SqlAsyncItemModel) { this.sqlStatistics.Reduce(item.IdentityId, 1) } // readWaitSql 读取待同步sql func (this *SqlAsyncUtil) readWaitSql() []*SqlAsyncItemModel { _, err := os.Stat(this.filePath) if err != nil { return nil } var bytes []byte bytes, err = ioutil.ReadFile(this.filePath) if err != nil { panic(fmt.Sprintf("readWaitSql错误 file:%s err:%s", this.filePath, err.Error())) } result := make([]*SqlAsyncItemModel, 0) err = json.Unmarshal(bytes, &result) if err != nil { panic(fmt.Sprintf("readWaitSql json反序列化错误 file:%s err:%s", this.filePath, err.Error())) } return result } // waitSqlFlushFile 待同步sql写入文件 func (this *SqlAsyncUtil) waitSqlFlushFile(waitSqlList []*SqlAsyncItemModel) { dir := filepath.Dir(this.filePath) // 创建文件夹 _, err := os.Stat(dir) if err != nil { os.MkdirAll(dir, os.ModePerm) } data, err := json.Marshal(waitSqlList) if err != nil { this.logAction(logUtil.Error, fmt.Sprintf("SqlAsyncUtil保存sql时,json出错,name:%s err:%s", this.name, err.Error())) return } err = ioutil.WriteFile(this.filePath, data, os.ModePerm) if err != nil { this.logAction(logUtil.Error, fmt.Sprintf("SqlAsyncUtil保存sql时,写入出错,name:%s err:%s", this.name, err.Error())) } } // delFile 删除文件 func (this *SqlAsyncUtil) delFile() { // 创建文件夹 _, err := os.Stat(this.filePath) if err == nil { os.Remove(this.filePath) } } // @title newGormDb // @description 构造新gorm.DB对象 // @Param connectionString 数据库连接字符串 // @Param maxOpenCount 最大打开连接数 func newGormDb(connectionString string, maxOpenCount int32) *gorm.DB { dbObj, err := gorm.Open("mysql", connectionString) if err != nil { panic(fmt.Sprintf("连接mysql出错 connectionString:%s err:%s", connectionString, err)) } dbObj.DB().SetMaxOpenConns(int(maxOpenCount)) dbObj.DB().SetMaxIdleConns(int(maxOpenCount)) dbObj.DB().SetConnMaxLifetime(time.Minute * 4) return dbObj }