我有一个JavaPairRDD< Integer,Integer []>我想在其上执行groupByKey操作.
groupByKey动作给了我一个:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle
如果我没有弄错的话,这实际上是一个OutOfMemory错误.这只发生在大数据集中(在我的情况下,当Web UI中显示的“Shuffle Write”为~96GB时).
我已经设定:
spark.serializer org.apache.spark.serializer.KryoSerializer
在$SPARK_HOME / conf / spark-defaults.conf中,但我不确定Kryo是否用于序列化我的JavaPairRDD.
除了设置这个conf参数之外,我还应该做些什么才能使用Kryo序列化我的RDD?我可以在serialization instructions中看到:
Spark automatically includes Kryo serializers for the many commonly-used core Scala classes covered in the AllScalaRegistrar from the Twitter chill library.
然后:
Since Spark 2.0.0,we internally use Kryo serializer when shuffling RDDs with simple types,arrays of simple types,or string type.
我还注意到,当我将spark.serializer设置为Kryo时,Web UI中的Shuffle Write从~96GB(默认序列化器)增加到243GB!
编辑:在评论中,我被问及我的程序的逻辑,以防groupByKey可以用reduceByKey替换.我不认为这是可能的,但无论如何它在这里:
>输入具有以下形式:
> key:index bucket id,
> value:此存储桶中的整数实体ID数组
> shuffle write操作以以下形式生成对:
> entityId
>同一个桶中所有实体ID的整数数组(称为邻居)
> groupByKey操作收集每个实体的所有邻居数组,其中一些可能出现多次(在许多存储桶中).
>在groupByKey操作之后,我为每个桶保留一个权重(基于它包含的负实体id的数量),并且对于每个邻居id,我总结它所属的桶的权重.
>我将每个邻居id的分数标准化为另一个值(假设它已经给出)并且每个实体发出前3个邻居.
我得到的不同密钥的数量大约是1000万(大约500万个正实体ID和500万个负数).
EDIT2:我尝试分别使用Hadoop的Writables(VIntWritable和VIntArrayWritable扩展ArrayWritable)而不是Integer和Integer [],但是shuffle大小仍然比默认的JavaSerializer大.
然后我将spark.shuffle.memoryFraction从0.2增加到0.4(即使在版本2.1.0中已弃用,也没有说明应该使用的内容)并启用offHeap内存,并且shuffle大小减少了~20GB.即使这符合标题所要求的内容,我更倾向于使用更算法的解决方案,或者包含更好压缩的解决方案.