一、前置知识详解
Spark
sql重要是操作DataFrame,DataFrame本身提供了save和load的操作,
Load:可以创建DataFrame,
Save:把DataFrame中的数据保存到
文件或者说与具体的格式来指明我们要读取的
文件的类型以及与具体的格式来指出我们要
输出的
文件是什么类型。
二、Spark sql读写数据代码实战
sql.*;
import org.apache.spark.
sql.types.DataTypes;
import org.apache.spark.
sql.types.StructField;
import org.apache.spark.
sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
public class SparksqlLoadSaveOps {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparksqlLoadSaveOps");
JavaSparkContext sc = new JavaSparkContext(conf);
sqlContext = new sqlContext(sc);
/**
- read()是DataFrameReader类型,load可以将数据读取出来
*/
DataFrame peopleDF = sqlContext.read().format("json").load("E:\Spark\Sparkinstanll_package\Big_Data_Software\spark-1.6.0-bin-hadoop2.6\examples\src\main\resources\people.json");
/**
- 直接对DataFrame进行操作
- Json: 是一种自解释的格式,读取Json的时候怎么判断其是什么格式?
- 通过扫描整个Json。扫描之后才会知道元数据
*/
//通过mode来指定输出文件的是append。创建新文件来追加文件
peopleDF.select("name").write().mode(SaveMode.Append).save("E:\personNames");
}
}
读取过程源码分析如下:
1. read方法返回DataFrameReader,用于读取数据。
sqlContext.read.parquet("/path/to/file.parquet")
*
sqlContext.read.schema(schema).json("/path/to/file.json")
* }}}
*
* @group genericdata
* @since 1.4.0
*/
@Experimental
//创建DataFrameReader实例,获得了DataFrameReader引用
def read: DataFrameReader = new DataFrameReader(this)
2. 然后再调用DataFrameReader类中的format,指出读取文件的格式。
3. 通过DtaFrameReader中load方法通过路径把传入过来的输入变成DataFrame。
至此,数据的读取工作就完成了,下面就对DataFrame进行操作。
下面就是写操作!!!
1. 调用DataFrame中select函数进行对列筛选
2. 然后通过write将结果写入到外部存储系统中。
3. 在保持文件的时候mode指定追加文件的方式
文件,然后追加
* - `SaveMode.Append`: append the data.
* - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
* - `SaveMode.ErrorIfExists`: default option,throw an exception at runtime.
*
* @since 1.4.0
*/
def mode(saveMode: SaveMode): DataFrameWriter = {
this.mode = saveMode
this
}
4. 最后,save()方法触发action,将文件输出到指定文件中。
path)
save()
}
三、Spark sql读写整个流程图如下
四、对于流程中部分函数源码详解
sql.sources.default.
*
* @group genericdata
* @deprecated As of 1.4.0,replaced by `read().load(path)`. This will be removed in Spark 2.0.
*/
@deprecated("Use read.load(path). This will be removed in Spark 2.0.","1.4.0")
def load(path: String): DataFrame = {
//此时的read就是DataFrameReader
read.load(path)
}
2. 追踪load源码进去,源码如下:
在DataFrameReader中的方法。Load()通过路径把输入传进来变成一个DataFrame。
3. 追踪load源码如下:
sqlContext,userSpecifiedSchema = userSpecifiedSchema,partitionColumns = Array.empty[String],provider = source,options = extraOptions.toMap)
DataFrame(
sqlContext,LogicalRelation(resolved.relation))
}
DataFrameReader.format()
1. Format:具体指定文件格式,这就获得一个巨大的启示是:如果是Json文件格式可以保持为Parquet等此类操作。
Spark sql在读取文件的时候可以指定读取文件的类型。例如,Json,Parquet.
DataFrame.write()
1. 创建DataFrameWriter实例
2. 追踪DataFrameWriter源码如下:
以DataFrame的方式向外部存储系统中写入数据。
sql](df: DataFrame) {
DataFrameWriter.mode()
1. Overwrite是覆盖,之前写的数据全都被覆盖了。
Append:是追加,对于普通文件是在一个文件中进行追加,但是对于parquet格式的文件则创建新的文件进行追加。
2. 通过模式匹配接收外部参数
SaveMode.Overwrite
case "append" => SaveMode.Append
case "ignore" => SaveMode.Ignore
case "error" | "default" => SaveMode.ErrorIfExists
case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
"Accepted modes are 'overwrite','append','ignore','error'.")
}
this
}
DataFrameWriter.save()
1. save将结果保存传入的路径。
path)
save()
}
2. 追踪save方法。
sqlContext,source,partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),mode,extraOptions.toMap,df)
}
3. 其中source是sqlConf的defaultDataSourceName
sqlContext.conf.defaultDataSourceName
其中DEFAULT_DATA_SOURCE_NAME默认参数是parquet。
sql.sources.default",defaultValue = Some("org.apache.spark.
sql.parquet"),doc = "The default data source to use in input/output.")
DataFrame.scala中部分函数详解:
1. toDF函数是将RDD转换成DataFrame
2. show()方法:将结果显示出来
追踪showString源码如下:showString中触发action收集数据。
numRows
val data = takeResult.take(numRows)
val numCols = schema.fieldNames.length
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程之家。