Integrating Apache Hudi with Spark
This guide covers the essential steps to integrate Apache Hudi with Apache Spark for building data lake solutions. The integration enables ACID transactions, time-travel queries, and efficient upserts on large datasets.
Environment Setup
Before starting, ensure you have Spark installed and Hadoop services running. The integration requires specific Spark configurations to work properly with Hudi.
Spark Configuration Requirements
When launching Spark Shell, you need to configure three critical components: the serializer, catalog, and SQL extensions. These settings enable Hudi's native Spark integration.
Execute the following command to start Spark Shell with the necessary Hudi support:
spark-shell \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
After startup, you should see the Spark welcome message indicating version 3.2.2 is running.
Writing Data to Hudi Tables
The following steps demonstrate how to write data to a Hudi table using Spark DataFrames.
Step 1: Import Required Dependencies
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
Step 2: Define Tible Parameters
val tableName = "trips_cow_table"
val basePath = "file:///tmp/trips_cow_table"
val dataGenerator = new DataGenerator
These parameters define the table name, storage location, and a data generator instance for creating sample records.
Step 3: Generate and Load Sample Data
val rawRecords = convertToStringList(dataGenerator.generateInserts(10))
val tripsDF = spark.read.json(spark.sparkContext.parallelize(rawRecords, 2))
This creates a DataFrame from JSON-formatted trip records containing fields such as timestamp, unique identifiers, rider/driver information, coordinates, and fare amounts.
Step 4: Write DataFrame to Hudi Table
tripsDF.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
The write operation configures several critical Hudi options: the precombine field for merging updates, the record key for unique identification, the partition path for data organization, and specifies overwrite mode for initial data loading.
Reading Data from Hudi Tables
Hudi supports snapshot queries that allow reading the latest state of the table at any point in time.
Loading Hudi Tables as DataFrame
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
tripsSnapshotDF.createOrReplaceTempView("trips_snapshot")
The loaded DataFrame contains both user-defined fields and Hudi metadata columns such as commit time, sequence number, record key, and partition path.
Querying Data Using Spark SQL
Once the table is registered as a temporary view, you can execute SQL queries against it:
spark.sql("select fare, begin_lon, begin_lat, ts from trips_snapshot where fare > 20.0").show()
This query filters records where the fare exceeds 20.0, returning columns for fare, longitude, latitude, and timsetamp.
To view all columns including Hudi metadata:
spark.sql("select * from trips_snapshot where fare > 20.0").show()
The results display complete record information along with internal Hudi fields like _hoodie_commit_time, _hoodie_record_key, and _hoodie_partition_path.
Key Configuraton Options
The integration relies on these essential configurations:
- KryoSerializer: Required for efficient serialization of Hudi objects
- HoodieCatalog: Enables native Hive-style catalog integration
- HoodieSparkSessionExtension: Provides SQL syntax extensions for Hudi operations
These settings form the foundation for building robust data lake applications using Apache Hudi and Apache Spark.