Imagine that you have a file “data.csv” that lies on Hadoop and you need to split it into a number of smaller files with the different data to process them separately. To do it with Pig or Hive you should specify the file schema to describe it as a table, which might be not the thing you need (for instance, if different rows have different schema). Here’s an example of how it can be done with a MapReduce job utilizing MultipleOutputs.
1. Mapper class:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
package io.pivotal.splitter; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import java.io.IOException; public class SplitterMap extends Mapper<LongWritable,Text,NullWritable,Text> { Text outVal = new Text(); MultipleOutputs mos = null; @Override protected void setup(Context context) throws IOException, InterruptedException { mos = new MultipleOutputs(context); } public void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String sVal = value.toString(); outVal.set(sVal.substring(sVal.indexOf('|')+1)); mos.write(NullWritable.get(), outVal, sVal.substring(0, sVal.indexOf('|')).replace("_", "")); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mos.close(); } } |
2. Main class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
package io.pivotal.splitter; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SplitterJob extends Configured implements Tool { public static void main( String[] args ) throws Exception { Configuration conf = new Configuration(); conf.set("mapred.output.compress", "true"); conf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); conf.set("mapreduce.map.memory.mb", "8000"); int exitCode = ToolRunner.run(conf, new SplitterJob(), args); System.exit(exitCode); } @Override public int run (final String[] args) throws Exception { Job job = new Job(getConf()); job.setJobName("File Splitter"); job.setJarByClass(io.pivotal.splitter.SplitterJob.class); job.setJar("/home/gpadmin/xml_parser/splitter/splitter.jar"); job.setOutputKeyClass(Text.class); job.setMapOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setMapperClass(SplitterMap.class); job.setNumReduceTasks(0); System.out.println(args[0]); System.out.println(args[1]); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean success = job.waitForCompletion(true); return success ? 0 : 1; } } |
It is a map-only job that reads the pipe-delimited file and outputs its content to a number of files based on the value of the first field in each row.