HBase本身提供了很多种数据导入的方式,通常有两种常用方式:
- 使用HBase提供的TableOutputFormat,原理是通过一个Mapreduce作业将数据导入HBase
- 另一种方式就是使用HBase原生Client API
本文就是示范如何通过MapReduce作业从一个文件读取数据并写入到HBase中。
首先启动Hadoop与HBase,然后创建一个空表,用于后面导入数据:1
2
3
4
5
6
7
8
9
10
11
12
13hbase(main):006:0> create 'mytable','cf'
0 row(s) in 10.8310 seconds
=> Hbase::Table - mytable
hbase(main):007:0> list
TABLE
mytable
1 row(s) in 0.1220 seconds
=> ["mytable"]
hbase(main):008:0> scan 'mytable'
ROW COLUMN+CELL
0 row(s) in 0.2130 seconds
一、示例程序
下面的示例程序通过TableOutputFormat
将HDFS上具有一定格式的文本数据导入到HBase中。
首先创建MapReduce作业,目录结构如下:1
2
3
4
5
6Hdfs2HBase/
├── classes
└── src
├── Hdfs2HBase.java
├── Hdfs2HBaseMapper.java
└── Hdfs2HBaseReducer.java
Hdfs2HBaseMapper.java
1 | package com.lisong.hdfs2hbase; |
Hdfs2HBaseReducer.java
1 | package com.lisong.hdfs2hbase; |
Hdfs2HBase.java
1 | package com.lisong.hdfs2hbase; |
配置javac
编译依赖环境:
1 | $HADOOP_HOME/share/hadoop/common/hadoop-common-2.4.1.jar |
这里要操作HBase,故除了上面三个jar包,还需要$HBASE_HOME/lib
目录下的jar包。为了方便,我们在/etc/profile
的CLASSPATH
里包含所有的依赖包:1
2
3TEMP=`ls /home/hadoop/hbase/lib/*.jar`
HBASE_JARS=`echo $TEMP | sed 's/ /:/g'`
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:/home/hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar:/home/hadoop/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar:/home/hadoop/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar:$HBASE_JARS
编译
1 | $ javac -d classes/ src/*.java |
打包
1 | $ jar -cvf hdfs2hbase.jar classes |
运行
创建一个data.txt
文件,内容如下(列族是建表时创建的列族cf
):1
2
3r1:cf:c1:value1
r2:cf:c2:value2
r3:cf:c3:value3
将文件复制到hdfs上:1
$ hadoop/bin/hadoop fs -put data.txt /hbase
运行MapReduce作业:1
$ hadoop/bin/hadoop jar Hdfs2HBase/hdfs2hbase.jar com.lisong.hdfs2hbase.Hdfs2HBase /hbase/data.txt mytable
报错NoClassDefFoundError
找不到类定义:1
2
3
4
5
6
7
8
9
10
11Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/io/ImmutableBytesWritable
at com.lisong.hdfs2hbase.Hdfs2HBase.main(Hdfs2HBase.java:30)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more
原因是我没有把HBase的jar包加到hadoop-env.sh
中。1
2
3TEMP=`ls /home/hadoop/hbase/lib/*.jar`
HBASE_JARS=`echo $TEMP | sed 's/ /:/g'`
HADOOP_CLASSPATH=$HBASE_JARS
再次运行发现又报了Unable to initialize MapOutputCollector
的错误:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1815/08/10 08:55:44 WARN mapred.MapTask: Unable to initialize MapOutputCollector org.apache.hadoop.mapred.MapTask$MapOutputBuffer
java.lang.NullPointerException
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1008)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:401)
...
at java.lang.Thread.run(Thread.java:745)
15/08/10 08:55:44 INFO mapred.LocalJobRunner: map task executor complete.
15/08/10 08:55:44 WARN mapred.LocalJobRunner: job_local2138114942_0001
java.lang.Exception: java.io.IOException: Unable to initialize any output collector
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: Unable to initialize any output collector
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:412)
...
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/08/10 08:55:44 INFO mapreduce.Job: Job job_local2138114942_0001 failed with state FAILED due to: NA
15/08/10 08:55:45 INFO mapreduce.Job: Counters: 0
原因是我没有指明Map输出的Key/Value类型,在Hdfs2HBase.java
中添加以下两句:1
2job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
如果没有专门定义Mapper输出类型的话,job.setOutputKeyClass
和job.setOutputValueClass
设置的是Mapper和Reducer两个的输出类型。1
2job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
而Hdfs2HBaseMapper输出类型是Text/Text,所以这里需要单独指定。
修改Hdfs2HBase.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43package com.lisong.hdfs2hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Hdfs2HBase {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 2) {
System.err.println("Usage: wordcount <infile> <table>");
System.exit(2);
}
Job job = new Job(conf, "hdfs2hbase");
job.setJarByClass(Hdfs2HBase.class);
job.setMapperClass(Hdfs2HBaseMapper.class);
job.setReducerClass(Hdfs2HBaseReducer.class);
job.setMapOutputKeyClass(Text.class); // +
job.setMapOutputValueClass(Text.class); // +
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, otherArgs[1]);
System.exit(job.waitForCompletion(true)?0:1);
}
}
再次编译、打包,然后运行成功!
查询HBase表,验证数据是否已导入:1
2
3
4
5
6hbase(main):001:0> scan 'mytable'
ROW COLUMN+CELL
r1 column=cf:c1, timestamp=1439223857492, value=value1
r2 column=cf:c2, timestamp=1439223857492, value=value2
r3 column=cf:c3, timestamp=1439223857492, value=value3
3 row(s) in 1.3820 seconds
可以看到,数据导入成功!
由于需要频繁的与存储数据的RegionServer通信,占用资源较大,一次性入库大量数据时,TableOutputFormat效率并不好。
二、拓展-TableReducer
我们可以将Hdfs2HBaseReducer.java
代码改成下面这样,作用是一样的:
1 | package com.lisong.hdfs2hbase; |
这里直接继承了TableReducer
,TableReducer
是部分特例化的Reducer
,它只有三个类型参数:输入Key/Value是对应Mapper的输出,输出Key可以是任意的类型,但是输出Value必须是一个Put
或Delete
实例。
编译打包运行,结果与前面的一样!