/* 我们使用腾讯云的CMQ作为公司的消息队列基础服务 产品的说明文档:https://cloud.tencent.com/document/product/406 */ package mqMgr import ( "encoding/json" "errors" "fmt" "strings" . "Framework/mqMgr/model" "goutil/webUtil" ) const ( EMPTY_MESSAGE_ERROR = "消息为空" EMPTY_HANDLE_ERROR = "句柄为空" MIN_DELAY_SECONDS = 0 MIN_POLLING_WAIT_SECONDS = 0 MAX_POLLING_WAIT_SECONDS = 30 MAX_BATCH_COUNT = 16 EMPTY_BATCH_LIST_ERROR = "批量消息为空" EXCEED_MAX_BATCH_COUNT_ERROR = "批量消息数量超过上限,最多为16条" ) // Queue对象 type Queue struct { // 地域 region string // 网络类型:内网、外网 network string // 队列名称 queueName string // API密钥Id secretId string // API密钥key secretKey string } // 执行操作 func (this *Queue) action(requestObj IRequest, responseObj interface{}) error { requestObj.SetCommonRequest(NewCommonRequest(requestObj.GetActionName(), this.region, this.secretId, this.queueName)) // 组装请求url paramMap := requestObj.AssembleParamMap() url, signature, err := AssembleUrl(this.region, this.network, MQ_TYPE_QUEUE, this.secretKey, paramMap) if err != nil { return err } paramMap["Signature"] = signature //请求url,请求头 header := webUtil.GetFormHeader() transport := webUtil.NewTransport() transport.DisableKeepAlives = true transport = webUtil.GetTimeoutTransport(transport, 10) statusCode, result, err := webUtil.PostMapData(url, paramMap, header, transport) //statusCode, result, err := webUtil.PostMapData(url, paramMap, header, nil) if err != nil { return err } if statusCode != 200 { return fmt.Errorf("Wrong status from server:%d", statusCode) } // 解析请求结果 err = json.Unmarshal(result, responseObj) return err } // 处理发送消息延迟的时间 func (this *Queue) handleDelaySeconds(delaySeconds int) int { if delaySeconds < MIN_DELAY_SECONDS { delaySeconds = MIN_DELAY_SECONDS } return delaySeconds } // 处理当没有消息是轮询等待的时间 func (this *Queue) handlePollingWaitSeconds(pollingWaitSeconds int) int { if pollingWaitSeconds < MIN_POLLING_WAIT_SECONDS { pollingWaitSeconds = MIN_POLLING_WAIT_SECONDS } else if pollingWaitSeconds > MAX_POLLING_WAIT_SECONDS { pollingWaitSeconds = MAX_POLLING_WAIT_SECONDS } return pollingWaitSeconds } // 验证批量处理的列表 func (this *Queue) validBatchList(list []string) error { if list == nil || len(list) == 0 { return errors.New(EMPTY_BATCH_LIST_ERROR) } if len(list) > MAX_BATCH_COUNT { return errors.New(EXCEED_MAX_BATCH_COUNT_ERROR) } return nil } // error:错误对象 func (this *Queue) GetQueueAttributes() (err error) { // 逻辑处理 requestObj := NewGetQueueAttributesRequest() responseObj := NewGetQueueAttributesResponse() // 发送请求 err = this.action(requestObj, &responseObj) if err != nil { return } if responseObj.IsFailure() { err = errors.New(responseObj.Message) return } return } // SendMessage 发送单条消息 // 参数 // message:消息内容 // delaySeconds:单位为秒,表示该消息发送到队列后,需要延时多久用户才可见该消息。 // 返回值 // error:错误对象 func (this *Queue) SendMessage(message string, delaySeconds int) (err error) { // 参数验证和处理 if message == "" { err = errors.New(EMPTY_MESSAGE_ERROR) return } delaySeconds = this.handleDelaySeconds(delaySeconds) // 逻辑处理 requestObj := NewSendMessageRequest(message, delaySeconds) responseObj := NewSendMessageResponse() // 发送请求 err = this.action(requestObj, &responseObj) if err != nil { return } if responseObj.IsFailure() { err = errors.New(responseObj.Message) return } return } // BatchSendMessage 批量发送消息 // 参数 // messageList:消息内容列表 // delaySeconds:单位为秒,表示该消息发送到队列后,需要延时多久用户才可见该消息。 // 返回值 // error:错误对象 func (this *Queue) BatchSendMessage(messageList []string, delaySeconds int) (err error) { // 参数验证和处理 err = this.validBatchList(messageList) if err != nil { return } delaySeconds = this.handleDelaySeconds(delaySeconds) // 逻辑处理 requestObj := NewBatchSendMessageRequest(messageList, delaySeconds) responseObj := NewBatchSendMessageResponse() // 发送请求 err = this.action(requestObj, &responseObj) if err != nil { return } if responseObj.IsFailure() { err = errors.New(responseObj.Message) return } return } // Receive 消费单条消息 // pollingWaitSeconds:本次请求的长轮询等待时间。取值范围0 - 30秒,如果不设置该参数,则默认使用队列属性中的 pollingWaitSeconds 值。 // 返回值 // receiptHandle:消息句柄 // message:消息内容 // exist:是否存在数据 // err:错误对象 func (this *Queue) ReceiveMessage(pollingWaitSeconds int) (receiptHandle, message string, exist bool, err error) { // 参数验证和处理 pollingWaitSeconds = this.handlePollingWaitSeconds(pollingWaitSeconds) // 逻辑处理 requestObj := NewReceiveMessageRequest(pollingWaitSeconds) responseObj := NewReceiveMessageResponse() // 发送请求 err = this.action(requestObj, &responseObj) if err != nil { return } // 忽略掉没有消息的错误 if responseObj.HaveNoMessage() { return } if responseObj.IsFailure() { err = errors.New(responseObj.Message) return } receiptHandle = responseObj.ReceiptHandle message = responseObj.MsgBody exist = true return } // BatchReceiveMessage 批量消费消息 // 参数 // numOfMsg:本次消费的消息数量 // pollingWaitSeconds:本次请求的长轮询等待时间。取值范围0 - 30秒,如果不设置该参数,则默认使用队列属性中的 pollingWaitSeconds 值。 // 返回值 // receiptHandleList:消息句柄列表 // messageList:消息内容列表 // exist:是否存在数据 // err:错误对象 func (this *Queue) BatchReceiveMessage(numOfMsg, pollingWaitSeconds int) (receiptHandleList, messageList []string, exist bool, err error) { // 参数验证和处理 if numOfMsg > MAX_BATCH_COUNT { err = errors.New(EXCEED_MAX_BATCH_COUNT_ERROR) return } pollingWaitSeconds = this.handlePollingWaitSeconds(pollingWaitSeconds) // 逻辑处理 requestObj := NewBatchReceiveMessageRequest(numOfMsg, pollingWaitSeconds) responseObj := NewBatchReceiveMessageResponse() // 发送请求 err = this.action(requestObj, &responseObj) if err != nil { return } // 忽略掉没有消息的错误 if responseObj.HaveNoMessage() { return } if responseObj.IsFailure() { err = errors.New(responseObj.Message) return } // 组装返回 receiptHandleList = make([]string, 0, numOfMsg) messageList = make([]string, 0, numOfMsg) for _, msgInfo := range responseObj.MsgInfoList { receiptHandleList = append(receiptHandleList, msgInfo.ReceiptHandle) messageList = append(messageList, msgInfo.MsgBody) } exist = true return } // DeleteMessage 删除单条消息 // 参数 // receiptHandle:消息句柄 // 返回值 // error:错误对象 func (this *Queue) DeleteMessage(receiptHandle string) (err error) { // 参数验证和处理 if receiptHandle == "" { err = errors.New(EMPTY_HANDLE_ERROR) return } // 逻辑处理 requestObj := NewDeleteMessageRequest(receiptHandle) responseObj := NewDeleteMessageResponse() // 发送请求 err = this.action(requestObj, &responseObj) if err != nil { return err } if responseObj.IsFailure() { err = errors.New(responseObj.Message) return } return } // BatchDeleteMessage 批量删除消息 // 参数 // receiptHandleList:消息句柄列表 // 返回值 // errorMap:删除错误的字典(key:删除失败的消息句柄;value:删除失败的原因) // err:错误对象 func (this *Queue) BatchDeleteMessage(receiptHandleList []string) (errorMap map[string]string, err error) { // 参数验证和处理 err = this.validBatchList(receiptHandleList) if err != nil { return } // 逻辑处理 requestObj := NewBatchDeleteMessageRequest(receiptHandleList) responseObj := NewBatchDeleteMessageResponse() // 发送请求 err = this.action(requestObj, &responseObj) if err != nil { return } if responseObj.IsFailure() { err = errors.New(responseObj.Message) // 组装返回 errorMap = make(map[string]string) for _, errInfo := range responseObj.ErrorList { errorMap[errInfo.ReceiptHandle] = errInfo.Message } return } return } // 创建新的Queue对象 func NewQueue(region, queueName, secretId, secretKey string) *Queue { queueConfigObj := &QueueConfig{ Region: region, QueueName: queueName, SecretId: secretId, SecretKey: secretKey, } return NewQueueByConfig(queueConfigObj) } // 通过队列配置对象创建新的Queue对象 func NewQueueByConfig(queueConfigObj *QueueConfig) *Queue { queueObj := &Queue{ region: queueConfigObj.Region, network: MQ_NETWORK_INTERNAL, queueName: queueConfigObj.QueueName, secretId: queueConfigObj.SecretId, secretKey: queueConfigObj.SecretKey, } err := queueObj.GetQueueAttributes() if err != nil { if strings.Contains(err.Error(), MQ_NETWORK_INTERNAL) { queueObj.network = MQ_NETWORK_PUBLIC } } return queueObj }