goProject/trunk/center/common/rabbitmq/consume_data.go
2025-01-23 16:12:49 +08:00

56 lines
1.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package rabbitmq
import (
configYaml "common/configsYaml"
"goutil/logUtilPlus"
"strconv"
)
// ConsumeData 消费mq数据
func ConsumeData(handler func(data string) error) {
// 确保通道启用了 Publisher Confirms
if err := RabbitMQChannel.Confirm(false); err != nil {
logUtilPlus.ErrorLog("channel could not be put into confirm mode: %w", err)
}
//循环数据库数量
for _, index := range configYaml.GetDBNum() {
rabbitMQName := configYaml.GetRabbitMQName() + ":" + strconv.Itoa(index)
// 注册一个消费者
msgs, err := RabbitMQChannel.Consume(
rabbitMQName, // 队列名称
"", // 消费者名称
false, // 是否自动确认
false, // 是否排他
false, // 是否本地
false, // 是否阻塞
nil, // 其他参数
)
if err != nil {
logUtilPlus.ErrorLog("Failed to register a consumererr:%s", err.Error())
}
// 启动一个 goroutine 来处理消息
go func() {
for d := range msgs {
// 消息处理
if err = handler(string(d.Body)); err != nil {
// 消息处理失败,记录错误日志
logUtilPlus.ErrorLog("Failed to handle message, err:%s", err.Error())
}
// 消息处理完成后手动确认
if err := d.Ack(false); err != nil {
logUtilPlus.ErrorLog("Failed to acknowledge message, err:%s", err.Error())
}
}
}()
}
}