那曲檬骨新材料有限公司

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

RocketMQ生產者為什么需要負載均衡?

馬哥Linux運維 ? 來源:稀土掘金 ? 2023-11-13 11:04 ? 次閱讀

RocketMQ生產者為什么需要負載均衡?

在RocketMQ中,隊列是消息發送的基本單位。每個Topic下可能存在多個隊列,因此一個生產者實例可以向不同的隊列發送消息。當生產者發送消息時,如果不能均衡的將消息發送到不同的隊列,那么會導致隊列里的消息分布不均衡,這樣最終會導致消息性能下降,因此生產者負載均衡機制也是非常重要的。

RocketMQ生產者原理分析

既然生產者負載均衡如此重要,我們看下是如何實現的。

我們通常使用如下方法發送消息:

構建消息
Message msg = new Message("TopicTest",
    "TagA",
    "OrderID188",
    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//發送消息    
SendResult sendResult = producer.send(msg);

RocketMQ發送消息的核心邏輯在DefaultMQProducerImpl類sendDefaultImpl。

9ad25470-81c0-11ee-939d-92fbcf53809c.jpg

在發送消息流程利里面有一行非常關鍵的邏輯,selectOneMessageQueue,看方法名稱就可以知道其含義,選擇一個消息隊列。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {

        return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

里面是通過策略類來實現的。

9aee8ece-81c0-11ee-939d-92fbcf53809c.jpg

策略類最終通過org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String) 實現。

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        //生產者第一次發消息
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            //非第一次,重試發消息的情況,
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                //重試的情況,不取上一個broker的隊列
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
第一次發消息選擇隊列核心邏輯在 org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue()


//線程安全的index
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();


public MessageQueue selectOneMessageQueue() {
        //獲取一個基礎索引,每次自增1 這個全局存在TopicPublishInfo 每一個topic
        int index = this.sendWhichQueue.getAndIncrement();
        // 基礎索引和 消息寫隊列大小 進行取模 用來實現輪訓的算法
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
            
        return this.messageQueueList.get(pos);
    }

哈哈,這里就是生產者負載均衡輪詢機制的核心邏輯了,使用到了ThreadLocal技術,sendWhichQueue為每個生產者線程維護一個自己的下標索引。

基礎索引計算器,使用ThreadLocal技術針對不同的生產者線程第一次隨機,后面遞增,可以更加負載均衡。

public class ThreadLocalIndex {
    //關鍵技術
    private final ThreadLocal threadLocalIndex = new ThreadLocal();
    private final Random random = new Random();


    public int getAndIncrement() {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            //第一次隨機
            index = Math.abs(random.nextInt());
            if (index < 0)
                index = 0;
            this.threadLocalIndex.set(index);
        }
        //第二次索引位置開始自增1
        index = Math.abs(index + 1);
        if (index < 0)
            index = 0;


        this.threadLocalIndex.set(index);
        return index;
    }
}

哈哈,有沒有覺得這個實現非常巧妙了。不同的生產者線程都擁有自己的索引因子,分配隊列更加均衡。

總結

本文分析了RocketMQ生產者底層的實現,設計地方有巧妙之處,值得我們學習,上面是發送非順序消息的場景, 如果是順序消息,我們作為使用者可以指定負載均衡策略。

編輯:黃飛

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 負載均衡
    +關注

    關注

    0

    文章

    113

    瀏覽量

    12391
  • 線程
    +關注

    關注

    0

    文章

    505

    瀏覽量

    19756
  • 消息隊列
    +關注

    關注

    0

    文章

    33

    瀏覽量

    3017

原文標題:RocketMQ生產者負載均衡(輪詢機制)核心原理

文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    labviEW一個生產者,多個消費問題

    大家好,我的程序的出發點是希望實現一個生產者,十六個消費模塊的形式。即生產者循環中的事件結構有十六個處理分支,對應每一個分支,它產生一個“開始”元素入隊列,相應的消費模塊中元素出隊
    發表于 04-05 16:42

    生產者與消費循環相關問題

    我是labview初學者,想請問一下各位大神,如果采集卡有緩存那還需要生產者與消費循環嗎?
    發表于 10-21 14:05

    生產者與消費注冊時間的應用

    生產者與消費注冊時間的應用
    發表于 03-29 15:02

    生產者消費模式(事件結構)

    現小弟學習生產者消費的事件結構模式(用隊列傳遞消息),在生產者中用事件結構,但是當我點擊其中一個按鈕響應事件后就無法再點擊其它的按鈕了,這是怎么搞的,請大俠貼出圖片讓小弟看看是什么情況。
    發表于 12-23 14:14

    生產者與消費循環結構當生產者停止發送數據為什么消費還要循環兩次?

    各位大神: 今天用生產者與消費結構做一個程序,需要消費循環每執行一次計數+1。但是發現當生產者停止發送數據后,消費
    發表于 09-17 23:08

    生產者是怎么把要發送的信息傳送到生產者模式里面的?

    誰有關于生產者與消費模式的講解,就是生產者是怎么把要發送的信息傳送到生產者模式里面的,就是誰可以講解下,或是哪里有歷程的視頻講解。先行謝過。
    發表于 10-28 20:57

    生產者消費的事件結構模式(用隊列傳遞消息)

    現小弟學習生產者消費的事件結構模式(用隊列傳遞消息),在生產者中用事件結構,但是當我點擊其中一個按鈕響應事件后,再點擊其它的按鈕了需要點兩次,這是怎么搞的,請大俠貼出圖片讓小弟看看
    發表于 01-17 14:53

    生產者消費循環

    有木有大神知道生產者消費循環中隊列的大小,默認值一般為多少?此外這個大小能否改變?
    發表于 11-28 19:59

    生產者與消費循環程序

    生產者與消費循環程序
    發表于 12-02 19:57

    生產者與消費

    生產者與消費
    發表于 12-22 20:46

    labview的生產者/消費模式

    生產者/消費模式以前在沒有學習隊列這塊,看到生產者/消費模式的時候總認為很困難。今天仔細學習了隊列后,回頭再看著塊時就不是多么難理解。這個編程模式使用到了隊列的函數。首先,字面理解
    發表于 05-05 09:36

    生產者消費循環的問題

    如果將生產者消費循環中的一個生產者同時對應兩個消費的時候,會有一些問題。如圖所示,生產者循環將一個數據入列,然后下面是兩個消費
    發表于 03-25 10:02

    基于生產者消費完整測試程序

    [hide][url=]基于生產者消費完整測試 ...[/url] [/hide]
    發表于 11-01 17:13

    電池生產者與消費要知道的常識

    電池生產者與消費要知道的常識  一、常用電池型號、俗稱及日常適用范圍 國
    發表于 10-22 10:39 ?566次閱讀

    RocketMQ協議是什么?RocketMQ協議特點

    分布式消息系統中生產者和消費之間的高效可靠通信。它支持同步和異步消息傳遞模式,可以實現靈活和響應迅速的通信方式。 RocketMQ協議基于發布-訂閱消息模式,生產者將消息發布到特定的
    的頭像 發表于 01-03 16:11 ?873次閱讀
    百家乐视频台球游戏| 飞天百家乐官网的玩法技巧和规则 | 赌博百家乐下载| 澳门百家乐官网骗人| 金钻娱乐| 百家乐平注法到6| 罗盘24山八卦| 百家乐官网信誉好的平台| 威尼斯人娱乐城代理佣金| 百家乐牌机的破解法| 网上百家乐官网作弊法| 苏州市| 大发88817| 百家乐赌术大揭秘| 单机百家乐官网小游戏| 长春市| 大发888娱乐场 东南网| 百家乐娱乐平台备用网址| 百家乐加牌规则| 买百家乐官网程序| 博彩乐百家乐官网平台| 十六浦娱乐城官网| 天堂鸟百家乐的玩法技巧和规则| 百家乐看不到视频| 百家乐官网博赌场| 百家乐官网的技术与心态| 金冠娱乐城官网| 德州扑克在线玩| 威尼斯人娱乐网网上百家乐的玩法技巧和规则| 百家乐德州扑克轮盘| 免费百家乐官网计划工具| 百盛百家乐官网软件| 百家乐官网太阳城线上| 且末县| 明升投注网 | 大发888dafa888| 皇马百家乐的玩法技巧和规则| 真人百家乐分析软件是骗局| 百家乐官网技巧| 百家乐官网的巧门| 百家乐官网两边|