Spark2-笔记二(Spark Sql 基础)

按照官方文档 sql-getting-started,学习Spark2的使用

https://spark.apache.org/docs/latest/sql-getting-started.html

创建 SprinSession

创建 SprinSession

1
2
3
4
5
6
7
8
9
10
import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

创建 DataFrames

以以下json文件示例

1
2
3
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package org.apache.spark.examples.learn
import org.apache.spark.sql.SparkSession
object Demo1 {

def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("spark- demo")
.config("spark.master", "local")
.getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val df = spark.read.json("src/main/resources/people.json")
df.show()
}
}

报错

1
Exception in thread "main" java.lang.IllegalArgumentException: Illegal pattern component: XXX

commongs-lang3版本问题

1
2
3
4
5
6
7
8
9
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
</dependencies>
</dependencyManagement>

1
Exception in thread "main" java.lang.NoClassDefFoundError: org/codehaus/janino/InternalCompilerException

增加依赖

1
2
3
4
5
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>

额,ok了,输出如下

1
2
3
4
5
6
7
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

详细使用在

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.

无类型数据集操作(又名DataFrame操作)

如上所述,在Spark 2.0中,DataFrames只是Scala和Java API中Rows的数据集。与“类型转换”相比,这些操作也称为“无类型转换”,带有强类型Scala / Java数据集。

这里我们包括使用数据集进行结构化数据处理的一些基本示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.

以编程方式运行SQL查询

SparkSession上的sql函数使应用程序能够以编程方式运行SQL查询并将结果作为DataFrame返回。

1
2
3
4
5
6
7
8
9
10
11
12
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

全局临时视图

Spark SQL中的临时视图是会话范围的,如果创建它的会话终止,它将消失。如果您希望拥有一个在所有会话之间共享的临时视图并保持活动状态,直到Spark应用程序终止,您可以创建一个全局临时视图。全局临时视图与系统保留的数据库 global_temp 绑定,我们必须使用限定名称来引用它,例如 SELECT * FROM global_temp.view1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.

创建数据集Dataset

数据集与RDD类似,但是,它们不使用Java序列化或Kryo,而是使用专用的编码器来序列化对象以便通过网络进行处理或传输。编码器和标准序列化都负责将对象转换为字节,编码器是动态生成的代码,它使用一种格式,允许Spark执行许多操作,如过滤,排序和散列,而无需将字节反序列化回对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

与RDD进行转换

Spark SQL支持两种不同的方法将现有RDD转换为数据集。第一种方法使用反射来推断包含特定类型对象的RDD的模式。这种基于反射的方法可以提供更简洁的代码,并且在您编写Spark应用程序时已经了解模式时可以很好地工作。

创建数据集的第二种方法是通过编程接口,允许您构建模式,然后将其应用于现有RDD。虽然此方法更详细,但它允许您在直到运行时才知道列及其类型时构造数据集。

使用反射推断模式(schema)

Spark SQL的Scala接口支持自动将包含RDD的案例类转换为DataFrame。case类定义了表的模式。使用反射读取case类的参数名称,并成为列的名称。案例类也可以嵌套或包含复杂类型,如Seqs或Arrays。此RDD可以隐式转换为DataFrame,然后注册为表。表可以在后续SQL语句中使用。

people.txt内容如下

1
2
3
Michael, 29
Andy, 30
Justin, 19
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.yibo.examples.sparksql

import org.apache.spark.sql.SparkSession

case class Person(name: String, age: Long)

object SparkSqlTest {
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.master", "local")
.getOrCreate()
val sc = spark.sparkContext
def main(args: Array[String]): Unit = {

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = sc.textFile("src/main/resources/people.txt")
.map(_.split(","))
.map(attributes=>Person(attributes(0), attributes(1).trim.toInt))
.toDF()
//注册成临时视图
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
// 结果中的行的列可以通过字段索引访问
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// 或者通过字段名
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+

// 对 Dataset[Map[K,V]],无预定义的编码器, 需要显式声明
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] 一次检索多个列到 Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
}
}

以编程方式指定 schema

如果无法提前定义案例类(例如,记录结构以字符串形式编码,或者文本数据集将被解析,字段将针对不同用户进行不同的投影),可以通过三个步骤以编程方式创建 DataFrame

  1. 从原始RDD创建行的RDD;
  2. 创建由与步骤1中创建的RDD中的行结构匹配的 StructType 表示的模式。
  3. 通过SparkSession提供的createDataFrame方法将模式应用于行的RDD。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.yibo.examples.sparksql

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._

case class Person(name: String, age: Long)

object SparkSqlTest {
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.master", "local")
.getOrCreate()
val sc = spark.sparkContext
def main(args: Array[String]): Unit = {

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

//创建 RDD
val peopleRDD = sc.textFile("src/main/resources/people.txt")

//用字符串编码
val schemaString = "name age"

//根据schema字符串创建schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

//转换 RDD(people) 成 Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes=>Row(attributes(0), attributes(1).trim))

//将schema应用到 RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

//使用 DataFrame 创建临时视图
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+

}
}

聚合

内置的DataFrames函数提供常见的聚合,例如count(), countDistinct(), avg(), max(), min(), etc. 虽然这些函数是为DataFrames设计的,但Spark SQL还为Scala和Java中的某些函数提供了类型安全版本,以便使用强类型数据集。此外,用户不限于预定义的聚合函数,并且可以创建自己的聚合函数。

用户定义的无类型聚合函数

用户必须扩展UserDefinedAggregateFunction抽象类以实现自定义无类型聚合函数。例如,用户定义的平均值可能如下所示:

0%
隐藏