Saliya's Blogs

Mostly technical stuff with some interesting moments of life

Hadoop: Writing Byte Output

3 comments
Recently, I wanted to write the set of value bytes (only them; no key) from the Reduce task of a Hadoop MapReduce program. The signature of the reduce function looked like this.

public void reduce(Text text, Iterator<byteswritable> itr, OutputCollector<text,> output, Reporter reporter)


In the main method I used SequenceFileOutputFormat as the output format. But it turned out that this way I get output as a SequenceFile, which I cannot later read by a separate Java program to extract out the values. May be I am wrong here, but as far as my searching went on, I couldn't find a way to easily do this.

After being fed up with searching I thought of writing a custom FileOutputFormat just to suit my job. So I wrote this ByteOutputFormat class, which simply writes the value bytes as a binary file. So later I can read it using a normal (non Hadoop aware) Java program to extract the bytes.


import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.DataOutputStream;
import java.io.IOException;

/**
* @author Saliya Ekanayake
*/
public class ByteOutputFormat<K, V> extends FileOutputFormat {
protected static class ByteRecordWriter<K, V> implements RecordWriter<K, V> {
private DataOutputStream out;

public ByteRecordWriter(DataOutputStream out) {
this.out = out;
}

public void write(K key, V value) throws IOException {
boolean nullValue = value == null || value instanceof NullWritable;
if (!nullValue) {
BytesWritable bw = (BytesWritable) value;
out.write(bw.get(), 0, bw.getSize());
}
}

public synchronized void close(Reporter reporter) throws IOException {
out.close();
}
}

@Override
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress)
throws IOException {
if (!getCompressOutput(job)) {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new ByteRecordWriter<K, V>(fileOut);
} else {
Class codecClass = getOutputCompressorClass(job, DefaultCodec.class);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, job);
Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new ByteRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)));

}

}
}


Hope this would be helpful for you as well.

3 comments :

Post a Comment