Spark-Dataframe笔记一

说明

这里以连接 phoenix 为示例,见

Phoenix Spark Plugin 使用

这里用的Spark版本为 1.6.2

测试表

1
create table patient_test(id varchar primary key,name varchar, id_number varchar, phone varchar, sex bigint, file_number varchar,empi varchar);
1
2
3
4
5
6
7
upsert into patient_test values( '1000' ,'王五','1234','12345',1,'0001');
upsert into patient_test values( '1001' ,'李四','1235','12346',1,'0002');
upsert into patient_test values( '1002' ,'张三','1236','12347',1,'0003');
upsert into patient_test values( '1003' ,'赵四','1237','12348',1,'0004');
upsert into patient_test values( '1004' ,'武松','1238','12349',1,'0005');
upsert into patient_test values( '1005' ,'王五','1234','12345',1,'0006');
upsert into patient_test values( '1006' ,'小明','1239','12350',1,'0007');

SparkSql使用

1
2
3
4
5
6
7
8
9
10
val sparkConf = new SparkConf().setMaster("local").setAppName("empi-test")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)

val df = sqlContext.read
.format("org.apache.phoenix.spark")
.option("table", "patient_test")
.option("zkUrl", "cdh01:2181")
.load()
df.show()

输出结果

1
2
3
4
5
6
7
8
9
+----+----+---------+-----+---+-----------+
| ID|NAME|ID_NUMBER|PHONE|SEX|FILE_NUMBER|
+----+----+---------+-----+---+-----------+
|1000| 王五| 1234|12345| 1| 0001|
|1001| 李四| 1235|12346| 1| 0002|
|1002| 张三| 1236|12347| 1| 0003|
|1003| 赵四| 1237|12348| 1| 0004|
|1004| 王五| 1238|12345| 1| 0006|
+----+----+---------+-----+---+-----------+

$说明

1
val updatedDf = df.withColumn("EMPI", empi(df("ID")))

也可以写成

1
val updatedDf = df.withColumn("EMPI", empi($"ID"))

不过要用$ ,需要在使用之前引用以下语句

1
import sqlContext.implicits._

show

1
df.show(1)
1
2
3
4
5
6
+----+----+---------+-----+---+-----------+
| ID|NAME|ID_NUMBER|PHONE|SEX|FILE_NUMBER|
+----+----+---------+-----+---+-----------+
|1000| 王五| 1234|12345| 1| 0001|
+----+----+---------+-----+---+-----------+
only showing top 1 row

select

1
df.select("FILE_NUMBER").show()

输出结果

1
2
3
4
5
6
7
8
9
+-----------+
|FILE_NUMBER|
+-----------+
| 0001|
| 0002|
| 0003|
| 0004|
| 0006|
+-----------+

map

1
df.map(t => "NAME: " + t.getAs[String]("NAME")).collect().foreach(println)
1
2
3
4
5
NAME: 王五
NAME: 李四
NAME: 张三
NAME: 赵四
NAME: 王五
1
df.map(_.getValuesMap[Any](List("ID", "NAME"))).collect().foreach(println)
1
2
3
4
5
Map(ID -> 1000, NAME -> 王五)
Map(ID -> 1001, NAME -> 李四)
Map(ID -> 1002, NAME -> 张三)
Map(ID -> 1003, NAME -> 赵四)
Map(ID -> 1004, NAME -> 王五)

filter

排除为空的 ID_NUMBER

!== ===

1
val idNumberNotNullList = df.filter($"EMPI".isNotNull.and(df(columnName).isNotNull))

groupBy

1
columnsDf.groupBy(columnNoIdListBuffer:_*).count().show()
1
2


1
2
3
4
//排除为空的身份证的人,找出所有身份证和ID,并按身份证分组
val idIdNumbers = df.filter(df("ID_NUMBER") > 0).map(_.getValuesMap[Any](List("ID", "ID_NUMBER"))).groupBy(_.get("ID_NUMBER")).collect()
//按 ID_NUMBER分组
idIdNumbers.foreach(println)
1
2
3
4
5
(Some(1236),CompactBuffer(Map(ID -> 1002, ID_NUMBER -> 1236)))
(Some(1238),CompactBuffer(Map(ID -> 1004, ID_NUMBER -> 1238)))
(Some(1234),CompactBuffer(Map(ID -> 1000, ID_NUMBER -> 1234), Map(ID -> 1005, ID_NUMBER -> 1234)))
(Some(1235),CompactBuffer(Map(ID -> 1001, ID_NUMBER -> 1235)))
(Some(1237),CompactBuffer(Map(ID -> 1003, ID_NUMBER -> 1237)))
1
2
3
4
5
6
7
//排除为空的身份证的人,找出所有身份证和ID,并按身份证分组
val idIdNumbers = df.filter(df("ID_NUMBER") > 0).map(_.getValuesMap[Any](List("ID", "ID_NUMBER"))).groupBy(_.get("ID_NUMBER")).collect()
//按 ID_NUMBER分组
idIdNumbers.foreach(e=>{
println(e._1)
e._2.foreach(println)
})
1
2
3
4
5
6
7
8
9
10
11
Some(1236)
Map(ID -> 1002, ID_NUMBER -> 1236)
Some(1238)
Map(ID -> 1004, ID_NUMBER -> 1238)
Some(1234)
Map(ID -> 1000, ID_NUMBER -> 1234)
Map(ID -> 1005, ID_NUMBER -> 1234)
Some(1235)
Map(ID -> 1001, ID_NUMBER -> 1235)
Some(1237)
Map(ID -> 1003, ID_NUMBER -> 1237)

合并字段

1
columnsNotNullDf.select($"ID",concat($"ID_NUMBER",$"SEX")).show()

null值替换

1
df.na.fill("")

修改值说明

需要用新的 Dataframe

一些说明

要用 $

要引用

1
2
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
0%
隐藏