spark – 如何减少JavaPairRDD的shuffle大小?

前端之家收集整理的这篇文章主要介绍了spark – 如何减少JavaPairRDD的shuffle大小?前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

我有一个JavaPairRDD< Integer,Integer []>我想在其上执行groupByKey操作.

groupByKey动作给了我一个:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle

如果我没有弄错的话,这实际上是一个OutOfMemory错误.这只发生在大数据集中(在我的情况下,当Web UI中显示的“Shuffle Write”为~96GB时).

我已经设定:

spark.serializer org.apache.spark.serializer.KryoSerializer

在$SPARK_HOME / conf / spark-defaults.conf中,但我不确定Kryo是否用于序列化我的JavaPairRDD.

除了设置这个conf参数之外,我还应该做些什么才能使用Kryo序列化我的RDD?我可以在serialization instructions中看到:

Spark automatically includes Kryo serializers for the many commonly-used core Scala classes covered in the AllScalaRegistrar from the Twitter chill library.

然后:

Since Spark 2.0.0,we internally use Kryo serializer when shuffling RDDs with simple types,arrays of simple types,or string type.

我还注意到,当我将spark.serializer设置为Kryo时,Web UI中的Shuffle Write从~96GB(默认序列化器)增加到243GB!

编辑:在评论中,我被问及我的程序的逻辑,以防groupByKey可以用reduceByKey替换.我不认为这是可能的,但无论如何它在这里:

>输入具有以下形式:

> key:index bucket id,
> value:此存储桶中的整数实体ID数组

> shuffle write操作以以下形式生成对:

> entityId
>同一个桶中所有实体ID的整数数组(称为邻居)

> groupByKey操作收集每个实体的所有邻居数组,其中一些可能出现多次(在许多存储桶中).
>在groupByKey操作之后,我为每个桶保留一个权重(基于它包含的负实体id的数量),并且对于每个邻居id,我总结它所属的桶的权重.
>我将每个邻居id的分数标准化为另一个值(假设它已经给出)并且每个实体发出前3个邻居.

我得到的不同密钥的数量大约是1000万(大约500万个正实体ID和500万个负数).

EDIT2:我尝试分别使用Hadoop的Writables(VIntWritable和VIntArrayWritable扩展ArrayWritable)而不是Integer和Integer [],但是shuffle大小仍然比默认的JavaSerializer大.

然后我将spark.shuffle.memoryFraction从0.2增加到0.4(即使在版本2.1.0中已弃用,也没有说明应该使用的内容)并启用offHeap内存,并且shuffle大小减少了~20GB.即使这符合标题所要求的内容,我更倾向于使用更算法的解决方案,或者包含更好压缩的解决方案.

最佳答案
我认为这里可以推荐的最佳方法(没有输入数据的更多具体知识)通常是在输入RDD上使用持久化API.

作为第一步,我尝试在输入上调用.persist(MEMORY_ONLY_SER),RDD以降低内存使用量(尽管在某个cpu开销下,对于你的情况来说不应该是针对整数的问题).

如果这还不够,你可以试试.persist(MEMORY_AND_DISK_SER)或者如果你的shuffle仍然占用了大量内存,那么输入数据集需要在内存上更容易.persist(DISK_ONLY)可能是一个选项,但是会强烈选择性能恶化.

猜你在找的Java相关文章