Hadoop: Writing Byte Output
Recently, I wanted to write the set of value bytes (only them; no key) from the Reduce task of a Hadoop MapReduce program. The signature of the reduce function looked like this.
public void reduce(Text text, Iterator<byteswritable> itr, OutputCollector<text,> output, Reporter reporter)
In the main method I used SequenceFileOutputFormat as the output format. But it turned out that this way I get output as a SequenceFile, which I cannot later read by a separate Java program to extract out the values. May be I am wrong here, but as far as my searching went on, I couldn't find a way to easily do this.
After being fed up with searching I thought of writing a custom FileOutputFormat just to suit my job. So I wrote this ByteOutputFormat class, which simply writes the value bytes as a binary file. So later I can read it using a normal (non Hadoop aware) Java program to extract the bytes.
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.DataOutputStream;
import java.io.IOException;
/**
* @author Saliya Ekanayake
*/
public class ByteOutputFormat<K, V> extends FileOutputFormat {
protected static class ByteRecordWriter<K, V> implements RecordWriter<K, V> {
private DataOutputStream out;
public ByteRecordWriter(DataOutputStream out) {
this.out = out;
}
public void write(K key, V value) throws IOException {
boolean nullValue = value == null || value instanceof NullWritable;
if (!nullValue) {
BytesWritable bw = (BytesWritable) value;
out.write(bw.get(), 0, bw.getSize());
}
}
public synchronized void close(Reporter reporter) throws IOException {
out.close();
}
}
@Override
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress)
throws IOException {
if (!getCompressOutput(job)) {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new ByteRecordWriter<K, V>(fileOut);
} else {
Class codecClass = getOutputCompressorClass(job, DefaultCodec.class);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, job);
Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new ByteRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)));
}
}
}
Hope this would be helpful for you as well.
Subscribe to:
Post Comments
(
Atom
)
Awesome info!
ReplyDeleteI am interested in knowing more on Mapreduce and Large Data analytics.. This one resource looks great...
High Performance Analytics with Hadoop http://www.impetus.com/featured_webinar?eventid=16
Could you suggest some more?
I think you would find the publications at http://grids.ucs.indiana.edu/ptliupages/publications/ to be helpful in that sense. I could be of help if I know the exact area that you are trying to tackle.
ReplyDeleteHi is it a rule that the mapper and reducer method signatures to be as follows.
ReplyDeletepublic static class Map extends MapReduceBase implements Mapper
{
public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter)
throws IOException
{
...
}
}
public static class Reduce extends MapReduceBase implements Reducer {
{
public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
{
...
}
}
or can we have our own signatures?
I am a student and new to hadoop trying to implement data mining in hadoop.
So basically I wanna try sorting of around 10000 numbers. so any help regarding the signatures of map/reduce methods?
pls help...