Spark SQL数据加载和保存实例讲解

前端之家收集整理的这篇文章主要介绍了Spark SQL数据加载和保存实例讲解前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

一、前置知识详解

Spark sql重要是操作DataFrame,DataFrame本身提供了save和load的操作,
Load:可以创建DataFrame,
Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出文件是什么类型。

二、Spark sql读写数据代码实战

sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;

public class SparksqlLoadSaveOps {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparksqlLoadSaveOps");
JavaSparkContext sc = new JavaSparkContext(conf);
sqlContext = new sqlContext(sc);
/**

  • read()是DataFrameReader类型,load可以将数据读取出来
    */
    DataFrame peopleDF = sqlContext.read().format("json").load("E:\Spark\Sparkinstanll_package\Big_Data_Software\spark-1.6.0-bin-hadoop2.6\examples\src\main\resources\people.json");

/**

  • 直接对DataFrame进行操作
  • Json: 是一种自解释的格式,读取Json的时候怎么判断其是什么格式?
  • 通过扫描整个Json。扫描之后才会知道元数据
    */
    //通过mode来指定输出文件的是append。创建新文件来追加文件
    peopleDF.select("name").write().mode(SaveMode.Append).save("E:\personNames");
    }
    }

读取过程源码分析如下:
1. read方法返回DataFrameReader,用于读取数据。

sqlContext.read.parquet("/path/to/file.parquet") * sqlContext.read.schema(schema).json("/path/to/file.json") * }}} * * @group genericdata * @since 1.4.0 */ @Experimental //创建DataFrameReader实例,获得了DataFrameReader引用 def read: DataFrameReader = new DataFrameReader(this)

2. 然后再调用DataFrameReader类中的format,指出读取文件的格式。

3. 通过DtaFrameReader中load方法通过路径把传入过来的输入变成DataFrame。

至此,数据的读取工作就完成了,下面就对DataFrame进行操作。
下面就是写操作!!!

1. 调用DataFrame中select函数进行对列筛选

2. 然后通过write将结果写入到外部存储系统中。

3. 在保持文件的时候mode指定追加文件的方式

文件,然后追加 * - `SaveMode.Append`: append the data. * - `SaveMode.Ignore`: ignore the operation (i.e. no-op). * - `SaveMode.ErrorIfExists`: default option,throw an exception at runtime. * * @since 1.4.0 */ def mode(saveMode: SaveMode): DataFrameWriter = { this.mode = saveMode this }

4. 最后,save()方法触发action,将文件输出到指定文件中。

path) save() }

三、Spark sql读写整个流程图如下

四、对于流程中部分函数源码详解

sql.sources.default. * * @group genericdata * @deprecated As of 1.4.0,replaced by `read().load(path)`. This will be removed in Spark 2.0. */ @deprecated("Use read.load(path). This will be removed in Spark 2.0.","1.4.0") def load(path: String): DataFrame = { //此时的read就是DataFrameReader read.load(path) }

2. 追踪load源码进去,源码如下:
在DataFrameReader中的方法。Load()通过路径把输入传进来变成一个DataFrame。

3. 追踪load源码如下:

sqlContext,userSpecifiedSchema = userSpecifiedSchema,partitionColumns = Array.empty[String],provider = source,options = extraOptions.toMap) DataFrame(sqlContext,LogicalRelation(resolved.relation)) }

DataFrameReader.format()

1. Format:具体指定文件格式,这就获得一个巨大的启示是:如果是Json文件格式可以保持为Parquet等此类操作。
Spark sql在读取文件的时候可以指定读取文件的类型。例如,Json,Parquet.

DataFrame.write()

1. 创建DataFrameWriter实例

2. 追踪DataFrameWriter源码如下:
以DataFrame的方式向外部存储系统中写入数据。

sql](df: DataFrame) {

DataFrameWriter.mode()

1. Overwrite是覆盖,之前写的数据全都被覆盖了。
Append:是追加,对于普通文件是在一个文件中进行追加,但是对于parquet格式的文件则创建新的文件进行追加。

2. 通过模式匹配接收外部参数

SaveMode.Overwrite case "append" => SaveMode.Append case "ignore" => SaveMode.Ignore case "error" | "default" => SaveMode.ErrorIfExists case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " + "Accepted modes are 'overwrite','append','ignore','error'.") } this }

DataFrameWriter.save()

1. save将结果保存传入的路径。

path) save() }

2. 追踪save方法

sqlContext,source,partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),mode,extraOptions.toMap,df) }

3. 其中source是sqlConf的defaultDataSourceName
sqlContext.conf.defaultDataSourceName
其中DEFAULT_DATA_SOURCE_NAME默认参数是parquet。

sql.sources.default",defaultValue = Some("org.apache.spark.sql.parquet"),doc = "The default data source to use in input/output.")

DataFrame.scala中部分函数详解:

1. toDF函数是将RDD转换成DataFrame

2. show()方法:将结果显示出来

追踪showString源码如下:showString中触发action收集数据。

numRows val data = takeResult.take(numRows) val numCols = schema.fieldNames.length

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程之家。

猜你在找的MsSQL相关文章