goProject/trunk/framework/mqMgr/queue.go

379 lines
9.1 KiB
Go
Raw Normal View History

2025-01-06 16:01:02 +08:00
/*
我们使用腾讯云的CMQ作为公司的消息队列基础服务
产品的说明文档https://cloud.tencent.com/document/product/406
*/
package mqMgr
import (
"encoding/json"
"errors"
"fmt"
"strings"
. "Framework/mqMgr/model"
"goutil/webUtil"
)
const (
EMPTY_MESSAGE_ERROR = "消息为空"
EMPTY_HANDLE_ERROR = "句柄为空"
MIN_DELAY_SECONDS = 0
MIN_POLLING_WAIT_SECONDS = 0
MAX_POLLING_WAIT_SECONDS = 30
MAX_BATCH_COUNT = 16
EMPTY_BATCH_LIST_ERROR = "批量消息为空"
EXCEED_MAX_BATCH_COUNT_ERROR = "批量消息数量超过上限最多为16条"
)
// Queue对象
type Queue struct {
// 地域
region string
// 网络类型:内网、外网
network string
// 队列名称
queueName string
// API密钥Id
secretId string
// API密钥key
secretKey string
}
// 执行操作
func (this *Queue) action(requestObj IRequest, responseObj interface{}) error {
requestObj.SetCommonRequest(NewCommonRequest(requestObj.GetActionName(), this.region, this.secretId, this.queueName))
// 组装请求url
paramMap := requestObj.AssembleParamMap()
url, signature, err := AssembleUrl(this.region, this.network, MQ_TYPE_QUEUE, this.secretKey, paramMap)
if err != nil {
return err
}
paramMap["Signature"] = signature
//请求url,请求头
header := webUtil.GetFormHeader()
transport := webUtil.NewTransport()
transport.DisableKeepAlives = true
transport = webUtil.GetTimeoutTransport(transport, 10)
statusCode, result, err := webUtil.PostMapData(url, paramMap, header, transport)
//statusCode, result, err := webUtil.PostMapData(url, paramMap, header, nil)
if err != nil {
return err
}
if statusCode != 200 {
return fmt.Errorf("Wrong status from server:%d", statusCode)
}
// 解析请求结果
err = json.Unmarshal(result, responseObj)
return err
}
// 处理发送消息延迟的时间
func (this *Queue) handleDelaySeconds(delaySeconds int) int {
if delaySeconds < MIN_DELAY_SECONDS {
delaySeconds = MIN_DELAY_SECONDS
}
return delaySeconds
}
// 处理当没有消息是轮询等待的时间
func (this *Queue) handlePollingWaitSeconds(pollingWaitSeconds int) int {
if pollingWaitSeconds < MIN_POLLING_WAIT_SECONDS {
pollingWaitSeconds = MIN_POLLING_WAIT_SECONDS
} else if pollingWaitSeconds > MAX_POLLING_WAIT_SECONDS {
pollingWaitSeconds = MAX_POLLING_WAIT_SECONDS
}
return pollingWaitSeconds
}
// 验证批量处理的列表
func (this *Queue) validBatchList(list []string) error {
if list == nil || len(list) == 0 {
return errors.New(EMPTY_BATCH_LIST_ERROR)
}
if len(list) > MAX_BATCH_COUNT {
return errors.New(EXCEED_MAX_BATCH_COUNT_ERROR)
}
return nil
}
// error:错误对象
func (this *Queue) GetQueueAttributes() (err error) {
// 逻辑处理
requestObj := NewGetQueueAttributesRequest()
responseObj := NewGetQueueAttributesResponse()
// 发送请求
err = this.action(requestObj, &responseObj)
if err != nil {
return
}
if responseObj.IsFailure() {
err = errors.New(responseObj.Message)
return
}
return
}
// SendMessage 发送单条消息
// 参数
// message:消息内容
// delaySeconds:单位为秒,表示该消息发送到队列后,需要延时多久用户才可见该消息。
// 返回值
// error:错误对象
func (this *Queue) SendMessage(message string, delaySeconds int) (err error) {
// 参数验证和处理
if message == "" {
err = errors.New(EMPTY_MESSAGE_ERROR)
return
}
delaySeconds = this.handleDelaySeconds(delaySeconds)
// 逻辑处理
requestObj := NewSendMessageRequest(message, delaySeconds)
responseObj := NewSendMessageResponse()
// 发送请求
err = this.action(requestObj, &responseObj)
if err != nil {
return
}
if responseObj.IsFailure() {
err = errors.New(responseObj.Message)
return
}
return
}
// BatchSendMessage 批量发送消息
// 参数
// messageList:消息内容列表
// delaySeconds:单位为秒,表示该消息发送到队列后,需要延时多久用户才可见该消息。
// 返回值
// error:错误对象
func (this *Queue) BatchSendMessage(messageList []string, delaySeconds int) (err error) {
// 参数验证和处理
err = this.validBatchList(messageList)
if err != nil {
return
}
delaySeconds = this.handleDelaySeconds(delaySeconds)
// 逻辑处理
requestObj := NewBatchSendMessageRequest(messageList, delaySeconds)
responseObj := NewBatchSendMessageResponse()
// 发送请求
err = this.action(requestObj, &responseObj)
if err != nil {
return
}
if responseObj.IsFailure() {
err = errors.New(responseObj.Message)
return
}
return
}
// Receive 消费单条消息
// pollingWaitSeconds:本次请求的长轮询等待时间。取值范围0 - 30秒如果不设置该参数则默认使用队列属性中的 pollingWaitSeconds 值。
// 返回值
// receiptHandle:消息句柄
// message:消息内容
// exist:是否存在数据
// err:错误对象
func (this *Queue) ReceiveMessage(pollingWaitSeconds int) (receiptHandle, message string, exist bool, err error) {
// 参数验证和处理
pollingWaitSeconds = this.handlePollingWaitSeconds(pollingWaitSeconds)
// 逻辑处理
requestObj := NewReceiveMessageRequest(pollingWaitSeconds)
responseObj := NewReceiveMessageResponse()
// 发送请求
err = this.action(requestObj, &responseObj)
if err != nil {
return
}
// 忽略掉没有消息的错误
if responseObj.HaveNoMessage() {
return
}
if responseObj.IsFailure() {
err = errors.New(responseObj.Message)
return
}
receiptHandle = responseObj.ReceiptHandle
message = responseObj.MsgBody
exist = true
return
}
// BatchReceiveMessage 批量消费消息
// 参数
// numOfMsg:本次消费的消息数量
// pollingWaitSeconds:本次请求的长轮询等待时间。取值范围0 - 30秒如果不设置该参数则默认使用队列属性中的 pollingWaitSeconds 值。
// 返回值
// receiptHandleList:消息句柄列表
// messageList:消息内容列表
// exist:是否存在数据
// err:错误对象
func (this *Queue) BatchReceiveMessage(numOfMsg, pollingWaitSeconds int) (receiptHandleList, messageList []string, exist bool, err error) {
// 参数验证和处理
if numOfMsg > MAX_BATCH_COUNT {
err = errors.New(EXCEED_MAX_BATCH_COUNT_ERROR)
return
}
pollingWaitSeconds = this.handlePollingWaitSeconds(pollingWaitSeconds)
// 逻辑处理
requestObj := NewBatchReceiveMessageRequest(numOfMsg, pollingWaitSeconds)
responseObj := NewBatchReceiveMessageResponse()
// 发送请求
err = this.action(requestObj, &responseObj)
if err != nil {
return
}
// 忽略掉没有消息的错误
if responseObj.HaveNoMessage() {
return
}
if responseObj.IsFailure() {
err = errors.New(responseObj.Message)
return
}
// 组装返回
receiptHandleList = make([]string, 0, numOfMsg)
messageList = make([]string, 0, numOfMsg)
for _, msgInfo := range responseObj.MsgInfoList {
receiptHandleList = append(receiptHandleList, msgInfo.ReceiptHandle)
messageList = append(messageList, msgInfo.MsgBody)
}
exist = true
return
}
// DeleteMessage 删除单条消息
// 参数
// receiptHandle:消息句柄
// 返回值
// error:错误对象
func (this *Queue) DeleteMessage(receiptHandle string) (err error) {
// 参数验证和处理
if receiptHandle == "" {
err = errors.New(EMPTY_HANDLE_ERROR)
return
}
// 逻辑处理
requestObj := NewDeleteMessageRequest(receiptHandle)
responseObj := NewDeleteMessageResponse()
// 发送请求
err = this.action(requestObj, &responseObj)
if err != nil {
return err
}
if responseObj.IsFailure() {
err = errors.New(responseObj.Message)
return
}
return
}
// BatchDeleteMessage 批量删除消息
// 参数
// receiptHandleList:消息句柄列表
// 返回值
// errorMap:删除错误的字典(key:删除失败的消息句柄;value:删除失败的原因)
// err:错误对象
func (this *Queue) BatchDeleteMessage(receiptHandleList []string) (errorMap map[string]string, err error) {
// 参数验证和处理
err = this.validBatchList(receiptHandleList)
if err != nil {
return
}
// 逻辑处理
requestObj := NewBatchDeleteMessageRequest(receiptHandleList)
responseObj := NewBatchDeleteMessageResponse()
// 发送请求
err = this.action(requestObj, &responseObj)
if err != nil {
return
}
if responseObj.IsFailure() {
err = errors.New(responseObj.Message)
// 组装返回
errorMap = make(map[string]string)
for _, errInfo := range responseObj.ErrorList {
errorMap[errInfo.ReceiptHandle] = errInfo.Message
}
return
}
return
}
// 创建新的Queue对象
func NewQueue(region, queueName, secretId, secretKey string) *Queue {
queueConfigObj := &QueueConfig{
Region: region,
QueueName: queueName,
SecretId: secretId,
SecretKey: secretKey,
}
return NewQueueByConfig(queueConfigObj)
}
// 通过队列配置对象创建新的Queue对象
func NewQueueByConfig(queueConfigObj *QueueConfig) *Queue {
queueObj := &Queue{
region: queueConfigObj.Region,
network: MQ_NETWORK_INTERNAL,
queueName: queueConfigObj.QueueName,
secretId: queueConfigObj.SecretId,
secretKey: queueConfigObj.SecretKey,
}
err := queueObj.GetQueueAttributes()
if err != nil {
if strings.Contains(err.Error(), MQ_NETWORK_INTERNAL) {
queueObj.network = MQ_NETWORK_PUBLIC
}
}
return queueObj
}