RocketMQ生產者為什么需要負載均衡?
在RocketMQ中,隊列是消息發送的基本單位。每個Topic下可能存在多個隊列,因此一個生產者實例可以向不同的隊列發送消息。當生產者發送消息時,如果不能均衡的將消息發送到不同的隊列,那么會導致隊列里的消息分布不均衡,這樣最終會導致消息性能下降,因此生產者負載均衡機制也是非常重要的。
RocketMQ生產者原理分析
既然生產者負載均衡如此重要,我們看下是如何實現的。
我們通常使用如下方法發送消息:
構建消息 Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); //發送消息 SendResult sendResult = producer.send(msg);
RocketMQ發送消息的核心邏輯在DefaultMQProducerImpl類sendDefaultImpl。
在發送消息流程利里面有一行非常關鍵的邏輯,selectOneMessageQueue,看方法名稱就可以知道其含義,選擇一個消息隊列。
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); }
里面是通過策略類來實現的。
策略類最終通過org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String) 實現。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //生產者第一次發消息 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //非第一次,重試發消息的情況, for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); //重試的情況,不取上一個broker的隊列 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } } 第一次發消息選擇隊列核心邏輯在 org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue() //線程安全的index private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); public MessageQueue selectOneMessageQueue() { //獲取一個基礎索引,每次自增1 這個全局存在TopicPublishInfo 每一個topic int index = this.sendWhichQueue.getAndIncrement(); // 基礎索引和 消息寫隊列大小 進行取模 用來實現輪訓的算法 int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
哈哈,這里就是生產者負載均衡輪詢機制的核心邏輯了,使用到了ThreadLocal技術,sendWhichQueue為每個生產者線程維護一個自己的下標索引。
基礎索引計算器,使用ThreadLocal技術針對不同的生產者線程第一次隨機,后面遞增,可以更加負載均衡。
public class ThreadLocalIndex { //關鍵技術 private final ThreadLocalthreadLocalIndex = new ThreadLocal (); private final Random random = new Random(); public int getAndIncrement() { Integer index = this.threadLocalIndex.get(); if (null == index) { //第一次隨機 index = Math.abs(random.nextInt()); if (index < 0) index = 0; this.threadLocalIndex.set(index); } //第二次索引位置開始自增1 index = Math.abs(index + 1); if (index < 0) index = 0; this.threadLocalIndex.set(index); return index; } }
哈哈,有沒有覺得這個實現非常巧妙了。不同的生產者線程都擁有自己的索引因子,分配隊列更加均衡。
總結
本文分析了RocketMQ生產者底層的實現,設計地方有巧妙之處,值得我們學習,上面是發送非順序消息的場景, 如果是順序消息,我們作為使用者可以指定負載均衡策略。
編輯:黃飛
-
負載均衡
+關注
關注
0文章
113瀏覽量
12391 -
線程
+關注
關注
0文章
505瀏覽量
19756 -
消息隊列
+關注
關注
0文章
33瀏覽量
3017
原文標題:RocketMQ生產者負載均衡(輪詢機制)核心原理
文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論