Spark学习-读写Hbase数据

说明

主要记录 Spark中读写Hbase 的操作记录

参考资料

Spark入门:读写HBase数据

准备工作

创建Hbase表

1
2
D:\BIGDATA\hbase-1.2.0\bin
λ hbase shell

查看所有表

1
2
3
4
5
6
hbase(main):001:0> list
TABLE

0 row(s) in 0.1630 seconds

=> []

创建 student

1
hbase>  create 'student','info'

可通过 describe 命令查看“ student” 表的基本信息:

1
hbase> describe 'student'

录入示例数据

1
2
3
4
5
6
put 'student','1','info:name','Xueqian'
put 'student','1','info:gender','F'
put 'student','1','info:age','23'
put 'student','2','info:name','Weiliang'
put 'student','2','info:gender','M'
put 'student','2','info:age','24'

查看录入的数据

1
2
3
4
//如果每次只查看一行,就用下面命令
hbase> get 'student','1'
//如果每次查看全部数据,就用下面命令
hbase> scan 'student'

配置Spark

拷贝 hbase 的 jar 包到 D:\BIGDATA\spark\spark-1.6.2-bin-hadoop2.6\lib\

需要拷贝的jar文件包括:所有hbase开头的jar文件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar

windows 上不知咋配~(下面我将 这些 jar作为 Libraries 来引用)

参考 博客 里配才是正确的吧~

编写读取Hbase的应用

程序一

Libiaries配置

如下引用 hbase 的 jar

如下引用 spark 的jar(这里我全引了~)

记得要 Add to modules

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.yibo.examples

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark._

object HbaseReader {

//读取hbase数据
def main(args: Array[String]) {
//序列化
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val conf = HBaseConfiguration.create()
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("SparkOperateHBase"))
//设置查询的表名
conf.set(TableInputFormat.INPUT_TABLE, "student")
val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val count = stuRDD.count()
println("Students RDD Count:" + count)
stuRDD.cache()
//遍历输出
stuRDD.foreach({ case (_, result) =>
val key = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue("info".getBytes, "name".getBytes))
val gender = Bytes.toString(result.getValue("info".getBytes, "gender".getBytes))
val age = Bytes.toString(result.getValue("info".getBytes, "age".getBytes))
println("Row key:" + key + " Name:" + name + " Gender:" + gender + " Age:" + age)
})
}
}

运行程序

直接启动 com.yibo.examples.HbaseReader

输出结果

提交程序一jar包到spark

打包

Build -> Build Arxx -> Build

1
spark-submit --class com.yibo.examples.HbaseReader --master yarn --deploy-mode client --driver-cores 1  --driver-memory 512M --num-executors 2 --executor-cores 2 --executor-memory 512M  D:\zhf\Documents\projects\git\scala-spark-demo\out\artifacts\SparkOperateHBase__jar\scala-spark-demo.jar

输出结果有以下

1
2
3
Students RDD Count:2
Row key:1 Name:Xueqian Gender:F Age:23
Row key:2 Name:Weiliang Gender:M Age:24

不打印 Row key 问题,增加 以下语句

1
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

编写写入Hbase的应用

在程序中增加 SparkWriteHBase

代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.yibo.examples

import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark._

object SparkWriteHBase {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local")
val sc = new SparkContext(sparkConf)
val tablename = "student"
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)

val job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

val indataRDD = sc.makeRDD(Array("3,Rongcheng,M,26", "4,Guanhua,M,27")) //构建两行记录
val rdd = indataRDD.map(_.split(',')).map { arr => {
val put = new Put(Bytes.toBytes(arr(0))) //行健的值
put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(arr(1))) //info:name列的值
put.add(Bytes.toBytes("info"), Bytes.toBytes("gender"), Bytes.toBytes(arr(2))) //info:gender列的值
put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(arr(3).toInt)) //info:age列的值
(new ImmutableBytesWritable, put)
}
}
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
}
}

查询表数据

1
hbase> scan 'student'

结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ROW                                    COLUMN+CELL                                                                                                   
1 column=info:age, timestamp=1479640712163, value=23
1 column=info:gender, timestamp=1479640704522, value=F
1 column=info:name, timestamp=1479640696132, value=Xueqian
2 column=info:age, timestamp=1479640752474, value=24
2 column=info:gender, timestamp=1479640745276, value=M
2 column=info:name, timestamp=1479640732763, value=Weiliang
3 column=info:age, timestamp=1479643273142, value=\x00\x00\x00\x1A
3 column=info:gender, timestamp=1479643273142, value=M
3 column=info:name, timestamp=1479643273142, value=Rongcheng
4 column=info:age, timestamp=1479643273142, value=\x00\x00\x00\x1B
4 column=info:gender, timestamp=1479643273142, value=M
4 column=info:name, timestamp=1479643273142, value=Guanhua
4 row(s) in 0.3240 seconds

一些问题

提交任务 时报错

1
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration

原因为没有配置Spark 中引用 hbase 的jar,如上图是将 hbase的依赖打进 jar


运行任务时出错

1
java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge

缺少 metrics-core-2.2.0.jar

日志打印问题

Libiaries 中将 sbtlogch 去掉,仅使用 spark 下的 slf4j ,然后将 SPARK_HOME/conf/log4j.properties复制到项目中的 src/main/resources

修改为 WARN ,就不会再打印一大堆日志了~

1
2
# Set everything to be logged to the console
log4j.rootCategory=WARN, console
0%
隐藏