那曲檬骨新材料有限公司

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

golang中使用kafka的綜合指南

馬哥Linux運維 ? 來源:稀土掘金技術社區 ? 2023-11-30 11:18 ? 次閱讀

介紹

kafka是一個比較流行的分布式、可拓展、高性能、可靠的流處理平臺。在處理kafka的數據時,這里有確保處理效率和可靠性的多種最佳實踐。本文將介紹這幾種實踐方式,并通過sarama實現他們。

以下是一些kafka消費的最佳實踐:

選擇合適的提交策略:Kafka提供兩種提交策略,自動和手動。雖然自動操作很容易使用,但它可能會導致數據丟失或重復。手動提交提供了更高級別的控制,確保消息至少處理一次或恰好一次,具體取決于用例。

盡可能減少Kafka的傳輸次數:大批量讀取消息可以顯著提高吞吐量。這可以通過調整 fetch.min.bytes 和 fetch.max.wait.ms 等參數來實現。

盡可能使用消費者組:Kafka允許多個消費者組成一個消費者組來并行消費數據。這使得 Kafka 能夠將數據分發給一個組中的所有消費者,從而實現高效的數據消費。

調整消費者緩沖區大小:通過調整消費者的緩沖區大小,如 receive.buffer.bytes 和 max.partition.fetch.bytes,可以根據消息的預期大小和消費者的內存容量進行調整。這可以提高消費者的表現。

處理rebalance:當新的消費者加入消費者組,或者現有的消費者離開時,Kafka會觸發rebalance以重新分配負載。在此過程中,消費者停止消費數據。因此,快速有效地處理重新平衡可以提高整體吞吐量。

監控消費者:使用 Kafka 的消費者指標來監控消費者的性能。定期監控可以幫助我們識別性能瓶頸并調整消費者的配置。

選擇合適的提交策略

1.自動提交

Sarama 的 ConsumerGroup 默認情況下會自動提交偏移量。這意味著它會定期提交已成功消費的消息的偏移量,這允許消費者在重新啟動或消費失敗時從中斷的地方繼續。

下面是一個自動提交的消費者組消費消息的例子:


config := sarama.NewConfig()  
config.Version = sarama.V2_0_0_0 
config.Consumer.Offsets.AutoCommit.Enable = true  
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second  
  
ConsumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)  
if err != nil {  
    log.Panicf( "創建消費者組客戶端時出錯: %v" , err)  
}  
  
Consumer := Consumer{}  
ctx := context.Background()  
  
for {  
    err := ConsumerGroup.Consume(ctx, [] string {topic}, Consumer)  
    if err != nil {  
        log.Panicf( "來自消費者的錯誤: %v" , err)  
    }  
}

根據config.Consumer.Offsets.AutoCommit.Interval可以看到,消費者會每秒自動提交offset。

2. 手動提交

手動提交使我們更好地控制何時提交消息偏移量。下面是一個手動提交的消費者組消費消息的例子:


config := sarama.NewConfig()  
config.Version = sarama.V2_0_0_0 
config.Consumer.Offsets.AutoCommit.Enable = false 
  
consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID , config)  
if err != nil {  
    log.Panicf( "創建消費者組客戶端時出錯: %v" , err)  
}  
  
Consumer := Consumer{}  
ctx := context.Background()  
  
for {  
    err := ConsumerGroup.Consume( ctx, [] string {topic}, Consumer)  
    if err != nil {  
        log.Panicf( "Error from Consumer: %v" , err)  
    }  
}  
  


type Consumer struct {}  
  
func (consumer Consumer) Setup (_ sarama.ConsumerGroupSession) error { return nil }  
func (consumer Consumer) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }  
func (consumer Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error {  
    for msg : = range Claim.Messages() {  
        
        fmt.Printf( "Message topic:%q partition:%d offset:%d
" , msg.Topic, msg.Partition, msg.Offset)  


        
        sess.MarkMessage(msg, "" )  
    }  
    return nil  
}

在該示例中, 使用MarkMessage手動將消息標記為已處理,最終根據Consumer.Offsets.CommitInterval配置提交。另外這個例子省略了錯誤處理部分,開發時需要注意正確處理生產過程中出現的錯誤。

譯者注:這篇文章雖然是今年5月發布,但是這里的提交方式還是有些過時了,目前sarama已經廢棄了Consumer.Offsets.CommitInterval,相關配置目前在Consumer.Offsets.AutoCommit

盡可能減少Kafka的傳輸次數

減少kafka的傳輸次數可以通過優化從kafka中讀取和寫入數據的方式來實現:

1. 增加批次的大小

使用kafka批量發送消息的效果優于逐個發送消息,批次越大,kafka發送數據效率就越高。但是需要權衡延遲和吞吐量之間的關系。較大的批次雖然代表著更大的吞吐量,但也會增加延遲。因為批次越大,填充批次的時間也越久。

在Go中,我們可以在使用sarama包生成消息時設置批次大小:


config := sarama.NewConfig()  
config.Producer.Flush.Bytes = 1024 * 1024

以及獲取消息的批次大小


config := sarama.NewConfig()  
config.Consumer.Fetch.Default = 1024 * 1024

2. 使用長輪詢

長輪詢是指消費者輪詢時如果Kafka中沒有數據,則消費者將等待數據到達。這減少了往返次數,因為消費者不需要在沒有數據時不斷請求數據。


config := sarama.NewConfig() 
config .Consumer.MaxWaitTime = 500 *time.Millisecond

該配置告訴消費者在返回之前會等待500毫秒

3. 盡可能使用消費者組

消費者組是一組協同工作消費來自kafka主題的消息的消費者。消費者組允許我們在多個消費者之間分配消息,從而提供橫向拓展能力。使用消費者組時,kafka負責將分區分配給組中的消費者,并確保每個分區同時僅被一個消費者消費。

接下來是sarama中消費者組的使用:

使用消費者組需要實現一個ConsumerGroupHandler接口

該接口具有三個方法:Setup、Cleanup、 和ConsumeClaim


type exampleConsumerGroupHandler struct { 
} 


func  (h *exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { 
    
    return  nil
 } 


func  (h *exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { 
    
    return  nil
 } 


func  (h *exampleConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error { 
    for message := range Claim.Messages() { 
        
        fmt.Printf( "Message: %s
" , string (message.Value)) 


        
        session.MarkMessage(message, "" ) 
    }
    返回 nil
 }

創建sarama.ConsumerGroup并開始消費:


brokers := []string{"localhost:9092"} 
topic := "example_topic"  
groupID := "example_consumer_group"  
  


consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)  
if err != nil {  
    log.Fatalf("Error creating consumer group: %v", err)  
}  
defer consumerGroup.Close()  
  


handler := &exampleConsumerGroupHandler{}  
  


for {  
    err := consumerGroup.Consume(context.Background(), []string{topic}, handler)  
    if err != nil {  
        log.Printf("Error consuming messages: %v", err)  
    }  
}

該示例設置了一個消費組,用于消費來自“example_topic”的消息。消費者組可以通過添加更多消費者來提高處理能力。

使用消費者組時,記得處理消費期間rebalance和錯誤。

調整消費者緩沖區大小

在sarama中,我們可以調整消費者緩沖區的大小,以調整消費者在處理消息之前可以在內存中保存的消息數量。

默認情況下,緩沖區大小設置為256,這代表Sarama在開始處理消息之前將在內存中保存最多256條消息。如果消費者速度很慢,增加緩沖區大小可能有助于提高吞吐量。但是,更大的緩沖區也會消耗更多的內存。

以下是如何增加緩沖區大小的例子:


config := sarama.NewConfig()  
config.Consumer.Return.Errors = true  
config.Version = sarama.V2_1_0_0  
config.Consumer.Offsets.Initial = sarama.OffsetOldest  


config.ChannelBufferSize = 500  
  


group, err := sarama.NewConsumerGroup([]string{broker}, groupID, config)  
if err != nil {  
    panic(err)  
}  
  


ctx := context.Background()  
for {  
    topics := []string{topic}  
    handler := exampleConsumerGroupHandler{}  


    err := group.Consume(ctx, topics, &handler)  
    if err != nil {  
        panic(err)  
    }  
}

處理rebalance

當新消費者添加到消費者組或現有消費者離開消費者組時,kafka會重新平衡該組中的消費者。rebalance是kafka確保消費者組中的所有消費者不會消費同一分區的保證。

在sarama中,處理rebalance是通過 Setup 和CleanUp函數來完成的。

通過正確處理重新平衡事件,您可以確保應用程序正常處理消費者組的更改,例如消費者離開或加入,并且在這些事件期間不會丟失或處理兩次消息。

譯者注:其實更重要的是在ConsumeClaim函數在通道關閉時盡早退出,才能正確的進入CleanUp函數。

監控消費者

監控Kafka消費者對于確保系統的健康和性能至關重要,我們需要時刻關注延遲、處理時間和錯誤率的指標。

Golang沒有內置對 Kafka 監控的支持,但有幾個庫和工具可以幫助我們。讓我們看一下其中的一些:

Sarama的Metrics:Sarama 提供了一個指標注冊表,它報告了有助于監控的各種指標,例如請求、響應的數量、請求和響應的大小等。這些指標可以使用 Prometheus 等監控系統來收集和監控。

JMX Exporter:如果您在 JVM 上運行 Kafka, 則可以使用 JMX Exporter 將kafka的 MBeans 發送給Prometheus

Kafka Exporter:Kafka Exporter是一個第三方工具,可以提供有關Kafka的更詳細的指標。它可以提供消費者組延遲,這是消費kafka消息時要監控的關鍵指標。

Jaeger 或 OpenTelemetry:這些工具可用于分布式追蹤,這有助于追蹤消息如何流經系統以及可能出現瓶頸的位置。

日志:時刻關注應用程序日志,記錄消費者中的任何錯誤或異常行為。這些日志可以幫助我們診斷問題。

消費者組命令, 可以使用kafka-consumer-groups命令來描述消費者組的狀態。

請記住,不僅要追蹤這些指標,還要針對任何需要關注的場景設置警報。通過這些方法,我們可以在問題還在初始階段時快速做出響應。

以上工作有助于確保使用kafka的應用程序健壯、可靠且高效。

審核編輯:湯梓紅

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 數據
    +關注

    關注

    8

    文章

    7145

    瀏覽量

    89584
  • 參數
    +關注

    關注

    11

    文章

    1860

    瀏覽量

    32428
  • kafka
    +關注

    關注

    0

    文章

    52

    瀏覽量

    5244

原文標題:golang中使用kafka的綜合指南

文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    如何使用Golang連接MySQL

    首先我們來看如何使用Golang連接MySQL。
    的頭像 發表于 01-08 09:42 ?3441次閱讀
    如何使用<b class='flag-5'>Golang</b>連接MySQL

    Kafka讀取數據操作指南

    Kafka消費者——從 Kafka讀取數據
    發表于 09-16 06:42

    淺析kafka

    kafka常見問題
    發表于 09-29 10:09

    基于發布與訂閱的消息系統Kafka

    Kafka權威指南》——初識 Kafka
    發表于 03-05 13:46

    Kafka基礎入門文檔

    kafka系統入門教程(原理、配置、集群搭建、Java應用、Kafka-manager)
    發表于 03-12 07:22

    Kafka集群環境的搭建

    1、環境版本版本:kafka2.11,zookeeper3.4注意:這里zookeeper3.4也是基于集群模式部署。2、解壓重命名tar -zxvf
    發表于 01-05 17:55

    現代的服務端技術棧:Golang/Protobuf/gRPC詳解

    Golang又稱Go語言,是一個開源的、多用途的編程語言,由Google研發,并由于種種原因,正在日益流行。Golang已經有10年的歷史,并且據Google稱已經在生產環境中使用了接近7年的時間,這一點可能讓大多數人大跌眼鏡。
    的頭像 發表于 12-25 17:32 ?1185次閱讀

    Kafka的概念及Kafka的宕機

    問題要從一次Kafka的宕機開始說起。 筆者所在的是一家金融科技公司,但公司內部并沒有采用在金融支付領域更為流行的 RabbitMQ ,而是采用了設計之初就為日志處理而生的 Kafka ,所以我一直
    的頭像 發表于 08-27 11:21 ?2160次閱讀
    <b class='flag-5'>Kafka</b>的概念及<b class='flag-5'>Kafka</b>的宕機

    初探Golang內聯

    今天我們來聊聊 Golang 中的內聯。
    的頭像 發表于 12-13 09:51 ?992次閱讀

    GoLang的安裝和使用

    GoLang的安裝和使用
    的頭像 發表于 01-13 14:06 ?1307次閱讀
    <b class='flag-5'>GoLang</b>的安裝和使用

    Kafka 的簡介

    ? 1 kafka簡介 2 為什么要用消息系統 3 kafka基礎知識 4 kafka集群架構 5 總結 ? 1 kafka簡介 其主要設計目標如下: 以時間復雜度為O(1)的方式提供
    的頭像 發表于 07-03 11:10 ?670次閱讀
    <b class='flag-5'>Kafka</b> 的簡介

    物通博聯5G-kafka工業網關實現kafka協議對接到云平臺

    Kafka協議是一種基于TCP層的網絡協議,用于在分布式消息傳遞系統Apache Kafka中發送和接收消息。Kafka協議定義了客戶端和服務器之間的通信方式和數據格式,允許客戶端發送消息到K
    的頭像 發表于 07-11 10:44 ?547次閱讀

    Spring Kafka的各種用法

    最近業務上用到了Spring Kafka,所以系統性的探索了下Spring Kafka的各種用法,發現了很多實用的特性,下面介紹下Spring Kafka的消息重試機制。 0. 前言 原生
    的頭像 發表于 09-25 17:04 ?1071次閱讀

    Kafka架構技術:Kafka的架構和客戶端API設計

    Kafka 給自己的定位是事件流平臺(event stream platform)。因此在消息隊列中經常使用的 "消息"一詞,在 Kafka 中被稱為 "事件"。
    的頭像 發表于 10-10 15:41 ?2449次閱讀
    <b class='flag-5'>Kafka</b>架構技術:<b class='flag-5'>Kafka</b>的架構和客戶端API設計

    kafka相關命令詳解

    kafka常用命令詳解
    的頭像 發表于 10-20 11:34 ?1002次閱讀
    云顶国际娱乐| 武胜县| 托克托县| 在线百家乐官网电脑| 做生意 风水| 韩国百家乐的玩法技巧和规则| 大发888网址是什么| A8百家乐官网现金网| 百家乐官网赌博策略大全| 百家乐会骗人吗| 全讯网网站| 新宝百家乐官网网址| 蓝盾百家乐官网赌场娱乐网规则| 南京百家乐菜籽油| 德州扑克算牌器| 百家乐官网真人游戏投注网| 百家乐的必赢术| 新全讯网22335555| 百家乐官网游戏补牌规则| 百家乐娱乐城优惠| 德州扑克高牌| 百家乐官网赌博大全| 亚洲百家乐博彩的玩法技巧和规则 | 百家乐制胜法| 六合彩下注网| 尊龙百家乐官网娱乐城| 皇冠网百家乐啊| 百家乐官网的玩法视频| 百家乐视频游戏界面| 博彩网站评级| 金世豪百家乐官网的玩法技巧和规则 | 大发888玩法技巧| 678百家乐官网博彩娱乐网| 金百家乐的玩法技巧和规则| 贵宾百家乐的玩法技巧和规则| 美高梅娱乐城网址| 百家乐官网导航| 大发888古怪猴子| 缅甸百家乐官网赌场娱乐网规则| 旧金山百家乐官网的玩法技巧和规则| 威尼斯人娱乐场 送2688元礼金领取lrm64 |