Scenario Overview
In modern application architectures, Redis is frequently employed as a caching layer to accelerate read operations. However, a common challenge arises when underlying database records are modified: the cache must be updated accordingly. Embedding cache invalidation or update logic directly within business code often leads to tight coupling and reduced maintainability. A cleaner approach involves decoupling this logic using an event-driven architecture.
System Architecture
Canal is a middleware that mimics a MySQL slave node to subscribe to binary logs (binlog). While it supports TCP mode, it can also directly bridge data to message queues. Currently, it inetgrates with Kafka, RocketMQ, and RabbitMQ (though RabbitMQ support may have stability issues in some versions).
This guide demonstrates using Kafka as the message broker to propagate database changes to Redis. The required components include:
- MySQL (Source Database)
- ZooKeeper (Coordination service for Kafka)
- Kafka (Message Queue)
- Canal (Binlog Subscriber)
- Redis (Cache Store)
Setting Up Kafka
Download the Kafka binaries from the official website. Extract the archive and modify the config/server.properties file to define the log directory:
log.dirs=./kafka-logs
Start ZooKeeper (assuming version 3.6.1 or similar):
# Navigate to ZooKeeper bin directory
zkServer.cmd
Next, launch the Kafka server. Open a terminal in the Kafka bin directory (use Git Bash or WSL on Windows for better compatibility) and run:
kafka-server-start.bat ../../config/server.properties
Verify that Kafka has registered with ZooKeeper. Then, create a topic to receive data events from Canal:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic db-sync-topic
Configuring Canal Server
Download the Canal deployer package. Navigate to conf/canal.properties and configure the server mode and Kafka connection:
# Enable Kafka mode
canal.serverMode = kafka
# Optimize parser threads
canal.instance.parser.parallelThreadSize = 16
# Kafka broker address
canal.mq.servers = 127.0.0.1:9092
# Define instance name
canal.destinations = instance1
Now, configure the specific instance. Go to conf/instance1/instance.properties:
# MySQL Connection
canal.instance.master.address=127.0.0.1:3306
# Binlog position (Optional: Canal can auto-track)
canal.instance.master.journal.name=mysql-bin.000006
canal.instance.master.position=4596
# Credentials (Ensure user has replication privileges)
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@****
canal.instance.connectionCharset = UTF-8
# Target Kafka topic
canal.mq.topic=db-sync-topic
# Partition index
canal.mq.partition=0
Start the Canal server. It will begin streaming binlog events too the Kafka topic.
Verifying the Message Stream
To ensure data is flowing, listen to the Kafka topic using the console consumer. Note: On Windows CMD, the default encoding is GBK. If you see garbled text, switch to UTF-8 encoding first:
chcp 65001
kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --from-beginning --topic db-sync-topic
You should see JSON payloads representing database events (INSERT, UPDATE, DELETE).
Implementing the Synchronization Service
Create a Spring Boot application to consume messages from Kafka and udpate Redis.
Dependencies
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
Configuration
Define Redis and Kafka settings in application.yml:
spring:
redis:
host: 127.0.0.1
port: 6379
database: 0
password: 123456
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
group-id: redis-sync-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Redis Helper
A utility class to handle Redis operations:
@Component
public class CacheHelper {
@Autowired
private StringRedisTemplate redisTemplate;
public void cacheValue(String key, String value, long ttlSeconds) {
if (ttlSeconds > 0) {
redisTemplate.opsForValue().set(key, value, ttlSeconds, TimeUnit.SECONDS);
} else {
redisTemplate.opsForValue().set(key, value);
}
}
public void removeKey(String key) {
redisTemplate.delete(key);
}
}
Data Model
Define a POJO to map the Canal JSON payload. Note the structure includes metadata and the actual data rows.
public class DbChangeEvent {
private String database;
private String table;
private String type; // INSERT, UPDATE, DELETE
private boolean isDdl;
private List<Map<String, Object>> data;
private List<Map<String, Object>> old; // For updates
// Getters and Setters
}
Kafka Consumer Logic
Implement the consumer that listens to the topic and updates Redis based on the event type.
@Component
public class DbSyncConsumer {
private static final Logger logger = LoggerFactory.getLogger(DbSyncConsumer.class);
private static final long CACHE_TTL = 3600L; // 1 hour
@Autowired
private CacheHelper cacheHelper;
@KafkaListener(topics = "db-sync-topic", groupId = "redis-sync-group")
public void handleDbEvent(String payload) {
logger.info("Received DB Event: {}", payload);
try {
DbChangeEvent event = JSON.parseObject(payload, DbChangeEvent.class);
if (event.isDdl() || event.getData() == null) {
return;
}
String eventType = event.getType();
for (Map<String, Object> row : event.getData()) {
// Assuming the primary key is 'id'. Adjust based on your table.
String recordId = (String) row.get("id");
String cacheKey = event.getTable() + ":" + recordId;
if ("INSERT".equalsIgnoreCase(eventType) || "UPDATE".equalsIgnoreCase(eventType)) {
// Serialize the row map and store in Redis
cacheHelper.cacheValue(cacheKey, JSON.toJSONString(row), CACHE_TTL);
} else if ("DELETE".equalsIgnoreCase(eventType)) {
cacheHelper.removeKey(cacheKey);
}
}
} catch (Exception e) {
logger.error("Failed to process message: {}", payload, e);
}
}
}
Testing the Flow
Consider a product table in MySQL:
CREATE TABLE `product_info` (
`id` varchar(32) NOT NULL,
`product_name` varchar(255) DEFAULT NULL,
`price` decimal(10,2) DEFAULT NULL,
`stock` int(11) DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
1. Insert: Execute an INSERT statement in MySQL.
INSERT INTO `product_info` (`id`, `product_name`, `price`, `stock`)
VALUES ('prod_001', 'Wireless Mouse', 25.50, 100);
The consumer should pick up the event and store the JSON representation in Redis under the key product_info:prod_001.
2. Update: Modify the record.
UPDATE `product_info` SET `stock` = 99 WHERE `id` = 'prod_001';
The corresponding value in Redis should be updated.
3. Delete: Remove the record.
DELETE FROM `product_info` WHERE `id` = 'prod_001';
The key should be evicted from Redis.