架构对于每一行都是灵活的.这意味着我不能为每个案例创建案例类(我的一些数据可以嵌套).
我试图从我的UDF函数返回元组,但我也没有运气(因为我需要从列表转换为元组),我没有找到一个优雅的解决方案.
我正在返回的数据类型是String,Integer,Double,DateTime,它们的顺序不同.
我试图在DataFrame上使用map,但是我的架构有问题.
import spark.implicits._ def processData(row_type: String) = { /* completely random output here. Tuple/List/Array of elements with a type Integer,String,DateType. */ // pseudo-code starts here if row_type == A (1,"second",3) else (1,3,4) } val processDataUDF = udf((row_type: String) => processData(row_type)) val df = Seq((0,1),(1,2)).toDF("a","b") val df2 = df.select(processDataUDF($"a")) df2.show(5) df2.printSchema()
结果
+------------+ | UDF(a)| +------------+ |[1,second,3]| |[1,3]| +------------+
我该如何处理这个问题?每个row_type有不同的处理结果.所有row_type都是动态设置的.我可以为每个row_type提供很好的Schema,但我不能使用不同的模式生成相同的UDF返回结果.
使用地图是唯一的方法吗?
解决方法
您可以通过以下方式实现一些灵
>定义表示所有可能字段的超集的模式,并将各列标记为可为空.只有在没有类型冲突的情况下才有可能(如果Row包含字段foo,它总是使用相同的sql类型表示).
>使用集合类型(MapType,ArrayType)表示可变大小的字段.所有值和/或键必须属于同一类型.
>将原始数据重塑为可通过固定架构实际表示的点. Spark包括作为其依赖项的json4s
,它为merging,diffing和querying JSON数据提供了一组工具.如果需要,它可用于应用相对复杂的转换.
如果这不实用,我建议保持JSON字段“按原样”并仅按需解析它以提取特定值.您可以使用get_json_object和显式类型转换.这允许测试不同的场景:
coalesce(Seq("$.bar","$.foo.bar","$.foobar.foo.bar") .map(get_json_object($"json_col",_)): _*).cast(DoubleType)
不假设单个文档结构.
你可以通过二进制编码器(Encoders.kryo,Encoders.java)或RDD API获得更多的灵活性,它可以用来存储联合类型(甚至是Any),但是如果你真的期望完全随机的输出,那么它表明一些严重的设计或数据建模问题.即使您可以存储已分析的数据,也很难使用它.