Spark-Dataframe笔记一
说明
这里以连接 phoenix
为示例,见
这里用的Spark版本为 1.6.2
测试表
1 | create table patient_test(id varchar primary key,name varchar, id_number varchar, phone varchar, sex bigint, file_number varchar,empi varchar); |
1 | upsert into patient_test values( '1000' ,'王五','1234','12345',1,'0001'); |
SparkSql使用
1 | val sparkConf = new SparkConf().setMaster("local").setAppName("empi-test") |
输出结果
1 | +----+----+---------+-----+---+-----------+ |
$说明
1 | val updatedDf = df.withColumn("EMPI", empi(df("ID"))) |
也可以写成
1 | val updatedDf = df.withColumn("EMPI", empi($"ID")) |
不过要用$
,需要在使用之前引用以下语句
1 | import sqlContext.implicits._ |
show
1 | df.show(1) |
1 | +----+----+---------+-----+---+-----------+ |
select
1 | df.select("FILE_NUMBER").show() |
输出结果
1 | +-----------+ |
map
1 | df.map(t => "NAME: " + t.getAs[String]("NAME")).collect().foreach(println) |
1 | NAME: 王五 |
1 | df.map(_.getValuesMap[Any](List("ID", "NAME"))).collect().foreach(println) |
1 | Map(ID -> 1000, NAME -> 王五) |
filter
排除为空的 ID_NUMBER
用 !==
===
1 | val idNumberNotNullList = df.filter($"EMPI".isNotNull.and(df(columnName).isNotNull)) |
groupBy
1 | columnsDf.groupBy(columnNoIdListBuffer:_*).count().show() |
1 |
1 | //排除为空的身份证的人,找出所有身份证和ID,并按身份证分组 |
1 | (Some(1236),CompactBuffer(Map(ID -> 1002, ID_NUMBER -> 1236))) |
1 | //排除为空的身份证的人,找出所有身份证和ID,并按身份证分组 |
1 | Some(1236) |
合并字段
1 | columnsNotNullDf.select($"ID",concat($"ID_NUMBER",$"SEX")).show() |
null值替换
1 | df.na.fill("") |
修改值说明
需要用新的 Dataframe
一些说明
要用 $
要引用
1 | // this is used to implicitly convert an RDD to a DataFrame. |