Hadoop: Writing Byte Output
Recently, I wanted to write the set of value bytes (only them; no key) from the Reduce task of a Hadoop MapReduce program. The signature of the reduce function looked like this.
public void reduce(Text text, Iterator<byteswritable> itr, OutputCollector<text,> output, Reporter reporter)
In the main method I used SequenceFileOutputFormat as the output format. But it turned out that this way I get output as a SequenceFile, which I cannot later read by a separate Java program to extract out the values. May be I am wrong here, but as far as my searching went on, I couldn't find a way to easily do this.
After being fed up with searching I thought of writing a custom FileOutputFormat just to suit my job. So I wrote this ByteOutputFormat class, which simply writes the value bytes as a binary file. So later I can read it using a normal (non Hadoop aware) Java program to extract the bytes.
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.DataOutputStream;
import java.io.IOException;
/**
* @author Saliya Ekanayake
*/
public class ByteOutputFormat<K, V> extends FileOutputFormat {
protected static class ByteRecordWriter<K, V> implements RecordWriter<K, V> {
private DataOutputStream out;
public ByteRecordWriter(DataOutputStream out) {
this.out = out;
}
public void write(K key, V value) throws IOException {
boolean nullValue = value == null || value instanceof NullWritable;
if (!nullValue) {
BytesWritable bw = (BytesWritable) value;
out.write(bw.get(), 0, bw.getSize());
}
}
public synchronized void close(Reporter reporter) throws IOException {
out.close();
}
}
@Override
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress)
throws IOException {
if (!getCompressOutput(job)) {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new ByteRecordWriter<K, V>(fileOut);
} else {
Class codecClass = getOutputCompressorClass(job, DefaultCodec.class);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, job);
Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new ByteRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)));
}
}
}
Hope this would be helpful for you as well.
Subscribe to:
Posts
(
Atom
)
3 comments :
Post a Comment