Synchronizing MySQL Data to Redis Using Canal and Kafka

Scenario Overview

In modern application architectures, Redis is frequently employed as a caching layer to accelerate read operations. However, a common challenge arises when underlying database records are modified: the cache must be updated accordingly. Embedding cache invalidation or update logic directly within business code often leads to tight coupling and reduced maintainability. A cleaner approach involves decoupling this logic using an event-driven architecture.

System Architecture

Canal is a middleware that mimics a MySQL slave node to subscribe to binary logs (binlog). While it supports TCP mode, it can also directly bridge data to message queues. Currently, it inetgrates with Kafka, RocketMQ, and RabbitMQ (though RabbitMQ support may have stability issues in some versions).

This guide demonstrates using Kafka as the message broker to propagate database changes to Redis. The required components include:

  • MySQL (Source Database)
  • ZooKeeper (Coordination service for Kafka)
  • Kafka (Message Queue)
  • Canal (Binlog Subscriber)
  • Redis (Cache Store)

Setting Up Kafka

Download the Kafka binaries from the official website. Extract the archive and modify the config/server.properties file to define the log directory:

log.dirs=./kafka-logs

Start ZooKeeper (assuming version 3.6.1 or similar):

# Navigate to ZooKeeper bin directory
zkServer.cmd

Next, launch the Kafka server. Open a terminal in the Kafka bin directory (use Git Bash or WSL on Windows for better compatibility) and run:

kafka-server-start.bat ../../config/server.properties

Verify that Kafka has registered with ZooKeeper. Then, create a topic to receive data events from Canal:

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic db-sync-topic

Configuring Canal Server

Download the Canal deployer package. Navigate to conf/canal.properties and configure the server mode and Kafka connection:

# Enable Kafka mode
canal.serverMode = kafka
# Optimize parser threads
canal.instance.parser.parallelThreadSize = 16
# Kafka broker address
canal.mq.servers = 127.0.0.1:9092
# Define instance name
canal.destinations = instance1

Now, configure the specific instance. Go to conf/instance1/instance.properties:

# MySQL Connection
canal.instance.master.address=127.0.0.1:3306
# Binlog position (Optional: Canal can auto-track)
canal.instance.master.journal.name=mysql-bin.000006
canal.instance.master.position=4596

# Credentials (Ensure user has replication privileges)
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@****
canal.instance.connectionCharset = UTF-8

# Target Kafka topic
canal.mq.topic=db-sync-topic
# Partition index
canal.mq.partition=0

Start the Canal server. It will begin streaming binlog events too the Kafka topic.

Verifying the Message Stream

To ensure data is flowing, listen to the Kafka topic using the console consumer. Note: On Windows CMD, the default encoding is GBK. If you see garbled text, switch to UTF-8 encoding first:

chcp 65001
kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --from-beginning --topic db-sync-topic

You should see JSON payloads representing database events (INSERT, UPDATE, DELETE).

Implementing the Synchronization Service

Create a Spring Boot application to consume messages from Kafka and udpate Redis.

Dependencies

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

Configuration

Define Redis and Kafka settings in application.yml:

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    database: 0
    password: 123456
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: redis-sync-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Redis Helper

A utility class to handle Redis operations:

@Component
public class CacheHelper {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public void cacheValue(String key, String value, long ttlSeconds) {
        if (ttlSeconds > 0) {
            redisTemplate.opsForValue().set(key, value, ttlSeconds, TimeUnit.SECONDS);
        } else {
            redisTemplate.opsForValue().set(key, value);
        }
    }

    public void removeKey(String key) {
        redisTemplate.delete(key);
    }
}

Data Model

Define a POJO to map the Canal JSON payload. Note the structure includes metadata and the actual data rows.

public class DbChangeEvent {
    private String database;
    private String table;
    private String type; // INSERT, UPDATE, DELETE
    private boolean isDdl;
    private List<Map<String, Object>> data;
    private List<Map<String, Object>> old; // For updates
    // Getters and Setters
}

Kafka Consumer Logic

Implement the consumer that listens to the topic and updates Redis based on the event type.

@Component
public class DbSyncConsumer {
    
    private static final Logger logger = LoggerFactory.getLogger(DbSyncConsumer.class);
    private static final long CACHE_TTL = 3600L; // 1 hour

    @Autowired
    private CacheHelper cacheHelper;

    @KafkaListener(topics = "db-sync-topic", groupId = "redis-sync-group")
    public void handleDbEvent(String payload) {
        logger.info("Received DB Event: {}", payload);
        
        try {
            DbChangeEvent event = JSON.parseObject(payload, DbChangeEvent.class);
            
            if (event.isDdl() || event.getData() == null) {
                return;
            }

            String eventType = event.getType();
            
            for (Map<String, Object> row : event.getData()) {
                // Assuming the primary key is 'id'. Adjust based on your table.
                String recordId = (String) row.get("id"); 
                String cacheKey = event.getTable() + ":" + recordId;

                if ("INSERT".equalsIgnoreCase(eventType) || "UPDATE".equalsIgnoreCase(eventType)) {
                    // Serialize the row map and store in Redis
                    cacheHelper.cacheValue(cacheKey, JSON.toJSONString(row), CACHE_TTL);
                } else if ("DELETE".equalsIgnoreCase(eventType)) {
                    cacheHelper.removeKey(cacheKey);
                }
            }
        } catch (Exception e) {
            logger.error("Failed to process message: {}", payload, e);
        }
    }
}

Testing the Flow

Consider a product table in MySQL:

CREATE TABLE `product_info` (
  `id` varchar(32) NOT NULL,
  `product_name` varchar(255) DEFAULT NULL,
  `price` decimal(10,2) DEFAULT NULL,
  `stock` int(11) DEFAULT '0',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

1. Insert: Execute an INSERT statement in MySQL.

INSERT INTO `product_info` (`id`, `product_name`, `price`, `stock`) 
VALUES ('prod_001', 'Wireless Mouse', 25.50, 100);

The consumer should pick up the event and store the JSON representation in Redis under the key product_info:prod_001.

2. Update: Modify the record.

UPDATE `product_info` SET `stock` = 99 WHERE `id` = 'prod_001';

The corresponding value in Redis should be updated.

3. Delete: Remove the record.

DELETE FROM `product_info` WHERE `id` = 'prod_001';

The key should be evicted from Redis.

Tags: MySQL Redis Apache Kafka Canal Binlog

Posted on Sat, 04 Jul 2026 16:36:57 +0000 by nadeem14375