1、可以先从数据的根源进行数据倾斜的解决。Spark读取的数据一般来源于HDFS,而HDFS上的数据大部分来源于Hive的数据清洗,可以通过hive的数据清洗过程,在源头就把数据进行聚合,这样Spark程序直接获取聚合后的数据,就无需进行shuffle操作,不存在shuffle过程,也就不存在数据倾斜的问题。
2、过滤掉导致倾斜的key。根据业务需求,如果导致倾斜的key不是很重要的话,可以直接在读取数据时进行过滤。
3、如果前两种方法不能使用的话,可以增加shuffle操作的reduce并行度,把key分散到不同的reduce上(在shuffle算子如groupbykey,reducebykey等方法中额外传入分区数量参数)。
4、使用随机前缀双重聚合。适用范围:groupbykey和reducebykey
把一次聚合分解成两次聚合,第一次先进行局部的聚合。
具体方法是在所有的key前面加上一个随机前缀(比如0-100),这样就算是相同的key,加上前缀后,也会被打散到不同的reduce端去,先进行第一轮的局部聚合,然后再把前缀去掉,再进行第二轮的聚合,完成全局聚合。
5、将reduce join转换成map join。
在join操作过程中,所有分区的数据将会按照key打散到不同的reduce中,在shuffle前先回把两个表分别进行reduce join,把key相同的数据的values汇聚起来(形成类似Map<key,List<values>>)的形式,然后再进行join,
这种情况下,可以采用广播变量的形式,把其中一个表给广播出去,所有的executor上都存有一份,那么就无需进行shuffle过程,直接使用map进行聚合即可。
但是这种方式仅适用于两个表中至少有一个表比较小的情况下,如果两个表都是大表的话,broadcast可能会导致内存不足,最终导致内存溢出。
6、sample采样倾斜key进行两次join。
先使用sample算子对大表进行采样,得到倾斜程度最大的几个key,然后把这几个key对应的数据从原来的RDD中分离出来,形成两份RDD,分别对另一个表join后再进行union。
如果这几个key分离出来之后,还是占据了大部分的数据,这种情况下,可以使用更近一步地优化,在另一个表中,把这几个key的数据也给筛选分离出来,然后对这个分离出来的包含倾斜key的数据集进行扩展,每个key都加上一个前缀(0-100),注意 这里是每个key都要加上所有的前缀(比如一开始有(key1,value1),扩展后成为(0_key1,value1),(1_key1,value1)…(100_key1,value1)),然后对第一个表分离出来的含有倾斜key的RDD的所有key都加上一个随机前缀,然后这个RDD和扩展的RDD进行join操作(此时这两个RDD均只包含了倾斜key,一个加了随机前缀,另一个进行了扩展),由于加上了随机前缀,即使这些key对应的数据量很大,也能够打散到不同的reduce上去。然后剩下的两个没有包含倾斜key的RDD也进行join,然后将两次join的结果进行union即可得到结果。
但是,如果倾斜的key很多的情况下,上面的方法就不是很好用了,可以采用下面的最后一个方案。
7、随机数扩容表join。
其实这个方法和第6个方法基本思路是差不多的,就是更为直接,直接对其中一个表进行扩容,然后另一个表全部加上随机前缀,然后进行join,然后再把结果去掉前缀即可。
这几个方法最好自上而下使用,前面的方法行不通了再考虑使用后面的方法。最后一个方法是最后的无奈之举,一般是用于两个大表之间的join,因为是大表之间的join,因此直接扩容的倍数不能太大,一般是10倍,如果是第6种方法的话,如果数据量不是特别多的话,可以扩容100倍甚至更多。