解決springboot集成rocketmq關於tag的坑
springboot集成rocketmq關於tag的坑
新項目使用springboot的若依框架集成rocketmq,選擇集成RocketMQTemplate這種方式實現消息的發送和接收。
1.客戶端發送代碼
此處回調方法裡有些業務不用關註,隻關心發送方法
@Component public class RocketMqHelper { Logger logger = LoggerFactory.getLogger(RocketMqHelper.class); @Resource private RocketMQTemplate rocketMQTemplate; public void send(ReqMsg msg){ rocketMQTemplate.asyncSend(msg.getMsg().getTopic()+":"+msg.getMsg().getTags(), msg.getMsg(), new SendCallback(){ @Override public void onSuccess(SendResult sendResult) { logger.debug("msgid:{} 發送成功" , sendResult.getMsgId()); logger.debug("發送mq成功後要執行的service: {}",msg.getMsg().getSendAfterMethod()); IsaveSendAfterMqLog saveSendAfterMqLog = SpringUtils.getBean(msg.getMsg().getSendAfterMethod()); saveSendAfterMqLog.saveSendAfterMqLog(new SendAfterLog(msg.getMsg(),sendResult,"0")); } @Override public void onException(Throwable throwable) { logger.error("mq發送異常!{}",throwable.toString()); logger.debug("發送mq失敗後執行的service: {}",msg.getMsg().getSendAfterMethod()); //異常描述截取500 length入庫 msg.getMsg().putUserProperty("exceptionDesc",throwable.toString()); IsaveSendAfterMqLog saveSendAfterMqLog = SpringUtils.getBean(msg.getMsg().getSendAfterMethod()); saveSendAfterMqLog.saveSendAfterMqLog(new SendAfterLog(msg.getMsg(),"1")); } }); } }
2.服務端監聽消息
@Service @RocketMQMessageListener(topic = "${rocketmq.topic}", consumerGroup = "${rocketmq.consumer.group}", selectorExpression="${rocketmq.tags}") public class CbiRocketmqConsumer implements RocketMQListener<CbiMsg> { Logger logger = LoggerFactory.getLogger(CbiRocketmqConsumer.class); @Override public void onMessage(CbiMsg message) { String msgBody = new String(message.getBody()); String serviceName = message.getTags(); logger.info("本次消費服務名稱:{}",serviceName); AbSaveReceiveAfter saveReceiveAfter = SpringUtils.getBean(serviceName); saveReceiveAfter.saveReceiveAfter(new RecevieAfterLog(message, Constants.CONSUME_SUCCESS));//默認消費成功 } }
@RocketMQMessageListener這個註解裡selectorExpression默認是*,接收topic下全部消息。想動態對tags進行配置。於是利用springboot獲取yml配置。寫死的時候沒有問題,但是改成$表達式配置後怎麼都收不到消息,經排查居然是selectorExpression這個不支持配置,會原封的按表達式進入MQ容器初始化。然而註解裡面的topic,comsumerGroup都可以正常拿到配置值。
翻源碼發現問題所在,項目啟動時,在ListenerContainerConfiguration在這個類裡初始化mq容器時,對配置進行賦值
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); container.setNameServer(rocketMQProperties.getNameServer()); container.setTopic(environment.resolvePlaceholders(annotation.topic())); container.setConsumerGroup(environment.resolvePlaceholders (annotation.consumerGroup())); container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener) bean); container.setObjectMapper(objectMapper); return container; }
topic和comsumerGroup都在springboot環境裡獲取配置值瞭,唯獨selectorExpression這個沒有,直接默認註解裡的。下面的問題就是需要自己在項目啟動,springboot容器起來,但是rocketmq容器未起的時候,動態去改註解裡配置的值。然後讓Rocketmq啟動。
** * 因為RocketMQMessageListener不提供動態配置功能 * springboot初始化後rocket容器初始化前利用反射動態改變 * RocketMQMessageListener註解selectorExpression的值 * * */ @Component public class ChangeSelectorExpressionBeforeMqStart implements InitializingBean { @Value("${rocketmq.consumer.tags}") private String tags; @Override public void afterPropertiesSet() throws Exception { RocketMQMessageListener annoTable = CbiRocketmqConsumer.class.getAnnotation(RocketMQMessageListener.class); // 獲取代理處理器 InvocationHandler invocationHandler = Proxy.getInvocationHandler(annoTable); // 獲取私有 memberValues 屬性 Field f = invocationHandler.getClass().getDeclaredField("memberValues"); f.setAccessible(true); // 獲取實例的屬性map Map<String, Object> memberValues = (Map<String, Object>) f.get(invocationHandler); // 修改屬性值 memberValues.put("selectorExpression", tags); } }
問題解決。。
SpringBoot集成RocketMQ及報錯處理
項目場景:
【說明】:
springBoot集成RocketMQ開發
【環境】:
阿裡雲+Centos8+RocketMQ+SpringBoot+Docker
【啟動】:
docker start rmqserver rmqbroker[因為RocketMQ安裝在Docket容器中,所以這樣啟動]
服務器broker.conf配置信息:
brokerIP1=外網ip namesrvAddr=外網ip:9876 brokerName=broker_tanhua autoCreateTopicEnable=true
【說明】:
1.brokerIP1 當前broker監聽的IP
2.Broker是RocketMq的核心,負責消息的傳遞(提供者=》消費者)以及消息的持久化存儲,消息的HA機制以及服務器過濾功能。
3.autoCreateTopicEnable:自動創建Topic路由
問題一描述:
【說明】:
我第一次配置時,broker.conf配置文件中沒有配置autoCreateTopicEnable,因此在程序運行時會提示沒有路由信息:No route info of this topic: tanhua-sso-login
【說明】
我發送消息路由名字是tanhua-sso-login
錯誤信息:
No route info of this topic: tanhua-sso-login
【錯誤信息截圖】:我沒有截圖網上找瞭一個,差不多
解決方式:
【說明】:
我當時也在網上找瞭很多,有在啟動時添加自動創建的也有說防火墻開啟的原因,但是我感覺會這個的話應該都知道關防火墻。在啟動時添加自動創建可能也好使,但是我沒試過,因為我在搜索時發現問題統一指向說沒有自動創建,因此我想的是直接在配置文件中進行修改,然後重啟
【解決方式】:
在broker.conf配置文件中添加如下配置:
autoCreateTopicEnable=true
SpringBoot集成信息:
【application.properties】:
# RocketMQ相關配置 rocketmq.nameServer=外網IP:9876 rocketmq.producer.group=tanhua rocketmq.producer.send-message-timeout= 6000
【註】:這裡配置的開通沒有spring,我之前加spring怎麼也連接不上
【pom.xml】:
<!--RocketMQ相關--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.5.1</version> </dependency>
問題二描述:
【說明】:我在修改上面的錯誤後,緊接著又報
【錯誤信息】:
RemotingTooMuchRequestException: sendDefaultImpl call timeout
【錯誤信息截圖】:也是沒有截圖網上找瞭一個,差不多
*【思路】:錯誤信息中提示call timeout,timeout一般想到到時連接或響應超時,因此在網上找到的是在發送MQ時出錯,網上解決方案是:修改Mq配置文件中的sendMsgTimeout,因此想到修改可以修改SpringBoot連接MQ時的配置設置
【解決方案】:添加rocketmq.producer.send-message-timeout= 6000
【說明】:給大一點發送信息超時時間。
*【說明】:同時在SpringBoot集成RoctetMQ配置中沒有sendMsgTimeout因此用rocketmq=>輸入’.’=>輸入sendtimeout=>查看有哪些關於這個的配置。
*【完整配置】:
# RocketMQ相關配置 rocketmq.nameServer=外網IP:9876 rocketmq.producer.group=tanhua rocketmq.producer.send-message-timeout= 6000
以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。
推薦閱讀:
- RocketMQTemplate 註入失敗的解決
- 分佈式消息隊列RocketMQ概念詳解
- 解決SpringBoot整合RocketMQ遇到的坑
- springboot整合rocketmq實現分佈式事務
- SpringBoot整合RocketMQ實現消息發送和接收的詳細步驟