Phoenix Spark Plugin 使用

版本说明

如果要用 phoenix 的 spark 插件,注意有些版本只支持spark 1.x ,scala也要相应地修改~

版本问题待解决,官方说2.x是没问题的~

版本问题可升级成 4.14.1-Hbase1.2解决,但是读取有问题,写入没问题,读取可以用jdbc方式

这里我用 的版本如下

组件 版本
Spark spark-1.6.2-bin-hadoop2.6
phoenix apache-phoenix-4.14.0-cdh5.14.2 ,连的远程phoenix
scala 2.10.6

参考资料

https://phoenix.apache.org/phoenix_spark.html

创建示例表

1
2
3
CREATE TABLE TABLE1 (ID BIGINT NOT NULL PRIMARY KEY, COL1 VARCHAR);
UPSERT INTO TABLE1 (ID, COL1) VALUES (1, 'test_row_1');
UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2');

创建应用

IDEA 创建 Scala 项目

build.sbt

如下

1
2
3
4
5
6
7
name := "scala-spark-demo"

version := "0.1"

scalaVersion := "2.10.6"

resolvers += "aliyun" at "http://maven.aliyun.com/nexus/content/groups/public/"

Project Structure 配置

Libraries 配置

读取Phoenix表

src/main/resources/log4j.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
log4j.logger.org.apache.spark.util.ShutdownHookManager=OFF
log4j.logger.org.apache.spark.SparkEnv=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

如果开启了 phoenix.schema.isNamespaceMappingEnabled ,需要增加以下文件

src/main/resources/hbase-site.xml

1
2
3
4
5
6
7
<?xml version="1.0"?>
<configuration>
<property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
</configuration>

使用Data Source API加载为DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.yibo.examples

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object PhoenixTest3 {
def main(args: Array[String]): Unit = {
val sc = new SparkContext("local", "phoenix-test")
val sqlContext = new SQLContext(sc)

val df = sqlContext.load(
"org.apache.phoenix.spark",
Map("table" -> "TABLE1", "zkUrl" -> "cdh01:2181")
)

df
.filter(df("COL1") === "test_row_1" && df("ID") === 1L)
.select(df("ID"))
.show
}

}

输出结果

1
2
3
4
5
+---+
| ID|
+---+
| 1|
+---+

打包提交测试1

打包时按照需求是否将 phoenix 依赖打进去,如果服务器有的话就不用了,打包方法

Ctrl+Alt+Shift+S 打开项目配置界面

配置需要打包的lib

注意要选择 Extract into output root

提交任务

1
spark-submit --class com.yibo.examples.PhoenixTest3 --master yarn --deploy-mode client --driver-cores 1  --driver-memory 512M --num-executors 2 --executor-cores 2 --executor-memory 512M  D:\zhf\Documents\projects\git\scala-spark-demo\out\artifacts\SparkOperateHBase__jar\scala-spark-demo.jar

使用Configuration对象直接加载为DataFrame

这个好像不加载 hbase-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.yibo.examples

import org.apache.hadoop.conf.Configuration
import org.apache.phoenix.spark._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object PhoenixTest3 {
def main(args: Array[String]): Unit = {
val configuration = new Configuration()
//配置zookeeper
configuration.set("hbase.zookeeper.quorum", "cdh01");
//开启了phoenix.schema.isNamespaceMappingEnabled,则需要配置这个
configuration.set("phoenix.schema.isNamespaceMappingEnabled", "true");
// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum'
val sc = new SparkContext("local", "phoenix-test")
val sqlContext = new SQLContext(sc)

// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame
val df = sqlContext.phoenixTableAsDataFrame(
"TABLE1", Array("ID", "COL1"), conf = configuration
)
df.show
}

}

输出结果

1
2
3
4
5
6
+---+----------+
| ID| COL1|
+---+----------+
| 1|test_row_1|
| 2|test_row_2|
+---+----------+

写入 Phoenix表

创建以下 Phoenix 表进行测试

1
CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);

保存RDD

spark2也可以用

saveToPhoenix方法是RDD [Product]上的隐式方法,或者是元组的RDD。数据类型必须与Phoenix支持的某种Java类型相对应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.yibo.examples

import org.apache.spark.SparkContext
import org.apache.phoenix.spark._

// RDD写入
object PhoenixRDDWriteTest {

def main(args: Array[String]): Unit = {
val sc = new SparkContext("local", "phoenix-test")
val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))

sc
.parallelize(dataSet)
.saveToPhoenix(
"OUTPUT_TEST_TABLE",
Seq("ID","COL1","COL2"),
zkUrl = Some("cdh01:2181")
)

}

}

查询 phoenix

1
2
3
4
5
6
7
8
0: jdbc:phoenix:> select * from output_test_table;
+-----+-------+-------+
| ID | COL1 | COL2 |
+-----+-------+-------+
| 1 | 1 | 1 |
| 2 | 2 | 2 |
| 3 | 3 | 3 |
+-----+-------+-------+

保存DataFrames

save是DataFrame上的方法允许传入数据源类型。您可以使用org.apache.phoenix.spark,还必须传入一个表和zkUrl参数来指定要将DataFrame保留到哪个表和服务器。列名称是从DataFrame的架构字段名称派生的,并且必须与Phoenix列名称匹配。

save方法还采用SaveMode选项,仅支持SaveMode.Overwrite。

1
2
CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
1
2
upsert into input_table values(1,'11',111);
upsert into input_table values(2,'22',222);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.yibo.examples

import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.phoenix.spark._


object PhoenixSaveTest {
def main(args: Array[String]): Unit = {
// Load INPUT_TABLE
val sc = new SparkContext("local", "phoenix-test")
val sqlContext = new SQLContext(sc)
val hbaseConnectionString = "cdh01:2181"
val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "INPUT_TABLE",
"zkUrl" -> hbaseConnectionString))
// Save to OUTPUT_TABLE
df.saveToPhoenix(Map("table" -> "OUTPUT_TABLE", "zkUrl" -> hbaseConnectionString))
//或者
// df.write
// .format("org.apache.phoenix.spark")
// .mode("overwrite")
// .option("table", "OUTPUT_TABLE")
// .option("zkUrl", hbaseConnectionString)
// .save()

}
}

查询 output_test

1
2
3
4
5
6
0: jdbc:phoenix:> select * from output_table;
+-----+-------+-------+
| ID | COL1 | COL2 |
+-----+-------+-------+
| 1 | 11 | 111 |
+-----+-------+-------+

报错信息

1
Cannot initiate connection as SYSTEM:CATALOG is found but client does not have phoenix.schema.isNamespaceMappingEnabled enabled

src/main/resources 增加 hbase-site.xml

1
2
3
4
5
6
7
<?xml version="1.0"?>
<configuration>
<property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
</configuration>
1
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/DataFrame

2.x版本问题~(但是官网说2.x是可以用的啊,我试过 2.x下 dataset写是可以的 读会报 Dataset找不到 写会报 上面的错)

降成 1.x

1
2
Error:(16, 25) value phoenixTableAsDataFrame is not a member of org.apache.spark.sql.SQLContext
val df = sqlContext.phoenixTableAsDataFrame(

增加 import org.apache.phoenix.spark._即可~

0%
隐藏