`
kirayuan
  • 浏览: 38612 次
文章分类
社区版块
存档分类
最新评论

hbase bulkload

 
阅读更多

bulkload的方式导入数据是hbase的一项相当好的数据导入工具,特别适合做为新系统的历史数据导入工具!hbase本身也封装了相关的类importtsv,官网有简单的介绍http://hbase.apache.org/bulk-loads.html。

这里我要说明的是如何去快速定制一些适合自己应用的bulkload。

我们一般需要运行的数据有几种格式,txt的用的最普遍,采用lzo压缩过的txt更专业一些,这里举例lzo格式的源文件。以下代码生成hfile

package com.sina.hbase.mr;

import java.io.IOException;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.hadoop.mapreduce.LzoTextInputFormat;
import com.sina.hbase.connection.ConnectionPool;
import com.sina.hbase.utils.DataOptUtil;
import com.sina.hbase.utils.Util;


public class BulkLoad {

	public static class ***Mapper extends
			Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			// 检查并初始化数据对象
			*** p = Util.checkAndBuild(value.toString());

			if (p != null) {
				byte[] row = Bytes.toBytes(p.getUid());
				ImmutableBytesWritable k = new ImmutableBytesWritable(row);
				KeyValue kv = new KeyValue(row, "c".getBytes(), "c".getBytes(),
						p.toByteArray());
				context.write(k, kv);

			}
		}
	}

	

	/**
	 * 通过表名决定使用哪种Mapper,如果表名不存在则返回null
	 * 
	 * @param tableName
	 * @return
	 */

	@SuppressWarnings("rawtypes")
	public static Class<? extends Mapper> decideMapper(String tableName) {
		if (tableName.equals("***"))
			return ***Mapper.class;
		

		return null;
	}

	public static void main(String[] args) throws Exception {

		if (args.length != 3) {
			System.err
					.println("Usage: BulkLoad <inputPath> <hfilePath> <tablename>");
			System.exit(2);
		}
		Configuration conf = HBaseConfiguration.create();
		ConnectionPool.init(conf, 1000);
		HTable table = null;
		
			table = ConnectionPool.getTable(args[2]);

		Job job = new Job(conf, "BulkLoad-" + args[2] + "-"
				+ DataOptUtil.Date2LongString(new Date()));

		// 根据表的不同选择mapper

		job.setMapperClass(decideMapper(args[2]));

		job.setJarByClass(BulkLoad.class);
		job.setInputFormatClass(LzoTextInputFormat.class);

		HFileOutputFormat.configureIncrementalLoad(job, table);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job,
				Util.RemoveHDFSPath(new Path(args[1]), conf));

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}
以上的源代码很简单,但是够用。需要做一些说明的是:

1、一定记得在建表时做region的预切分,HFileOutputFormat.configureIncrementalLoad方法会根据region的数量来觉得reduce的数量以及每个reduce覆盖的rowkey范围。否则当个reduce过大,任务处理不均衡。

2、单个rowkey下的子列不要过多,否则在reduce阶段排序的时候会造成oom,有一种办法是通过二次排序来避免reduce阶段的排序,看应用而定。

3、该代码执行完后需要将hdfs中生成好的hfile写入到hbase表中。采用hadoop jar hbase-version.jarcompletebulkload /hfilepath tablename 命令实现。

4、导入hadoop-lzo的jar包,才有LzoTextInputFormat这个类。

5、MR报google的一些包找不到时是hadoop classpath环境中没有加入hbase相关jar包,可以用-libjars xxx.jar,xxx.jar,xxx.jar来解决




分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics