SparkSQL应用

Posted by BY KiloMeter on March 15, 2019

Hive是将HQL语句转换成MR程序,简化了程序的编写,SparkSQL能转换成RDD,同样是简化程序的开发。

SparkSQL提供了两个新的数据抽象,分别是DataFrame(Spark1.3)和DataSet(Spark1.6)

DataFrame

DataFrame与RDD相比,DataFrame更想是一张数据库表,除了知道表中数据以外,还能知道数据的结构信息

从上图可以明显地看出RDD和DataFrame的区别,RDD只知道存储的是person类型,里面的数据结构对于RDD来言相当于一个黑盒,如果想对RDD进行一些操作,比如逐行去查看,比如想要查询出年龄在某个范围之内的数据,RDD必须每一行都去遍历,然后取出年龄那一列然后进行判断,如果想要进行一些聚合操作则更为复杂。DataFrame知道存储内容的详细信息。DataFrame除了提供能丰富的算子以外,还提供了更多的性能优化,比如filter下推,裁剪等。

此外,DataFrame还提供了Schema的视图,可以把DataFrame当做一张表来使用。

DataFrame的性能比较高主要是下面两个原因:

1、定制化内存管理:数据是存放在非堆内存,摆脱了GC的限制

2、优化的执行计划:查询计划通过 Spark catalyst optimiser 进行优化

创建DataFrame的三种方式:

1、从已经存在的RDD进行转换

val peopleRDD = sc.textFile("C:\\Users\\KiloMeter\\Desktop\\各种文档什么玩意\\spark\\person.txt")
  import spark.implicits._
  val peopleDF = peopleRDD.map(_.split(" ")).map(
    paras => (paras(0).trim,paras(1).toInt)).toDF("name","age")
  peopleDF.show()

可以看到,创建时需要指定列名,如果DataFrame中的数据是case类的话,会自动将类的字段名映射成表名

因为指定case类后,参数的类型也已经确定了,因此下面这段代码最后也能转换成DataSet

val peopleDF = sc.textFile("C:\\Users\\KiloMeter\\Desktop\\各种文档什么玩意\\spark\\person.txt")
    .map(_.split(" ")).map(
    parmas => Person(parmas(0).trim,parmas(1).toLong)).toDF()

  peopleDF.select("name").show()

2、从HiveTable进行查询返回

3、通过Spark的数据源创建

var df = spark.read.json("C:\\Users\\KiloMeter\\Desktop\\各种文档什么玩意\\spark\\person.json")

上面讲到,DataFrame可以当做一张数据库表来使用,使用的方式有两种,第一种是DSL风格语法,第二种是熟悉的SQL语法

**DSL 风格语法 **

peopleDF.select("name").show()
peopleDF.select($"name", $"age" + 1).show()
peopleDF.filter($"age" > 21).show()
peopleDF.groupBy("age").count().show()

SQL风格语法

peopleDF.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

DataSet

DateFrame的缺点就是缺少类型检查,可能会因为类型问题在运行时出错。DataSet是DataFram API的扩展,具有了类型检查,Dataframe可以说是是Dataset的特例,DataFrame=Dataset[Row] ,所以可以通过 as 方法将 Dataframe 转换为 Dataset

创建DataSet的方法

由于DataSet是强类型的,需要在创建的时候就指定字段的类型

val personDS = Seq(Person("km",10)).toDS()
personDS.show()

val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect().foreach(println)

val path = "C:\\Users\\KiloMeter\\Desktop\\各种文档什么玩意\\spark\\person.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()

DataFrame、DataSet和RDD相互转换

DataFrame、DataSet转换成RDD比较简单

val rdd1 = testDF.rdd
val rdd2 = testDS.rdd

DataSet转换成DataFrame

import spark.implicits._
val testDF = testDS.toDS()

RDD转换成DataFrame

val peopleDF = sc.textFile("C:\\Users\\KiloMeter\\Desktop\\各种文档什么玩意\\spark\\person.txt")
    .map(_.split(" ")).map(
    parmas => (parmas(0).trim,parmas(1).toLong)).toDF()

RDD转换成DataSet

case class Person(name: String, age: Long) extends Serializable
val peopleDF = sc.textFile("C:\\Users\\KiloMeter\\Desktop\\各种文档什么玩意\\spark\\person.txt")
    .map(_.split(" ")).map(
    parmas => Person(parmas(0).trim,parmas(1).toLong)).toDF()

DataFrame转换成DataSet

import spark.implicits._
  val peopleDF = sc.textFile("C:\\Users\\KiloMeter\\Desktop\\各种文档什么玩意\\spark\\person.txt")
    .map(_.split(" ")).map(parmas => (parmas(0).trim,parmas(1).toLong)).toDF("name","age")
  
  val peopleDS = peopleDF.as[Person]

SparkSession读取和存储文件

Spark SQL 的默认数据源为 Parquet 格式。数据源为 Parquet 文件时, SparkSQL 可以方便的执行所有的操作。当读取的文件格式不是Parquet 时,需要指定文件的读取格式,如json,csv等

val peopleDF = spark.read.format("json").load("C:\\Users\\KiloMeter\\Desktop\\各种文档什么玩意\\spark\\person.json")

同样保存的时候,需要指定保存的格式

peopleDF.write.format("csv").save("C:\\Users\\KiloMeter\\Desktop\\各种文档什么玩意\\spark\\person.csv")