那曲檬骨新材料有限公司

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫(xiě)文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

Spark的兩種核心Shuffle詳解

微云疏影 ? 來(lái)源:園陌 ? 作者:園陌 ? 2022-08-11 15:54 ? 次閱讀

在 MapReduce 框架中, Shuffle 階段是連接 Map 與 Reduce 之間的橋梁, Map 階段通過(guò) Shuffle 過(guò)程將數(shù)據(jù)輸出到 Reduce 階段中。由于 Shuffle 涉及磁盤(pán)的讀寫(xiě)和網(wǎng)絡(luò) I/O,因此 Shuffle 性能的高低直接影響整個(gè)程序的性能。Spark 也有 Map 階段和 Reduce 階段,因此也會(huì)出現(xiàn) Shuffle 。

Spark Shuffle

Spark Shuffle 分為兩種:一種是基于 Hash 的 Shuffle;另一種是基于 Sort 的 Shuffle。先介紹下它們的發(fā)展歷程,有助于我們更好的理解 Shuffle:

在 Spark 1.1 之前, Spark 中只實(shí)現(xiàn)了一種 Shuffle 方式,即基于 Hash 的 Shuffle 。在 Spark 1.1 版本中引入了基于 Sort 的 Shuffle 實(shí)現(xiàn)方式,并且 Spark 1.2 版本之后,默認(rèn)的實(shí)現(xiàn)方式從基于 Hash 的 Shuffle 修改為基于 Sort 的 Shuffle 實(shí)現(xiàn)方式,即使用的 ShuffleManager 從默認(rèn)的 hash 修改為 sort。在 Spark 2.0 版本中, Hash Shuffle 方式己經(jīng)不再使用。

Spark 之所以一開(kāi)始就提供基于 Hash 的 Shuffle 實(shí)現(xiàn)機(jī)制,其主要目的之一就是為了避免不需要的排序,大家想下 Hadoop 中的 MapReduce,是將 sort 作為固定步驟,有許多并不需要排序的任務(wù),MapReduce 也會(huì)對(duì)其進(jìn)行排序,造成了許多不必要的開(kāi)銷。

在基于 Hash 的 Shuffle 實(shí)現(xiàn)方式中,每個(gè) Mapper 階段的 Task 會(huì)為每個(gè) Reduce 階段的 Task生成一個(gè)文件,通常會(huì)產(chǎn)生大量的文件(即對(duì)應(yīng)為 M*R 個(gè)中間文件,其中, M 表示 Mapper階段的 Task 個(gè)數(shù), R 表示 Reduce 階段的 Task 個(gè)數(shù)) 伴隨大量的隨機(jī)磁盤(pán) I/O 操作與大量的內(nèi)存開(kāi)銷。

為了緩解上述問(wèn)題,在 Spark 0.8.1 版本中為基于 Hash 的 Shuffle 實(shí)現(xiàn)引入了 ShuffleConsolidate 機(jī)制(即文件合并機(jī)制),將 Mapper 端生成的中間文件進(jìn)行合并的處理機(jī)制。通過(guò)配置屬性 spark.shuffie.consolidateFiles=true,減少中間生成的文件數(shù)量。通過(guò)文件合并,可以將中間文件的生成方式修改為每個(gè)執(zhí)行單位為每個(gè) Reduce階段的 Task 生成一個(gè)文件。

執(zhí)行單位對(duì)應(yīng)為:每個(gè) Mapper 端的 Cores 數(shù)/每個(gè) Task分配的 Cores 數(shù)(默認(rèn)為 1) 。最終可以將文件個(gè)數(shù)從 M*R 修改為 E*C/T*R,其中,E 表示 Executors 個(gè)數(shù), C 表示可用 Cores 個(gè)數(shù), T 表示 Task 分配的 Cores 數(shù)。

Spark1.1 版本引入了 Sort Shuffle:

基于 Hash 的 Shuffle 的實(shí)現(xiàn)方式中,生成的中間結(jié)果文件的個(gè)數(shù)都會(huì)依賴于 Reduce 階段的 Task 個(gè)數(shù),即 Reduce 端的并行度,因此文件數(shù)仍然不可控,無(wú)法真正解決問(wèn)題。為了更好地解決問(wèn)題,在 Spark1.1 版本引入了基于 Sort 的 Shuffle 實(shí)現(xiàn)方式,并且在 Spark 1.2 版本之后,默認(rèn)的實(shí)現(xiàn)方式也從基于 Hash 的 Shuffle,修改為基于 Sort 的 Shuffle 實(shí)現(xiàn)方式,即使用的 ShuffleManager 從默認(rèn)的 hash 修改為 sort。

在基于 Sort 的 Shuffle 中,每個(gè) Mapper 階段的 Task 不會(huì)為每 Reduce 階段的 Task 生成一個(gè)單獨(dú)的文件,而是全部寫(xiě)到一個(gè)數(shù)據(jù)(Data)文件中,同時(shí)生成一個(gè)索引(Index)文件, Reduce 階段的各個(gè) Task 可以通過(guò)該索引文件獲取相關(guān)的數(shù)據(jù)。避免產(chǎn)生大量文件的直接收益就是降低隨機(jī)磁盤(pán) I/0 與內(nèi)存的開(kāi)銷。最終生成的文件個(gè)數(shù)減少到 2*M ,其中 M 表示 Mapper 階段的 Task 個(gè)數(shù),每個(gè) Mapper 階段的 Task 分別生成兩個(gè)文件(1 個(gè)數(shù)據(jù)文件、 1 個(gè)索引文件),最終的文件個(gè)數(shù)為 M 個(gè)數(shù)據(jù)文件與 M 個(gè)索引文件。因此,最終文件個(gè)數(shù)是 2*M 個(gè)。

從 Spark 1.4 版本開(kāi)始,在 Shuffle 過(guò)程中也引入了基于 Tungsten-Sort 的 Shuffie 實(shí)現(xiàn)方式,通 Tungsten 項(xiàng)目所做的優(yōu)化,可以極大提高 Spark 在數(shù)據(jù)處理上的性能。(Tungsten 翻譯為中文是鎢絲)

注:在一些特定的應(yīng)用場(chǎng)景下,采用基于 Hash 實(shí)現(xiàn) Shuffle 機(jī)制的性能會(huì)超過(guò)基于 Sort 的 Shuffle 實(shí)現(xiàn)機(jī)制。

一張圖了解下 Spark Shuffle 的迭代歷史:

poYBAGL0tcKAfFHvAADfYGmHtJs077.jpg

Spark Shuffle 迭代歷史

為什么 Spark 最終還是放棄了 HashShuffle ,使用了 Sorted-Based Shuffle?

我們可以從 Spark 最根本要優(yōu)化和迫切要解決的問(wèn)題中找到答案,使用 HashShuffle 的 Spark 在 Shuffle 時(shí)產(chǎn)生大量的文件。當(dāng)數(shù)據(jù)量越來(lái)越多時(shí),產(chǎn)生的文件量是不可控的,這嚴(yán)重制約了 Spark 的性能及擴(kuò)展能力,所以 Spark 必須要解決這個(gè)問(wèn)題,減少 Mapper 端 ShuffleWriter 產(chǎn)生的文件數(shù)量,這樣便可以讓 Spark 從幾百臺(tái)集群的規(guī)模瞬間變成可以支持幾千臺(tái),甚至幾萬(wàn)臺(tái)集群的規(guī)模。

但使用 Sorted-Based Shuffle 就完美了嗎,答案是否定的,Sorted-Based Shuffle 也有缺點(diǎn),其缺點(diǎn)反而是它排序的特性,它強(qiáng)制要求數(shù)據(jù)在 Mapper 端必須先進(jìn)行排序,所以導(dǎo)致它排序的速度有點(diǎn)慢。好在出現(xiàn)了 Tungsten-Sort Shuffle ,它對(duì)排序算法進(jìn)行了改進(jìn),優(yōu)化了排序的速度。Tungsten-SortShuffle 已經(jīng)并入了 Sorted-Based Shuffle,Spark 的引擎會(huì)自動(dòng)識(shí)別程序需要的是 Sorted-BasedShuffle,還是 Tungsten-Sort Shuffle。

下面詳細(xì)剖析每個(gè) Shuffle 的底層執(zhí)行原理:

一、Hash Shuffle 解析

以下的討論都假設(shè)每個(gè) Executor 有 1 個(gè) cpu core。

1. HashShuffleManager

shuffle write 階段,主要就是在一個(gè) stage 結(jié)束計(jì)算之后,為了下一個(gè) stage 可以執(zhí)行 shuffle 類的算子(比如 reduceByKey),而將每個(gè) task 處理的數(shù)據(jù)按 key 進(jìn)行“劃分”。所謂“劃分”,就是對(duì)相同的 key 執(zhí)行 hash 算法,從而將相同 key 都寫(xiě)入同一個(gè)磁盤(pán)文件中,而每一個(gè)磁盤(pán)文件都只屬于下游 stage 的一個(gè) task。在將數(shù)據(jù)寫(xiě)入磁盤(pán)之前,會(huì)先將數(shù)據(jù)寫(xiě)入內(nèi)存緩沖中,當(dāng)內(nèi)存緩沖填滿之后,才會(huì)溢寫(xiě)到磁盤(pán)文件中去。

下一個(gè) stage 的 task 有多少個(gè),當(dāng)前 stage 的每個(gè) task 就要?jiǎng)?chuàng)建多少份磁盤(pán)文件。比如下一個(gè) stage 總共有 100 個(gè) task,那么當(dāng)前 stage 的每個(gè) task 都要?jiǎng)?chuàng)建 100 份磁盤(pán)文件。如果當(dāng)前 stage 有 50 個(gè) task,總共有 10 個(gè) Executor,每個(gè) Executor 執(zhí)行 5 個(gè) task,那么每個(gè) Executor 上總共就要?jiǎng)?chuàng)建 500 個(gè)磁盤(pán)文件,所有 Executor 上會(huì)創(chuàng)建 5000 個(gè)磁盤(pán)文件。由此可見(jiàn),未經(jīng)優(yōu)化的 shuffle write 操作所產(chǎn)生的磁盤(pán)文件的數(shù)量是極其驚人的。

shuffle read 階段,通常就是一個(gè) stage 剛開(kāi)始時(shí)要做的事情。此時(shí)該 stage 的每一個(gè) task 就需要將上一個(gè) stage 的計(jì)算結(jié)果中的所有相同 key,從各個(gè)節(jié)點(diǎn)上通過(guò)網(wǎng)絡(luò)都拉取到自己所在的節(jié)點(diǎn)上,然后進(jìn)行 key 的聚合或連接等操作。由于 shuffle write 的過(guò)程中,map task 給下游 stage 的每個(gè) reduce task 都創(chuàng)建了一個(gè)磁盤(pán)文件,因此 shuffle read 的過(guò)程中,每個(gè) reduce task 只要從上游 stage 的所有 map task 所在節(jié)點(diǎn)上,拉取屬于自己的那一個(gè)磁盤(pán)文件即可。

shuffle read 的拉取過(guò)程是一邊拉取一邊進(jìn)行聚合的。每個(gè) shuffle read task 都會(huì)有一個(gè)自己的 buffer 緩沖,每次都只能拉取與 buffer 緩沖相同大小的數(shù)據(jù),然后通過(guò)內(nèi)存中的一個(gè) Map 進(jìn)行聚合等操作。聚合完一批數(shù)據(jù)后,再拉取下一批數(shù)據(jù),并放到 buffer 緩沖中進(jìn)行聚合操作。以此類推,直到最后將所有數(shù)據(jù)到拉取完,并得到最終的結(jié)果。

HashShuffleManager 工作原理如下圖所示:

pYYBAGL0tcKAf5RbAADeo8fkeyA209.jpg

未優(yōu)化的HashShuffleManager工作原理

2. 優(yōu)化的 HashShuffleManager

為了優(yōu)化 HashShuffleManager 我們可以設(shè)置一個(gè)參數(shù):spark.shuffle.consolidateFiles,該參數(shù)默認(rèn)值為 false,將其設(shè)置為 true 即可開(kāi)啟優(yōu)化機(jī)制,通常來(lái)說(shuō),如果我們使用 HashShuffleManager,那么都建議開(kāi)啟這個(gè)選項(xiàng)。

開(kāi)啟 consolidate 機(jī)制之后,在 shuffle write 過(guò)程中,task 就不是為下游 stage 的每個(gè) task 創(chuàng)建一個(gè)磁盤(pán)文件了,此時(shí)會(huì)出現(xiàn)shuffleFileGroup的概念,每個(gè) shuffleFileGroup 會(huì)對(duì)應(yīng)一批磁盤(pán)文件,磁盤(pán)文件的數(shù)量與下游 stage 的 task 數(shù)量是相同的。一個(gè) Executor 上有多少個(gè) cpu core,就可以并行執(zhí)行多少個(gè) task。而第一批并行執(zhí)行的每個(gè) task 都會(huì)創(chuàng)建一個(gè) shuffleFileGroup,并將數(shù)據(jù)寫(xiě)入對(duì)應(yīng)的磁盤(pán)文件內(nèi)。

當(dāng) Executor 的 cpu core 執(zhí)行完一批 task,接著執(zhí)行下一批 task 時(shí),下一批 task 就會(huì)復(fù)用之前已有的 shuffleFileGroup,包括其中的磁盤(pán)文件,也就是說(shuō),此時(shí) task 會(huì)將數(shù)據(jù)寫(xiě)入已有的磁盤(pán)文件中,而不會(huì)寫(xiě)入新的磁盤(pán)文件中。因此,consolidate 機(jī)制允許不同的 task 復(fù)用同一批磁盤(pán)文件,這樣就可以有效將多個(gè) task 的磁盤(pán)文件進(jìn)行一定程度上的合并,從而大幅度減少磁盤(pán)文件的數(shù)量,進(jìn)而提升 shuffle write 的性能。

假設(shè)第二個(gè) stage 有 100 個(gè) task,第一個(gè) stage 有 50 個(gè) task,總共還是有 10 個(gè) Executor(Executor CPU 個(gè)數(shù)為 1),每個(gè) Executor 執(zhí)行 5 個(gè) task。那么原本使用未經(jīng)優(yōu)化的 HashShuffleManager 時(shí),每個(gè) Executor 會(huì)產(chǎn)生 500 個(gè)磁盤(pán)文件,所有 Executor 會(huì)產(chǎn)生 5000 個(gè)磁盤(pán)文件的。但是此時(shí)經(jīng)過(guò)優(yōu)化之后,每個(gè) Executor 創(chuàng)建的磁盤(pán)文件的數(shù)量的計(jì)算公式為:cpu core的數(shù)量 * 下一個(gè)stage的task數(shù)量,也就是說(shuō),每個(gè) Executor 此時(shí)只會(huì)創(chuàng)建 100 個(gè)磁盤(pán)文件,所有 Executor 只會(huì)創(chuàng)建 1000 個(gè)磁盤(pán)文件。

這個(gè)功能優(yōu)點(diǎn)明顯,但為什么 Spark 一直沒(méi)有在基于 Hash Shuffle 的實(shí)現(xiàn)中將功能設(shè)置為默認(rèn)選項(xiàng)呢,官方給出的說(shuō)法是這個(gè)功能還欠穩(wěn)定。

優(yōu)化后的 HashShuffleManager 工作原理如下圖所示:

pYYBAGL0tcKAYLdNAAD7yksWDwA591.jpg

優(yōu)化后的HashShuffleManager工作原理

基于 Hash 的 Shuffle 機(jī)制的優(yōu)缺點(diǎn)

優(yōu)點(diǎn):

可以省略不必要的排序開(kāi)銷。

避免了排序所需的內(nèi)存開(kāi)銷。

缺點(diǎn):

生產(chǎn)的文件過(guò)多,會(huì)對(duì)文件系統(tǒng)造成壓力。

大量小文件的隨機(jī)讀寫(xiě)帶來(lái)一定的磁盤(pán)開(kāi)銷。

數(shù)據(jù)塊寫(xiě)入時(shí)所需的緩存空間也會(huì)隨之增加,對(duì)內(nèi)存造成壓力。

二、SortShuffle 解析

SortShuffleManager 的運(yùn)行機(jī)制主要分成三種:

普通運(yùn)行機(jī)制;

bypass 運(yùn)行機(jī)制,當(dāng) shuffle read task 的數(shù)量小于等于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值時(shí)(默認(rèn)為 200),就會(huì)啟用 bypass 機(jī)制;

Tungsten Sort 運(yùn)行機(jī)制,開(kāi)啟此運(yùn)行機(jī)制需設(shè)置配置項(xiàng) spark.shuffle.manager=tungsten-sort。開(kāi)啟此項(xiàng)配置也不能保證就一定采用此運(yùn)行機(jī)制(后面會(huì)解釋)。

1. 普通運(yùn)行機(jī)制

在該模式下,數(shù)據(jù)會(huì)先寫(xiě)入一個(gè)內(nèi)存數(shù)據(jù)結(jié)構(gòu)中,此時(shí)根據(jù)不同的 shuffle 算子,可能選用不同的數(shù)據(jù)結(jié)構(gòu)。如果是 reduceByKey 這種聚合類的 shuffle 算子,那么會(huì)選用 Map 數(shù)據(jù)結(jié)構(gòu),一邊通過(guò) Map 進(jìn)行聚合,一邊寫(xiě)入內(nèi)存;如果是 join 這種普通的 shuffle 算子,那么會(huì)選用 Array 數(shù)據(jù)結(jié)構(gòu),直接寫(xiě)入內(nèi)存。接著,每寫(xiě)一條數(shù)據(jù)進(jìn)入內(nèi)存數(shù)據(jù)結(jié)構(gòu)之后,就會(huì)判斷一下,是否達(dá)到了某個(gè)臨界閾值。如果達(dá)到臨界閾值的話,那么就會(huì)嘗試將內(nèi)存數(shù)據(jù)結(jié)構(gòu)中的數(shù)據(jù)溢寫(xiě)到磁盤(pán),然后清空內(nèi)存數(shù)據(jù)結(jié)構(gòu)。

在溢寫(xiě)到磁盤(pán)文件之前,會(huì)先根據(jù) key 對(duì)內(nèi)存數(shù)據(jù)結(jié)構(gòu)中已有的數(shù)據(jù)進(jìn)行排序。排序過(guò)后,會(huì)分批將數(shù)據(jù)寫(xiě)入磁盤(pán)文件。默認(rèn)的 batch 數(shù)量是 10000 條,也就是說(shuō),排序好的數(shù)據(jù),會(huì)以每批 1 萬(wàn)條數(shù)據(jù)的形式分批寫(xiě)入磁盤(pán)文件。寫(xiě)入磁盤(pán)文件是通過(guò) Java 的 BufferedOutputStream 實(shí)現(xiàn)的。BufferedOutputStream 是 Java 的緩沖輸出流,首先會(huì)將數(shù)據(jù)緩沖在內(nèi)存中,當(dāng)內(nèi)存緩沖滿溢之后再一次寫(xiě)入磁盤(pán)文件中,這樣可以減少磁盤(pán) IO 次數(shù),提升性能。

一個(gè) task 將所有數(shù)據(jù)寫(xiě)入內(nèi)存數(shù)據(jù)結(jié)構(gòu)的過(guò)程中,會(huì)發(fā)生多次磁盤(pán)溢寫(xiě)操作,也就會(huì)產(chǎn)生多個(gè)臨時(shí)文件。最后會(huì)將之前所有的臨時(shí)磁盤(pán)文件都進(jìn)行合并,這就是merge 過(guò)程,此時(shí)會(huì)將之前所有臨時(shí)磁盤(pán)文件中的數(shù)據(jù)讀取出來(lái),然后依次寫(xiě)入最終的磁盤(pán)文件之中。此外,由于一個(gè) task 就只對(duì)應(yīng)一個(gè)磁盤(pán)文件,也就意味著該 task 為下游 stage 的 task 準(zhǔn)備的數(shù)據(jù)都在這一個(gè)文件中,因此還會(huì)單獨(dú)寫(xiě)一份索引文件,其中標(biāo)識(shí)了下游各個(gè) task 的數(shù)據(jù)在文件中的 start offset 與 end offset。

SortShuffleManager 由于有一個(gè)磁盤(pán)文件 merge 的過(guò)程,因此大大減少了文件數(shù)量。比如第一個(gè) stage 有 50 個(gè) task,總共有 10 個(gè) Executor,每個(gè) Executor 執(zhí)行 5 個(gè) task,而第二個(gè) stage 有 100 個(gè) task。由于每個(gè) task 最終只有一個(gè)磁盤(pán)文件,因此此時(shí)每個(gè) Executor 上只有 5 個(gè)磁盤(pán)文件,所有 Executor 只有 50 個(gè)磁盤(pán)文件。

普通運(yùn)行機(jī)制的 SortShuffleManager 工作原理如下圖所示:

pYYBAGL0tcKASW1gAADMtD5xIRQ752.jpg

普通運(yùn)行機(jī)制的SortShuffleManager工作原理

2. bypass 運(yùn)行機(jī)制

Reducer 端任務(wù)數(shù)比較少的情況下,基于 Hash Shuffle 實(shí)現(xiàn)機(jī)制明顯比基于 Sort Shuffle 實(shí)現(xiàn)機(jī)制要快,因此基于 Sort Shuffle 實(shí)現(xiàn)機(jī)制提供了一個(gè)帶 Hash 風(fēng)格的回退方案,就是 bypass 運(yùn)行機(jī)制。對(duì)于 Reducer 端任務(wù)數(shù)少于配置屬性spark.shuffle.sort.bypassMergeThreshold設(shè)置的個(gè)數(shù)時(shí),使用帶 Hash 風(fēng)格的回退計(jì)劃。

bypass 運(yùn)行機(jī)制的觸發(fā)條件如下:

shuffle map task 數(shù)量小于spark.shuffle.sort.bypassMergeThreshold=200參數(shù)的值。不是聚合類的 shuffle 算子。

此時(shí),每個(gè) task 會(huì)為每個(gè)下游 task 都創(chuàng)建一個(gè)臨時(shí)磁盤(pán)文件,并將數(shù)據(jù)按 key 進(jìn)行 hash 然后根據(jù) key 的 hash 值,將 key 寫(xiě)入對(duì)應(yīng)的磁盤(pán)文件之中。當(dāng)然,寫(xiě)入磁盤(pán)文件時(shí)也是先寫(xiě)入內(nèi)存緩沖,緩沖寫(xiě)滿之后再溢寫(xiě)到磁盤(pán)文件的。最后,同樣會(huì)將所有臨時(shí)磁盤(pán)文件都合并成一個(gè)磁盤(pán)文件,并創(chuàng)建一個(gè)單獨(dú)的索引文件。

該過(guò)程的磁盤(pán)寫(xiě)機(jī)制其實(shí)跟未經(jīng)優(yōu)化的 HashShuffleManager 是一模一樣的,因?yàn)槎家獎(jiǎng)?chuàng)建數(shù)量驚人的磁盤(pán)文件,只是在最后會(huì)做一個(gè)磁盤(pán)文件的合并而已。因此少量的最終磁盤(pán)文件,也讓該機(jī)制相對(duì)未經(jīng)優(yōu)化的 HashShuffleManager 來(lái)說(shuō),shuffle read 的性能會(huì)更好。

而該機(jī)制與普通 SortShuffleManager 運(yùn)行機(jī)制的不同在于:第一,磁盤(pán)寫(xiě)機(jī)制不同;第二,不會(huì)進(jìn)行排序。也就是說(shuō),啟用該機(jī)制的最大好處在于,shuffle write 過(guò)程中,不需要進(jìn)行數(shù)據(jù)的排序操作,也就節(jié)省掉了這部分的性能開(kāi)銷。

bypass 運(yùn)行機(jī)制的 SortShuffleManager 工作原理如下圖所示:

poYBAGL0tcOAVLh4AACV6xSKcyg402.jpg

bypass運(yùn)行機(jī)制的SortShuffleManager工作原理

3. Tungsten Sort Shuffle 運(yùn)行機(jī)制

Tungsten Sort 是對(duì)普通 Sort 的一種優(yōu)化,Tungsten Sort 會(huì)進(jìn)行排序,但排序的不是內(nèi)容本身,而是內(nèi)容序列化后字節(jié)數(shù)組的指針(元數(shù)據(jù)),把數(shù)據(jù)的排序轉(zhuǎn)變?yōu)榱酥羔様?shù)組的排序,實(shí)現(xiàn)了直接對(duì)序列化后的二進(jìn)制數(shù)據(jù)進(jìn)行排序。由于直接基于二進(jìn)制數(shù)據(jù)進(jìn)行操作,所以在這里面沒(méi)有序列化和反序列化的過(guò)程。內(nèi)存的消耗大大降低,相應(yīng)的,會(huì)極大的減少的 GC 的開(kāi)銷。

Spark 提供了配置屬性,用于選擇具體的 Shuffle 實(shí)現(xiàn)機(jī)制,但需要說(shuō)明的是,雖然默認(rèn)情況下 Spark 默認(rèn)開(kāi)啟的是基于 SortShuffle 實(shí)現(xiàn)機(jī)制,但實(shí)際上,參考 Shuffle 的框架內(nèi)核部分可知基于 SortShuffle 的實(shí)現(xiàn)機(jī)制與基于 Tungsten Sort Shuffle 實(shí)現(xiàn)機(jī)制都是使用 SortShuffleManager,而內(nèi)部使用的具體的實(shí)現(xiàn)機(jī)制,是通過(guò)提供的兩個(gè)方法進(jìn)行判斷的:

對(duì)應(yīng)非基于 Tungsten Sort 時(shí),通過(guò) SortShuffleWriter.shouldBypassMergeSort 方法判斷是否需要回退到 Hash 風(fēng)格的 Shuffle 實(shí)現(xiàn)機(jī)制,當(dāng)該方法返回的條件不滿足時(shí),則通過(guò) SortShuffleManager.canUseSerializedShuffle 方法判斷是否需要采用基于 Tungsten Sort Shuffle 實(shí)現(xiàn)機(jī)制,而當(dāng)這兩個(gè)方法返回都為 false,即都不滿足對(duì)應(yīng)的條件時(shí),會(huì)自動(dòng)采用普通運(yùn)行機(jī)制。

因此,當(dāng)設(shè)置了 spark.shuffle.manager=tungsten-sort 時(shí),也不能保證就一定采用基于 Tungsten Sort 的 Shuffle 實(shí)現(xiàn)機(jī)制。

要實(shí)現(xiàn) Tungsten Sort Shuffle 機(jī)制需要滿足以下條件:

Shuffle 依賴中不帶聚合操作或沒(méi)有對(duì)輸出進(jìn)行排序的要求。

Shuffle 的序列化器支持序列化值的重定位(當(dāng)前僅支持 KryoSerializer Spark SQL 框架自定義的序列化器)。

Shuffle 過(guò)程中的輸出分區(qū)個(gè)數(shù)少于 16777216 個(gè)。

實(shí)際上,使用過(guò)程中還有其他一些限制,如引入 Page 形式的內(nèi)存管理模型后,內(nèi)部單條記錄的長(zhǎng)度不能超過(guò) 128 MB (具體內(nèi)存模型可以參考 PackedRecordPointer 類)。另外,分區(qū)個(gè)數(shù)的限制也是該內(nèi)存模型導(dǎo)致的。

所以,目前使用基于 Tungsten Sort Shuffle 實(shí)現(xiàn)機(jī)制條件還是比較苛刻的。

聲明:本文內(nèi)容及配圖由入駐作者撰寫(xiě)或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 二進(jìn)制
    +關(guān)注

    關(guān)注

    2

    文章

    796

    瀏覽量

    41757
  • 磁盤(pán)
    +關(guān)注

    關(guān)注

    1

    文章

    380

    瀏覽量

    25286
  • SPARK
    +關(guān)注

    關(guān)注

    1

    文章

    105

    瀏覽量

    19977
收藏 人收藏

    評(píng)論

    相關(guān)推薦

    詳解Zynq的兩種啟動(dòng)模式

    Zynq-7000AP SOC器件有效利用了片上CPU來(lái)幫忙配置,在沒(méi)有外部JTAG的情況下,處理系統(tǒng)(PS)與可編程邏輯(PL)都必須依靠PS來(lái)完成芯片的初始化配置。 ZYNQ的兩種啟動(dòng)模式:從BootROM主動(dòng)啟動(dòng),從JTAG被動(dòng)啟動(dòng)。
    發(fā)表于 08-02 09:33 ?1632次閱讀
    <b class='flag-5'>詳解</b>Zynq的<b class='flag-5'>兩種</b>啟動(dòng)模式

    山西嵌入式系統(tǒng)課程| Spark與Hadoop計(jì)算模型之Spark比Hadoop更...

    有很多種,不像Hadoop只提供了Map和Reduce兩種操作。比如map, filter, flatMap,sample, groupByKey, reduceByKey, union, join
    發(fā)表于 11-17 16:44

    基于Spark 2.1版本的Apache Spark內(nèi)存管理

    Apache Spark 內(nèi)存管理詳解
    發(fā)表于 04-26 17:13

    詳解Spark Shuffle原理及Shuffle操作問(wèn)題解決

    Spark Shuffle原理、Shuffle操作問(wèn)題解決和參數(shù)調(diào)優(yōu)
    發(fā)表于 04-29 17:11

    Spark簡(jiǎn)介與生態(tài)系統(tǒng)

    Spark詳解(一):Spark及其生態(tài)圈概述
    發(fā)表于 06-21 16:45

    Spark Streaming基礎(chǔ)總結(jié)

    實(shí)用計(jì):Spark Streaming消費(fèi)Kafka數(shù)據(jù)的兩種方案
    發(fā)表于 09-11 08:58

    基于RDMA技術(shù)的Spark Shuffle性能提升

    一篇文章教你使用RDMA技術(shù)提升SparkShuffle性能
    發(fā)表于 10-28 16:46

    兩種典型的ADRC算法介紹

    前言??上篇中詳細(xì)闡述了經(jīng)典的自抗擾控制算法的原理,本篇將圍繞兩種ADRC算法展開(kāi),針對(duì)擴(kuò)張狀態(tài)觀測(cè)器的參數(shù)整定問(wèn)題進(jìn)行詳解,同時(shí),對(duì)跟蹤微分器的幾個(gè)重要應(yīng)用進(jìn)行介紹。兩種典型的ADRC算法??自抗
    發(fā)表于 09-07 08:02

    蘋(píng)果音樂(lè)播放器走到盡頭:關(guān)閉iPod Nano和Shuffle條產(chǎn)品線

    據(jù)美國(guó)科技新聞網(wǎng)站AppleInsider報(bào)道,蘋(píng)果已經(jīng)決定停止銷售iPod nano和iPod shuffle兩種播放器產(chǎn)品,未來(lái)只保留iPod Touch產(chǎn)品。
    發(fā)表于 07-28 09:10 ?1610次閱讀

    蘋(píng)果宣布停售iPod nano/shuffle iPod nano/shuffle全回顧:傷心到講不出再見(jiàn)

    據(jù)美國(guó)科技新聞網(wǎng)站AppleInsider報(bào)道,蘋(píng)果已經(jīng)決定停止銷售iPod nano和iPod shuffle兩種播放器產(chǎn)品,未來(lái)只保留iPod Touch產(chǎn)品。
    發(fā)表于 07-28 17:11 ?3299次閱讀

    剖析Spark兩種核心Shuffle

    Spark Shuffle 分為兩種:一是基于 Hash 的 Shuffle;另一是基于
    的頭像 發(fā)表于 10-11 11:15 ?1978次閱讀
    剖析<b class='flag-5'>Spark</b>的<b class='flag-5'>兩種</b><b class='flag-5'>核心</b><b class='flag-5'>Shuffle</b>

    兩種MOS冗余驅(qū)動(dòng)方案

    兩種MOS冗余驅(qū)動(dòng)方案
    發(fā)表于 10-28 12:00 ?2次下載
    <b class='flag-5'>兩種</b>MOS冗余驅(qū)動(dòng)方案

    詳解PMSM中常用的兩種坐標(biāo)變換

    期介紹了Clarke的Park變化的基本原理,但是經(jīng)過(guò)這兩種變換后會(huì)存在兩種系數(shù),相信大家都很迷惑,這是什么原因? 主要原因是存在兩種遵循的方式:1、變換前后電流所產(chǎn)生的旋轉(zhuǎn)磁場(chǎng)等
    的頭像 發(fā)表于 01-19 15:52 ?2620次閱讀
    <b class='flag-5'>詳解</b>PMSM中常用的<b class='flag-5'>兩種</b>坐標(biāo)變換

    NVIDIA 攜手騰訊開(kāi)發(fā)和優(yōu)化 Spark UCX 實(shí)現(xiàn)性能躍升

    騰訊網(wǎng)絡(luò)平臺(tái)部與數(shù)據(jù)平臺(tái)部,聯(lián)合 NVIDIA 合作開(kāi)發(fā)和優(yōu)化 Spark UCX,最終實(shí)現(xiàn) Spark Shuffle 穩(wěn)定加速 15% - 20%,平均降低現(xiàn)網(wǎng) Spark 任務(wù)
    的頭像 發(fā)表于 08-25 20:50 ?830次閱讀
    NVIDIA 攜手騰訊開(kāi)發(fā)和優(yōu)化 <b class='flag-5'>Spark</b> UCX 實(shí)現(xiàn)性能躍升

    spark為什么比mapreduce快?

    spark為什么比mapreduce快? 首先澄清幾個(gè)誤區(qū): 1:者都是基于內(nèi)存計(jì)算的,任何計(jì)算框架都肯定是基于內(nèi)存的,所以網(wǎng)上說(shuō)的spark是基于內(nèi)存計(jì)算所以快,顯然是錯(cuò)誤的 2;DAG計(jì)算模型
    的頭像 發(fā)表于 09-06 09:45 ?320次閱讀
    大发888ber| 金殿百家乐的玩法技巧和规则| 百家乐官网注码法| 大发888备用网址大发娱乐城| 百家乐代理网址| 太阳城百家乐官网杀祖玛| 博彩评测网| 金樽百家乐的玩法技巧和规则| 玩百家乐官网掉房| 百家乐官网娱乐城新澳博| 大发888游戏平台hg dafa 888 gw| 百家乐玄机| 金海岸百家乐官网的玩法技巧和规则 | 大发888娱乐官方下载| 永利高百家乐怎样开户| 百家乐官网赌法| 龙虎斗网站| 大发888官网网址| 百家乐生活馆拖鞋| 百家乐官网真人游戏| 百家乐官网轮盘技巧| 欧洲娱乐场| 白山在线棋牌游戏| 百家乐园是真的不| 百家乐赌场彩| 免费百家乐官网预测软件| 百家乐官网开和几率| 百家乐官网概率投注| 云博娱乐城| 大发888娱乐城xiazai| 金界百家乐的玩法技巧和规则| 澳门百家乐官网官网| 百家乐官网注册彩金| 百家乐官网概率怎么算| 博九网| 棋牌室高尔夫娱乐场| 全讯网五湖四海| 娱乐百家乐的玩法技巧和规则| 百家乐娱乐城官方网| 真人百家乐网西陆| 做生意看风水|