@H_404_2@
使用spark保存记录时,有没有办法获取写入的记录数?虽然我知道目前不符合规范,但我希望能够做到这样的事情:
val count = df.write.csv(path)
或者,能够进行步骤结果的内联计数(优选地不使用标准累加器)将(几乎)同样有效.即:
dataset.countTo(count_var).filter({function}).countTo(filtered_count_var).collect()
有任何想法吗?
解决方法
我使用
SparkListener
可以拦截可用于访问任务指标的onTaskEnd或onStageCompleted事件.
任务指标为您提供Spark用于在sql选项卡中显示指标的累加器(在查询的详细信息中).
例如,以下查询:
spark. read. option("header",true). csv("../datasets/people.csv"). limit(10). write. csv("people")
给出了10个输出行,因此Spark知道它(你也可以).
您还可以探索Spark sql的QueryExecutionListener:
The interface of query execution listener that can be used to analyze execution metrics.
您可以使用可用作spark.listenerManager的ExecutionListenerManager
注册QueryExecutionListener.
scala> :type spark.listenerManager org.apache.spark.sql.util.ExecutionListenerManager scala> spark.listenerManager. clear clone register unregister
我认为它更接近“裸机”,但之前没有使用过.
@D3V(在评论部分中)提到使用结构化查询的QueryExecution访问numOutputRows sql指标.值得考虑的事情.
scala> :type q org.apache.spark.sql.DataFrame scala> :type q.queryExecution.executedPlan.metrics Map[String,org.apache.spark.sql.execution.metric.sqlMetric] q.queryExecution.executedPlan.metrics("numOutputRows").value
@H_404_2@