goProject/trunk/goutil/esLogUtil/esIndexHandler.go

114 lines
2.1 KiB
Go
Raw Permalink Normal View History

2025-01-06 16:01:02 +08:00
package esLogUtil
import (
"bytes"
"encoding/json"
"sync"
"time"
"github.com/elastic/go-elasticsearch/v8/esutil"
"golang.org/x/net/context"
"goutil/logUtil"
)
var (
indexMutex sync.Mutex
indexer esutil.BulkIndexer
)
func timedReindex() {
go func() {
defer func() {
if r := recover(); r != nil {
logUtil.LogUnknownError(r)
timedReindex()
time.Sleep(time.Second * 1)
}
}()
currentIndexName := getIndexName()
for {
//设置休眠
time.Sleep(time.Second * 1)
newIndexName := getIndexName()
if currentIndexName != getIndexName() {
newIndexer(newIndexName)
currentIndexName = newIndexName
}
}
}()
}
func newIndexer(newIndexName string) {
if indexer != nil {
closeIndex()
}
//wait until get the lock
indexMutex.Lock()
defer indexMutex.Unlock()
var err error
indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: getIndexName(), // The default index name
Client: esClient, // The Elasticsearch client
FlushInterval: time.Second, // The periodic flush interval
})
if err != nil {
logUtil.ErrorLog("[%s]: Creating the indexer err: %s", serverModuleName, err)
}
}
func closeIndex() {
//wait until get the lock
indexMutex.Lock()
defer indexMutex.Unlock()
if indexer == nil {
return
}
err := indexer.Close(context.Background())
if err != nil {
logUtil.ErrorLog("[%s]:Close err%s", serverModuleName, err.Error())
}
indexer = nil
}
// 批量保存到在线日志系统
// 参数:
//
// 数量
//
// 返回值:
//
// 日志列表对象
func bulkSendHandler(logObj EsLog) {
if esClient == nil || indexer == nil {
return
}
//try to get the lock in 10000 milliseconds,if cant obtain it,return false
indexMutex.Lock()
defer indexMutex.Unlock()
message, err := json.Marshal(logObj)
if err != nil {
logUtil.ErrorLog("[%s]: Marshal failed. Err:%s", serverModuleName, err)
return
}
err = indexer.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: "index",
Body: bytes.NewReader(message),
})
if err != nil {
logUtil.ErrorLog("[%s]: Add data err%s", serverModuleName, err.Error())
}
}