This example demostrates the usage of Hadoop to count DNA bases.
Notice: The method used in this example is not efficient. This example aims to show as many features of Hadoop as possible. Specificly, this example shows the custom InputFormat and RecordReader, custom partitioner, sort comparitor and grouping comparitor.
The input file for this test is a fasta file:
>a
cGTAaccaataaaaaaacaagcttaacctaattc
>a
cggcGGGcagatcta
>b
agcttagTTTGGatctggccgggg
>c
gcggatttactcCCCCCAAAAANNaggggagagcccagataaatggagtctgtgcgtccaca
gaattcgcacca
>c
gcggatttactcaggggagagcccagGGataaatggagtctgtgcgtccaca
gaattcgcacca
>d
tccgtgaaacaaagcggatgtaccggatNNttttattccggctatggggcaa
ttccccgtcgcggagcca
>d
atttatactcatgaaaatcttattcgagttNcattcaagacaagcttgaca
ttgatctacagaccaacagtacttacaaagaATGCCGaaatttaaaatgtggtcac
There are the following [ATGCatgcNN] bases in this file. We want to count the number of each character ignoring cases. This example will use a customized FastaInputFormat to split the file and use a FastaRecordReader to process each split. The mapper class emitts every single character with count 1 for each character in each fasta record, such as <a,1>, <T,1>,<G,1>,<g,1>. We use 3 reducers, and the partitioner class will partition the output of mapper into 3 reducers:[ATat] to reducer 0, [GCgc] to reducer 1, and others to reducer 2. We use a custom sortComparitor to sort the key-value pairs before they are passed to reducers. The custom comparitor sorts the keys ignoring cases so that the same DNA base with different cases will be next to each other. By default, setSortComparatorClass and setGroupingComparatorClass use the same comparitor, e.g, if we set setSortComparatorClass and leave setGroupingComparatorClass unsetted, setGroupingComparatorClass will use the same comparitor as we set for setSortComparatorClass, but but vice versa is not true, and you can use this example to test it.
pasted below is the code for each file:
FastaInputFormat.java
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
* NLineInputFormat which splits N lines of input as one split.
*
* In many “pleasantly” parallel applications, each process/mapper
* processes the same input file (s), but with computations are
* controlled by different parameters.(Referred to as “parameter sweeps”).
* One way to achieve this, is to specify a set of parameters
* (one set per line) as input in a control file
* (which is the input path to the map-reduce application,
* where as the input dataset is specified
* via a config variable in JobConf.).
*
* The NLineInputFormat can be used in such applications, that splits
* the input file such that by default, one line is fed as
* a value to one map task, and key is the offset.
* i.e. (k,v) is (LongWritable, Text).
* The location hints will span the whole mapred cluster.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FastaInputFormat extends FileInputFormat<LongWritable, Text> {
public static final String LINES_PER_MAP =
“mapreduce.input.lineinputformat.linespermap”;
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
context.setStatus(genericSplit.toString());
return new FastaRecordReader();
}
/**
* Logically splits the set of input files for the job, splits N lines
* of the input as one split.
*
* @see FileInputFormat#getSplits(JobContext)
*/
public List<InputSplit> getSplits(JobContext job)
throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
int numLinesPerSplit = getNumLinesPerSplit(job);
for (FileStatus status : listStatus(job)) {
splits.addAll(getSplitsForFile(status,
job.getConfiguration(), numLinesPerSplit));
}
return splits;
}
public static List<FileSplit> getSplitsForFile(FileStatus status,
Configuration conf, int numLinesPerSplit) throws IOException {
List<FileSplit> splits = new ArrayList<FileSplit> ();
Path fileName = status.getPath();
if (status.isDir()) {
throw new IOException(“Not a file: ” + fileName);
}
FileSystem fs = fileName.getFileSystem(conf);
LineReader lr = null;
try {
FSDataInputStream in = fs.open(fileName);
lr = new LineReader(in, conf);
Text line = new Text();
int numLines = 0;
long begin = 0;
long length = 0;
int num = -1;
/**
while ((num = lr.readLine(line)) > 0) {
numLines++;
length += num;
if (numLines == numLinesPerSplit) {
if (begin == 0) {
splits.add(new FileSplit(fileName, begin, length – 1,
new String[] {}));
} else {
splits.add(new FileSplit(fileName, begin – 1, length,
new String[] {}));
}
begin += length;
length = 0;
numLines = 0;
}
}
if (numLines != 0) {
splits.add(new FileSplit(fileName, begin, length, new String[]{}));
}
} finally {
if (lr != null) {
lr.close();
}
}
return splits;
*/
long record_length = 0;
int recordsRead = 0;
while ((num = lr.readLine(line)) > 0) {
if (line.toString().indexOf(“>”) >= 0){
recordsRead++;
}
if (recordsRead > numLinesPerSplit){
splits.add(new FileSplit(fileName, begin, record_length, new String[]{}));
begin = length;
record_length = 0;
recordsRead = 1;
}
length += num;
record_length += num;
}
splits.add(new FileSplit(fileName, begin, record_length, new String[]{}));
} finally {
if (lr != null) {
lr.close();
}
}
return splits;
}
/**
* Set the number of lines per split
* @param job the job to modify
* @param numLines the number of lines per split
*/
public static void setNumLinesPerSplit(Job job, int numLines) {
job.getConfiguration().setInt(LINES_PER_MAP, numLines);
}
/**
* Get the number of lines per split
* @param job the job
* @return the number of lines per split
*/
public static int getNumLinesPerSplit(JobContext job) {
return job.getConfiguration().getInt(LINES_PER_MAP, 1);
}
}
FastaRecordReader.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* Treats keys as offset in file and value as line.
*/
public class FastaRecordReader extends RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(FastaRecordReader.class);
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private LongWritable key = null;
private Text value = null;
FSDataInputStream fileIn;
Configuration job;
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
job = context.getConfiguration();
this.maxLineLength = job.getInt(“mapred.linerecordreader.maxlength”,
Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
–start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish “start”.
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end – start));
}
this.pos = start;
}
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
/**
while (pos < end) {
newSize = in.readLine(value, maxLineLength,
Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
maxLineLength));
if (newSize == 0) {
break;
}
pos += newSize;
if (newSize < maxLineLength) {
break;
}
// line too long. try again
LOG.info(“Skipped line of size ” + newSize + ” at pos ” +
(pos – newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
*/
LOG.info(“##########################”);
StringBuilder text = new StringBuilder();
int record_length = 0;
Text line = new Text();
int recordsRead = 0;
while (pos < end) {
key.set(pos);
newSize = in.readLine(line, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength));
if(line.toString().indexOf(“>”) >= 0){
if(recordsRead > 9){//10 fasta records each time
value.set(text.toString());
fileIn.seek(pos);
in = new LineReader(fileIn, job);
return true;
}
recordsRead++;
}
record_length += newSize;
text.append(line.toString());
text.append(“\n”);
pos += newSize;
if (newSize == 0) {
break;
}
}
if (record_length == 0){
return false;
}
value.set(text.toString());
return true;
}
@Override
public LongWritable getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
/**
* Get the progress within the split
*/
public float getProgress() {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos – start) / (float)(end – start));
}
}
public synchronized void close() throws IOException {
if (in != null) {
in.close();
}
}
}
CountBaseMapper.java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
public class CountBaseMapper
extends Mapper<Object, Text, Text, IntWritable>
{
private Text base = new Text();
private IntWritable one = new IntWritable(1);
private static final Log LOG = LogFactory.getLog(CountBaseMapper.class);
public void map(Object key,
Text value,
Context context)
throws IOException, InterruptedException
{
//System.err.println(String.format(“[map] key: (%s), value: (%s)”, key, value));
// break each sentence into words, using the punctuation characters shown
String fasta = value.toString();
String[] lines = fasta.split(“[\\r\\n]+”);
LOG.info(“#############”);
StringBuffer sb = new StringBuffer();
for(int j=1;j<lines.length;j++){
char[] array = lines[j].toCharArray();
for(char c : array){
LOG.info(“>”+new Character(c).toString()+”<“);
base.set(new Character(c).toString());
context.write(base,one);
}
}
/**
StringTokenizer tokenizer = new StringTokenizer(value.toString(), ” \t\n\r\f,.:;?![]'”);
while (tokenizer.hasMoreTokens())
{
// make the words lowercase so words like “an” and “An” are counted as one word
String s = tokenizer.nextToken().toLowerCase().trim();
System.err.println(String.format(“[map, in loop] token: (%s)”, s));
word.set(s);
context.write(word, one);
}
*/
}
}
CountBaseReducer.java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
public class CountBaseReducer
extends Reducer<Text, IntWritable, Text, IntWritable> //the types must corespond to the output of map the output of reduce
{
//private IntWritable occurrencesOfWord = new IntWritable();
private static final Log LOG = LogFactory.getLog(CountBaseReducer.class);
public void reduce(Text key,
Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException
{
LOG.info(“————–reducer—————-“);
int count = 0;
IntWritable out = new IntWritable();
for(IntWritable val:values){
count++;
}
out.set(count);
LOG.info(“<“+key.toString()+”>”);
//key.set(“>”+key.toString()+”<“);
context.write(key,out);
}
}
BasePartitioner.java
import org.apache.hadoop.mapreduce.Partitioner;
/** Partition keys by bases{A,T,G,C,a,t,g,c}. */
public class BasePartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value,
int numReduceTasks) {
String base = key.toString();
if(base.compareToIgnoreCase(“A”) == 0){
return 0;
}else if(base.compareToIgnoreCase(“C”) == 0){
return 1;
}else if(base.compareToIgnoreCase(“G”) == 0){
return 1;
}else if(base.compareToIgnoreCase(“T”) == 0){
return 0;
}else{
return 2;
}
}
}
BaseComparator.java
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class BaseComparator extends WritableComparator {
protected BaseComparator(){
super(Text.class,true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
Text t1 = (Text) w1;
Text t2 = (Text) w2;
//compare bases ignoring cases
String s1 = t1.toString().toUpperCase();
String s2 = t2.toString().toUpperCase();
int cmp = s1.compareTo(s2);
return cmp;
}
}
CountBaseDriver.java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
public class CountBaseDriver {
/**
* the “driver” class. it sets everything up, then gets it started.
*/
public static void main(String[] args)
throws Exception
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2)
{
System.err.println(“Usage: blastdriver <inputFile> <outputDir>”);
System.exit(2);
}
Job job = new Job(conf, “count bases”);
job.setJarByClass(CountBaseMapper.class);
job.setMapperClass(CountBaseMapper.class);
job.setNumReduceTasks(3);
//job.setCombinerClass(CountBaseReducer.class);
job.setReducerClass(CountBaseReducer.class);
job.setInputFormatClass(FastaInputFormat.class);
job.setPartitionerClass(BasePartitioner.class);
job.setSortComparatorClass(BaseComparator.class); //setGroupingComparatorClass will use the same comparitor as setSortComparatorClass by default, so do not need to explicitly set setGroupingComparatorClass, but vice versa is not true. You can change the settings to test it.
//job.setGroupingComparatorClass(BaseComparator.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
compile the course code files (make sure all th erequired jars included in the classpath):
javac -classpath .:/home/ubuntu/hadoop-1.0.1/hadoop-core-1.0.1.jar:/home/ubuntu/hadoop-1.0.1/commons-logging-1.1.1/commons-logging-1.1.1.jar:/home/ubuntu/hadoop-1.0.1/lib/commons-cli-1.2.jar:/home/ubuntu/hadoop-1.0.1/contrib/streaming/hadoop-streaming-1.0.1.jar *java
create the jar:
jar cef CountBaseDriver countbasedriver.jar .
put your input fasta files in input and run hadoop:
bin/hadoop fs -mkdir input
bin/hadoop fs -put input/test_fasta.fasta input
bin/hadoop jar countbasedriver.jar input output
results:
without customized SortComparatorClass
with customized SortComparatorClass
Download the source code jar file
Read Full Post »