轉(zhuǎn)帖|使用教程|編輯:龔雪|2017-03-16 11:22:22.000|閱讀 736 次
概述:本文結(jié)合實(shí)例詳細(xì)闡明了Spark數(shù)據(jù)傾斜的幾種場(chǎng)景以及對(duì)應(yīng)的解決方案,包括避免數(shù)據(jù)源傾斜,調(diào)整并行度,使用自定義Partitioner,使用Map側(cè)Join代替Reduce側(cè)Join,給傾斜Key加上隨機(jī)前綴等。
# 界面/圖表報(bào)表/文檔/IDE等千款熱門軟控件火熱銷售中 >>
對(duì)Spark/Hadoop這樣的大數(shù)據(jù)系統(tǒng)來(lái)講,數(shù)據(jù)量大并不可怕,可怕的是數(shù)據(jù)傾斜。
何謂數(shù)據(jù)傾斜?數(shù)據(jù)傾斜指的是,并行處理的數(shù)據(jù)集中,某一部分(如Spark或Kafka的一個(gè)Partition)的數(shù)據(jù)顯著多于其它部分,從而使得該部分的處理速度成為整個(gè)數(shù)據(jù)集處理的瓶頸。
在Spark中,同一個(gè)Stage的不同Partition可以并行處理,而具有依賴關(guān)系的不同Stage之間是串行處理的。假設(shè)某個(gè)Spark Job分為Stage 0和Stage 1兩個(gè)Stage,且Stage 1依賴于Stage 0,那Stage 0完全處理結(jié)束之前不會(huì)處理Stage 1。而Stage 0可能包含N個(gè)Task,這N個(gè)Task可以并行進(jìn)行。如果其中N-1個(gè)Task都在10秒內(nèi)完成,而另外一個(gè)Task卻耗時(shí)1分鐘,那該Stage的總時(shí)間至少為1分鐘。換句話說(shuō),一個(gè)Stage所耗費(fèi)的時(shí)間,主要由最慢的那個(gè)Task決定。
由于同一個(gè)Stage內(nèi)的所有Task執(zhí)行相同的計(jì)算,在排除不同計(jì)算節(jié)點(diǎn)計(jì)算能力差異的前提下,不同Task之間耗時(shí)的差異主要由該Task所處理的數(shù)據(jù)量決定。
Stage的數(shù)據(jù)來(lái)源主要分為如下兩類
以Spark Stream通過(guò)DirectStream方式讀取Kafka數(shù)據(jù)為例。由于Kafka的每一個(gè)Partition對(duì)應(yīng)Spark的一個(gè)Task(Partition),所以Kafka內(nèi)相關(guān)Topic的各Partition之間數(shù)據(jù)是否平衡,直接決定Spark處理該數(shù)據(jù)時(shí)是否會(huì)產(chǎn)生數(shù)據(jù)傾斜。
如《Kafka設(shè)計(jì)解析(一)- Kafka背景及架構(gòu)介紹》一文所述,Kafka某一Topic內(nèi)消息在不同Partition之間的分布,主要由Producer端所使用的Partition實(shí)現(xiàn)類決定。如果使用隨機(jī)Partitioner,則每條消息會(huì)隨機(jī)發(fā)送到一個(gè)Partition中,從而從概率上來(lái)講,各Partition間的數(shù)據(jù)會(huì)達(dá)到平衡。此時(shí)源Stage(直接讀取Kafka數(shù)據(jù)的Stage)不會(huì)產(chǎn)生數(shù)據(jù)傾斜。
但很多時(shí)候,業(yè)務(wù)場(chǎng)景可能會(huì)要求將具備同一特征的數(shù)據(jù)順序消費(fèi),此時(shí)就需要將具有相同特征的數(shù)據(jù)放于同一個(gè)Partition中。一個(gè)典型的場(chǎng)景是,需要將同一個(gè)用戶相關(guān)的PV信息置于同一個(gè)Partition中。此時(shí),如果產(chǎn)生了數(shù)據(jù)傾斜,則需要通過(guò)其它方式處理。
Spark在做Shuffle時(shí),默認(rèn)使用HashPartitioner(非Hash Shuffle)對(duì)數(shù)據(jù)進(jìn)行分區(qū)。如果并行度設(shè)置的不合適,可能造成大量不相同的Key對(duì)應(yīng)的數(shù)據(jù)被分配到了同一個(gè)Task上,造成該Task所處理的數(shù)據(jù)遠(yuǎn)大于其它Task,從而造成數(shù)據(jù)傾斜。
如果調(diào)整Shuffle時(shí)的并行度,使得原本被分配到同一Task的不同Key發(fā)配到不同Task上處理,則可降低原Task所需處理的數(shù)據(jù)量,從而緩解數(shù)據(jù)傾斜問(wèn)題造成的短板效應(yīng)。
現(xiàn)有一張測(cè)試表,名為student_external,內(nèi)有10.5億條數(shù)據(jù),每條數(shù)據(jù)有一個(gè)唯一的id值。現(xiàn)從中取出id取值為9億到10.5億的共1.5條數(shù)據(jù),并通過(guò)一些處理,使得id為9億到9.4億間的所有數(shù)據(jù)對(duì)12取模后余數(shù)為8(即在Shuffle并行度為12時(shí)該數(shù)據(jù)集全部被HashPartition分配到第8個(gè)Task),其它數(shù)據(jù)集對(duì)其id除以100取整,從而使得id大于9.4億的數(shù)據(jù)在Shuffle時(shí)可被均勻分配到所有Task中,而id小于9.4億的數(shù)據(jù)全部分配到同一個(gè)Task中。處理過(guò)程如下
INSERT OVERWRITE TABLE test
SELECT CASE WHEN id < 940000000 THEN (9500000 + (CAST (RAND() * 8 AS INTEGER)) * 12 )
ELSE CAST(id/100 AS INTEGER)
END,
name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;
通過(guò)上述處理,一份可能造成后續(xù)數(shù)據(jù)傾斜的測(cè)試數(shù)據(jù)即以準(zhǔn)備好。接下來(lái),使用Spark讀取該測(cè)試數(shù)據(jù),并通過(guò)groupByKey(12)
對(duì)id分組處理,且Shuffle并行度為12。代碼如下
public class SparkDataSkew {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder()
.appName("SparkDataSkewTunning")
.config("hive.metastore.uris", "thrift://hadoop1:9083")
.enableHiveSupport()
.getOrCreate();
Dataset dataframe = sparkSession.sql( "select * from test");
dataframe.toJavaRDD()
.mapToPair((Row row) -> new Tuple2(row.getInt(0),row.getString(1)))
.groupByKey(12)
.mapToPair((Tuple2> tuple) -> {
int id = tuple._1();
AtomicInteger atomicInteger = new AtomicInteger(0);
tuple._2().forEach((String name) -> atomicInteger.incrementAndGet());
return new Tuple2(id, atomicInteger.get());
}).count();
sparkSession.stop();
sparkSession.close();
}
}
本次實(shí)驗(yàn)所使用集群節(jié)點(diǎn)數(shù)為4,每個(gè)節(jié)點(diǎn)可被Yarn使用的CPU核數(shù)為16,內(nèi)存為16GB。使用如下方式提交上述應(yīng)用,將啟動(dòng)4個(gè)Executor,每個(gè)Executor可使用核數(shù)為12(該配置并非生產(chǎn)環(huán)境下的最優(yōu)配置,僅用于本文實(shí)驗(yàn)),可用內(nèi)存為12GB。
spark-submit --queue ambari --num-executors 4 --executor-cores 12 --executor-memory 12g --class com.jasongj.spark.driver.SparkDataSkew --master yarn --deploy-mode client SparkExample-with-dependencies-1.0.jar
GroupBy Stage的Task狀態(tài)如下圖所示,Task 8處理的記錄數(shù)為4500萬(wàn),遠(yuǎn)大于(9倍于)其它11個(gè)Task處理的500萬(wàn)記錄。而Task 8所耗費(fèi)的時(shí)間為38秒,遠(yuǎn)高于其它11個(gè)Task的平均時(shí)間(16秒)。整個(gè)Stage的時(shí)間也為38秒,該時(shí)間主要由最慢的Task 8決定。
在這種情況下,可以通過(guò)調(diào)整Shuffle并行度,使得原來(lái)被分配到同一個(gè)Task(即該例中的Task 8)的不同Key分配到不同Task,從而降低Task 8所需處理的數(shù)據(jù)量,緩解數(shù)據(jù)傾斜。
通過(guò)groupByKey(48)
將Shuffle并行度調(diào)整為48,重新提交到Spark。新的Job的GroupBy Stage所有Task狀態(tài)如下圖所示。
從上圖可知,記錄數(shù)最多的Task 20處理的記錄數(shù)約為1125萬(wàn),相比于并行度為12時(shí)Task 8的4500萬(wàn),降低了75%左右,而其耗時(shí)從原來(lái)Task 8的38秒降到了24秒。
在這種場(chǎng)景下,調(diào)整并行度,并不意味著一定要增加并行度,也可能是減小并行度。如果通過(guò)groupByKey(11)
將Shuffle并行度調(diào)整為11,重新提交到Spark。新Job的GroupBy Stage的所有Task狀態(tài)如下圖所示。
從上圖可見(jiàn),處理記錄數(shù)最多的Task 6所處理的記錄數(shù)約為1045萬(wàn),耗時(shí)為23秒。處理記錄數(shù)最少的Task 1處理的記錄數(shù)約為545萬(wàn),耗時(shí)12秒。
適用場(chǎng)景
大量不同的Key被分配到了相同的Task造成該Task數(shù)據(jù)量過(guò)大。
解決方案
調(diào)整并行度。一般是增大并行度,但有時(shí)如本例減小并行度也可達(dá)到效果。
優(yōu)勢(shì)
實(shí)現(xiàn)簡(jiǎn)單,可在需要Shuffle的操作算子上直接設(shè)置并行度或者使用spark.default.parallelism
設(shè)置。如果是Spark SQL,還可通過(guò)SET spark.sql.shuffle.partitions=[num_tasks]
設(shè)置并行度。可用最小的代價(jià)解決問(wèn)題。一般如果出現(xiàn)數(shù)據(jù)傾斜,都可以通過(guò)這種方法先試驗(yàn)幾次,如果問(wèn)題未解決,再嘗試其它方法。
劣勢(shì)
適用場(chǎng)景少,只能將分配到同一Task的不同Key分散開(kāi),但對(duì)于同一Key傾斜嚴(yán)重的情況該方法并不適用。并且該方法一般只能緩解數(shù)據(jù)傾斜,沒(méi)有徹底消除問(wèn)題。從實(shí)踐經(jīng)驗(yàn)來(lái)看,其效果一般。
使用自定義的Partitioner(默認(rèn)為HashPartitioner),將原本被分配到同一個(gè)Task的不同Key分配到不同Task。
以上述數(shù)據(jù)集為例,繼續(xù)將并發(fā)度設(shè)置為12,但是在groupByKey
算子上,使用自定義的Partitioner
(實(shí)現(xiàn)如下)
.groupByKey(new Partitioner() {
@Override
public int numPartitions() {
return 12;
}
@Override
public int getPartition(Object key) {
int id = Integer.parseInt(key.toString());
if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {
return (id - 9500000) / 12;
} else {
return id % 12;
}
}
})
由下圖可見(jiàn),使用自定義Partition后,耗時(shí)最長(zhǎng)的Task 6處理約1000萬(wàn)條數(shù)據(jù),用時(shí)15秒。并且各Task所處理的數(shù)據(jù)集大小相當(dāng)。
適用場(chǎng)景
大量不同的Key被分配到了相同的Task造成該Task數(shù)據(jù)量過(guò)大。
解決方案
使用自定義的Partitioner實(shí)現(xiàn)類代替默認(rèn)的HashPartitioner,盡量將所有不同的Key均勻分配到不同的Task中。
優(yōu)勢(shì)
不影響原有的并行度設(shè)計(jì)。如果改變并行度,后續(xù)Stage的并行度也會(huì)默認(rèn)改變,可能會(huì)影響后續(xù)Stage。
劣勢(shì)
適用場(chǎng)景有限,只能將不同Key分散開(kāi),對(duì)于同一Key對(duì)應(yīng)數(shù)據(jù)集非常大的場(chǎng)景不適用。效果與調(diào)整并行度類似,只能緩解數(shù)據(jù)傾斜而不能完全消除數(shù)據(jù)傾斜。而且需要根據(jù)數(shù)據(jù)特點(diǎn)自定義專用的Partitioner,不夠靈活。
通過(guò)如下SQL創(chuàng)建一張具有傾斜Key且總記錄數(shù)為1.5億的大表test。
INSERT OVERWRITE TABLE test
SELECT CAST(CASE WHEN id < 980000000 THEN (95000000 + (CAST (RAND() * 4 AS INT) + 1) * 48 )
ELSE CAST(id/10 AS INT) END AS STRING),
name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;
使用如下SQL創(chuàng)建一張數(shù)據(jù)分布均勻且總記錄數(shù)為50萬(wàn)的小表test_new。
INSERT OVERWRITE TABLE test_new
SELECT CAST(CAST(id/10 AS INT) AS STRING),
name
FROM student_delta_external
WHERE id BETWEEN 950000000 AND 950500000;
直接通過(guò)Spark Thrift Server提交如下SQL將表test與表test_new進(jìn)行Join并將Join結(jié)果存于表test_join中。
INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;
該SQL對(duì)應(yīng)的DAG如下圖所示。從該圖可見(jiàn),該執(zhí)行過(guò)程總共分為三個(gè)Stage,前兩個(gè)用于從Hive中讀取數(shù)據(jù),同時(shí)二者進(jìn)行Shuffle,通過(guò)最后一個(gè)Stage進(jìn)行Join并將結(jié)果寫入表test_join中。
從下圖可見(jiàn),最近Join Stage各Task處理的數(shù)據(jù)傾斜嚴(yán)重,處理數(shù)據(jù)量最大的Task耗時(shí)7.1分鐘,遠(yuǎn)高于其它無(wú)數(shù)據(jù)傾斜的Task約2s秒的耗時(shí)。
接下來(lái),嘗試通過(guò)Broadcast實(shí)現(xiàn)Map側(cè)Join。實(shí)現(xiàn)Map側(cè)Join的方法,并非直接通過(guò)CACHE TABLE test_new
將小表test_new進(jìn)行cache。現(xiàn)通過(guò)如下SQL進(jìn)行Join。
CACHE TABLE test_new;
INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;
通過(guò)如下DAG圖可見(jiàn),該操作仍分為三個(gè)Stage,且仍然有Shuffle存在,唯一不同的是,小表的讀取不再直接掃描Hive表,而是掃描內(nèi)存中緩存的表。
并且數(shù)據(jù)傾斜仍然存在。如下圖所示,最慢的Task耗時(shí)為7.1分鐘,遠(yuǎn)高于其它Task的約2秒。
正確的使用Broadcast實(shí)現(xiàn)Map側(cè)Join的方式是,通過(guò)SET spark.sql.autoBroadcastJoinThreshold=104857600;
將Broadcast的閾值設(shè)置得足夠大。
再次通過(guò)如下SQL進(jìn)行Join。
SET spark.sql.autoBroadcastJoinThreshold=104857600;
INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;
通過(guò)如下DAG圖可見(jiàn),該方案只包含一個(gè)Stage。
并且從下圖可見(jiàn),各Task耗時(shí)相當(dāng),無(wú)明顯數(shù)據(jù)傾斜現(xiàn)象。并且總耗時(shí)為1.5分鐘,遠(yuǎn)低于Reduce側(cè)Join的7.3分鐘。
適用場(chǎng)景
參與Join的一邊數(shù)據(jù)集足夠小,可被加載進(jìn)Driver并通過(guò)Broadcast方法廣播到各個(gè)Executor中。
解決方案
在Java/Scala代碼中將小數(shù)據(jù)集數(shù)據(jù)拉取到Driv
更多行業(yè)資訊,更新鮮的技術(shù)動(dòng)態(tài),盡在。
本站文章除注明轉(zhuǎn)載外,均為本站原創(chuàng)或翻譯。歡迎任何形式的轉(zhuǎn)載,但請(qǐng)務(wù)必注明出處、不得修改原文相關(guān)鏈接,如果存在內(nèi)容上的異議請(qǐng)郵件反饋至chenjj@fc6vip.cn