goProject/trunk/goutil/esLogUtil/esLogUtil.go

270 lines
4.4 KiB
Go
Raw Permalink Normal View History

2025-01-06 16:01:02 +08:00
package esLogUtil
import (
"fmt"
"sync"
"time"
"github.com/elastic/go-elasticsearch/v8"
"goutil/logUtil"
"goutil/stringUtil"
)
var (
serverModuleName = "esLog"
esClient *elasticsearch.Client
indexName string //Index名
strServerGroupId string //区服
isStop bool
logChan = make(chan EsLog, 2048)
warnCount = 2000
closedChan = make(chan struct{})
logPool = sync.Pool{
New: func() interface{} {
return &EsLog{}
},
}
)
// 启动ES日志系统
// 参数:
//
// esUrlsES地址(多个地址使用,分割)
// nameIndexName
// serverGroupId服务器组Id
//
// 返回值:
//
// 结果状态
func Start(esUrls string, name string, serverGroupId int32) {
if stringUtil.IsEmpty(esUrls) {
return
}
//构造Es客户端
var err error
esClient, err = elasticsearch.NewClient(elasticsearch.Config{
Addresses: stringUtil.Split(esUrls, []string{","}),
// Retry on 429 TooManyRequests statuses
//
RetryOnStatus: []int{502, 503, 504, 429},
// A simple incremental backoff function
//
RetryBackoff: func(i int) time.Duration { return time.Duration(i) * 100 * time.Millisecond },
// Retry up to 5 attempts
//
MaxRetries: 5,
})
if err != nil {
panic(fmt.Sprintf("构造es对象出错err:%s", err.Error()))
}
indexName = name
strServerGroupId = fmt.Sprintf("%d", serverGroupId)
//初始化ES
newIndexer(getIndexName())
timedReindex()
startSendProcessor()
guardProcessor()
return
}
// 停止服务
func Stop() {
//停止接受日志
isStop = true
if indexer == nil {
return
}
if len(logChan) == 0 {
close(logChan)
}
<-closedChan
}
//#region 内部方法
func guardProcessor() {
go func() {
defer func() {
if r := recover(); r != nil {
logUtil.LogUnknownError(r)
time.Sleep(1 * time.Second)
guardProcessor()
}
}()
for {
time.Sleep(5 * time.Second)
count := len(logChan)
if count < warnCount {
continue
}
logUtil.NormalLog(fmt.Sprintf("ES日志通道中当前有%d条消息待消费。", count), logUtil.Warn)
}
}()
}
func startSendProcessor() {
go func() {
defer func() {
if r := recover(); r != nil {
logUtil.LogUnknownError(r)
time.Sleep(1 * time.Second)
startSendProcessor()
}
}()
for {
select {
case logObj, ok := <-logChan:
if ok {
bulkSendHandler(logObj) // 执行刷新
}
if len(logChan) == 0 && isStop {
// is closed
closeIndex()
closedChan <- struct{}{}
return
}
}
}
}()
}
func getIndexName() string {
//获取当天日期
return fmt.Sprintf("%s_%s", indexName, time.Now().Format("20060102"))
}
// 写入在线日志
// 参数:
//
// 日志信息对象
//
// 返回值:
//
// 无
func writeLog(logObj *EsLog) {
if isStop || indexer == nil {
return
}
logChan <- *logObj
}
// 组装ES日志对象
// 参数:
//
// logType 日志类型
// format 日志格式
// args 参数列表
//
// 返回值:
//
// 结果状态
func buildLog(logType, format string, args ...interface{}) (newLogObj *EsLog) {
msg := format
if len(args) > 0 {
msg = fmt.Sprintf(format, args...)
}
//构造新的日志对象
newLogObj = new(logType, msg, strServerGroupId)
return
}
//#endregion
//#region 外部方法
// 日志记录
//
// format:日志格式
// logType:日志类型
// args:参数列表
//
// 返回值
//
// 无
func NormalLog(format string, logType logUtil.LogType, args ...interface{}) {
writeLog(buildLog(logType.String(), format, args...))
}
// 消息日志记录
//
// format:日志格式
// args:参数列表
//
// 返回值
//
// 无
func InfoLog(format string, args ...interface{}) {
writeLog(buildLog("Info", format, args...))
}
// 警告日志记录
//
// format:日志格式
// args:参数列表
//
// 返回值
//
// 无
func WarnLog(format string, args ...interface{}) {
writeLog(buildLog("Warn", format, args...))
}
// 调试日志记录
//
// format:日志格式
// args:参数列表
//
// 返回值
//
// 无
func DebugLog(format string, args ...interface{}) {
writeLog(buildLog("Debug", format, args...))
}
// 错误日志记录
//
// format:日志格式
// args:参数列表
//
// 返回值
//
// 无
func ErrorLog(format string, args ...interface{}) {
writeLog(buildLog("Error", format, args...))
}
// 致命错误日志记录
//
// format:日志格式
// args:参数列表
//
// 返回值
//
// 无
func FatalLog(format string, args ...interface{}) {
writeLog(buildLog("Fatal", format, args...))
}
//#endregion