我试图将一个元组列表传递给
scala中的udf.我不确定如何为此精确定义数据类型.我试图将它作为整行传递,但它无法真正解决它.我需要根据元组的第一个元素对列表进行排序,然后再发送n个元素.我为udf尝试了以下定义
def udfFilterPath = udf((id: Long,idList: Array[structType[Long,String]] ) def udfFilterPath = udf((id: Long,idList: Array[Tuple2[Long,idList: Row)
这就是idList的样子:
[[1234,"Tony"],[2345,"Angela"]] [[1234,[234545,"Ruby"],[353445,"Ria"]]
这是一个如上所述的100行数据帧.我把udf称为如下:
testSet.select("id","idList").withColumn("result",udfFilterPath($"id",$"idList")).show
当我打印数据帧的架构时,它将其作为结构数组读取. idList本身是通过在由密钥分组并存储在数据帧中的元组列上执行收集列表来生成的.关于我做错了什么的任何想法?谢谢!
解决方法
定义UDF时,应使用普通的Scala类型(例如Tuples,Primitives …)而不是Spark sql类型(例如StructType)作为输出类型.
至于输入类型 – 这是它变得棘手(并且没有太多文档记录) – 一个元组数组实际上是一个mutable.WrappedArray [Row].所以 – 你必须先将每一行“转换”成一个元组,然后你可以进行排序并返回结果.
最后,根据您的描述,似乎根本没有使用id列,因此我将其从UDF定义中删除,但可以轻松地将其添加回来.
val udfFilterPath = udf { idList: mutable.WrappedArray[Row] => // converts the array items into tuples,sorts by first item and returns first two tuples: idList.map(r => (r.getAs[Long](0),r.getAs[String](1))).sortBy(_._1).take(2) } df.withColumn("result",udfFilterPath($"idList")).show(false) +------+-------------------------------------------+----------------------------+ |id |idList |result | +------+-------------------------------------------+----------------------------+ |1234 |[[1234,Tony],Angela]] |[[1234,Angela]]| |234545|[[1234,[2345454,Ruby],Ria]]|[[1234,Ria]] | +------+-------------------------------------------+----------------------------+