goProject/trunk/goutil/redisUtil/pubsub.go

132 lines
2.6 KiB
Go
Raw Permalink Normal View History

2025-01-06 16:01:02 +08:00
package redisUtil
import (
"context"
"fmt"
"time"
"github.com/gomodule/redigo/redis"
)
// 订阅回调函数
type SubscribeCallback func() error
// Subscriber
// @description: 订阅者
type Subscriber struct {
// pool 订阅者连接池
pool *RedisPool
// callBack 订阅者回调函数
callBack SubscribeCallback
}
// NewSubscriber
// @description: 构建一个订阅者
// parameter:
// @pool:
// @callBack:
// return:
// @*Subscriber:
func NewSubscriber(pool *RedisPool, callBack SubscribeCallback) *Subscriber {
return &Subscriber{pool: pool, callBack: callBack}
}
// Promulgator
// @description: 发布者
type Promulgator struct {
// pool 发布者连接池
pool *RedisPool
}
// NewPromulgator
// @description: 构建一个发布者
// parameter:
// @pool:
// return:
// @*Promulgator:
func NewPromulgator(pool *RedisPool) *Promulgator {
return &Promulgator{pool: pool}
}
// Publish
// @description: 发布消息
// parameter:
// @receiver s:
// @channel:
// @message:
// return:
// @error:
func (s *Promulgator) Publish(channel, message string) error {
c := s.pool.GetConnection()
defer c.Close()
_, err := c.Do("PUBLISH", channel, message)
if err != nil {
return fmt.Errorf("redis publish %s %s, err: %v", channel, message, err)
}
//n, err := s.pool.Int(result)
//if err != nil {
// return fmt.Errorf("redis publish %s %s, err: %v", channel, message, err)
//}
return nil
}
// Subscribe
// @description: 订阅者订阅消息
// parameter:
// @receiver s:
// @ctx:
// @channel: 频道
// return:
// @error:
func (s *Subscriber) Subscribe(ctx context.Context, channel ...string) error {
sub := redis.PubSubConn{Conn: s.pool.GetConnection()}
if err := sub.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
return err
}
done := make(chan error, 1)
// 启动一个新协程去持续订阅消息
go func() {
defer sub.Close()
for {
switch msg := sub.Receive().(type) {
case error:
done <- fmt.Errorf("redis pubsub receive err: %v", msg)
return
case redis.Message:
if err := s.callBack(); err != nil {
done <- err
return
}
case redis.Subscription:
if msg.Count == 0 {
// 所有的订阅者都退出
done <- nil
return
}
}
}
}()
// health check
tick := time.NewTicker(time.Minute)
defer tick.Stop()
for {
select {
case <-ctx.Done():
if err := sub.Unsubscribe(); err != nil {
return fmt.Errorf("redis pubsub unsubscribe err: %v", err)
}
return nil
case err := <-done:
return err
case <-tick.C:
if err := sub.Ping(""); err != nil {
return err
}
}
}
}