Practical Implementation of Flink SQL Connectors: Kafka, JDBC, and CDC

Stream Integration via Flink SQL Connectors

Apache Flink SQL abstracts data movement through a modular connector ecosystem. This section outlines configuration patterns for three widely used integrations: upsert-capable Kafka sinks, relational database JDBC bridges, and real-time Change Data Capture (CDC) sources.

1. Upsert-Kafka Connector and Changelog Processing

The upsert-kafka connector manages key-based updates and deletions in Kafka topics. Unlike standard append-only connectors, it requires a primary key to determine how messages are keyed and how updates are reconciled across consumers.

Stream Preparation and View Registration

Convert raw network input into a typed stream and expose it to the Table API:

DataStreamSource<String> rawSocket = env.socketTextStream("192.168.5.10", 9090);
SingleOutputStreamOperator<InventoryItem> typedStream = rawSocket.map(line -> {
    String[] tokens = line.split(";");
    return new InventoryItem(tokens[0], Integer.parseInt(tokens[1]));
});
tableEnv.createTemporaryView("inventory_stream", typedStream);

Target Table Definition

Define the Kafka sink using DDL. The NOT ENFORCED constraint is mandatory for Flink to bypass local constraint checks while still routing messages based on the key.

CREATE TABLE kafka_stock_sink (
  item_category STRING PRIMARY KEY NOT ENFORCED,
  stock_level BIGINT
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'warehouse_updates',
  'properties.bootstrap.servers' = 'kafka-broker:9092',
  'key.format' = 'csv',
  'value.format' = 'csv'
);

Aggregation and Sink Insertion

INSERT INTO kafka_stock_sink
SELECT item_category, COUNT(*) AS stock_level
FROM inventory_stream
GROUP BY item_category;

Managing JOIN Retractions (+I, -D, +I)

When joining two dynamic streams, Flink generates changelog records to maintain state consistency. A left join typically produces an initial insert (+I), a retraction of the previous state (-D), and a corrected insert (+I) when matching data arrives.

DataStream<String> orderSrc = env.socketTextStream("192.168.5.10", 8081);
DataStream<Order> orderStream = orderSrc.map(row -> {
    String[] parts = row.split(",");
    return new Order(Integer.parseInt(parts[0]), parts[1], Integer.parseInt(parts[2]));
});

DataStream<String> detailSrc = env.socketTextStream("192.168.5.10", 8082);
DataStream<Location> locationStream = detailSrc.map(row -> {
    String[] parts = row.split(",");
    return new Location(Integer.parseInt(parts[0]), parts[1], Long.parseLong(parts[2]));
});

tableEnv.createTemporaryView("order_flow", orderStream);
tableEnv.createTemporaryView("location_flow", locationStream);

CREATE TABLE kafka_enriched_sink (
  order_id INT PRIMARY KEY NOT ENFORCED,
  item_name STRING,
  quantity INT,
  warehouse_loc STRING,
  contact_phone BIGINT
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'enriched_orders',
  'properties.bootstrap.servers' = 'kafka-broker:9092',
  'key.format' = 'csv',
  'value.format' = 'json'
);

INSERT INTO kafka_enriched_sink
SELECT o.order_id, o.item_name, o.quantity, l.warehouse_loc, l.contact_phone
FROM order_flow AS o
LEFT JOIN location_flow AS l ON o.order_id = l.order_id;

-- Inspect the changelog stream
tableEnv.executeSql("SELECT * FROM kafka_enriched_sink").print();

2. JDBC Connector: Scan and Upsert Modes

The JDBC connector interacts with relational databases. Its operational mode is dictated by the presence of a primary key in the DDL. A defined primary key activates upsert semantics (handling UPDATE and DELETE via dialect-specific syntax), whereas its absence defaults to an append-only sink that cannot process retractions.

Bounded Source (Scan Mode)

Configured without explicit streaming properties, the JDBC source acts as a bounded dataset, executing a one-time snapshot query against the target table.

CREATE TABLE jdbc_profile_source (
  user_id INT PRIMARY KEY,
  full_name STRING,
  age_group INT,
  region_code STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://db-primary:3306/analytics_db?useSSL=false',
  'table-name' = 'user_profiles',
  'username' = 'read_only_svc',
  'password' = 'encrypted_pass'
);

Streaming Sink Execution

Inserting query results into the JDBC table triggers continuous writes. If a primary key is present, duplicate rows are merged; otherwise, records are appended sequentially.

INSERT INTO jdbc_profile_source
SELECT o.order_id, o.item_name, o.quantity, o.region_code
FROM order_stream AS o;

Retrieving data from this table via SELECT remains a bounded operation, executing a single scan rather than subscribing to real-time updates.

3. Change Data Capture (CDC) Integration

Flink CDC connectors utilize Debezium to capture row-level changes directly from database transaction logs. This architecture eliminates polling overhead and enables low-latency synchronization between source systems and Flink pipelines.

Source Compilation and Version Alignment

For production deployments, comipling the connector from source ensures compatibility with speciifc Flink or Debezium releases. After cloning the repository, remove unused submodules to reduce build time, update the flink.version and scala.binary.version properties in pom.xml, and execute mvn clean install to publish the customized artifact to the local repository.

Data base Configuration

MySQL must have binary logging enabled to supply row-level change events. Update the server configuration file (my.cnf or my.ini) under the [mysqld] section:

[mysqld]
log_bin=mysql-bin
binlog_format=ROW
server-id=1
expire_logs_days=7

Verify the configuration and capture the current binary log coordinates:

SHOW VARIABLES LIKE 'log_%';
SHOW MASTER STATUS;
SHOW BINLOG EVENTS IN 'mysql-bin.000001';

CDC Table Definition

Once the database is prepared, declare the CDC source table in Flink SQL. The connector automatically handles initial snapshots and incremental binlog parsing, translating database transactions into Flink changelog records.

CREATE TABLE mysql_cdc_source (
  record_id INT NOT NULL,
  payload STRING,
  event_ts TIMESTAMP(3),
  PRIMARY KEY (record_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'mysql-primary',
  'port' = '3306',
  'username' = 'cdc_user',
  'password' = 'cdc_secret',
  'database-name' = 'production_db',
  'table-name' = 'events_log',
  'scan.startup.mode' = 'latest-offset'
);

Tags: flink-sql upsert-kafka jdbc-connector mysql-cdc debezium

Posted on Wed, 01 Jul 2026 17:13:40 +0000 by sander_ESP