goProject/.svn/pristine/b9/b96f577140a96cce4d3b17e69da36c41ccec5d21.svn-base

272 lines
7.0 KiB
Plaintext
Raw Normal View History

2025-01-06 16:21:36 +08:00
package sqlSync
import (
"database/sql"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"goutil/debugUtil"
"goutil/fileUtil"
"goutil/logUtil"
)
// 同步对象定义
type SyncObject struct {
// 同步数据的存储路径
dirPath string
// 同步数据对象的唯一标识,用于进行重复判断
identifier string
// 数据库对象
dbObj *sql.DB
// 处理数据写入的文件
sqlFileObj *SqlFile
// 同步处理对象
syncHandleObj SyncHandler
}
// 进行同步对象初始化
// maxFileSize:每个大文件的最大写入值单位Byte
func (this *SyncObject) Init(maxFileSize int) {
// 启动时同步所有数据(然后才能从数据库中查询数据,以免数据丢失)
this.syncHandleObj.Init(this)
// 构造同步数据对象
fileName, _ := this.syncHandleObj.GetSyncingInfo()
this.sqlFileObj = NewSqlFile(this.dirPath, this.identifier, fileName, maxFileSize)
// 当前没有正在同步的文件,则指向当前正在写的文件
if len(fileName) <= 0 {
this.syncHandleObj.Update(this.sqlFileObj.FileFullName(), 0, nil)
}
// 启动一个新goroutine来负责同步数据
go func() {
/* 此处不使用goroutineMgr.Monitor/ReleaseMonitor因为此处不能捕获panic需要让外部进程终止执行
因为此模块的文件读写为核心逻辑,一旦出现问题必须停止进程,否则会造成脏数据
*/
this.Sync()
}()
}
// 保存数据到本地文件
// command:待保存的指令
func (this *SyncObject) Save(command string) {
this.sqlFileObj.Write(command)
}
// 循环同步多个文件
func (this *SyncObject) Sync() {
// 开始循环同步
for {
// 同步当前文件
this.syncOneFile()
// 当前文件同步完成,记录同步日志
nowFilePath, _ := this.syncHandleObj.GetSyncingInfo()
// 删除已同步完成的文件
WaitForOk(func() bool {
fileExist, err := fileUtil.IsFileExists(nowFilePath)
if err != nil {
logUtil.ErrorLog(fmt.Sprintf("mysqlSync/syncObject IsFileExists error:%s", err.Error()))
return false
}
if fileExist == false {
return true
}
err = fileUtil.DeleteFile(nowFilePath)
if err != nil {
logUtil.ErrorLog(fmt.Sprintf("mysqlSync/syncObject delete file error:%s", err.Error()))
return false
}
return true
}, 10*time.Second)
// 当前文件同步完成,获取下个文件
nextFileName := NewFileName("", nowFilePath)
filePath := filepath.Join(this.dirPath, nextFileName)
exist, err := fileUtil.IsFileExists(filePath)
if err != nil {
logUtil.ErrorLog(fmt.Sprintf("mysqlSync/syncObject IsFileExists error:%s", err.Error()))
panic(err)
}
// 如果文件不存在,退出
if !exist {
// fmt.Println("协程退出了")
return
}
// 更新同步的位置信息 此处忽略错误是因为,哪怕是出错了,也不会影响整体逻辑
this.syncHandleObj.Update(filePath, 0, nil)
}
}
// 同步单个文件
func (this *SyncObject) syncOneFile() {
// 获取信息同步项对象
filePath, offset := this.syncHandleObj.GetSyncingInfo()
// 打开待读取的文件
f, exist := this.openFile(filePath)
if exist == false {
// logUtil.WarnLog(fmt.Sprintf("待同步的文件不存在,跳过此文件:%s", filePath),"")
return
}
defer f.Close()
for {
// 移动到需要读取的位置
if _, err := f.Seek(offset, io.SeekStart); err != nil {
prefix := fmt.Sprintf("%s-%s", this.identifier, "SyncObject.Seek")
err = fmt.Errorf("%s-Seek offset for header failed:%s", prefix, err)
logUtil.ErrorLog(err.Error())
panic(err)
}
command, readLen, err := Read(f)
if err != nil {
// 如果读取到文件末尾,判断是否等待
if err == io.EOF {
if this.sqlFileObj != nil && strings.Contains(filePath, this.sqlFileObj.FileName()) {
time.Sleep(20 * time.Millisecond)
continue
}
// 如果该文件是空文件,同步更新信息
return
}
prefix := fmt.Sprintf("%s-%s", this.identifier, "SyncObject.syncOneFile.f.Read")
err = fmt.Errorf("%s-Read header failed:%s", prefix, err)
logUtil.ErrorLog(err.Error())
panic(err)
}
// 3. 同步到mysql中,并更新同步位置
this.syncHandleObj.SyncOneSql(command, filePath, offset+readLen)
// 4. 更新内存中的同步位置
offset += readLen
}
}
// 打开待读取的文件
// filePath:待打开的文件
// 返回值:
// *os.File:文件句柄,
func (this *SyncObject) openFile(filePath string) (f *os.File, exist bool) {
var err error
for {
exist, err = fileUtil.IsFileExists(filePath)
if err != nil {
err = fmt.Errorf("check file error,filePath:%v error:%v", filePath, err.Error())
logUtil.ErrorLog(err.Error())
time.Sleep(time.Second * 5)
continue
}
if exist == false {
// 如果文件不存在,则跳过此文件
logUtil.WarnLog(fmt.Sprintf("file no exist, skip file:%v", filePath))
exist = false
return
}
exist = true
// 打开当前处理文件
f, err = os.OpenFile(filePath, os.O_RDONLY, os.ModePerm|os.ModeTemporary)
if err != nil {
prefix := fmt.Sprintf("%s-%s", this.identifier, "SyncObject.syncOneFile.os.OpenFile")
err = fmt.Errorf("%s-Open file:%s failed:%s", prefix, filePath, err)
logUtil.ErrorLog(err.Error())
time.Sleep(time.Second * 5)
continue
}
return
}
}
// 同步数据到mysql中
// command:sql语句
// tx:事务处理对象
// 返回值:
// error:错误信息
func (this *SyncObject) syncToMysql(command string, tx *sql.Tx) error {
_, err := tx.Exec(command)
if err != nil {
prefix := fmt.Sprintf("%s-%s", this.identifier, "SyncObject.syncToMysql")
err = fmt.Errorf("%s-%s Update to mysql failed:%s", prefix, command, err)
logUtil.ErrorLog(err.Error())
debugUtil.Printf("fatal Error:%v", err.Error())
return err
}
return nil
}
// 创新新的mysql同步对象
// dirPath:存放数据的目录
// identifier:当前数据的唯一标识(可以使用数据库表名)
// dbObj:数据库对象
// _syncHandleObj:同步处理对象
// 返回值:
// mysql同步对象
func NewSyncObject(dirPath, identifier string, dbObj *sql.DB, _syncHandleObj SyncHandler) *SyncObject {
dirPath = filepath.Join(dirPath, identifier)
// 创建更新目录
err := os.MkdirAll(dirPath, os.ModePerm|os.ModeTemporary)
if err != nil {
err = fmt.Errorf("%s-%s-make dir failed:%s", identifier, "SyncObject.newSyncObject.os.MkdirAll", err)
logUtil.ErrorLog(err.Error())
panic(err)
}
// 构造同步信息对象
result := &SyncObject{
dirPath: dirPath,
identifier: identifier,
dbObj: dbObj,
syncHandleObj: _syncHandleObj,
}
return result
}
// 同步处理接口
type SyncHandler interface {
// 初始化
Init(baseObj *SyncObject)
// 获取正在同步的信息
// filePath:文件路径
// offset:文件偏移量
GetSyncingInfo() (filePath string, offset int64)
// 更新
// filePath:文件路径
// offset:文件偏移量
// tran:事务对象
// 返回值:
// error:错误对象
Update(filePath string, offset int64, tran *sql.Tx) error
// 同步一条sql
// command:指令数据
// filePath:文件路径
// offset:文件偏移量
SyncOneSql(command string, filePath string, offset int64)
}