goProject/.svn/pristine/ca/cac300511b92e329a4c7f42a1a4482984549452e.svn-base

237 lines
5.3 KiB
Plaintext
Raw Permalink Normal View History

2025-01-06 16:21:36 +08:00
package impl_es
import (
"bytes"
"encoding/json"
"fmt"
"time"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
"golang.org/x/net/context"
)
const (
// 如果缓存中的日志数量超过阈值,则记录日志
con_WRITE_LOG_THRESHOLD_NUMBER = 1000
)
// Logger
// @description: es日志处理对象
type Logger struct {
esClient *elasticsearch.Client // esClient es客户端
blukIndexer esutil.BulkIndexer // blukIndexer es批量索引
indexNamePrefix string // indexNamePrefix 索引前缀
curIndexName string // curIndexName 当前索引名字
innerId string // innerId 系统唯一标识
extendCb func() string // extendCb 扩展信息获取方法
cache *logCache // cache 日志缓存
}
// NewLogger
// @description: 构造es日志对象
// parameter:
// @urls:
// @username:
// @pwd:
// @esIndexName:
// @_innerId:
// @_extendCb:
// return:
// @*Logger:
// @error:
func NewLogger(urls []string, username, pwd string, esIndexName, _innerId string, _extendCb func() string) (*Logger, error) {
if _extendCb == nil {
_extendCb = func() string {
return ""
}
}
esClient, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: urls,
Username: username,
Password: pwd,
})
if err != nil {
return nil, err
}
l := &Logger{
esClient: esClient,
blukIndexer: nil,
indexNamePrefix: esIndexName,
curIndexName: "",
innerId: _innerId,
extendCb: _extendCb,
cache: newlogCache(),
}
l.curIndexName = l.getCurIndexName()
l.blukIndexer, err = l.newBlukIndexer()
if err != nil {
return nil, err
}
// 启动日志处理
l.start()
return l, nil
}
// start
// @description: 启动日志处理
// parameter:
// @receiver l:
// return:
func (l *Logger) start() {
go l.logHandlerStart()
go l.indexNameCheck()
}
// getCurIndexName
// @description: 获取索引名字
// parameter:
// @receiver l:
// return:
// @string:
func (l *Logger) getCurIndexName() string {
//获取当天日期
return fmt.Sprintf("%s_%s", l.indexNamePrefix, time.Now().Format("20060102"))
}
// newBlukIndexer
// @description: 创建新的BlukIndexer
// parameter:
// @receiver l:
// return:
// @bulkIndexer:
// @err:
func (l *Logger) newBlukIndexer() (bulkIndexer esutil.BulkIndexer, err error) {
bulkIndexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: l.getCurIndexName(), // The default index name
Client: l.esClient, // The Elasticsearch client
FlushInterval: time.Second, // The periodic flush interval
})
if err != nil {
fmt.Println("[es log]: Creating the indexer err: ", err)
}
return
}
// 日志处理
func (l *Logger) logHandlerStart() {
defer func() {
if r := recover(); r != nil {
fmt.Println("[es log]: Creating the indexer err: ", r)
go l.logHandlerStart()
}
}()
time.Sleep(1 * time.Second)
for {
// 达到指定的时间或者指定的日志数量,则保存到消息队列中去
logCount := l.cache.getCacheLogCount()
if logCount == 0 || l.esClient == nil {
time.Sleep(time.Second * 1)
continue
}
// 记录在线日志待发送数量
if logCount > con_WRITE_LOG_THRESHOLD_NUMBER {
fmt.Printf("[es log]: 当前缓存中共有%d条未发送到在线日志系统的日志", logCount)
}
// 执行刷新
l.bulkPushToOnlineLogSystem(l.cache.getCacheLog(logCount))
}
}
// indexNameCheck
// @description: index名字校验
// parameter:
// @receiver l:
// return:
func (l *Logger) indexNameCheck() {
defer func() {
if r := recover(); r != nil {
fmt.Println("[es log]: indexNameCheck err: ", r)
go l.indexNameCheck()
}
}()
time.Sleep(1 * time.Second)
for {
time.Sleep(time.Second * 1)
tempIndexName := l.getCurIndexName()
if tempIndexName == l.curIndexName {
continue
}
// 关闭老的blukIndexer+建立新的blukIndexer
newBlukIndexer, err := l.newBlukIndexer()
if err != nil {
continue
}
tempBlukIndexer := l.blukIndexer
l.blukIndexer = newBlukIndexer
l.curIndexName = tempIndexName
// 暂停3s等待正在写入的数据写入完成
time.Sleep(3 * time.Second)
err = tempBlukIndexer.Close(context.Background())
if err != nil {
fmt.Println("[es log]:BlukIndexer close err", err.Error())
}
}
}
// bulkPushToOnlineLogSystem
// @description: 批量保存到在线日志系统
// parameter:
// @receiver l:
// @logList:
// return:
func (l *Logger) bulkPushToOnlineLogSystem(logList []*EsLogModel) {
for _, logObj := range logList {
message, err := json.Marshal(logObj)
if err != nil {
fmt.Println("[es log]: Marshal failed. Err:", err)
continue
}
err = l.blukIndexer.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: "index",
Body: bytes.NewReader(message),
})
if err != nil {
fmt.Println("[es log]: Add data err", err.Error())
}
}
}
// buildOnlineLog
// @description: 组装es日志对象
// parameter:
// @receiver l:
// @logType:
// @format:
// @args:
// return:
// @newLogObj:
func (l *Logger) buildEsLog(logType, format string, args ...interface{}) (newLogObj *EsLogModel) {
msg := format
if len(args) > 0 {
msg = fmt.Sprintf(format, args...)
}
// 构造新的日志对象
newLogObj = newEsLogModel(logType, msg, l.innerId, l.extendCb)
return
}