X Tutup
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "MQClientAPIImpl.h" #include #include #include #include #include "CommunicationMode.h" #include "Logging.h" #include "MQDecoder.h" #include "PullResultExt.h" namespace rocketmq { //registerProcessor(CHECK_TRANSACTION_STATE, clientRemotingProcessor); m_pRemotingClient->registerProcessor(RESET_CONSUMER_CLIENT_OFFSET, clientRemotingProcessor); m_pRemotingClient->registerProcessor(GET_CONSUMER_STATUS_FROM_CLIENT, clientRemotingProcessor); m_pRemotingClient->registerProcessor(GET_CONSUMER_RUNNING_INFO, clientRemotingProcessor); m_pRemotingClient->registerProcessor(NOTIFY_CONSUMER_IDS_CHANGED, clientRemotingProcessor); m_pRemotingClient->registerProcessor(CONSUME_MESSAGE_DIRECTLY, clientRemotingProcessor); m_topAddressing.reset(new TopAddressing(unitName)); } MQClientAPIImpl::~MQClientAPIImpl() { m_pRemotingClient = NULL; m_topAddressing = NULL; } void MQClientAPIImpl::stopAllTcpTransportThread() { m_pRemotingClient->stopAllTcpTransportThread(); } bool MQClientAPIImpl::writeDataToFile(string filename, string data, bool isSync) { if (data.size() == 0) return false; FILE* pFd = fopen(filename.c_str(), "w+"); if (NULL == pFd) { LOG_ERROR("fopen failed, filename:%s", filename.c_str()); return false; } int byte_write = 0; int byte_left = data.size(); const char* pData = data.c_str(); while (byte_left > 0) { byte_write = fwrite(pData, sizeof(char), byte_left, pFd); if (byte_write == byte_left) { if (ferror(pFd)) { LOG_ERROR("write data fail, data len:" SIZET_FMT ", file:%s, msg:%s", data.size(), filename.c_str(), strerror(errno)); fclose(pFd); return false; } } byte_left -= byte_write; pData += byte_write; } pData = NULL; if (isSync) { LOG_INFO("fsync with filename:%s", filename.c_str()); fflush(pFd); } fclose(pFd); return true; } string MQClientAPIImpl::fetchNameServerAddr(const string& NSDomain) { try { string homeDir(UtilAll::getHomeDirectory()); string storePath = homeDir + "/logs/rocketmq-cpp/snapshot"; boost::filesystem::path dir(storePath); boost::system::error_code ec; if (!boost::filesystem::exists(dir, ec)) { if (!boost::filesystem::create_directory(dir, ec)) { LOG_ERROR("create data dir:%s error", storePath.c_str()); return ""; } } string file(storePath); string fileBak(storePath); vector ret_; int retSize = UtilAll::Split(ret_, m_mqClientId, "@"); if (retSize == 2) { file.append("/nameserver_addr-").append(ret_[retSize - 1]); } else { LOG_ERROR("split mqClientId:%s fail", m_mqClientId.c_str()); file.append("/nameserver_addr-DEFAULT"); } boost::filesystem::path snapshot_file(file); fileBak.append("/nameserver_addr.bak"); const string addrs = m_topAddressing->fetchNSAddr(NSDomain); if (addrs.empty()) { if (m_nameSrvAddr.empty()) { LOG_INFO("Load the name server snapshot local file:%s", file.c_str()); if (boost::filesystem::exists(snapshot_file)) { ifstream snapshot_file(file, ios::binary); istreambuf_iterator beg(snapshot_file), end; string filecontent(beg, end); updateNameServerAddr(filecontent); m_nameSrvAddr = filecontent; } else { LOG_WARN("The name server snapshot local file not exists"); } } } else { if (m_firstFetchNameSrv == true) { // it is the first time, so need to create the name server snapshot // local file m_firstFetchNameSrv = false; } if (addrs.compare(m_nameSrvAddr) != 0) { LOG_INFO("name server address changed, old: %s, new: %s", m_nameSrvAddr.c_str(), addrs.c_str()); updateNameServerAddr(addrs); m_nameSrvAddr = addrs; } else { if (!m_firstFetchNameSrv) return m_nameSrvAddr; } // update the snapshot local file if nameSrv changes or // m_firstFetchNameSrv==true if (writeDataToFile(fileBak, addrs, true)) { if (!UtilAll::ReplaceFile(fileBak, file)) LOG_ERROR("could not rename bak file:%s", strerror(errno)); } } if (!boost::filesystem::exists(snapshot_file)) { // the name server snapshot local file maybe deleted by force, create it if (writeDataToFile(fileBak, m_nameSrvAddr, true)) { if (!UtilAll::ReplaceFile(fileBak, file)) LOG_ERROR("could not rename bak file:%s", strerror(errno)); } } } catch (...) { } return m_nameSrvAddr; } void MQClientAPIImpl::updateNameServerAddr(const string& addrs) { if (m_pRemotingClient != NULL) m_pRemotingClient->updateNameServerAddressList(addrs); } void MQClientAPIImpl::callSignatureBeforeRequest(const string& addr, RemotingCommand& request, const SessionCredentials& session_credentials) { ClientRPCHook rpcHook(session_credentials); rpcHook.doBeforeRequest(addr, request); } // Note: all request rules: throw exception if got broker error response, // exclude getTopicRouteInfoFromNameServer and unregisterClient void MQClientAPIImpl::createTopic(const string& addr, const string& defaultTopic, TopicConfig topicConfig, const SessionCredentials& sessionCredentials) { string topicWithProjectGroup = topicConfig.getTopicName(); CreateTopicRequestHeader* requestHeader = new CreateTopicRequestHeader(); requestHeader->topic = (topicWithProjectGroup); requestHeader->defaultTopic = (defaultTopic); requestHeader->readQueueNums = (topicConfig.getReadQueueNums()); requestHeader->writeQueueNums = (topicConfig.getWriteQueueNums()); requestHeader->perm = (topicConfig.getPerm()); requestHeader->topicFilterType = (topicConfig.getTopicFilterType()); RemotingCommand request(UPDATE_AND_CREATE_TOPIC, requestHeader); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); unique_ptr response(m_pRemotingClient->invokeSync(addr, request)); if (response) { switch (response->getCode()) { case SUCCESS_VALUE: return; default: break; } THROW_MQEXCEPTION(MQBrokerException, response->getRemark(), response->getCode()); } THROW_MQEXCEPTION(MQBrokerException, "response is null", -1); } void MQClientAPIImpl::endTransactionOneway(std::string addr, EndTransactionRequestHeader* requestHeader, std::string remark, const SessionCredentials& sessionCredentials) { RemotingCommand request(END_TRANSACTION, requestHeader); request.setRemark(remark); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); m_pRemotingClient->invokeOneway(addr, request); return; } SendResult MQClientAPIImpl::sendMessage(const string& addr, const string& brokerName, const MQMessage& msg, const SendMessageRequestHeader& requestHeader, int timeoutMillis, int maxRetrySendTimes, int communicationMode, SendCallback* pSendCallback, const SessionCredentials& sessionCredentials) { // RemotingCommand request(SEND_MESSAGE, pRequestHeader); // Using MQ V2 Protocol to end messages. SendMessageRequestHeaderV2* pRequestHeaderV2 = new SendMessageRequestHeaderV2(requestHeader); RemotingCommand request(SEND_MESSAGE_V2, pRequestHeaderV2); string body = msg.getBody(); request.SetBody(body.c_str(), body.length()); request.setMsgBody(body); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); switch (communicationMode) { case ComMode_ONEWAY: m_pRemotingClient->invokeOneway(addr, request); break; case ComMode_ASYNC: sendMessageAsync(addr, brokerName, msg, request, pSendCallback, timeoutMillis, maxRetrySendTimes, 1); break; case ComMode_SYNC: return sendMessageSync(addr, brokerName, msg, request, timeoutMillis); default: break; } return SendResult(); } void MQClientAPIImpl::sendHeartbeat(const string& addr, HeartbeatData* pHeartbeatData, const SessionCredentials& sessionCredentials) { RemotingCommand request(HEART_BEAT, NULL); string body; pHeartbeatData->Encode(body); request.SetBody(body.data(), body.length()); request.setMsgBody(body); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); if (m_pRemotingClient->invokeHeartBeat(addr, request)) { LOG_DEBUG("sendHeartbeat to broker:%s success", addr.c_str()); } else { LOG_WARN("sendHeartbeat to broker:%s failed", addr.c_str()); } } void MQClientAPIImpl::unregisterClient(const string& addr, const string& clientID, const string& producerGroup, const string& consumerGroup, const SessionCredentials& sessionCredentials) { LOG_INFO("unregisterClient to broker:%s", addr.c_str()); RemotingCommand request(UNREGISTER_CLIENT, new UnregisterClientRequestHeader(clientID, producerGroup, consumerGroup)); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); unique_ptr response(m_pRemotingClient->invokeSync(addr, request)); if (response) { switch (response->getCode()) { case SUCCESS_VALUE: LOG_INFO("unregisterClient to:%s success", addr.c_str()); return; default: break; } LOG_WARN("unregisterClient fail:%s,%d", response->getRemark().c_str(), response->getCode()); } } // return NULL if got no response or error response TopicRouteData* MQClientAPIImpl::getTopicRouteInfoFromNameServer(const string& topic, int timeoutMillis, const SessionCredentials& sessionCredentials) { RemotingCommand request(GET_ROUTEINTO_BY_TOPIC, new GetRouteInfoRequestHeader(topic)); callSignatureBeforeRequest("", request, sessionCredentials); request.Encode(); unique_ptr pResponse(m_pRemotingClient->invokeSync("", request, timeoutMillis)); if (pResponse != NULL) { if (((*(pResponse->GetBody())).getSize() == 0) || ((*(pResponse->GetBody())).getData() != NULL)) { switch (pResponse->getCode()) { case SUCCESS_VALUE: { const MemoryBlock* pbody = pResponse->GetBody(); if (pbody->getSize()) { TopicRouteData* topicRoute = TopicRouteData::Decode(pbody); return topicRoute; } } case TOPIC_NOT_EXIST: { if (DEFAULT_TOPIC.compare(topic) != 0) { LOG_WARN("Get topic[%s] route failed [TOPIC_NOT_EXIST].", topic.c_str()); } return NULL; } default: break; } LOG_WARN("%s,%d", pResponse->getRemark().c_str(), pResponse->getCode()); return NULL; } } if (DEFAULT_TOPIC.compare(topic) != 0) { LOG_WARN("Get topic[%s] route failed [Null Response].", topic.c_str()); } return NULL; } TopicList* MQClientAPIImpl::getTopicListFromNameServer(const SessionCredentials& sessionCredentials) { RemotingCommand request(GET_ALL_TOPIC_LIST_FROM_NAMESERVER, NULL); callSignatureBeforeRequest("", request, sessionCredentials); request.Encode(); unique_ptr pResponse(m_pRemotingClient->invokeSync("", request)); if (pResponse != NULL) { if (((*(pResponse->GetBody())).getSize() == 0) || ((*(pResponse->GetBody())).getData() != NULL)) { switch (pResponse->getCode()) { case SUCCESS_VALUE: { const MemoryBlock* pbody = pResponse->GetBody(); if (pbody->getSize()) { TopicList* topicList = TopicList::Decode(pbody); return topicList; } } default: break; } THROW_MQEXCEPTION(MQClientException, pResponse->getRemark(), pResponse->getCode()); } } return NULL; } int MQClientAPIImpl::wipeWritePermOfBroker(const string& namesrvAddr, const string& brokerName, int timeoutMillis) { return 0; } void MQClientAPIImpl::deleteTopicInBroker(const string& addr, const string& topic, int timeoutMillis) {} void MQClientAPIImpl::deleteTopicInNameServer(const string& addr, const string& topic, int timeoutMillis) {} void MQClientAPIImpl::deleteSubscriptionGroup(const string& addr, const string& groupName, int timeoutMillis) {} string MQClientAPIImpl::getKVConfigByValue(const string& projectNamespace, const string& projectGroup, int timeoutMillis) { return ""; } KVTable MQClientAPIImpl::getKVListByNamespace(const string& projectNamespace, int timeoutMillis) { return KVTable(); } void MQClientAPIImpl::deleteKVConfigByValue(const string& projectNamespace, const string& projectGroup, int timeoutMillis) {} SendResult MQClientAPIImpl::sendMessageSync(const string& addr, const string& brokerName, const MQMessage& msg, RemotingCommand& request, int timeoutMillis) { // pResponse(m_pRemotingClient->invokeSync(addr, request, timeoutMillis)); if (pResponse != NULL) { try { SendResult result = processSendResponse(brokerName, msg, pResponse.get()); LOG_DEBUG("sendMessageSync success:%s to addr:%s,brokername:%s, send status:%d", msg.toString().c_str(), addr.c_str(), brokerName.c_str(), (int)result.getSendStatus()); return result; } catch (std::exception& e) { LOG_ERROR("send new error, broker:%s, details:%s", brokerName.c_str(), e.what()); throw e; } catch (...) { LOG_ERROR("Unknown error, broker:%s", brokerName.c_str()); } } THROW_MQEXCEPTION(MQClientException, "response is null", -1); } void MQClientAPIImpl::sendMessageAsync(const string& addr, const string& brokerName, const MQMessage& msg, RemotingCommand& request, SendCallback* pSendCallback, int64 timeoutMilliseconds, int maxRetryTimes, int retrySendTimes) { int64 begin_time = UtilAll::currentTimeMillis(); // cbw = std::make_shared(brokerName, msg, pSendCallback, this); if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMilliseconds, maxRetryTimes, retrySendTimes) == false) { LOG_WARN("invokeAsync failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d", addr.c_str(), msg.getTopic().data(), timeoutMilliseconds, maxRetryTimes, retrySendTimes); // when getTcp return false, need consider retrySendTimes int retry_time = retrySendTimes + 1; int64 time_out = timeoutMilliseconds - (UtilAll::currentTimeMillis() - begin_time); while (retry_time < maxRetryTimes && time_out > 0) { begin_time = UtilAll::currentTimeMillis(); if (m_pRemotingClient->invokeAsync(addr, request, cbw, time_out, maxRetryTimes, retry_time) == false) { retry_time += 1; time_out = time_out - (UtilAll::currentTimeMillis() - begin_time); LOG_WARN("invokeAsync retry failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d", addr.c_str(), msg.getTopic().data(), time_out, maxRetryTimes, retry_time); continue; } else { return; // invokeAsync success } } LOG_ERROR("sendMessageAsync failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d", addr.c_str(), msg.getTopic().data(), time_out, maxRetryTimes, retrySendTimes); if (cbw && pSendCallback != nullptr) { cbw->onException(); // deleteAndZero(cbw); } else { THROW_MQEXCEPTION(MQClientException, "sendMessageAsync failed", -1); } } } PullResult* MQClientAPIImpl::pullMessage(const string& addr, PullMessageRequestHeader* pRequestHeader, int timeoutMillis, int communicationMode, PullCallback* pullCallback, void* pArg, const SessionCredentials& sessionCredentials) { RemotingCommand request(PULL_MESSAGE, pRequestHeader); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); switch (communicationMode) { case ComMode_ONEWAY: break; case ComMode_ASYNC: pullMessageAsync(addr, request, timeoutMillis, pullCallback, pArg); break; case ComMode_SYNC: return pullMessageSync(addr, request, timeoutMillis); default: break; } return NULL; } void MQClientAPIImpl::pullMessageAsync(const string& addr, RemotingCommand& request, int timeoutMillis, PullCallback* pullCallback, void* pArg) { // AsyncCallbackWrap* cbw = new PullCallbackWrap(pullCallback, this, pArg); std::shared_ptr cbw = std::make_shared(pullCallback, this, pArg); if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMillis) == false) { LOG_ERROR("pullMessageAsync failed of addr:%s, mq:%s", addr.c_str(), static_cast(pArg)->mq.toString().data()); // deleteAndZero(cbw); THROW_MQEXCEPTION(MQClientException, "pullMessageAsync failed", -1); } } PullResult* MQClientAPIImpl::pullMessageSync(const string& addr, RemotingCommand& request, int timeoutMillis) { unique_ptr pResponse(m_pRemotingClient->invokeSync(addr, request, timeoutMillis)); if (pResponse != NULL) { if (((*(pResponse->GetBody())).getSize() == 0) || ((*(pResponse->GetBody())).getData() != NULL)) { try { PullResult* pullResult = processPullResponse(pResponse.get()); // pullMessage will handle // exception from // processPullResponse return pullResult; } catch (MQException& e) { LOG_ERROR("%s", e.what()); return NULL; } } } return NULL; } SendResult MQClientAPIImpl::processSendResponse(const string& brokerName, const MQMessage& msg, RemotingCommand* pResponse) { SendStatus sendStatus = SEND_OK; int res = 0; switch (pResponse->getCode()) { case FLUSH_DISK_TIMEOUT: sendStatus = SEND_FLUSH_DISK_TIMEOUT; break; case FLUSH_SLAVE_TIMEOUT: sendStatus = SEND_FLUSH_SLAVE_TIMEOUT; break; case SLAVE_NOT_AVAILABLE: sendStatus = SEND_SLAVE_NOT_AVAILABLE; break; case SUCCESS_VALUE: sendStatus = SEND_OK; break; default: res = -1; break; } if (res == 0) { SendMessageResponseHeader* responseHeader = (SendMessageResponseHeader*)pResponse->getCommandHeader(); auto extFields = pResponse->getExtFields(); bool traceOn = (extFields->count("TRACE_ON") && extFields->at("TRACE_ON") == "true"); MQMessageQueue messageQueue(msg.getTopic(), brokerName, responseHeader->queueId); string unique_msgId = msg.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); return SendResult(sendStatus, unique_msgId, responseHeader->msgId, messageQueue, responseHeader->queueOffset, responseHeader->regionId, traceOn); } LOG_ERROR("processSendResponse error remark:%s, error code:%d", (pResponse->getRemark()).c_str(), pResponse->getCode()); THROW_MQEXCEPTION(MQClientException, pResponse->getRemark(), pResponse->getCode()); } PullResult* MQClientAPIImpl::processPullResponse(RemotingCommand* pResponse) { PullStatus pullStatus = NO_NEW_MSG; switch (pResponse->getCode()) { case SUCCESS_VALUE: pullStatus = FOUND; break; case PULL_NOT_FOUND: pullStatus = NO_NEW_MSG; break; case PULL_RETRY_IMMEDIATELY: pullStatus = NO_MATCHED_MSG; break; case PULL_OFFSET_MOVED: pullStatus = OFFSET_ILLEGAL; break; default: THROW_MQEXCEPTION(MQBrokerException, pResponse->getRemark(), pResponse->getCode()); break; } PullMessageResponseHeader* responseHeader = static_cast(pResponse->getCommandHeader()); if (!responseHeader) { LOG_ERROR("processPullResponse:responseHeader is NULL"); THROW_MQEXCEPTION(MQClientException, "processPullResponse:responseHeader is NULL", -1); } //GetBody()); // response data judgement had been done outside // of processPullResponse if (bodyFromResponse.getSize() == 0) { if (pullStatus != FOUND) { return new PullResultExt(pullStatus, responseHeader->nextBeginOffset, responseHeader->minOffset, responseHeader->maxOffset, (int)responseHeader->suggestWhichBrokerId); } else { THROW_MQEXCEPTION(MQClientException, "memoryBody size is 0, but pullStatus equals found", -1); } } else { return new PullResultExt(pullStatus, responseHeader->nextBeginOffset, responseHeader->minOffset, responseHeader->maxOffset, (int)responseHeader->suggestWhichBrokerId, bodyFromResponse); } } //topic = topic; pRequestHeader->queueId = queueId; RemotingCommand request(GET_MIN_OFFSET, pRequestHeader); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); unique_ptr response(m_pRemotingClient->invokeSync(addr, request, timeoutMillis)); if (response) { switch (response->getCode()) { case SUCCESS_VALUE: { GetMinOffsetResponseHeader* responseHeader = (GetMinOffsetResponseHeader*)response->getCommandHeader(); int64 offset = responseHeader->offset; return offset; } default: break; } THROW_MQEXCEPTION(MQBrokerException, response->getRemark(), response->getCode()); } THROW_MQEXCEPTION(MQBrokerException, "response is null", -1); } int64 MQClientAPIImpl::getMaxOffset(const string& addr, const string& topic, int queueId, int timeoutMillis, const SessionCredentials& sessionCredentials) { GetMaxOffsetRequestHeader* pRequestHeader = new GetMaxOffsetRequestHeader(); pRequestHeader->topic = topic; pRequestHeader->queueId = queueId; RemotingCommand request(GET_MAX_OFFSET, pRequestHeader); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); unique_ptr response(m_pRemotingClient->invokeSync(addr, request, timeoutMillis)); if (response) { switch (response->getCode()) { case SUCCESS_VALUE: { GetMaxOffsetResponseHeader* responseHeader = (GetMaxOffsetResponseHeader*)response->getCommandHeader(); int64 offset = responseHeader->offset; return offset; } default: break; } THROW_MQEXCEPTION(MQBrokerException, response->getRemark(), response->getCode()); } THROW_MQEXCEPTION(MQBrokerException, "response is null", -1); } int64 MQClientAPIImpl::searchOffset(const string& addr, const string& topic, int queueId, uint64_t timestamp, int timeoutMillis, const SessionCredentials& sessionCredentials) { SearchOffsetRequestHeader* pRequestHeader = new SearchOffsetRequestHeader(); pRequestHeader->topic = topic; pRequestHeader->queueId = queueId; pRequestHeader->timestamp = timestamp; RemotingCommand request(SEARCH_OFFSET_BY_TIMESTAMP, pRequestHeader); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); unique_ptr response(m_pRemotingClient->invokeSync(addr, request, timeoutMillis)); if (response) { switch (response->getCode()) { case SUCCESS_VALUE: { SearchOffsetResponseHeader* responseHeader = (SearchOffsetResponseHeader*)response->getCommandHeader(); int64 offset = responseHeader->offset; return offset; } default: break; } THROW_MQEXCEPTION(MQBrokerException, response->getRemark(), response->getCode()); } THROW_MQEXCEPTION(MQBrokerException, "response is null", -1); } MQMessageExt* MQClientAPIImpl::viewMessage(const string& addr, int64 phyoffset, int timeoutMillis, const SessionCredentials& sessionCredentials) { ViewMessageRequestHeader* pRequestHeader = new ViewMessageRequestHeader(); pRequestHeader->offset = phyoffset; RemotingCommand request(VIEW_MESSAGE_BY_ID, pRequestHeader); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); unique_ptr response(m_pRemotingClient->invokeSync(addr, request, timeoutMillis)); if (response) { switch (response->getCode()) { case SUCCESS_VALUE: { } default: break; } THROW_MQEXCEPTION(MQBrokerException, response->getRemark(), response->getCode()); } THROW_MQEXCEPTION(MQBrokerException, "response is null", -1); } int64 MQClientAPIImpl::getEarliestMsgStoretime(const string& addr, const string& topic, int queueId, int timeoutMillis, const SessionCredentials& sessionCredentials) { GetEarliestMsgStoretimeRequestHeader* pRequestHeader = new GetEarliestMsgStoretimeRequestHeader(); pRequestHeader->topic = topic; pRequestHeader->queueId = queueId; RemotingCommand request(GET_EARLIEST_MSG_STORETIME, pRequestHeader); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); unique_ptr response(m_pRemotingClient->invokeSync(addr, request, timeoutMillis)); if (response) { switch (response->getCode()) { case SUCCESS_VALUE: { GetEarliestMsgStoretimeResponseHeader* responseHeader = (GetEarliestMsgStoretimeResponseHeader*)response->getCommandHeader(); int64 timestamp = responseHeader->timestamp; return timestamp; } default: break; } THROW_MQEXCEPTION(MQBrokerException, response->getRemark(), response->getCode()); } THROW_MQEXCEPTION(MQBrokerException, "response is null", -1); } void MQClientAPIImpl::getConsumerIdListByGroup(const string& addr, const string& consumerGroup, vector& cids, int timeoutMillis, const SessionCredentials& sessionCredentials) { GetConsumerListByGroupRequestHeader* pRequestHeader = new GetConsumerListByGroupRequestHeader(); pRequestHeader->consumerGroup = consumerGroup; RemotingCommand request(GET_CONSUMER_LIST_BY_GROUP, pRequestHeader); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); unique_ptr pResponse(m_pRemotingClient->invokeSync(addr, request, timeoutMillis)); if (pResponse != NULL) { if ((pResponse->GetBody()->getSize() == 0) || (pResponse->GetBody()->getData() != NULL)) { switch (pResponse->getCode()) { case SUCCESS_VALUE: { const MemoryBlock* pbody = pResponse->GetBody(); if (pbody->getSize()) { GetConsumerListByGroupResponseBody::Decode(pbody, cids); return; } } default: break; } THROW_MQEXCEPTION(MQBrokerException, pResponse->getRemark(), pResponse->getCode()); } } THROW_MQEXCEPTION(MQBrokerException, "response is null", -1); } int64 MQClientAPIImpl::queryConsumerOffset(const string& addr, QueryConsumerOffsetRequestHeader* pRequestHeader, int timeoutMillis, const SessionCredentials& sessionCredentials) { RemotingCommand request(QUERY_CONSUMER_OFFSET, pRequestHeader); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); unique_ptr response(m_pRemotingClient->invokeSync(addr, request, timeoutMillis)); if (response) { switch (response->getCode()) { case SUCCESS_VALUE: { QueryConsumerOffsetResponseHeader* responseHeader = (QueryConsumerOffsetResponseHeader*)response->getCommandHeader(); int64 consumerOffset = responseHeader->offset; return consumerOffset; } default: break; } THROW_MQEXCEPTION(MQBrokerException, response->getRemark(), response->getCode()); } THROW_MQEXCEPTION(MQBrokerException, "response is null", -1); return -1; } void MQClientAPIImpl::updateConsumerOffset(const string& addr, UpdateConsumerOffsetRequestHeader* pRequestHeader, int timeoutMillis, const SessionCredentials& sessionCredentials) { RemotingCommand request(UPDATE_CONSUMER_OFFSET, pRequestHeader); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); unique_ptr response(m_pRemotingClient->invokeSync(addr, request, timeoutMillis)); if (response) { switch (response->getCode()) { case SUCCESS_VALUE: { return; } default: break; } THROW_MQEXCEPTION(MQBrokerException, response->getRemark(), response->getCode()); } THROW_MQEXCEPTION(MQBrokerException, "response is null", -1); } void MQClientAPIImpl::updateConsumerOffsetOneway(const string& addr, UpdateConsumerOffsetRequestHeader* pRequestHeader, int timeoutMillis, const SessionCredentials& sessionCredentials) { RemotingCommand request(UPDATE_CONSUMER_OFFSET, pRequestHeader); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); m_pRemotingClient->invokeOneway(addr, request); } void MQClientAPIImpl::consumerSendMessageBack(const string addr, MQMessageExt& msg, const string& consumerGroup, int delayLevel, int timeoutMillis, int maxReconsumeTimes, const SessionCredentials& sessionCredentials) { ConsumerSendMsgBackRequestHeader* pRequestHeader = new ConsumerSendMsgBackRequestHeader(); pRequestHeader->group = consumerGroup; pRequestHeader->offset = msg.getCommitLogOffset(); pRequestHeader->delayLevel = delayLevel; pRequestHeader->unitMode = false; pRequestHeader->originTopic = msg.getTopic(); pRequestHeader->originMsgId = msg.getMsgId(); pRequestHeader->maxReconsumeTimes = maxReconsumeTimes; // string addr = socketAddress2IPPort(msg.getStoreHost()); RemotingCommand request(CONSUMER_SEND_MSG_BACK, pRequestHeader); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); unique_ptr response(m_pRemotingClient->invokeSync(addr, request, timeoutMillis)); if (response) { switch (response->getCode()) { case SUCCESS_VALUE: { return; } default: break; } THROW_MQEXCEPTION(MQBrokerException, response->getRemark(), response->getCode()); } THROW_MQEXCEPTION(MQBrokerException, "response is null", -1); } void MQClientAPIImpl::lockBatchMQ(const string& addr, LockBatchRequestBody* requestBody, vector& mqs, int timeoutMillis, const SessionCredentials& sessionCredentials) { RemotingCommand request(LOCK_BATCH_MQ, NULL); string body; requestBody->Encode(body); request.SetBody(body.data(), body.length()); request.setMsgBody(body); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); unique_ptr pResponse(m_pRemotingClient->invokeSync(addr, request, timeoutMillis)); if (pResponse != NULL) { if (((*(pResponse->GetBody())).getSize() == 0) || ((*(pResponse->GetBody())).getData() != NULL)) { switch (pResponse->getCode()) { case SUCCESS_VALUE: { const MemoryBlock* pbody = pResponse->GetBody(); if (pbody->getSize()) { LockBatchResponseBody::Decode(pbody, mqs); } return; } break; default: break; } THROW_MQEXCEPTION(MQBrokerException, pResponse->getRemark(), pResponse->getCode()); } } THROW_MQEXCEPTION(MQBrokerException, "response is null", -1); } void MQClientAPIImpl::unlockBatchMQ(const string& addr, UnlockBatchRequestBody* requestBody, int timeoutMillis, const SessionCredentials& sessionCredentials) { RemotingCommand request(UNLOCK_BATCH_MQ, NULL); string body; requestBody->Encode(body); request.SetBody(body.data(), body.length()); request.setMsgBody(body); callSignatureBeforeRequest(addr, request, sessionCredentials); request.Encode(); unique_ptr pResponse(m_pRemotingClient->invokeSync(addr, request, timeoutMillis)); if (pResponse != NULL) { switch (pResponse->getCode()) { case SUCCESS_VALUE: { return; } break; default: break; } THROW_MQEXCEPTION(MQBrokerException, pResponse->getRemark(), pResponse->getCode()); } THROW_MQEXCEPTION(MQBrokerException, "response is null", -1); } //
X Tutup