Understanding and Implementing ShardingSphere-JDBC for Distributed Databases

When discussing distributed database middleware, Apache ShardingSphere-JDBC is a prominent solution. This guide delves into its core concepts and practical applications.

The ShardingSphere Ecosystem

Apache ShardingSphere is a robust distributed database ecosystem comprising two primary products:

  • ShardingSphere-Proxy: Positioned as a transparent data base proxy, it offers a server-side component encapsulating database binary protocols, enabling support for heterogeneous languages. This proxy layer resides between applications and databases, introducing a slight latency due to request forwarding. However, it offers significant benefits, including minimal application changes, language independence, and reduced connection consumption through shared connections.

  • ShardingSphere-JDBC: As the original ShardingSphere product, often referred to as sharding-jdbc, it functions as a lightweight Java framework. It enhances the standard JDBC layer within Java applications, providing services as a JAR package. This client-side approach connects directly to databases, eliminating the need for additional deployments. It's essentially an augmented JDBC driver, fully compatible with standard JDBC and various ORM frameworks.

Here’s a comparison to help choose between ShardingSphere-JDBC and ShardingSphere-Proxy:

Feature ShardingSphere-JDBC ShardingSphere-Proxy
Supported Databases Any MySQL/PostgreSQL
Connection Usage High Low
Language Support Java Only Any
Performance Minimal Overhead Slightly Higher Overhead
Centralized No (client-side) Yes (proxy server)
Static Entrypoint No Yes

ShardingSphere-JDBC is widely adopted in production environments primarily due to its simplicity in principle, implementation, and operational management.

Fundamental Principles

JDBC programming forms the bedrock of backend developmant. Regardless of whether ORM frameworks like Mybatis, Hibernate, or Spring Data JPA are used, their underlying implementation relies on the JDBC model. ShardingSphere-JDBC fundamentally achieves its capabilities by implementing the core JDBC interfaces.

JDBC Interface ShardingSphere Implementation
DataSource ShardingDataSource
Connection ShardingConnection
Statement ShardingStatement
PreparedStatement ShardingPreparedStatement
ResultSet ShardingResultSet

While the underlying concept is simple, the practical implementation involves numerous intricate details. The core processing flow for ShardingSphere-JDBC (and Proxy) includes:

  1. SQL Parsing: This stage involves lexical and syntactic analysis. A lexical parser tokenizes the SQL statement into indivisible words. Subsequently, a syntactic parser interprets the SQL, extracting a parsing context. This context encapsulates information such as tables, select items, order-by clauses, group-by clauses, aggregate functions, pagination details, query conditions, and placeholder markers.
  2. Execution Plan Optimization: Sharding conditions are merged and optimized, particularly for OR clauses.
  3. SQL Routing: Based on the parsed context, the system matches user-defined sharding strategies to generate routing paths. Both sharding routes (to specific shards) and broadcast routes (to all shards) are supported.
  4. SQL Rewriting: The original SQL statement is transformed into a version that can execute correctly on the actual database instances. This involves correctness rewriting (e.g., adding LIMIT for pagination) and optimization rewriting.
  5. SQL Execution: The rewritten SQL statements are executed asynchronously across multiple database instances using a multi-threaded executor.
  6. Result Merging: Results from different database instances are merged to provide a unified JDBC ResultSet interface. This merging can occur via streaming, in-memory aggregation, or an appending merge strategy using the decorator pattern.

Practical Implementation Example

Consider an order service for an O2O company where each purchase order generates records across several tables:

  • t_ent_order: Primary order record (single entry)
  • t_ent_order_detail: Order details (single entry)
  • t_ent_order_item: Multiple order line items (N entries)

The sharding strategy applied is as follows:

  • t_ent_order and t_ent_order_detail are sharded by ent_id (enterprise user ID) across databases.
  • t_ent_order_item is sharded by ent_id across databases AND also by ent_id across tables within each database.

For this setup, four logical databases are created: ds_0, ds_1, ds_2, and ds_3. Each database contains t_ent_order, t_ent_order_detail, and multiple t_ent_order_item tables (e.g., t_ent_order_item_0 to t_ent_order_item_7).

To integrate ShardingSphere-JDBC into a Spring Boot project, include the necessary dependency:

<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
    <version>4.1.1</version>
</dependency>

Configuration within application.yml or application.properties typically involves:

  • Defining the actual data sources (e.g., ds0, ds1, ds2, ds3).
  • Enabling sql.show for debugging purposes, especially in development environments.
  • Specifying sharding rules for tables under shardingsphere.datasource.sharding.tables.
shardingsphere:
  datasource:
    names: ds0,ds1,ds2,ds3
    ds0:
      type: com.zaxxer.hikari.HikariDataSource
      driver-class-name: com.mysql.cj.jdbc.Driver
      jdbc-url: jdbc:mysql://localhost:3306/ds0?serverTimezone=UTC
      username: root
      password: password
    # ... similar configurations for ds1, ds2, ds3

    sharding:
      tables:
        t_ent_order:
          actual-data-nodes: ds$->{0..3}.t_ent_order
          database-strategy:
            standard:
              sharding-column: ent_id
              sharding-algorithm-class-name: com.example.algorithm.EnterpriseIdDatabaseShardingAlgorithm
        t_ent_order_detail:
          actual-data-nodes: ds$->{0..3}.t_ent_order_detail
          database-strategy:
            standard:
              sharding-column: ent_id
              sharding-algorithm-class-name: com.example.algorithm.EnterpriseIdDatabaseShardingAlgorithm
        t_ent_order_item:
          actual-data-nodes: ds$->{0..3}.t_ent_order_item_$->{0..7}
          database-strategy:
            standard:
              sharding-column: ent_id
              sharding-algorithm-class-name: com.example.algorithm.EnterpriseIdDatabaseShardingAlgorithm
          table-strategy:
            standard:
              sharding-column: ent_id
              sharding-algorithm-class-name: com.example.algorithm.EnterpriseIdTableShardingAlgorithm
      # ... other sharding configurations
      props:
        sql.show: true

Key aspects of sharding rule configuration include:

  1. Actual Data Nodes: This defines the mapping between a logical table (e.g., t_ent_order_item) and its physical instances across data sources and tables. For example, ds$->{0..3}.t_ent_order_item_$->{0..7} indicates that the logical table t_ent_order_item exists across data sources ds0 to ds3, and within each, it's sharded into t_ent_order_item_0 to t_ent_order_item_7.
  2. Sharding Algorithms: Both database sharding and table sharding strategies require specifying a sharding column (sharding-column) and a custom sharding algorithm (sharding-algorithm-class-name).

Data-Embedded Sharding and Custom Complex Algorithms

Consider a scenario where an order table needs to be distributed across four databases (shard0, shard1, shard2, shard3). A common approach is to hash a specific string (or substring) from the record, take a modulo (e.g., 1024), and route based on which quarter-interval the result (slot) falls into. For example, [0-255] for shard0, [256-511] for shard1, and so on.

However, if queries frequently rely on order_id (e.g., to fetch order details), and the sharding is based on ent_id, directly querying by order_id would necessitate broadcasting the query to all shards, leading to inefficient performance. The solution lies in embedding sharding information into the ID itself (often referred to as the "gene method") and using custom complex sharding algorithms.

By leveraging a distributed ID generation scheme like the Snowflake algorithm, specific bits within the order_id can store the slot or workerId that directly maps to the sharded database. For instance, the 10-bit worker ID field in Snowflake can be utilized to store the database shard identifier.

class OrderIdHelper {
    // Assuming the workerId part of the Snowflake ID directly corresponds to a shard index
    public Integer extractShardIdFromOrderId(Long orderIdentifier) {
        // Shift right by 12 bits to discard timestamp and sequence bits
        // Then mask with 0x03ff (1023) to get the 10-bit worker ID
        Long workerIdentifier = (orderIdentifier >> 12) & 0x03ffL;
        return workerIdentifier.intValue();
    }
}

This approach allows direct determination of the target shard from the order_id itself. The Snowflake algorithm ensures that the generated order_id carries the necessary enterprise user ID information, enabling efficient routing.

For scenarios where sharding decisions depend on multiple columns (e.g., order_id AND ent_id), ShardingSphere-JDBC supports custom complex sharding algorithms. This is achieved by implementing the org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingAlgorithm interface. The logic for a complex sharding algorithm typically involves:

  1. If the sharding keys contain a primary key value (e.g., order_id) which inherently encodes shard information, directly resolve the routing partition from it.
  2. If the primary key value is absent, use other available sharding key fields (e.g., ent_id) to determine the routing partition.

Scaling and Data Migration Strategies

Implementing sharding also necessitates a robust strategy for smooth horizontal scaling (elastic scaling).

1. Migration Scope Definition

Before any data synchronization, a clear understanding of the migration scope is crucial:

  • Unique Business Identifiers: Identify all unique business IDs across tables. These are essential for data synchronization operations. Special attention must be paid if database auto-increment IDs are used as business IDs, requiring potential business logic refactoring. Ensure all tables have unique indexes (possibly composite unique indexes) to prevent data duplication during synchronization.
  • Tables to Migrate and New Sharding Rules: Define which tables will be migrated and their new sharding rules. Different sharding rules (e.g., by user ID, non-user ID, sharding only databases, or no sharding) impact the rehash process and data validation during migration.

2. Data Synchronization Process

The overall data synchronization solution typically leverages database binlogs, using an independent middleware service to perform synchronization without altering business application code.

  • Full Historical Data Synchronization: This involves migrating existing data from old shards to new ones. A dedicated service uses a cursor-based approach to select data from old shards, rehashes it according to the new sharding rules, and then bulk inserts it into the new target shards. Crucially, the JDBC connection string parameter rewriteBatchedStatements=true must be configured for efficient batch operations.

    To maintain data consistency, historical full synchronization should be carefully orchestrated with incremental synchronization. Incremental data synchronization (from old to new shards) is initiated first, but messages are only accumulated (e.g., in Kafka) without immediate consumption. Then, full historical synchronization begins. Once completed, the accumulated incremental messages are processed, ensuring that even data updated during the full sync period is eventually consistent. This strategy also aims to reduce backlog and improve full sync efficiency.

  • Incremental Data Synchronization: Considering stability, disaster recovery, and rollback capabilities during a grey release, a real-time bi-directional synchronization approach is often preferred. If stability or data consistency issues arise in the new cluster during cutover, a rapid rollback to the old cluster is possible.

    Key considerations for real-time incremental data synchronization include:

    1. Filtering Loop Messages: Mechanisms must be in place to prevent infinite loops of binlog messages in bi-directional sync.
    2. Data Consolidation: For performance, the synchronization component often collects multiple operations on the same record within a local blocking queue. A scheduled task then periodicaly processes these records, consolidating multiple operations on the same primary key to only the final state (similar to Redis AOF rewriting).
    3. UPDATE to INSERT Transformation: During data consolidation, if an INSERT followed by an UPDATE on the same record leads to only the UPDATE being retained, it might fail if the record doesn't exist. Therefore, UPDATE operations might need to be transformed into INSERT or UPSERT statements where appropriate.
    4. Batching by New Table: The final set of consolidated operations is grouped by the new target tables, enabling efficient database batch operations (e.g., batch insert or batch update) per target table.

Conclusion

ShardingSphere-JDBC operates by implementing core JDBC interfaces, offering a relatively straightforward architecture for distributed database solutions. Practical implementation involves configuring data sources, defining logical-to-physical table mappings via actualDataNodes, and specifying sharding strategies with appropriate sharding-columns and sharding-algorithms.

To achieve direct routing based on distributed primary keys, combining data-embedded IDs (gene method) with custom ComplexKeysShardingAlgorithm is highly effective. Smooth horizontal scaling and data migration rely on a meticulously planned full historical synchronization paired with real-time bi-directional incremental synchronization, involving numerous engineering intricacies.

For a complete working example, refer to the practical demonstration code: https://github.com/makemyownlife/shardingsphere-jdbc-demo

Tags: ShardingSphere JDBC Database Sharding Distributed Systems scaling

Posted on Wed, 17 Jun 2026 16:05:16 +0000 by yobo