114 lines
2.1 KiB
Go
114 lines
2.1 KiB
Go
package esLogUtil
|
||
|
||
import (
|
||
"bytes"
|
||
"encoding/json"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/elastic/go-elasticsearch/v8/esutil"
|
||
"golang.org/x/net/context"
|
||
"goutil/logUtil"
|
||
)
|
||
|
||
var (
|
||
indexMutex sync.Mutex
|
||
indexer esutil.BulkIndexer
|
||
)
|
||
|
||
func timedReindex() {
|
||
go func() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
logUtil.LogUnknownError(r)
|
||
|
||
timedReindex()
|
||
time.Sleep(time.Second * 1)
|
||
}
|
||
}()
|
||
|
||
currentIndexName := getIndexName()
|
||
for {
|
||
//设置休眠
|
||
time.Sleep(time.Second * 1)
|
||
|
||
newIndexName := getIndexName()
|
||
if currentIndexName != getIndexName() {
|
||
newIndexer(newIndexName)
|
||
currentIndexName = newIndexName
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
func newIndexer(newIndexName string) {
|
||
if indexer != nil {
|
||
closeIndex()
|
||
}
|
||
|
||
//wait until get the lock
|
||
indexMutex.Lock()
|
||
defer indexMutex.Unlock()
|
||
|
||
var err error
|
||
indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
|
||
Index: getIndexName(), // The default index name
|
||
Client: esClient, // The Elasticsearch client
|
||
FlushInterval: time.Second, // The periodic flush interval
|
||
})
|
||
if err != nil {
|
||
logUtil.ErrorLog("[%s]: Creating the indexer err: %s", serverModuleName, err)
|
||
}
|
||
}
|
||
|
||
func closeIndex() {
|
||
//wait until get the lock
|
||
indexMutex.Lock()
|
||
defer indexMutex.Unlock()
|
||
|
||
if indexer == nil {
|
||
return
|
||
}
|
||
|
||
err := indexer.Close(context.Background())
|
||
if err != nil {
|
||
logUtil.ErrorLog("[%s]:Close err:%s", serverModuleName, err.Error())
|
||
}
|
||
|
||
indexer = nil
|
||
}
|
||
|
||
// 批量保存到在线日志系统
|
||
// 参数:
|
||
//
|
||
// 数量
|
||
//
|
||
// 返回值:
|
||
//
|
||
// 日志列表对象
|
||
func bulkSendHandler(logObj EsLog) {
|
||
if esClient == nil || indexer == nil {
|
||
return
|
||
}
|
||
|
||
//try to get the lock in 10000 milliseconds,if cant obtain it,return false
|
||
indexMutex.Lock()
|
||
defer indexMutex.Unlock()
|
||
|
||
message, err := json.Marshal(logObj)
|
||
if err != nil {
|
||
logUtil.ErrorLog("[%s]: Marshal failed. Err:%s", serverModuleName, err)
|
||
return
|
||
}
|
||
|
||
err = indexer.Add(
|
||
context.Background(),
|
||
esutil.BulkIndexerItem{
|
||
Action: "index",
|
||
Body: bytes.NewReader(message),
|
||
})
|
||
if err != nil {
|
||
logUtil.ErrorLog("[%s]: Add data err:%s", serverModuleName, err.Error())
|
||
}
|
||
}
|