我有一个使用Apache Spark的
Java程序.该程序中最有趣的部分如下所示:
long seed = System.nanoTime(); JavaRDD<AnnotatedDocument> annotated = documents .mapPartitionsWithIndex(new InitialAnnotater(seed),true); annotated.cache(); for (int iter = 0; iter < 2000; iter++) { GlobalCounts counts = annotated .mapPartitions(new GlobalCounter()) .reduce((a,b) -> a.sum(b)); // update overall counts (*) seed = System.nanoTime(); // copy overall counts which CountChanger uses to compute a stochastic thing (**) annotated = annotated .mapPartitionsWithIndex(new CountChanger(counts,seed),true); annotated.cache(); // adding these lines causes constant time complexity like i want //List<AnnotatedDocument> ll = annotated.collect(); //annotated = sc.parallelize(ll,8); }
因此,实际上,行(**)会生成带有表单的RDD
documents .mapPartitionsWithIndex(initial) .mapPartitionsWithIndex(nextIter) .mapPartitionsWithIndex(nextIter) .mapPartitionsWithIndex(nextIter) ... 2000 more
确实是一个很长的地图链.此外,由于需要更新计数,因此行(*)会在每次迭代时强制计算(非惰性).
我遇到的问题是,我得到的时间复杂度随着每次迭代而线性增加,因此总体上呈二次方式:
我认为这是因为Spark试图“记住”链中的每个RDD,以及容错算法或导致其增长的任何因素.但是,我真的不知道.
我真正想做的是在每次迭代时告诉Spark“崩溃”RDD,以便只有最后一个保存在内存中并继续工作.我认为这应该导致每次迭代的时间不变.这可能吗?还有其他解决方案吗?
谢谢!
解决方法
尝试使用rdd.checkpoint.这将把RDD保存到hdfs并清除沿袭.
每次转换RDD时,都会增加谱系,Spark必须跟踪可用内容和必须重新计算的内容.处理DAG是昂贵的,并且大型DAG倾向于非常快地杀死性能.通过“检查点”,您可以指示Spark计算并保存生成的RDD,并丢弃其创建方式的信息.这使得它类似于简单地保存RDD并将其读回,从而最大限度地减少DAG操作.
在旁注中,由于您遇到了这个问题,因此最好知道union也会通过添加步骤来影响RDD性能,并且由于沿袭信息的方式,也可能抛出StackOverflowError. See this post
This link有更多细节和漂亮的图表,主题也提到了in this SO post.