HBase MapReduce

Introduction

We know that MapReduce is a process designed to solve the problem of processing huge dataset of range terabytes to pet bytes of data in a scalable way. It increases the performance linearly by adding number of physical machines to the cluster.

It follows method of divide-and-conquer approach by splitting the data into chunks located on a distributed file system. Server can access these chunks of data and process them quickly.

We know that MapReduce involves two stages they are,

  1. Mapping- It performs filtering and sorting.
  2. Reducing- It perform summary operation like combining the result.

In case of Hbase MapReduce happens in two stages, will study one by one.

1. Reading from Hbase

Here “TableInputFormat” is used to read an HBase table and input into the MapReduce job, in this stage mapping will happen by splitting each region of the table. For example if  there are 100 regions in the table, there will be 100 map tasks for the job, regardless of how many column families are selected in the Scan.

Example

In this example we are configuring read-only job by using HBase as a MapReduce source.

Configuration config = HBaseConfiguration.create();

config.set(                                 // speculative

“mapred.map.tasks.speculative.execution”, // execution will

“false”);                                 // decrease performance // or damage the data

Job job = new Job(config, “ExampleRead”);

job.setJarByClass(MyReadJob.class); // class that contains mapper

Scan scan = new Scan();

scan.setCaching(500);         // 1 is the default in Scan,

// which will be bad for MapReduce jobs

scan.setCacheBlocks(false);   // don’t set to true for MR jobs

// set other scan attrs

TableMapReduceUtil.initTableMapperJob(

tableName,        // input HBase table name

scan,             // Scan instance to control CF and attribute selection

MyMapper.class,   // mapper

null,             // mapper output key

null,             // mapper output value

job);

job.setOutputFormatClass(NullOutputFormat.class);  //not emitting anything from mapper

boolean b = job.waitForCompletion(true);

if (!b) {

throw new IOException(“error with job!”);

}

Here mapper instance extend TableMapper as shown below.

public static class MyMapper extends TableMapper<Text, Text>

{

public void map(ImmutableBytesWritable row, Result value, Context context)

throws InterruptedException, IOException {

// processing from the Result instance.

}

}

2. Writing to Hbase

Example

In this example we are configuring writing job by using Hbase as both source and sink with MapReduce.

Configuration config = …; // configuring reading

Job job = …; // from HBase table

Scan scan = …; // is the same as in

TableMapReduceUtil // read-only example

.initTableMapperJob(…); // above

TableMapReduceUtil.initTableReducerJob(

targetTable, // output table

MyTableReducer.class, // reducer class

job);

job.setNumReduceTasks(1); // at least one, adjust as required

boolean b = job.waitForCompletion(true);

Here reducer instance extend TableReducer as shown below.

public static class MyTableReducer extends TableReducer<Text, IntWritable,

ImmutableBytesWritable> {

public void reduce(Text key, Iterable values, Context context)

throws IOException, InterruptedException {

Put put = …; // data to be written

context.write(null, put);

}

}

Reference

http://hbase.apache.org/0.94/book/mapreduce.example.html