Experimental Principle
1. InputFormat Concept
The InputFormat class in Hadoop defines how input files are split and read. It provides the following functionality:
- Selects files or objects to process as input
- Defines InputSplits that partition files into tasks
- Provides a factory method for RecordReader to read files
Hadoop includes several built-in input formats. The FileInputFormat is an abstract base class that all file-based InputFormat implementations inherit from. When a Hadoop job starts, FileInputFormat receives a path parameter containing files to process. It reads all files in that directory (by default, excluding subdirectories) and splits them into one or more InputSplits. You can set the input format for your job using the setInputFormat() method on the JobConf object.
The following table describes the default input formats:
TextInputFormat treats each line of the input file as a separate record without parsing. This is useful for unformatted data or line-based records such as log files.
KeyValueInputFormat also treats each line as a record, but unlike TextInputFormat which treats the entire line as the value, it splits the line into key-value pairs by searching for tab characters. This is particularly useful when the output of one MapReduce job serves as input to another job, since the default output format produces data in this format.
SequenceFileInputFormat reads Hadoop-specific binary files with features optimized for fast reading by mappers. Sequence files are block-compressed and provide direct serialization and deserialization for multiple data types (not just text). Sequence files can serve as MapReduce output and are efficient for intermediate data between jobs.
2. InputSplit
An InputSplit describes a single unit of work for a map task in a MapReduce program. When a MapReduce program is applied to a dataset, a job consists of several (possibly hundreds) tasks. Map tasks may read an entire file or typically just a portion of it. By default, FileInputFormat and its subclasses split files using 64MB as the base unit (matching the default HDFS block size). You can control the split size by setting the mapred.min.split.size parameter in the configuration file or by overriding this parameter in the JobConf object for a specific MapReduce job. Processing files in blocks allows multiple map tasks to operate on a file in parallel. For very large files, this feature can significantly improve performance through parallel processing. More importantly, since files consisting of multiple blocks may be distributed across several nodes in the cluster, tasks can be scheduled on different nodes, allowing all individual blocks to be processed locally without data transfer between nodes. However, some file formats do not support block processing. For such cases, you can write a custom InputFormat to control how files are split (or not split) into blocks.
The InputFormat defines the list of map tasks in the mapping phase, with each task corresponding to an input split. These tasks are then assigned to respective nodes based on the physical location of the input file blocks. Multiple map tasks may be assigned to the same node. After task assignment, nodes begin executing tasks to maximize parallelization. The maximum number of parallel tasks on a node is controlled by the mapred.tasktracker.map.tasks.maximum parameter.
3. RecordReader
While InputSplit defines how to divide work, it does not describe how to access it. The RecordReader class is responsible for actually loading data and converting it into key-value pairs suitable for mappers to read. RecordReader instances are defined by the input format. The default input format, TextInputFormat, provides a LineRecordReader that treats each line of the input file as a new value, with the key being the byte offset of that line in the file. The RecordReader is repeatedly called on the input split until the entire splits processed, and each call invokes the Mapper's map() method.
4. When to Create Custom InputFormat
When dealing with special key-value pairs, such as keys composed of file names and record positions, the built-in input formats like TextInputFormat, CombineInputFormat, and NLineInputFormat cannot meet requirements. In such cases, you need to implement custom enput splitting. MapReduce defines the InputFormat interface with two methods: getSplits() and createRecordReader(). The getSplits() method splits input files into individual splits (similar to how Hadoop's default InputFormats split files larger than BlockSize while leaving smaller ones unsplit). The createRecordReader() method splits each split into key-value pairs for MapReduce to use in the map phase. The key task is defining how key-value pairs are generated. Developers need to extend FileInputFormat to implement a new input format and extend RecordReader to implement reading methods for the new key and value types.
FileInputFormat implements the InputFormat interface, providing logic to split files larger than BlockSize while preserving the createRecordReader() method for custom implementation. You can create a class like CustomFileInputFormat that extends FileInputFormat and overrides the createRecordReader() method.
Looking at TextInputFormat as a reference, it extends FileInputFormat and overrides createRecordReader(), calling LineRecordReader to implement the input splitting. Similarly, you need to create a custom RecordReader class similar to LineRecordReader and have your custom InputFormat call it. When LineRecordReader extends RecordReader, it overrides six methods: initialize(), getCurrentKey(), getCurrentValue(), getProgress(), close(), and nextKeyValue(). Your custom RecordReader must also override these methods.
Experimental Steps
Step 1: Create Table
Create a comma-separated table file.
Step 2: Create Local Directory
mkdir -p /data/mapreduce11
Step 3: Upload Table to Virtual Machine
Transfer the table file to the virtual machine's /data/mapreduce11 directory.
Step 4: Upload and Extract Hadoop Libraries
hadoop fs -mkdir -p /mymapreduce11/in
hadoop fs -put /data/mapreduce11/cat1 /mymapreduce11/in
Step 5: Create HDFS Directory and Upload Input Data
hadoop fs -mkdir -p /mymapreduce11/in
hadoop fs -put /data/mapreduce11/cat1 /mymapreduce11/in
Step 6: Implement Java Code in IDE
The following is the main MapReduce job class:
package mapreduce.custom;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FileKeyMR {
public static class MapPhase extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
System.out.println(line);
String[] fields = line.split(",");
for (String field : fields) {
context.write(key, new Text(field));
}
System.out.println(line);
}
}
public static class ReducePhase extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuilder result = new StringBuilder(":");
for (Text val : values) {
result.append(val.toString());
}
context.write(key, new Text(result.toString()));
}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration config = new Configuration();
Job job = Job.getInstance(config, "FileKeyMR");
job.setJarByClass(FileKeyMR.class);
job.setMapperClass(MapPhase.class);
job.setReducerClass(ReducePhase.class);
job.setInputFormatClass(FileKeyInputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job,
new Path("hdfs://192.168.149.10:9000/mymapreduce11/in/cat1"));
FileOutputFormat.setOutputPath(job,
new Path("hdfs://192.168.149.10:9000/mymapreduce11/out"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
The custom InputFormat class:
package mapreduce.custom;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class FileKeyInputFormat extends FileInputFormat<Text, Text> {
public FileKeyInputFormat() {
super();
}
@Override
public RecordReader<Text, Text> createRecordReader(
InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
FileKeyRecordReader reader = new FileKeyRecordReader();
try {
reader.initialize(split, context);
} catch (Exception e) {
e.printStackTrace();
}
return reader;
}
@Override
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return super.computeSplitSize(blockSize, minSize, maxSize);
}
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
return super.getSplits(context);
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return true;
}
@Override
protected List<FileStatus> listStatus(JobContext context) throws IOException {
return super.listStatus(context);
}
}
The custom RecordReader class:
package mapreduce.custom;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
public class FileKeyRecordReader extends RecordReader<Text, Text> {
private String fileName;
private LineRecordReader lineReader;
public FileKeyRecordReader() {
this.lineReader = new LineRecordReader();
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
lineReader.initialize(split, context);
this.fileName = ((FileSplit) split).getPath().getName();
}
@Override
public void close() throws IOException {
lineReader.close();
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
System.out.println("CurrentKey");
LongWritable position = lineReader.getCurrentKey();
Text key = new Text("(" + fileName + "@" + position + ")");
System.out.println("key--" + key);
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return lineReader.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public boolean nextKeyValue() throws IOException {
return lineReader.nextKeyValue();
}
}
Step 7: Add Hadoop Libraries to Project
Copy all JAR files from the hadoop2lib directory to your project's classpath or lib folder.
Step 8: Copy Log4j Configuration
Copy the log4j.properties file to the project resources directory.
Step 9: Execute and View Results
hadoop fs -ls /mymapreduce11/out
hadoop fs -cat /mymapreduce11/out/part-r-00000