Environment: Windows 10, CentOS 7.9, Hadoop 3.2, HBase 2.5.3, and Zookeeper 3.8 in fully distributed mode; Environment setup procedures can be found in these articles: CentOS7 Hadoop3.X Fully Distributed Environment Setup Hadoop3.x Fully Distributed Environment Setup with Zookeeper and Hbase
1. Integrating MapReduce and HBase
- Copy
hbase-site.xmlto$HADOOP_HOME/etc/hadoopdirectory
cp $HBASE_HOME/conf/hbase-site.xml $HADOOP_HOME/etc/hadoop/
Note: For fully distributed environments, all nodes need to have this file copied. This applies to subsequent operations as well.
- Edit
hadoop-env.shto include HBase libraries inHADOOP_CLASSPATH, allowing MapReduce programs to access these libraries at runtime
vim $HADOOP_HOME/etc/hadoop/hadoop-env.sh
# Add the following content to the file
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/hbase/lib/*
- Execute test package
hbase-server-2.5.3-tests.jar
cd $HBASE_HOME/lib
# 'test' refers to the table in HBase database
hadoop jar hbase-server-2.5.3-tests.jar org.apache.hadoop.hbase/mapreduce.RowCounter test
Upon successful execution:
Test completed successfully.
2. Bulk Data Import
Import required data in to HBase.
2.1 Uploading Data to HDFS
First, upload data to HDFS to prepare for bulk import into HBase.
- Create a new folder
/data/songsin HDFS
hadoop fs -mkdir -p /data/songs
- Upload data files (
songs1.txt, songs2.txt, songs3.txt) to the host machine
rz # Using xshell to upload files via rz command, select appropriate files
- Upload files to HDFS under
data/songsdirectory
hadoop fs -put songs1.txt songs2.txt songs3.txt /data/songs # Upload files
hadoop fs -ls /data/songs/ # View files
2.2 Importing Data into HBase
- Use
importtsvto generate HFile and create table from prepared data
cd $HBASE_HOME/lib # Enter HBase lib directory containing various jar packages
hadoop jar hbase-server-2.5.3.jar org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.bulk.output=temp_storage -Dimporttsv.columns=HBASE_ROW_KEY,metadata:title,metadata:artist,metadata:sex,metadata:rhythm,metadata:device tracks /data/songs -Dcreate.table=yes
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.bulk.output=temp_storage -Dimporttsv.columns=HBASE_ROW_KEY,metadata:title,metadata:artist,metadata:sex,metadata:rhythm,metadata:device tracks /data/songs -Dcreate.table=yes
hbase-server-2.5.3.jaris the Hadooop MapReduce task jar containing the importtsv class for importing data into HBase.-Dimporttsv.bulk.output=temp_storageis a system property specifying temporary output directory, default is/temp(in HDFS), using this option generates HFile in internal format without writing data to HBase, instead placing it in specified temporary directory/temp.-Dimporttsv.columns=HBASE_ROW_KEY,metadata:title,metadata:artist,metadata:sex,metadata:rhythm,metadata:device tracksspecifies columns to import into HBase. Each column consists of column family name and qualifier separated by colon. The command specifies 6 columns includingHBASE_ROW_KEY(unique identifier per row) and other info columns.- /data/songs is the location of data in HDFS.
- -Dcreate.table=yes indicates automatic table creation.
Normal execution completion result:
- Load HFile data into HBase
hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles temp_storage tracks
Normal execution completion result:
- View content of tracks table in HBase
Data has been loaded into HBase tracks table.
3. Data Processing
Regarding HBase Storage Unit cell
The storage unit cell in HBase consists of the following fields:
1) row
2) column family
3) column qualifier
4) timestamp
5) type
6) MVCC version
7) value
3.1 Project Source Code
3.1.1 HBaseConnection
package com.songs.ranking;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
public class HBaseConnection {
// Set static property for HBase connection
public static Connection conn = null;
static {
try {
// Use configuration to get server
conn = ConnectionFactory.createConnection();
} catch (IOException e) {
System.out.println("Connection retrieval failed");
e.printStackTrace();
}
}
public static void closeConnection() throws IOException {
if (conn != null) {
conn.close();
}
}
}
3.1.2 HBaseSchemaOperations
package com.songs.ranking;
import com.database.api.HBaseConnection;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class HBaseSchemaOperations {
// Add static property connection pointing to singleton connection
public static Connection conn = HBaseConnection.conn;
/**
* Check if table exists
*
* @param namespace namespace name
* @param tableName table name
* @return returns check result
* @throws IOException exception
*/
public static boolean doesTableExist(String namespace, String tableName) throws IOException {
// Get admin
Admin admin = conn.getAdmin();
// Use method to check if table exists
boolean exists = false;
try {
exists = admin.tableExists(TableName.valueOf(tableName));
} catch (IOException e) {
e.printStackTrace();
}
// Close admin
admin.close();
return exists;
}
/**
* Create table
*
* @param namespace namespace name
* @param tableName table name
* @param columnFamilies column family names
*/
public static void createTable(String namespace, String tableName, String... columnFamilies) throws IOException {
// Check for at least one column family
if (columnFamilies.length == 0) {
System.out.println("Table creation requires at least one column family");
return;
}
// Check if table exists
if (doesTableExist(namespace, tableName)) {
System.out.println("Table already exists");
return;
}
// Get admin
Admin admin = conn.getAdmin();
// Call method to create table
// Create table descriptor builder
// Just add parameters in builder, no need to generate descriptor objects
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(namespace, tableName));
// Add parameters
for (String columnFamily : columnFamilies) {
// Create column family descriptor builder
ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily));
// Add parameters for current column family
columnFamilyDescriptorBuilder.setMaxVersions(5);
// Create column family descriptor with added parameters
tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
}
// Create corresponding table descriptor
try {
admin.createTable(tableDescriptorBuilder.build());
} catch (IOException e) {
// Since we already checked existence, any exception means other issues, just print stack trace
e.printStackTrace();
}
// Close admin
admin.close();
}
/**
* Delete table
*
* @param namespace namespace name
* @param tableName table name
* @return return 1 if deletion successful, otherwise 0
*/
public static boolean removeTable(String namespace, String tableName) throws IOException {
// Check if table exists
if (!doesTableExist(namespace, tableName)) {
System.out.println("Table doesn't exist, cannot delete");
return false;
}
// Get admin
Admin admin = conn.getAdmin();
// Call delete table method
try {
// Before deleting table, mark as disabled
TableName tableName1 = TableName.valueOf(namespace, tableName);
admin.disableTable(tableName1);
admin.deleteTable(tableName1);
} catch (IOException e) {
e.printStackTrace();
}
// Close admin
admin.close();
return true;
}
}
3.1.3 CountAggregatorReducer
package com.songs.ranking;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
class CountAggregatorReducer extends TableReducer<Text, IntWritable, Text> {
@Override
/**
* Sum total plays for each song
*
* @param key // Song title
* @param values // Play count collection {1, 1, 1, 1}
* @param context // Context
* @throws IOException
* @throws InterruptedException
*/
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// Count total plays per song
int totalPlays = 0;
for (IntWritable num : values) {
totalPlays += num.get();
}
// Specify row key for Put operation
Put put = new Put(Bytes.toBytes(key.toString()));
// Specify column and value for Put operation
put.addColumn(Bytes.toBytes("statistics"), Bytes.toBytes("position"),
Bytes.toBytes(totalPlays));
context.write(key, put);
}
}
3.1.4 DescendingOrderComparator
package com.songs.ranking;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
/**
* Implement descending order sorting class
*/
class DescendingOrderComparator extends
IntWritable.Comparator {
@Override
public int compare(WritableComparable a, WritableComparable b) {
return -super.compare(a, b); // Negative comparison result for descending order
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
3.1.5 SongScannerMapper
package com.songs.ranking;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.List;
/**
* Scan metadata:title column in each row
*/
class SongScannerMapper extends TableMapper<Text, IntWritable> {
@Override
/**
* Scan file content, output key-value pairs <"song_title": 1>
* @param key // Row key
* @param value // A record
* @param context // Context
* @throws IOException
* @throws InterruptedException
*/
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
List<Cell> cells = value.listCells();
for (Cell cell : cells) {
if (Bytes.toString(CellUtil.cloneFamily(cell)).equals("metadata") &&
Bytes.toString(CellUtil.cloneQualifier(cell)).equals("title")) {
context.write(
new Text(Bytes.toString(CellUtil.cloneValue(cell))), // Song title
new IntWritable(1));
}
}
}
}
3.1.6 ProcessedDataMapper
package com.songs.ranking;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.List;
/**
* Process data after first MapReduce
* Scan all song titles and get play counts per song
* Output key/value: play_count/song_title
* Output destination: HDFS file
*/
class ProcessedDataMapper extends TableMapper<IntWritable, Text> {
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
List<Cell> cells = value.listCells();
for (Cell cell : cells) {
context.write(
new IntWritable(Bytes.toInt(CellUtil.cloneValue(cell))), // Play count
new Text(Bytes.toString(key.get()))); // Song title
}
}
}
3.1.7 MusicRankingSystem.java
package com.songs.ranking;
import java.io.IOException;
import java.io.InputStream;
import com.database.api.HBaseConnection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MusicRankingSystem {
static final String TRACKS_TABLE = "tracks";
static final String TITLE_LIST_TABLE = "titlelist";
static final String RESULT_PATH = "hdfs://master-node:9000/results/topsongs";
// Set HBase static configuration
static Configuration config = HBaseConfiguration.create();
/**
* Configure job: First MapReduce, count total plays per song
* @param args command line arguments
* @return Whether Job task ran successfully 0 1
* @throws IOException IO exception
* @throws ClassNotFoundException Class not found exception
* @throws InterruptedException Exception thrown when blocked method receives interrupt request
*/
public static boolean calculatePlayCounts(String[] args)
throws IOException, ClassNotFoundException, InterruptedException {
// Set Job instance
Job job = Job.getInstance(config);
// Basic MapReduce program configuration
job.setJarByClass(MusicRankingSystem.class);
// Set two ReduceTasks
job.setNumReduceTasks(2);
// Set scanned column family: column name i.e. metadata:title
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("metadata"), Bytes.toBytes("title"));
// Use HBase utility class to set job
// Set input table name, scan object, Mapper type, output key-value types, Job object
TableMapReduceUtil.initTableMapperJob(TRACKS_TABLE, scan,
SongScannerMapper.class, Text.class, IntWritable.class, job);
// Check if output table exists, create if not, delete and recreate if exists
if (!HBaseSchemaOperations.doesTableExist("default", TITLE_LIST_TABLE)) {
HBaseSchemaOperations.createTable("default", TITLE_LIST_TABLE, "statistics");
} else {
if (HBaseSchemaOperations.removeTable("default", "titlelist")) {
System.out.println("Table deleted successfully");
HBaseSchemaOperations.createTable("default", "titlelist", "statistics");
} else {
System.exit(0);
}
}
// Set output table name, Reducer type, Job object
TableMapReduceUtil.initTableReducerJob(TITLE_LIST_TABLE,
CountAggregatorReducer.class, job);
return job.waitForCompletion(true);
}
/**
* Configure job: Second MapReduce (only map function overridden), custom comparator, use shuffle to sort data in descending order
* @param args command line arguments
* @return Whether job instance ran successfully 0 1
* @throws IOException IO exception
* @throws ClassNotFoundException Class not found exception
* @throws InterruptedException Exception thrown when blocked method receives interrupt request
*/
public static boolean sortSongs(String[] args)
throws IOException, ClassNotFoundException, InterruptedException {
// Set Job instance
Job job = Job.getInstance(config, "sort-songs");
job.setJarByClass(MusicRankingSystem.class);
job.setNumReduceTasks(1);
// Set comparator class
job.setSortComparatorClass(DescendingOrderComparator.class);
// Set output table, scan object, Mapper class, key-value types, job instance
TableMapReduceUtil.initTableMapperJob(TITLE_LIST_TABLE, new Scan(),
ProcessedDataMapper.class, IntWritable.class, Text.class, job);
// Output sorted data files to specified path
Path output = new Path(RESULT_PATH);
if (FileSystem.get(config).exists(output))
FileSystem.get(config).delete(output, true);
FileOutputFormat.setOutputPath(job, output);
return job.waitForCompletion(true);
}
/**
* View output file, get final ranking data
* @throws IllegalArgumentException Illegal argument exception
* @throws IOException IO exception
*/
public static void displayResults() throws IllegalArgumentException, IOException{
// Get filesystem object
FileSystem fs = FileSystem.get(config);
// Output file contents under output path
InputStream in = null;
try {
in = fs.open(new Path(RESULT_PATH+"/part-r-00000"));
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
// Main function
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
// About GenericOptionsParser: Basic class for parsing command line arguments in Hadoop framework.
// It can recognize standard command line arguments, enabling applications to easily specify namenode, jobtracker,
// and other additional configuration resources.
GenericOptionsParser parser = new GenericOptionsParser(config, args);
String[] remainingArgs = parser.getRemainingArgs(); // Get command line arguments
// If musicCount() executes successfully, execute sortMusic(), if sortMusic executes successfully, call showResult() to display results
if (calculatePlayCounts(remainingArgs)) {
if (sortSongs(remainingArgs)) {
displayResults();
}
}
}
}
3.2 Execution Results
Console output results:
Output results stored in HDFS:
HBase titlelist table content:
At this point, the big data development project - music ranking system is complete.