public void reduce(Text text, Iterator<byteswritable> itr, OutputCollector<text,> output, Reporter reporter)
In the main method I used SequenceFileOutputFormat as the output format. But it turned out that this way I get output as a SequenceFile, which I cannot later read by a separate Java program to extract out the values. May be I am wrong here, but as far as my searching went on, I couldn't find a way to easily do this.
After being fed up with searching I thought of writing a custom FileOutputFormat just to suit my job. So I wrote this ByteOutputFormat class, which simply writes the value bytes as a binary file. So later I can read it using a normal (non Hadoop aware) Java program to extract the bytes.
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.DataOutputStream;
import java.io.IOException;
/**
* @author Saliya Ekanayake
*/
public class ByteOutputFormat<K, V> extends FileOutputFormat {
protected static class ByteRecordWriter<K, V> implements RecordWriter<K, V> {
private DataOutputStream out;
public ByteRecordWriter(DataOutputStream out) {
this.out = out;
}
public void write(K key, V value) throws IOException {
boolean nullValue = value == null || value instanceof NullWritable;
if (!nullValue) {
BytesWritable bw = (BytesWritable) value;
out.write(bw.get(), 0, bw.getSize());
}
}
public synchronized void close(Reporter reporter) throws IOException {
out.close();
}
}
@Override
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress)
throws IOException {
if (!getCompressOutput(job)) {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new ByteRecordWriter<K, V>(fileOut);
} else {
Class codecClass = getOutputCompressorClass(job, DefaultCodec.class);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, job);
Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new ByteRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)));
}
}
}
Hope this would be helpful for you as well.
Awesome info!
ReplyDeleteI am interested in knowing more on Mapreduce and Large Data analytics.. This one resource looks great...
High Performance Analytics with Hadoop http://www.impetus.com/featured_webinar?eventid=16
Could you suggest some more?
I think you would find the publications at http://grids.ucs.indiana.edu/ptliupages/publications/ to be helpful in that sense. I could be of help if I know the exact area that you are trying to tackle.
ReplyDeleteHi is it a rule that the mapper and reducer method signatures to be as follows.
ReplyDeletepublic static class Map extends MapReduceBase implements Mapper
{
public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter)
throws IOException
{
...
}
}
public static class Reduce extends MapReduceBase implements Reducer {
{
public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
{
...
}
}
or can we have our own signatures?
I am a student and new to hadoop trying to implement data mining in hadoop.
So basically I wanna try sorting of around 10000 numbers. so any help regarding the signatures of map/reduce methods?
pls help...