package impl_es import ( "bytes" "encoding/json" "fmt" "time" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esutil" "golang.org/x/net/context" ) const ( // 如果缓存中的日志数量超过阈值,则记录日志 con_WRITE_LOG_THRESHOLD_NUMBER = 1000 ) // Logger // @description: es日志处理对象 type Logger struct { esClient *elasticsearch.Client // esClient es客户端 blukIndexer esutil.BulkIndexer // blukIndexer es批量索引 indexNamePrefix string // indexNamePrefix 索引前缀 curIndexName string // curIndexName 当前索引名字 innerId string // innerId 系统唯一标识 extendCb func() string // extendCb 扩展信息获取方法 cache *logCache // cache 日志缓存 } // NewLogger // @description: 构造es日志对象 // parameter: // @urls: // @username: // @pwd: // @esIndexName: // @_innerId: // @_extendCb: // return: // @*Logger: // @error: func NewLogger(urls []string, username, pwd string, esIndexName, _innerId string, _extendCb func() string) (*Logger, error) { if _extendCb == nil { _extendCb = func() string { return "" } } esClient, err := elasticsearch.NewClient(elasticsearch.Config{ Addresses: urls, Username: username, Password: pwd, }) if err != nil { return nil, err } l := &Logger{ esClient: esClient, blukIndexer: nil, indexNamePrefix: esIndexName, curIndexName: "", innerId: _innerId, extendCb: _extendCb, cache: newlogCache(), } l.curIndexName = l.getCurIndexName() l.blukIndexer, err = l.newBlukIndexer() if err != nil { return nil, err } // 启动日志处理 l.start() return l, nil } // start // @description: 启动日志处理 // parameter: // @receiver l: // return: func (l *Logger) start() { go l.logHandlerStart() go l.indexNameCheck() } // getCurIndexName // @description: 获取索引名字 // parameter: // @receiver l: // return: // @string: func (l *Logger) getCurIndexName() string { //获取当天日期 return fmt.Sprintf("%s_%s", l.indexNamePrefix, time.Now().Format("20060102")) } // newBlukIndexer // @description: 创建新的BlukIndexer // parameter: // @receiver l: // return: // @bulkIndexer: // @err: func (l *Logger) newBlukIndexer() (bulkIndexer esutil.BulkIndexer, err error) { bulkIndexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Index: l.getCurIndexName(), // The default index name Client: l.esClient, // The Elasticsearch client FlushInterval: time.Second, // The periodic flush interval }) if err != nil { fmt.Println("[es log]: Creating the indexer err: ", err) } return } // 日志处理 func (l *Logger) logHandlerStart() { defer func() { if r := recover(); r != nil { fmt.Println("[es log]: Creating the indexer err: ", r) go l.logHandlerStart() } }() time.Sleep(1 * time.Second) for { // 达到指定的时间或者指定的日志数量,则保存到消息队列中去 logCount := l.cache.getCacheLogCount() if logCount == 0 || l.esClient == nil { time.Sleep(time.Second * 1) continue } // 记录在线日志待发送数量 if logCount > con_WRITE_LOG_THRESHOLD_NUMBER { fmt.Printf("[es log]: 当前缓存中共有%d条未发送到在线日志系统的日志", logCount) } // 执行刷新 l.bulkPushToOnlineLogSystem(l.cache.getCacheLog(logCount)) } } // indexNameCheck // @description: index名字校验 // parameter: // @receiver l: // return: func (l *Logger) indexNameCheck() { defer func() { if r := recover(); r != nil { fmt.Println("[es log]: indexNameCheck err: ", r) go l.indexNameCheck() } }() time.Sleep(1 * time.Second) for { time.Sleep(time.Second * 1) tempIndexName := l.getCurIndexName() if tempIndexName == l.curIndexName { continue } // 关闭老的blukIndexer+建立新的blukIndexer newBlukIndexer, err := l.newBlukIndexer() if err != nil { continue } tempBlukIndexer := l.blukIndexer l.blukIndexer = newBlukIndexer l.curIndexName = tempIndexName // 暂停3s,等待正在写入的数据写入完成 time.Sleep(3 * time.Second) err = tempBlukIndexer.Close(context.Background()) if err != nil { fmt.Println("[es log]:BlukIndexer close err:", err.Error()) } } } // bulkPushToOnlineLogSystem // @description: 批量保存到在线日志系统 // parameter: // @receiver l: // @logList: // return: func (l *Logger) bulkPushToOnlineLogSystem(logList []*EsLogModel) { for _, logObj := range logList { message, err := json.Marshal(logObj) if err != nil { fmt.Println("[es log]: Marshal failed. Err:", err) continue } err = l.blukIndexer.Add( context.Background(), esutil.BulkIndexerItem{ Action: "index", Body: bytes.NewReader(message), }) if err != nil { fmt.Println("[es log]: Add data err:", err.Error()) } } } // buildOnlineLog // @description: 组装es日志对象 // parameter: // @receiver l: // @logType: // @format: // @args: // return: // @newLogObj: func (l *Logger) buildEsLog(logType, format string, args ...interface{}) (newLogObj *EsLogModel) { msg := format if len(args) > 0 { msg = fmt.Sprintf(format, args...) } // 构造新的日志对象 newLogObj = newEsLogModel(logType, msg, l.innerId, l.extendCb) return }