Mostly technical stuff with some interesting moments of life

Hadoop: Writing Byte Output

3 comments
Recently, I wanted to write the set of value bytes (only them; no key) from the Reduce task of a Hadoop MapReduce program. The signature of the reduce function looked like this.

public void reduce(Text text, Iterator<byteswritable> itr, OutputCollector<text,> output, Reporter reporter)


In the main method I used SequenceFileOutputFormat as the output format. But it turned out that this way I get output as a SequenceFile, which I cannot later read by a separate Java program to extract out the values. May be I am wrong here, but as far as my searching went on, I couldn't find a way to easily do this.

After being fed up with searching I thought of writing a custom FileOutputFormat just to suit my job. So I wrote this ByteOutputFormat class, which simply writes the value bytes as a binary file. So later I can read it using a normal (non Hadoop aware) Java program to extract the bytes.


import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.DataOutputStream;
import java.io.IOException;

/**
* @author Saliya Ekanayake
*/
public class ByteOutputFormat<K, V> extends FileOutputFormat {
protected static class ByteRecordWriter<K, V> implements RecordWriter<K, V> {
private DataOutputStream out;

public ByteRecordWriter(DataOutputStream out) {
this.out = out;
}

public void write(K key, V value) throws IOException {
boolean nullValue = value == null || value instanceof NullWritable;
if (!nullValue) {
BytesWritable bw = (BytesWritable) value;
out.write(bw.get(), 0, bw.getSize());
}
}

public synchronized void close(Reporter reporter) throws IOException {
out.close();
}
}

@Override
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress)
throws IOException {
if (!getCompressOutput(job)) {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new ByteRecordWriter<K, V>(fileOut);
} else {
Class codecClass = getOutputCompressorClass(job, DefaultCodec.class);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, job);
Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new ByteRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)));

}

}
}


Hope this would be helpful for you as well.

3 comments :

  1. Awesome info!

    I am interested in knowing more on Mapreduce and Large Data analytics.. This one resource looks great...
    High Performance Analytics with Hadoop http://www.impetus.com/featured_webinar?eventid=16

    Could you suggest some more?

    ReplyDelete
  2. I think you would find the publications at http://grids.ucs.indiana.edu/ptliupages/publications/ to be helpful in that sense. I could be of help if I know the exact area that you are trying to tackle.

    ReplyDelete
  3. Hi is it a rule that the mapper and reducer method signatures to be as follows.
    public static class Map extends MapReduceBase implements Mapper
    {
    public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter)
    throws IOException
    {
    ...
    }
    }


    public static class Reduce extends MapReduceBase implements Reducer {
    {
    public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
    {
    ...
    }
    }

    or can we have our own signatures?

    I am a student and new to hadoop trying to implement data mining in hadoop.
    So basically I wanna try sorting of around 10000 numbers. so any help regarding the signatures of map/reduce methods?
    pls help...

    ReplyDelete