scala – 无法使用案例类从RDD的Row创建数据框

前端之家收集整理的这篇文章主要介绍了scala – 无法使用案例类从RDD的Row创建数据框前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
使用Spark 2.x,我似乎无法使用由case类组成的Row的RDD创建Dataframe.

它在Spark 1.6.x上运行良好但在2.x运行异常时失败:

java.lang.RuntimeException: Timestamp is not a valid external type for schema of struct<seconds:bigint,nanos:int>

之前是来自Catalyst的一堆生成代码.

这是片段(我正在做的简化版):

package main

import org.apache.spark.sql.{SparkSession,Row}
import org.apache.spark.sql.types.{IntegerType,LongType,StructField,StructType}

object Test {

  case class Timestamp(seconds: Long,nanos: Int)

  val TIMESTAMP_TYPE = StructType(List(
    StructField("seconds",false),StructField("nanos",IntegerType,false)
  ))

  val SCHEMA = StructType(List(
    StructField("created_at",TIMESTAMP_TYPE,true)
  ))

  def main(args: Array[String]) {

    val spark = SparkSession.builder().getOrCreate()

    val rowRDD = spark.sparkContext.parallelize(Seq((0L,0))).map {
      case (seconds: Long,nanos: Int) => {
        Row(Timestamp(seconds,nanos))
      }
    }

    spark.createDataFrame(rowRDD,SCHEMA).show(1)
  }
}

我不确定这是一个Spark bug还是我在文档中遗漏的东西(我知道Spark 2.x引入了运行时行编码验证,也许这是相关的)

非常感谢

解决方法

我不确定它是否是一个bug,但混合动态类型的Row,case类和显式模式没有多大意义.使用Rows和schema:

import collection.mutable._
import collection.JavaConverters._

spark.createDataFrame(ArrayBuffer(Row(Row(0L,0))).asJava,SCHEMA)

或案例类:

import spark.implicits._

Seq(Tuple1(Timestamp(0L,0))).toDF("created_at")

否则你只是两次做同样的工作.

注意:

如果您希望表示字段可以为空,请使用选项.例如

case class Record(created_at: Option[Timestamp])
case class Timestamp(seconds: Long,nanos: Option[Int])

Seq(Record(Some(Timestamp(0L,Some(0))))).toDF

生成架构,其中created_at和created_at.milliseconds可以为NULL,但如果created_at不为NULL,则必须设置created_at.seconds.

猜你在找的Scala相关文章