goProject/trunk/framework/dataSyncMgr/mysqlSync/syncObject.go
皮蛋13361098506 1b77f62820 初始化项目
2025-01-06 16:01:02 +08:00

211 lines
5.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package mysqlSync
import (
"database/sql"
"fmt"
"os"
"path/filepath"
"time"
"Framework/dataSyncMgr/mysqlSync/sqlSync"
"goutil/debugUtil"
"goutil/logUtil"
)
// 同步对象定义
type SyncObject struct {
// 同步数据的存储路径
dirPath string
// 同步数据对象的唯一标识,用于进行重复判断
identifier string
// 数据库对象
dbObj *sql.DB
// 同步信息对象
syncingInfoObj *syncingInfo
// 错误处理对象
errorHandleObj *errorFile
// 同步对象
syncObj *sqlSync.SyncObject
}
// 进行同步对象初始化
// maxFileSize:每个大文件的最大写入值单位Byte
func (this *SyncObject) Init(baseObj *sqlSync.SyncObject) {
this.syncObj = baseObj
// 创建同步信息记录对象
syncingInfoObj, err := newSyncingInfoObject(this.identifier, this.dbObj)
if err != nil {
panic(err)
}
this.syncingInfoObj = syncingInfoObj
// 启动时同步所有数据(然后才能从数据库中查询数据,以免数据丢失)
this.syncOldData()
}
// 同步完成之前未同步完的数据
func (this *SyncObject) syncOldData() {
// 获取文件列表(有序的列表)
fileList := sqlSync.GetDataFileList(this.dirPath)
filePath, _ := this.syncingInfoObj.GetSyncingInfo()
// 判断是否有文件
if len(fileList) == 0 {
return
}
// 判断当前文件是否为空,如果为空则将第一个文件赋给它
if filePath == "" {
this.syncingInfoObj.Update(fileList[0], 0, nil)
}
// 开始同步数据
this.syncObj.Sync()
return
}
// 获取正在同步的信息
// filePath:文件路径
// offset:文件偏移量
func (this *SyncObject) GetSyncingInfo() (filePath string, offset int64) {
return this.syncingInfoObj.GetSyncingInfo()
}
// 更新
// filePath:文件路径
// offset:文件偏移量
// tran:事务对象
// 返回值:
// error:错误对象
func (this *SyncObject) Update(filePath string, offset int64, tran *sql.Tx) error {
return this.syncingInfoObj.Update(filePath, offset, tran)
}
// 同步一条sql语句
// command:待执行的命令
// filePath:保存路径
// offset:文件偏移量
// 返回值:
// error:错误信息
func (this *SyncObject) SyncOneSql(command string, filePath string, offset int64) {
err := this.syncOneSqlDetail(command, filePath, offset)
if err == nil {
return
}
// 发送监控报警
this.handleError(command, filePath, offset, err)
return
}
// 同步一条sql语句的具体逻辑
// command:待执行的命令
// filePath:保存路径
// offset:文件偏移量
// 返回值:
// error:错误信息
func (this *SyncObject) syncOneSqlDetail(command string, filePath string, offset int64) error {
return sqlSync.ExecuteByTran(this.dbObj, func(tx *sql.Tx) (isCommit bool, err error) {
// 保存sql到数据库
err = this.syncToMysql(command, tx)
if err != nil {
return false, err
}
// 保存进度信息到数据库
err = this.syncingInfoObj.Update(filePath, offset, tx)
if err != nil {
return false, err
}
return true, nil
})
}
// 同步数据到mysql中
// command:待执行的命令
// tx:事务对象
// 返回值:
// error:错误信息
func (this *SyncObject) syncToMysql(command string, tx *sql.Tx) error {
_, err := tx.Exec(command)
if err != nil {
prefix := fmt.Sprintf("%s-%s", this.identifier, "SyncObject.syncToMysql")
err = fmt.Errorf("%s-%s Update to mysql failed:%s", prefix, command, err)
logUtil.ErrorLog(err.Error())
debugUtil.Printf("fatal Error:%v", err.Error())
return err
}
return nil
}
// 进行错误处理
// command:存在异常的数据
// filePath:文件路径
// offset:文件偏移量
// err:错误信息
func (this *SyncObject) handleError(command string, filePath string, offset int64, err error) {
defer this.errorHandleObj.Delete()
// 保存当前sql命令
this.errorHandleObj.SaveCommand(command)
// 循环处理当前命令,直到没有错误
beginTime := time.Now().Unix()
for {
// 每隔5分钟发送警报
if time.Now().Unix()-beginTime > 5*60 {
beginTime = time.Now().Unix()
}
// 每次循环休眠20秒
time.Sleep(5 * time.Second)
command = this.errorHandleObj.ReadCommand()
err = this.syncOneSqlDetail(command, filePath, offset)
if err != nil {
continue
}
break
}
}
// 创新新的mysql同步对象
// dirPath:存放数据的目录
// identifier:当前数据的唯一标识(可以使用数据库表名)
// dbObj:数据库对象
// syncingInfoObj:同步信息记录对象
// errorHandleObj:错误处理对象
// 返回值:
// mysql同步对象
func newSyncObject(dirPath, identifier string, dbObj *sql.DB) *SyncObject {
dirPath = filepath.Join(dirPath, identifier)
// 创建更新目录
err := os.MkdirAll(dirPath, os.ModePerm|os.ModeTemporary)
if err != nil {
err = fmt.Errorf("%s-%s-make dir failed:%s", identifier, "SyncObject.newSyncObject.os.MkdirAll", err)
logUtil.ErrorLog(err.Error())
panic(err)
}
// 构造同步信息对象
result := &SyncObject{
dirPath: dirPath,
identifier: identifier,
dbObj: dbObj,
errorHandleObj: newErrorFile(dirPath, identifier),
}
return result
}