When discussing distributed database middleware, Apache ShardingSphere-JDBC is a prominent solution. This guide delves into its core concepts and practical applications.
The ShardingSphere Ecosystem
Apache ShardingSphere is a robust distributed database ecosystem comprising two primary products:
-
ShardingSphere-Proxy: Positioned as a transparent data base proxy, it offers a server-side component encapsulating database binary protocols, enabling support for heterogeneous languages. This proxy layer resides between applications and databases, introducing a slight latency due to request forwarding. However, it offers significant benefits, including minimal application changes, language independence, and reduced connection consumption through shared connections.
-
ShardingSphere-JDBC: As the original ShardingSphere product, often referred to as sharding-jdbc, it functions as a lightweight Java framework. It enhances the standard JDBC layer within Java applications, providing services as a JAR package. This client-side approach connects directly to databases, eliminating the need for additional deployments. It's essentially an augmented JDBC driver, fully compatible with standard JDBC and various ORM frameworks.
Here’s a comparison to help choose between ShardingSphere-JDBC and ShardingSphere-Proxy:
| Feature | ShardingSphere-JDBC | ShardingSphere-Proxy |
|---|---|---|
| Supported Databases | Any | MySQL/PostgreSQL |
| Connection Usage | High | Low |
| Language Support | Java Only | Any |
| Performance | Minimal Overhead | Slightly Higher Overhead |
| Centralized | No (client-side) | Yes (proxy server) |
| Static Entrypoint | No | Yes |
ShardingSphere-JDBC is widely adopted in production environments primarily due to its simplicity in principle, implementation, and operational management.
Fundamental Principles
JDBC programming forms the bedrock of backend developmant. Regardless of whether ORM frameworks like Mybatis, Hibernate, or Spring Data JPA are used, their underlying implementation relies on the JDBC model. ShardingSphere-JDBC fundamentally achieves its capabilities by implementing the core JDBC interfaces.
| JDBC Interface | ShardingSphere Implementation |
|---|---|
DataSource |
ShardingDataSource |
Connection |
ShardingConnection |
Statement |
ShardingStatement |
PreparedStatement |
ShardingPreparedStatement |
ResultSet |
ShardingResultSet |
While the underlying concept is simple, the practical implementation involves numerous intricate details. The core processing flow for ShardingSphere-JDBC (and Proxy) includes:
- SQL Parsing: This stage involves lexical and syntactic analysis. A lexical parser tokenizes the SQL statement into indivisible words. Subsequently, a syntactic parser interprets the SQL, extracting a parsing context. This context encapsulates information such as tables, select items, order-by clauses, group-by clauses, aggregate functions, pagination details, query conditions, and placeholder markers.
- Execution Plan Optimization: Sharding conditions are merged and optimized, particularly for
ORclauses. - SQL Routing: Based on the parsed context, the system matches user-defined sharding strategies to generate routing paths. Both sharding routes (to specific shards) and broadcast routes (to all shards) are supported.
- SQL Rewriting: The original SQL statement is transformed into a version that can execute correctly on the actual database instances. This involves correctness rewriting (e.g., adding
LIMITfor pagination) and optimization rewriting. - SQL Execution: The rewritten SQL statements are executed asynchronously across multiple database instances using a multi-threaded executor.
- Result Merging: Results from different database instances are merged to provide a unified
JDBC ResultSetinterface. This merging can occur via streaming, in-memory aggregation, or an appending merge strategy using the decorator pattern.
Practical Implementation Example
Consider an order service for an O2O company where each purchase order generates records across several tables:
t_ent_order: Primary order record (single entry)t_ent_order_detail: Order details (single entry)t_ent_order_item: Multiple order line items (N entries)
The sharding strategy applied is as follows:
t_ent_orderandt_ent_order_detailare sharded byent_id(enterprise user ID) across databases.t_ent_order_itemis sharded byent_idacross databases AND also byent_idacross tables within each database.
For this setup, four logical databases are created: ds_0, ds_1, ds_2, and ds_3. Each database contains t_ent_order, t_ent_order_detail, and multiple t_ent_order_item tables (e.g., t_ent_order_item_0 to t_ent_order_item_7).
To integrate ShardingSphere-JDBC into a Spring Boot project, include the necessary dependency:
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
<version>4.1.1</version>
</dependency>
Configuration within application.yml or application.properties typically involves:
- Defining the actual data sources (e.g.,
ds0,ds1,ds2,ds3). - Enabling
sql.showfor debugging purposes, especially in development environments. - Specifying sharding rules for tables under
shardingsphere.datasource.sharding.tables.
shardingsphere:
datasource:
names: ds0,ds1,ds2,ds3
ds0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/ds0?serverTimezone=UTC
username: root
password: password
# ... similar configurations for ds1, ds2, ds3
sharding:
tables:
t_ent_order:
actual-data-nodes: ds$->{0..3}.t_ent_order
database-strategy:
standard:
sharding-column: ent_id
sharding-algorithm-class-name: com.example.algorithm.EnterpriseIdDatabaseShardingAlgorithm
t_ent_order_detail:
actual-data-nodes: ds$->{0..3}.t_ent_order_detail
database-strategy:
standard:
sharding-column: ent_id
sharding-algorithm-class-name: com.example.algorithm.EnterpriseIdDatabaseShardingAlgorithm
t_ent_order_item:
actual-data-nodes: ds$->{0..3}.t_ent_order_item_$->{0..7}
database-strategy:
standard:
sharding-column: ent_id
sharding-algorithm-class-name: com.example.algorithm.EnterpriseIdDatabaseShardingAlgorithm
table-strategy:
standard:
sharding-column: ent_id
sharding-algorithm-class-name: com.example.algorithm.EnterpriseIdTableShardingAlgorithm
# ... other sharding configurations
props:
sql.show: true
Key aspects of sharding rule configuration include:
- Actual Data Nodes: This defines the mapping between a logical table (e.g.,
t_ent_order_item) and its physical instances across data sources and tables. For example,ds$->{0..3}.t_ent_order_item_$->{0..7}indicates that the logical tablet_ent_order_itemexists across data sourcesds0tods3, and within each, it's sharded intot_ent_order_item_0tot_ent_order_item_7. - Sharding Algorithms: Both database sharding and table sharding strategies require specifying a sharding column (
sharding-column) and a custom sharding algorithm (sharding-algorithm-class-name).
Data-Embedded Sharding and Custom Complex Algorithms
Consider a scenario where an order table needs to be distributed across four databases (shard0, shard1, shard2, shard3). A common approach is to hash a specific string (or substring) from the record, take a modulo (e.g., 1024), and route based on which quarter-interval the result (slot) falls into. For example, [0-255] for shard0, [256-511] for shard1, and so on.
However, if queries frequently rely on order_id (e.g., to fetch order details), and the sharding is based on ent_id, directly querying by order_id would necessitate broadcasting the query to all shards, leading to inefficient performance. The solution lies in embedding sharding information into the ID itself (often referred to as the "gene method") and using custom complex sharding algorithms.
By leveraging a distributed ID generation scheme like the Snowflake algorithm, specific bits within the order_id can store the slot or workerId that directly maps to the sharded database. For instance, the 10-bit worker ID field in Snowflake can be utilized to store the database shard identifier.
class OrderIdHelper {
// Assuming the workerId part of the Snowflake ID directly corresponds to a shard index
public Integer extractShardIdFromOrderId(Long orderIdentifier) {
// Shift right by 12 bits to discard timestamp and sequence bits
// Then mask with 0x03ff (1023) to get the 10-bit worker ID
Long workerIdentifier = (orderIdentifier >> 12) & 0x03ffL;
return workerIdentifier.intValue();
}
}
This approach allows direct determination of the target shard from the order_id itself. The Snowflake algorithm ensures that the generated order_id carries the necessary enterprise user ID information, enabling efficient routing.
For scenarios where sharding decisions depend on multiple columns (e.g., order_id AND ent_id), ShardingSphere-JDBC supports custom complex sharding algorithms. This is achieved by implementing the org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingAlgorithm interface. The logic for a complex sharding algorithm typically involves:
- If the sharding keys contain a primary key value (e.g.,
order_id) which inherently encodes shard information, directly resolve the routing partition from it. - If the primary key value is absent, use other available sharding key fields (e.g.,
ent_id) to determine the routing partition.
Scaling and Data Migration Strategies
Implementing sharding also necessitates a robust strategy for smooth horizontal scaling (elastic scaling).
1. Migration Scope Definition
Before any data synchronization, a clear understanding of the migration scope is crucial:
- Unique Business Identifiers: Identify all unique business IDs across tables. These are essential for data synchronization operations. Special attention must be paid if database auto-increment IDs are used as business IDs, requiring potential business logic refactoring. Ensure all tables have unique indexes (possibly composite unique indexes) to prevent data duplication during synchronization.
- Tables to Migrate and New Sharding Rules: Define which tables will be migrated and their new sharding rules. Different sharding rules (e.g., by user ID, non-user ID, sharding only databases, or no sharding) impact the rehash process and data validation during migration.
2. Data Synchronization Process
The overall data synchronization solution typically leverages database binlogs, using an independent middleware service to perform synchronization without altering business application code.
-
Full Historical Data Synchronization: This involves migrating existing data from old shards to new ones. A dedicated service uses a cursor-based approach to select data from old shards, rehashes it according to the new sharding rules, and then bulk inserts it into the new target shards. Crucially, the JDBC connection string parameter
rewriteBatchedStatements=truemust be configured for efficient batch operations.To maintain data consistency, historical full synchronization should be carefully orchestrated with incremental synchronization. Incremental data synchronization (from old to new shards) is initiated first, but messages are only accumulated (e.g., in Kafka) without immediate consumption. Then, full historical synchronization begins. Once completed, the accumulated incremental messages are processed, ensuring that even data updated during the full sync period is eventually consistent. This strategy also aims to reduce backlog and improve full sync efficiency.
-
Incremental Data Synchronization: Considering stability, disaster recovery, and rollback capabilities during a grey release, a real-time bi-directional synchronization approach is often preferred. If stability or data consistency issues arise in the new cluster during cutover, a rapid rollback to the old cluster is possible.
Key considerations for real-time incremental data synchronization include:
- Filtering Loop Messages: Mechanisms must be in place to prevent infinite loops of binlog messages in bi-directional sync.
- Data Consolidation: For performance, the synchronization component often collects multiple operations on the same record within a local blocking queue. A scheduled task then periodicaly processes these records, consolidating multiple operations on the same primary key to only the final state (similar to Redis AOF rewriting).
UPDATEtoINSERTTransformation: During data consolidation, if anINSERTfollowed by anUPDATEon the same record leads to only theUPDATEbeing retained, it might fail if the record doesn't exist. Therefore,UPDATEoperations might need to be transformed intoINSERTorUPSERTstatements where appropriate.- Batching by New Table: The final set of consolidated operations is grouped by the new target tables, enabling efficient database batch operations (e.g.,
batch insertorbatch update) per target table.
Conclusion
ShardingSphere-JDBC operates by implementing core JDBC interfaces, offering a relatively straightforward architecture for distributed database solutions. Practical implementation involves configuring data sources, defining logical-to-physical table mappings via actualDataNodes, and specifying sharding strategies with appropriate sharding-columns and sharding-algorithms.
To achieve direct routing based on distributed primary keys, combining data-embedded IDs (gene method) with custom ComplexKeysShardingAlgorithm is highly effective. Smooth horizontal scaling and data migration rely on a meticulously planned full historical synchronization paired with real-time bi-directional incremental synchronization, involving numerous engineering intricacies.
For a complete working example, refer to the practical demonstration code: https://github.com/makemyownlife/shardingsphere-jdbc-demo