goProject/trunk/goutil/coroutine-timer/timer-mgr.go
皮蛋13361098506 1b77f62820 初始化项目
2025-01-06 16:01:02 +08:00

410 lines
8.0 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package coroutine_timer
import (
"fmt"
"math"
"time"
"goutil/logUtil"
"goutil/stringUtil"
)
const (
// 启动暂停时间
con_STAR_SLEEP_NUM = 3
// 秒级定时器卡槽数量
con_SECOND_SLOT_NUM = 60
//分钟级定时器卡槽数量
con_MINUTES_SLOT_NUM = 60
)
var (
// 秒级定时器下标
secIndex = 0
// 秒级定时器当前开始时间
secondStarTime int64
// 秒级定时器槽
secondsTimers [con_SECOND_SLOT_NUM]*timersModel
// 分钟级定时器下标
minIndex = 0
// 分钟级定时器当前开始时间
minStarTime int64
// 分钟级定时器槽
minutesTimers [con_MINUTES_SLOT_NUM]*timersModel
// 其他定时器存放槽
otherTimers *timersModel
// 操作通道
cmdChan chan *cmdModel
)
func init() {
for i := 0; i < con_SECOND_SLOT_NUM; i++ {
secondsTimers[i] = newTimersModel()
}
for i := 0; i < con_MINUTES_SLOT_NUM; i++ {
minutesTimers[i] = newTimersModel()
}
otherTimers = newTimersModel()
cmdChan = make(chan *cmdModel, 1000)
secondStarTime = time.Now().Unix()
minStarTime = secondStarTime + con_SECOND_SLOT_NUM
go chanHandler()
}
// AddTimer
// @description: 添加定时回调
// parameter:
//
// @afterSecond:延后多少时间执行
// @exfun:执行方法
// @obj:执行传入的参数
//
// return:
//
// @string:
func AddTimer(afterSecond int, exfun func(interface{}), obj interface{}) string {
tick := time.Now().Unix() + int64(afterSecond)
return AddTimer3(tick, exfun, obj)
}
// AddTimer2
// @description: 添加定时回调
// parameter:
//
// @t:执行时间点
// @exfun:执行方法
// @obj:执行传入的参数
//
// return:
//
// @string:
func AddTimer2(t time.Time, exfun func(interface{}), obj interface{}) string {
tick := t.Unix()
return AddTimer3(tick, exfun, obj)
}
// AddTimer3
// @description: 添加定时回调
// parameter:
//
// @tick:执行时间点
// @exfun:执行方法
// @obj:执行传入的参数
//
// return:
//
// @newId:
func AddTimer3(tick int64, exfun func(interface{}), obj interface{}) (newId string) {
newId = stringUtil.GetNewUUID()
newObj := newTimerObj(newId, tick, exfun, obj)
cnm := newCmdModel(cmd_add, newObj)
cmdChan <- cnm
return
}
// AddTimer4
// @description: 添加定时回调(此方法会在内部校验id所以性能会比其他AddTimer方法低)
// parameter:
//
// @id:定时id(外部需要自行保证id唯一)
// @tick:执行时间点
// @exfun:执行方法
// @obj:执行传入的参数
//
// return:
//
// @err:
func AddTimer4(id string, tick int64, exfun func(interface{}), obj interface{}) (err error) {
newObj := newTimerObj(id, tick, exfun, obj)
newObj.needCheckId = true
// 加入处理队列
cnm := newCmdModel(cmd_add, newObj)
cmdChan <- cnm
// 等待处理结束
<-cnm.waitChan
// 返回处理结果
err = cnm.err
return
}
// DeleteTimer
// @description: 删除定时器
// parameter:
//
// @id:
//
// return:
func DeleteTimer(id string) {
cnm := newCmdModel(cmd_del, id)
cmdChan <- cnm
}
// chanHandler
// @description: channel处理
// parameter:
// return:
func chanHandler() {
defer func() {
if err := recover(); err != nil {
logUtil.ErrorLog("coroutine-timer.excute err:%s", err)
}
}()
// 暂停一下再处理,避免启动立即处理,其他数据还没准备好
time.Sleep(con_STAR_SLEEP_NUM * time.Second)
at := time.After(time.Second * 1)
for {
select {
case cm := <-cmdChan:
switch cm.cmd {
case cmd_add:
cmdAdd(cm)
case cmd_del:
cmdDel(cm)
}
case <-at:
// byron:需要处理时间后调导致跳时间的问题:调整后应该马上执行的
// 计算需要执行的次数
n := time.Now().Unix() - secondStarTime - int64(secIndex)
if n > 0 {
// 执行对应次数的方法 --- 正常应该只执行1此调时间后此处会追时间
var i int64
for i = 0; i < n; i++ {
cmdRun()
}
}
at = time.After(time.Second * 1)
}
}
}
// cmdAdd
// @description: 添加定时器
// parameter:
//
// @cm:
//
// return:
func cmdAdd(cm *cmdModel) {
newObj := cm.paramObj.(*timerObj)
if newObj.needCheckId && checkTimerExist(newObj.id) {
cm.err = fmt.Errorf("已经存在id=%s的timer", newObj.id)
cm.waitChan <- struct{}{}
return
}
// 如果执行时间比当前时间小,则放入最近的调度卡槽,以便尽快执行
tick := newObj.tick
if tick <= (secondStarTime + int64(secIndex)) {
tick = (secondStarTime + int64(secIndex)) + 1
}
// 落在秒钟级别定时器上
if tick < (secondStarTime + con_SECOND_SLOT_NUM) {
index := (int)(tick - secondStarTime)
secondsTimers[index].addTimer(newObj)
cm.waitChan <- struct{}{}
return
}
// 落在分钟级别定时器上
if tick < (minStarTime + con_MINUTES_SLOT_NUM*con_SECOND_SLOT_NUM) {
index := (int)(tick-minStarTime) / con_SECOND_SLOT_NUM
minutesTimers[index].addTimer(newObj)
cm.waitChan <- struct{}{}
return
}
//落在小时级别定时器上
otherTimers.addTimer(newObj)
// 返回操作完成
cm.waitChan <- struct{}{}
}
// cmdDel
// @description: 删除timer
// parameter:
//
// @cm:
//
// return:
func cmdDel(cm *cmdModel) {
id := cm.paramObj.(string)
// 移除秒级别定时器
for _, item := range secondsTimers {
item.delTimer(id)
}
// 移除分种级定时器
for _, item := range minutesTimers {
item.delTimer(id)
}
// 移除时钟级定时器
otherTimers.delTimer(id)
// 返回操作完成
cm.waitChan <- struct{}{}
}
// cmdRun
// @description: 运行定时器
// parameter:
// return:
func cmdRun() {
defer func() {
if err := recover(); err != nil {
logUtil.ErrorLog("coroutine-timer.inExcute err:%s", err)
}
}()
// 执行秒级定时器
timers := getSencondTimers()
if len(timers) == 0 {
return
}
for _, t := range timers {
go safeRun(t)
}
}
// checkTimerExist
// @description: 校验timer是否存在
// parameter:
//
// @id:id
//
// return:
//
// @bool:
func checkTimerExist(id string) bool {
// 秒级别定时器检测
for _, item := range secondsTimers {
if item.exist(id) {
return true
}
}
// 分种级定时器检测
for _, item := range minutesTimers {
if item.exist(id) {
return true
}
}
// 时钟级定时器检测
return otherTimers.exist(id)
}
// getSencondTimers
// @description: 获取秒级定时器
// parameter:
// return:
//
// @result:
func getSencondTimers() (result []*timerObj) {
// 获取对应slot里面的定时对象
result = secondsTimers[secIndex].getAllTimers2()
secondsTimers[secIndex] = newTimersModel()
secIndex++
// 如果达到最大,则重新填装新的调度对象
if secIndex == con_SECOND_SLOT_NUM {
secIndex = 0
secondStarTime = secondStarTime + con_SECOND_SLOT_NUM
minTaskList := getMinutesTasks()
for _, t := range minTaskList {
index := t.tick - secondStarTime
secondsTimers[index].addTimer(t)
}
}
return
}
// getMinutesTasks
// @description: 获取分钟级定时器
// parameter:
// return:
//
// @result:
func getMinutesTasks() (result []*timerObj) {
// 获取对应slot里面的定时对象
result = minutesTimers[minIndex].getAllTimers2()
minutesTimers[minIndex] = newTimersModel()
minIndex++
// 如果达到最大,则重新填装新的调度对象
if minIndex == con_MINUTES_SLOT_NUM {
reInputMin()
}
return
}
// reInputMin
// @description: 重新填入分钟级定时器
// parameter:
// return:
func reInputMin() {
minIndex = 0
minStarTime = minStarTime + con_MINUTES_SLOT_NUM*con_SECOND_SLOT_NUM
delMap := make(map[string]struct{})
for _, t := range otherTimers.getAllTimers() {
index := (t.tick - minStarTime) / con_SECOND_SLOT_NUM
if index > math.MaxInt || index >= con_MINUTES_SLOT_NUM {
continue
}
minutesTimers[index].addTimer(t)
delMap[t.id] = struct{}{}
}
if len(delMap) > 0 {
for k := range delMap {
otherTimers.delTimer(k)
}
}
}
// safeRun
// @description: 安全运行定时器回调
// parameter:
//
// @t:
//
// return:
func safeRun(t *timerObj) {
defer func() {
if err := recover(); err != nil {
logUtil.ErrorLog("coroutine-timer.safeRun id:%s err:%s", t.id, err)
}
}()
t.excuteAction(t.paramObj)
}