Building a Music Ranking System with HBase and MapReduce

Environment: Windows 10, CentOS 7.9, Hadoop 3.2, HBase 2.5.3, and Zookeeper 3.8 in fully distributed mode; Environment setup procedures can be found in these articles: CentOS7 Hadoop3.X Fully Distributed Environment Setup Hadoop3.x Fully Distributed Environment Setup with Zookeeper and Hbase

1. Integrating MapReduce and HBase

  1. Copy hbase-site.xml to $HADOOP_HOME/etc/hadoop directory
cp $HBASE_HOME/conf/hbase-site.xml $HADOOP_HOME/etc/hadoop/

Note: For fully distributed environments, all nodes need to have this file copied. This applies to subsequent operations as well.

  1. Edit hadoop-env.sh to include HBase libraries in HADOOP_CLASSPATH, allowing MapReduce programs to access these libraries at runtime
vim $HADOOP_HOME/etc/hadoop/hadoop-env.sh

# Add the following content to the file
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/hbase/lib/*
  1. Execute test package hbase-server-2.5.3-tests.jar
cd $HBASE_HOME/lib

# 'test' refers to the table in HBase database
hadoop jar hbase-server-2.5.3-tests.jar org.apache.hadoop.hbase/mapreduce.RowCounter test

Upon successful execution:

Test completed successfully.

2. Bulk Data Import

Import required data in to HBase.

2.1 Uploading Data to HDFS

First, upload data to HDFS to prepare for bulk import into HBase.

  1. Create a new folder /data/songs in HDFS
hadoop fs -mkdir -p /data/songs
  1. Upload data files (songs1.txt, songs2.txt, songs3.txt) to the host machine
rz  # Using xshell to upload files via rz command, select appropriate files
  1. Upload files to HDFS under data/songs directory
hadoop fs -put songs1.txt songs2.txt songs3.txt /data/songs  # Upload files
hadoop fs -ls /data/songs/  # View files

2.2 Importing Data into HBase

  1. Use importtsv to generate HFile and create table from prepared data
cd $HBASE_HOME/lib  # Enter HBase lib directory containing various jar packages
hadoop jar hbase-server-2.5.3.jar org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.bulk.output=temp_storage -Dimporttsv.columns=HBASE_ROW_KEY,metadata:title,metadata:artist,metadata:sex,metadata:rhythm,metadata:device tracks /data/songs -Dcreate.table=yes

hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.bulk.output=temp_storage -Dimporttsv.columns=HBASE_ROW_KEY,metadata:title,metadata:artist,metadata:sex,metadata:rhythm,metadata:device tracks /data/songs -Dcreate.table=yes
  • hbase-server-2.5.3.jar is the Hadooop MapReduce task jar containing the importtsv class for importing data into HBase.
  • -Dimporttsv.bulk.output=temp_storage is a system property specifying temporary output directory, default is /temp (in HDFS), using this option generates HFile in internal format without writing data to HBase, instead placing it in specified temporary directory /temp.
  • -Dimporttsv.columns=HBASE_ROW_KEY,metadata:title,metadata:artist,metadata:sex,metadata:rhythm,metadata:device tracks specifies columns to import into HBase. Each column consists of column family name and qualifier separated by colon. The command specifies 6 columns including HBASE_ROW_KEY (unique identifier per row) and other info columns.
  • /data/songs is the location of data in HDFS.
  • -Dcreate.table=yes indicates automatic table creation.

Normal execution completion result:

  1. Load HFile data into HBase
hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles temp_storage tracks

Normal execution completion result:

  1. View content of tracks table in HBase

Data has been loaded into HBase tracks table.

3. Data Processing

Regarding HBase Storage Unit cell

The storage unit cell in HBase consists of the following fields:

1) row
2) column family
3) column qualifier
4) timestamp
5) type
6) MVCC version
7) value

3.1 Project Source Code

3.1.1 HBaseConnection
package com.songs.ranking;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

import java.io.IOException;

public class HBaseConnection {

    // Set static property for HBase connection
    public static Connection conn = null;
    static {
        try {
            // Use configuration to get server
            conn = ConnectionFactory.createConnection();
        } catch (IOException e) {
            System.out.println("Connection retrieval failed");
            e.printStackTrace();
        }
    }

    public static void closeConnection() throws IOException {
        if (conn != null) {
            conn.close();
        }
    }
}
3.1.2 HBaseSchemaOperations
package com.songs.ranking;

import com.database.api.HBaseConnection;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class HBaseSchemaOperations {
    // Add static property connection pointing to singleton connection
    public static Connection conn = HBaseConnection.conn;

    /**
     * Check if table exists
     *
     * @param namespace namespace name
     * @param tableName table name
     * @return returns check result
     * @throws IOException exception
     */
    public static boolean doesTableExist(String namespace, String tableName) throws IOException {
        // Get admin
        Admin admin = conn.getAdmin();

        // Use method to check if table exists
        boolean exists = false;

        try {
            exists = admin.tableExists(TableName.valueOf(tableName));
        } catch (IOException e) {
            e.printStackTrace();
        }

        // Close admin
        admin.close();

        return exists;
    }

    /**
     * Create table
     *
     * @param namespace namespace name
     * @param tableName table name
     * @param columnFamilies column family names
     */
    public static void createTable(String namespace, String tableName, String... columnFamilies) throws IOException {
        // Check for at least one column family
        if (columnFamilies.length == 0) {
            System.out.println("Table creation requires at least one column family");
            return;
        }

        // Check if table exists
        if (doesTableExist(namespace, tableName)) {
            System.out.println("Table already exists");
            return;
        }

        // Get admin
        Admin admin = conn.getAdmin();

        // Call method to create table
        // Create table descriptor builder
        // Just add parameters in builder, no need to generate descriptor objects
        TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(namespace, tableName));
        // Add parameters
        for (String columnFamily : columnFamilies) {
            // Create column family descriptor builder
            ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily));
            // Add parameters for current column family
            columnFamilyDescriptorBuilder.setMaxVersions(5);
            // Create column family descriptor with added parameters
            tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
        }

        // Create corresponding table descriptor
        try {
            admin.createTable(tableDescriptorBuilder.build());
        } catch (IOException e) {
            // Since we already checked existence, any exception means other issues, just print stack trace
            e.printStackTrace();
        }

        // Close admin
        admin.close();
    }

    /**
     * Delete table
     *
     * @param namespace namespace name
     * @param tableName table name
     * @return return 1 if deletion successful, otherwise 0
     */
    public static boolean removeTable(String namespace, String tableName) throws IOException {
        // Check if table exists
        if (!doesTableExist(namespace, tableName)) {
            System.out.println("Table doesn't exist, cannot delete");
            return false;
        }

        // Get admin
        Admin admin = conn.getAdmin();

        // Call delete table method

        try {
            // Before deleting table, mark as disabled
            TableName tableName1 = TableName.valueOf(namespace, tableName);
            admin.disableTable(tableName1);
            admin.deleteTable(tableName1);
        } catch (IOException e) {
            e.printStackTrace();
        }

        // Close admin
        admin.close();
        return true;
    }

}
3.1.3 CountAggregatorReducer
package com.songs.ranking;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

class CountAggregatorReducer extends TableReducer<Text, IntWritable, Text> {
    @Override
    /**
     * Sum total plays for each song
     *
     * @param key                        // Song title
     * @param values                    // Play count collection {1, 1, 1, 1}
     * @param context                    // Context
     * @throws IOException
     * @throws InterruptedException
     */
    protected void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {

        // Count total plays per song
        int totalPlays = 0;
        for (IntWritable num : values) {
            totalPlays += num.get();
        }
        // Specify row key for Put operation
        Put put = new Put(Bytes.toBytes(key.toString()));
        // Specify column and value for Put operation
        put.addColumn(Bytes.toBytes("statistics"), Bytes.toBytes("position"),
                Bytes.toBytes(totalPlays));
        context.write(key, put);
    }

}
3.1.4 DescendingOrderComparator
package com.songs.ranking;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;

/**
 * Implement descending order sorting class
 */
class DescendingOrderComparator extends
        IntWritable.Comparator {
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        return -super.compare(a, b); // Negative comparison result for descending order
    }

    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        return -super.compare(b1, s1, l1, b2, s2, l2);
    }
}
3.1.5 SongScannerMapper
package com.songs.ranking;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.List;

/**
 * Scan metadata:title column in each row
 */
class SongScannerMapper extends TableMapper<Text, IntWritable> {

    @Override
    /**
     * Scan file content, output key-value pairs <"song_title": 1>
     * @param key                        // Row key
     * @param value                        // A record
     * @param context                    // Context
     * @throws IOException
     * @throws InterruptedException
     */
    protected void map(ImmutableBytesWritable key, Result value,
                    Context context) throws IOException, InterruptedException {

                List<Cell> cells = value.listCells();
                for (Cell cell : cells) {
                    if (Bytes.toString(CellUtil.cloneFamily(cell)).equals("metadata") &&
                            Bytes.toString(CellUtil.cloneQualifier(cell)).equals("title")) {
                        context.write(
                                new Text(Bytes.toString(CellUtil.cloneValue(cell))),    // Song title
                                new IntWritable(1));
                    }
        }
    }
}
3.1.6 ProcessedDataMapper
package com.songs.ranking;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.List;

/**
 * Process data after first MapReduce
 * Scan all song titles and get play counts per song
 * Output key/value: play_count/song_title
 * Output destination: HDFS file
 */
class ProcessedDataMapper extends TableMapper<IntWritable, Text> {

    @Override
    protected void map(ImmutableBytesWritable key, Result value,
                       Context context) throws IOException, InterruptedException {
        List<Cell> cells = value.listCells();
        for (Cell cell : cells) {
            context.write(
                    new IntWritable(Bytes.toInt(CellUtil.cloneValue(cell))),    // Play count
                    new Text(Bytes.toString(key.get())));    // Song title
        }
    }
}
3.1.7 MusicRankingSystem.java
package com.songs.ranking;

import java.io.IOException;
import java.io.InputStream;

import com.database.api.HBaseConnection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class MusicRankingSystem {

	static final String TRACKS_TABLE = "tracks";
	static final String TITLE_LIST_TABLE = "titlelist";
	static final String RESULT_PATH = "hdfs://master-node:9000/results/topsongs";
	// Set HBase static configuration
	static Configuration config = HBaseConfiguration.create();

	/**
	 * Configure job: First MapReduce, count total plays per song
	 * @param args                      command line arguments
	 * @return                          Whether Job task ran successfully 0 1
	 * @throws IOException              IO exception
	 * @throws ClassNotFoundException   Class not found exception
	 * @throws InterruptedException     Exception thrown when blocked method receives interrupt request
	 */
	public static boolean calculatePlayCounts(String[] args)
			throws IOException, ClassNotFoundException, InterruptedException {
		// Set Job instance
		Job job = Job.getInstance(config);

		// Basic MapReduce program configuration
		job.setJarByClass(MusicRankingSystem.class);
		// Set two ReduceTasks
		job.setNumReduceTasks(2);
		// Set scanned column family: column name i.e. metadata:title
		Scan scan = new Scan();
		scan.addColumn(Bytes.toBytes("metadata"), Bytes.toBytes("title"));
		// Use HBase utility class to set job
		// Set input table name, scan object, Mapper type, output key-value types, Job object
		TableMapReduceUtil.initTableMapperJob(TRACKS_TABLE, scan,
				SongScannerMapper.class, Text.class, IntWritable.class, job);

		// Check if output table exists, create if not, delete and recreate if exists
		if (!HBaseSchemaOperations.doesTableExist("default", TITLE_LIST_TABLE)) {
			HBaseSchemaOperations.createTable("default", TITLE_LIST_TABLE, "statistics");
		} else {
			if (HBaseSchemaOperations.removeTable("default", "titlelist")) {
				System.out.println("Table deleted successfully");
				HBaseSchemaOperations.createTable("default", "titlelist", "statistics");
			} else {
				System.exit(0);
			}
		}

		// Set output table name, Reducer type, Job object
		TableMapReduceUtil.initTableReducerJob(TITLE_LIST_TABLE,
				CountAggregatorReducer.class, job);

		return job.waitForCompletion(true);
	}

	/**
	 * Configure job: Second MapReduce (only map function overridden), custom comparator, use shuffle to sort data in descending order
	 * @param args                      command line arguments
	 * @return                          Whether job instance ran successfully 0 1
	 * @throws IOException              IO exception
	 * @throws ClassNotFoundException   Class not found exception
	 * @throws InterruptedException     Exception thrown when blocked method receives interrupt request
	 */
	public static boolean sortSongs(String[] args)
			throws IOException, ClassNotFoundException, InterruptedException {
		// Set Job instance
		Job job = Job.getInstance(config, "sort-songs");
		job.setJarByClass(MusicRankingSystem.class);
		job.setNumReduceTasks(1);
		// Set comparator class
		job.setSortComparatorClass(DescendingOrderComparator.class);
		// Set output table, scan object, Mapper class, key-value types, job instance
		TableMapReduceUtil.initTableMapperJob(TITLE_LIST_TABLE, new Scan(),
				ProcessedDataMapper.class, IntWritable.class, Text.class, job);

		// Output sorted data files to specified path
		Path output = new Path(RESULT_PATH);
		if (FileSystem.get(config).exists(output))
			FileSystem.get(config).delete(output, true);
		FileOutputFormat.setOutputPath(job, output);
		return job.waitForCompletion(true);
	}

	/**
	 * View output file, get final ranking data
	 * @throws IllegalArgumentException   Illegal argument exception
	 * @throws IOException                IO exception
	 */
	public static void displayResults() throws IllegalArgumentException, IOException{
		// Get filesystem object
		FileSystem fs = FileSystem.get(config);

		// Output file contents under output path
		InputStream in = null;
		try {
			in = fs.open(new Path(RESULT_PATH+"/part-r-00000"));
			IOUtils.copyBytes(in, System.out, 4096, false);
		} finally {
			IOUtils.closeStream(in);
		}
	}

	// Main function
	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {

		// About GenericOptionsParser: Basic class for parsing command line arguments in Hadoop framework.
		// It can recognize standard command line arguments, enabling applications to easily specify namenode, jobtracker,
		// and other additional configuration resources.
		GenericOptionsParser parser = new GenericOptionsParser(config, args);
		String[] remainingArgs = parser.getRemainingArgs(); // Get command line arguments

		// If musicCount() executes successfully, execute sortMusic(), if sortMusic executes successfully, call showResult() to display results
		if (calculatePlayCounts(remainingArgs)) {
			if (sortSongs(remainingArgs)) {
				displayResults();
			}
		}

	}
}

3.2 Execution Results

Console output results:

Output results stored in HDFS:

HBase titlelist table content:

At this point, the big data development project - music ranking system is complete.

Tags: HBase mapreduce Big Data Music Analytics Data Processing

Posted on Wed, 13 May 2026 00:15:56 +0000 by LawsLoop