
上QQ阅读APP看书,第一时间看更新
Map side join (replicated join)
If any of the data is small enough to fit into the main memory, then a map side join can be a good choice. In a map side join, the small dataset is loaded into the memory map during the setup phase of mapper. Large datasets will be read as input to the mapper so that each record gets joined with a small dataset and output is then emitted to a file. There is no reduce phase and therefore there will be no shuffling and sorting phases.
Map side join is widely used for left outer join and inner join use cases. Let's look into examples of how we can create a Mapper class for map side join and Driver class:
- Mapper class: The following Mapper class is a template for using map side join and you can use it and modify the logic according to your input dataset. The data that's read from a distributed cache is stored in RAM and therefore it can throw an out of memory exception if the file size does not fit into memory. The only option to solve this problem is to increase the memory space. The setup method is executed only once during the mapper life cycle and the map function is called for each record. Inside the map function, each record is processed and checked for any matching record available in memory to perform any join operation.
Let's look into the Mapper class template. The following is the code for the Mapper class:
import org.apache.Hadoop.conf.Configuration;
import org.apache.Hadoop.fs.Path;
import org.apache.Hadoop.io.LongWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Job;
import org.apache.Hadoop.mapreduce.Mapper;
import java.io.*;
import java.net.URI;
import java.util.HashMap;
public class UserPurchaseMapSideJoinMapper extends
Mapper<LongWritable, Text, Text, Text> {
private HashMap<String, String> userDetails = new HashMap<String, String>();
private Configuration conf;
public void setup(Context context) throws IOException {
conf = context.getConfiguration();
URI[] URIs = Job.getInstance(conf).getCacheFiles();
for (URI patternsURI : URIs) {
Path filePath = new Path(patternsURI.getPath());
String userDetailFile = filePath.getName();
readFile(userDetailFile);
}
}
private void readFile(String filePath) {
try {
BufferedReader bufferedReader = new BufferedReader(new FileReader(filePath));
String userInfo = null;
while ((userInfo = bufferedReader.readLine()) != null) {
/* Add Record to map here. You can modify value and key accordingly.*/
userDetails.put(userInfo.split(",")[0], userInfo.toLowerCase());
}
} catch (IOException ex) {
System.err.println("Exception while reading stop words file: " + ex.getMessage());
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String purchaseDetailUserId = value.toString().split(",")[0];
String userDetail = userDetails.get(purchaseDetailUserId);
/*Perform the join operation here*/
}
}
- Driver class: In the Driver class, we add the path of the input file that will be shipped to each mapper during their execution. Let's look into the Driver class template, as follows:
import org.apache.Hadoop.conf.Configuration;
import org.apache.Hadoop.fs.Path;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Job;
import org.apache.Hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.Hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.Hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.Hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.Hadoop.util.Tool;
import org.apache.Hadoop.util.ToolRunner;
import java.util.Map;
public class MapSideJoinDriver {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), (Tool) new MapSideJoinDriver(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "map join");
job.setJarByClass(MapSideJoinDriver.class);
if (args.length < 3) {
System.out.println("Jar requires 3 paramaters : \""
+ job.getJar()
+ " input_path output_path distributedcachefile");
return 1;
}
job.addCacheFile(new Path(args[2]).toUri());
job.setMapperClass(UserPurchaseMapSideJoinMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path filePath = new Path(args[0]);
FileInputFormat.setInputPaths(job, filePath);
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
return 0;
}
}