Preventing Duplicate Data in High-Concurrency Systems

Background

Recently, our testing team reported a bug where a batch product duplication API was generating duplicate product data. After investigating, I discovered this issue was more complex than initially apparent, involving multiple challenges.

Initial Requirements

The product team requested a feature where users could select brands and, upon clicking a confirmation button, the system would create new products based on existing default product data.

Initially, I implemented a straightforward synchronous API for the e-commerce system to call. The workflow was simple:

When duplicating a small number of products, this synchronous approach worked without significant issues.

Performance Optimization

However, since the system needed to handle duplicating thousands of products at once, the synchronous approach began to show performance limitations.

I refactored the product duplication logic to use message queuing (MQ) for asynchronous processing:

The system also needed to notify the e-commerce platform when duplication was complete. This solution appeared effective initially.

The Problem Emerges

Eventually, the testing team reported that the batch product duplication API was creating duplicate product data.

Upon investigation, I discovered that the e-commerce platform had also switched to asynchronous processing for performance reasons. They weren't directly calling our base system's product duplication API from their interface but instead invoking it through a background job.

From their perspective, the workflow was as shown above. However, an internal bug in their system caused duplicate entries in the request record table. This resulted in the job making duplicate calls to our base system's duplication API.

Since our base system was using RocketMQ for asynchronous processing, and their job was processing batches of records (e.g., 20 records) in rapid succession within a for loop, duplicate requests with identical parameters were being sent in quick succession. This led to concurrent insertion of duplicate data.

Multi-threaded Consumption

The root cause was that RocketMQ consumers, for performance optimization, use multi-threaded concurrent consumption by default, supporting up to 64 threads:

@RocketMQMessageListener(topic = "${com.susan.topic:PRODUCT_TOPIC}",
        consumerGroup = "${com.susan.group:PRODUCT_TOPIC_GROUP}")
@Service
public class ProductMessageReceiver implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String payload = new String(message.getBody(), StandardCharsets.UTF_8);
        processProductDuplication(payload);
    }
}

When duplicate messages are sent in quick succession, they can be processed by different threads.

Even with code that checks for existing data before insertion:

Product existingProduct = productRepository.findByHashcode(hashCode);
if(existingProduct == null) {
    productRepository.save(product);
}

In concurrent scenarios, multiple threads may all determine that the product doesn't exist and simultaneously attempt insertion, resulting in duplicate data:

Sequential Consumption

To address this issue of duplicate messages from concurrent consumption, we implemented a two-pronged approach:

  1. Fixed the bug in the e-commerce system that was generating duplicate records
  2. Modified our base system to use single-threaded sequential message consumption

Upon further consideration, relying solely on the e-commerce system to fix their bug wasn't sufficient. Similar issues could occur if users rapidly clicked the create product button multiple times or if the system initiated retries.

Therefore, our base system needed additional safeguards.

RocketMQ supports sequential consumption, requiring changes in both the producer and consumer:

Producer Changes:

rocketMQTemplate.asyncSendOrderly(topic, message, hashKey, new SendCallback() {
  @Override
  public void onSuccess(SendResult sendResult) {
      log.info("Product duplication message sent successfully");
  }

  @Override
  public void onException(Throwable e) {
      log.error("Failed to send product duplication message", e);
  }
});

The key change is using the `asyncSendOrderly` method to send ordered messages.

Consumer Changes:

@RocketMQMessageListener(topic = "${com.susan.topic:PRODUCT_TOPIC}",
        consumeMode = ConsumeMode.ORDERLY,
        consumerGroup = "${com.susan.group:PRODUCT_TOPIC_GROUP}")
@Service
public class ProductMessageReceiver implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String payload = new String(message.getBody(), StandardCharsets.UTF_8);
        processProductDuplication(payload);
    }
}

The critical change is setting the `consumeMode` parameter to `ConsumeMode.ORDERLY` in the `@RocketMQMessageListener` annotation, ensuring messages are processed sequentially.

After these modifications to both systems, the product duplication functionality no longer created duplicate products.

Beyond Single Use Case

However, after fixing this bug, I considered that product duplication was just one entry point for creating new products. What if other creation methods were used simultaneously with the duplication feature?

Wouldn't that also potentially create duplicate products?

While the probability was extremely low, the consequences of duplicate products would be severe, requiring complex data merging operations.

This experience highlighted the need for proactive prevention of duplicate product creation regardless of the entry point—whether from user actions or internal systems.

Unique Index Constraints

The fastest, lowest-cost solution to prevent duplicate product data would be to add unique indexes to the database table.

However, our organization has a standard requiring all business tables to implement logical deletion rather than physical deletion.

With logical deletion, records aren't actually removed from the database but are marked as deleted through an update statement:

UPDATE product SET deleted = true, modified_time = now(3) WHERE id = 123;

Logical deletion requires an additional status field in the table to track whether data has been deleted. All business queries must filter out records marked as deleted.

For tables using logical deletion, creating unique indexes becomes problematic.

Suppose we create a unique index on the name and model fields. If a user deletes a product (setting deleted=true), then later tries to add a product with identical name and model, the unique index would prevent the insertion, even though the previous product was logically deleted.

One might suggest creating a unique index across name, model, and deleted status fields. While this would allow adding a product with the same name and model after one was deleted, it would cause issues if that newly added product was later deleted and the user tried to add the same product again.

Thus, unique indexes aren't practical for tables with logical deletion.

Distributed Locking

The second potential solution to prevent duplicate data is implementing distributed locks, with Redis being a popular choice for high-performance distributed locking.

Pseudocode for Redis distributed locking:

try {
  String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
  if ("OK"equals(result)) {
      processProductCreation();
      return true;
  }
  return false;
} finally {
    releaseLock(lockKey, requestId);
}

The lockKey is typically a combination of unique fields (like name and model), requestId ensures proper lock release, and expireTime prevents permanent lock failures.

For single or少量产品 creation, distributed locking works well. The main workflow would be:

Before creating each product, attempt to acquire a lock. If successful, check if the product exists and create it if not. If lock acquisition fails or the product already exists, return immediately.

However, for batch operations involving thousands of products, adding a distributed lock for each product would severely impact performance.

Thus, Redis distributed locking isn't ideal for batch product creation interfaces.

Unified MQ Processing

We had already addressed the duplicate product issue in the batch duplication API by using RocketMQ's ordered messages with single-threaded asynchronous processing.

But this only fixed one entry point for product creation. What if we unified all product creation logic to use the same underlying code path, with RocketMQ's ordered messages ensuring single-threaded asynchronous creation?

This approach would indeed prevent duplicate products but introduces two significant problems:

  1. All product creation functionality becomes asynchronous, requiring changes to frontend interfaces to avoid poor user experience
  2. Previously parallel product creation processes now become serialized, reducing overall creation efficiency

After considering these trade-offs, this solution was ultimately rejected.

INSERT ON DUPLICATE KEY UPDATE

MySQL provides a syntax for handling duplicate data: `INSERT ON DUPLICATE KEY UPDATE`.

This syntax inserts data if it doesn't exist, or updates it if a unique index or primary key constraint is violated.

However, our product table uses a snowflake-generated ID as the primary key, which guarantees uniqueness. The challenge is the logical deletion requirement preventing unique indexes.

Additionally, this approach can cause deadlocks under high concurrency, requiring special attention.

INSERT IGNORE

MySQL also offers `INSERT ... IGNORE`, which silently ignores duplicate entry errors instead of throwing exceptions.

Like the previous solution, it requires unique indexes or primary keys, which we can't implement due to logical deletion requirements.

Using `INSERT ... IGNORE` can also potentially lead to deadlocks.

Deduplication Table

Given the limitations of previous approaches, we considered an alternative: creating a dedicated deduplication table.

This table would contain the same unique fields (name and model) with a unique index, separate from the main product table:

CREATE TABLE `product_deduplication` (
  `id` bigint(20) NOT NULL COMMENT 'corresponds to product id',
  `name` varchar(130) NOT NULL COMMENT 'product name',
  `model` varchar(255) NOT NULL COMMENT 'product model',
  `user_id` bigint(20) unsigned NOT NULL COMMENT 'creator user id',
  `user_name` varchar(30) NOT NULL COMMENT 'creator name',
  `create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT 'creation timestamp',
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_name_model` (`name`, `model`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='product deduplication table';

The workflow becomes: before inserting product data, first insert into the deduplication table. If successful, proceed with product insertion. If it fails (due to duplicate key), handle according to business requirements.

For scenarios where duplicates should cause the entire batch to fail:

try {
  transactionTemplate.execute((status) -> {
      productDeduplicationRepository.batchInsert(deduplicationList);
      productRepository.batchInsert(productList);
      return Boolean.TRUE;
  });
} catch(DuplicateKeyException e) {
   throw new BusinessException("Duplicate products detected. Please refresh and try again.");
}

For scenarios where duplicates should be skipped without failing the entire batch:

Product existingProduct = productRepository.findByUniqueFields(product);
if (existingProduct != null) {
    return existingProduct;
}

try {
  transactionTemplate.execute((status) -> {
      productDeduplicationRepository.insert(deduplication);
      productRepository.insert(product);
      return Boolean.TRUE;
  });
} catch(DuplicateKeyException e) {
   return productRepository.findByUniqueFields(product);
}

Crucially, the deduplication check and product insertion must occur within the same transaction. Additionally, when products are logically deleted from the main table, they should be physically deleted from the deduplication table.

Multiple solutions exist for preventing duplicate data, each with trade-offs. The optimal approach depends on the specific business requirements and constraints.

Tags: Concurrency data-deduplication distributed-systems message-queue database-optimization

Posted on Sun, 28 Jun 2026 17:05:38 +0000 by Goop3630