56 lines
1.4 KiB
Go
56 lines
1.4 KiB
Go
package rabbitmq
|
||
|
||
import (
|
||
configYaml "common/configsYaml"
|
||
"goutil/logUtilPlus"
|
||
"strconv"
|
||
)
|
||
|
||
// ConsumeData 消费mq数据
|
||
func ConsumeData(handler func(data string) error) {
|
||
|
||
// 确保通道启用了 Publisher Confirms
|
||
if err := RabbitMQChannel.Confirm(false); err != nil {
|
||
logUtilPlus.ErrorLog("channel could not be put into confirm mode: %w", err)
|
||
}
|
||
|
||
//循环数据库数量
|
||
for _, index := range configYaml.GetDBNum() {
|
||
|
||
rabbitMQName := configYaml.GetRabbitMQName() + ":" + strconv.Itoa(index)
|
||
|
||
// 注册一个消费者
|
||
msgs, err := RabbitMQChannel.Consume(
|
||
rabbitMQName, // 队列名称
|
||
"", // 消费者名称
|
||
false, // 是否自动确认
|
||
false, // 是否排他
|
||
false, // 是否本地
|
||
false, // 是否阻塞
|
||
nil, // 其他参数
|
||
)
|
||
if err != nil {
|
||
logUtilPlus.ErrorLog("Failed to register a consumer,err:%s", err.Error())
|
||
}
|
||
|
||
// 启动一个 goroutine 来处理消息
|
||
go func() {
|
||
for d := range msgs {
|
||
|
||
// 消息处理
|
||
if err = handler(string(d.Body)); err != nil {
|
||
|
||
// 消息处理失败,记录错误日志
|
||
|
||
logUtilPlus.ErrorLog("Failed to handle message, err:%s", err.Error())
|
||
}
|
||
|
||
// 消息处理完成后手动确认
|
||
if err := d.Ack(false); err != nil {
|
||
logUtilPlus.ErrorLog("Failed to acknowledge message, err:%s", err.Error())
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
}
|