我从s3文件输入以下DataFrame,需要将数据转换为以下所需的输出.我使用Spark版本1.5.1和
Scala,但可以用
Python改为Spark.欢迎任何建议.
DataFrame输入:
name animal data john mouse aaaaa bob mouse bbbbb bob mouse ccccc bob dog ddddd
期望的输出:
john/mouse/file.csv bob/mouse/file.csv bob/dog/file.csv terminal$cat bob/mouse/file.csv bbbbb ccccc terminal$cat bob/dog/file.csv ddddd
这是我尝试过的现有Spark Scala代码:
val sc = new SparkContext(new SparkConf()) val sqlc = new org.apache.spark.sql.sqlContext(sc) val df = sqlc.read.json("raw.gz") val cols = Seq("name","animal") df.groupBy(cols.head,cols.tail: _*).count().take(100).foreach(println)
电流输出:
[john,mouse,1] [bob,2] [bob,dog,1]
我现有代码的一些问题是groupBy返回一个GroupedData对象,我可能不想对该数据执行count / sum / agg函数.我正在寻找一种更好的技术来分组和输出数据.数据集非常大.
解决方法
这可以使用DataFrameWriter的partitionBy选项来实现.一般语法如下:
df.write.partitionBy("name","animal").format(...).save(...)
不幸的是,支持Spark 1.5中分区的唯一纯文本格式是JSON.
如果您可以将Spark安装更新为:
> 1.6 – 您可以将partitionBy与文本格式一起使用.如果您需要组的单个输出文件(重新分区),则还需要1.6.
> 2.0 – 您可以将partitionBy与csv格式一起使用.
我相信在1.5中你最好的选择是将文件写为JSON并转换单个输出文件.
如果不同名称’,’动物的数量很小,您可以尝试为每个组执行单独的写入:
val dist = df.select("name","animal").rdd.collect.map { case Row(name: String,animal: String) => (name,animal) } for { (name,animal) <- dist } df.where($"name" === name && $"animal" === animal) .select($"data").write.format("csv").save(s"/prefix/$name/$animal")