Two-Phase Commit in Flink Sink Operators
Flink achieves exactly-once guarantees for external systems through an implementation of the Two-Phase Commit (2PC) protocol. The core mechanism aligns transaction boundaries with checkpoint barriers. When a sink operator receives the first record of a checkpoint interval or encounters a checkpoint barrier, it initiates a new transaction within the external system. All subsequent records are written under this transaction but remain in an uncommitted state. Only after the JobManager confirms that every parallel task has successfully persisted its checkpoint state does the sink formally commit the transaction, making the data visible to consumers.
This design specifically protects data processed after the barrier. Since the sink represents the terminal stage of the pipeline, records before the barrier have already been emitted and finalized. If a system failure occurs during checkpointing, Flink restores from the last successful checkpoint state. Without 2PC protection on the current interval's data, partially written records could be re-emitted upon recovery, leading to duplicates. The transaction boundary ensures that uncommitted data is either rolled back (on failure) or atomically committed (on success), preserving consistency.
Configuration Prerequisites for End-to-End Consistency
To guarantee exactly-once semantics across the entire pipeline, configurasions must be synchronized across Flink, the Kafka source, the Kafka sink, and the consuming applications.
- Flink Runtime: Checkpointing must be explicitly enabled and configured with appropriate intervals and timeouts.
- Kafka Source: The consumer must persist read offsets within Flink's operator state and commit them during checkpoint barriers rather than periodically. This ensures offset recovery aligns with state restoration.
- Kafka Sink: The producer must be instantiated with the exactly-once semantic flag, enabling transactional writes.
- Consumer Isolation Level: Downstream applications consuming from the sink topic must configure
isolation.level=read_committed. The defaultread_uncommittedsetting allows consumers to read transactional messages before they are finalized, breaking exactly-once guarantees and potentially introducing latency. - Transaction Timeout Alignment: The Flink sink's
transaction.timeout.msmust be strictly lower than Kafka broker'stransaction.max.timeout.msto prevent brokers from forcibly aborting long-running Flink transactions.
Maven Dependencies
<properties>
<java.version>1.8</java.version>
<flink.version>1.13.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- Flink Core & Streaming -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Kafka Connectors -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
Implementation Guide
The following example demonstrates a complete stream processing job that reads from a Kafka topic, performs word aggregation, and writes results back to Kafka with exactly-once guarantees. Failure simulation is included to demonstrate checkpoint recovery behavior.
package com.streamprocessing.demo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import java.util.Properties;
import java.util.Random;
public class KafkaExactlyOnceJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1. Configure Flink Checkpointing
CheckpointConfig ckptConfig = env.getCheckpointConfig();
ckptConfig.setCheckpointInterval(10_000);
ckptConfig.setCheckpointTimeout(5_000);
ckptConfig.setMinPauseBetweenCheckpoints(15_000);
ckptConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
ckptConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
);
env.setStateBackend(new FsStateBackend("file:///tmp/flink-checkpoints"));
// 2. Configure Kafka Source
Properties sourceProps = new Properties();
sourceProps.setProperty("bootstrap.servers", "broker-1:9092");
sourceProps.setProperty("group.id", "flink-exactly-once-group");
sourceProps.setProperty("auto.offset.reset", "latest");
sourceProps.setProperty("flink.partition-discovery.interval-millis", "5000");
FlinkKafkaConsumer<String> kafkaReader = new FlinkKafkaConsumer<>(
"input-events", new SimpleStringSchema(), sourceProps
);
kafkaReader.setStartFromLatest();
kafkaReader.setCommitOffsetsOnCheckpoints(true);
DataStream<String> sourceStream = env.addSource(kafkaReader);
// 3. Transform and Aggregate
DataStream<Tuple2<String, Integer>> wordCounts = sourceStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
private final Random rng = new Random();
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
for (String token : line.split("\\s+")) {
// Simulate intermittent processing failures
if (rng.nextInt(10) == 0) {
throw new RuntimeException("Simulated transient failure");
}
out.collect(Tuple2.of(token, 1));
}
}
});
KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordCounts.keyBy(record -> record.f0);
DataStream<Tuple2<String, Integer>> aggregatedStream = keyedStream.sum(1);
DataStream<String> formattedOutput = aggregatedStream.map(record -> record.f0 + " :: " + record.f1);
// 4. Configure Kafka Sink with Exactly-Once
Properties sinkProps = new Properties();
sinkProps.setProperty("bootstrap.servers", "broker-1:9092");
sinkProps.setProperty("transaction.timeout.ms", "600000"); // 10 minutes
FlinkKafkaProducer<String> kafkaWriter = new FlinkKafkaProducer<>(
"output-aggregated",
new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()),
sinkProps,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);
formattedOutput.addSink(kafkaWriter);
env.execute("Flink-Kafka Exactly-Once Job");
}
}
Checkpoint Mechanics and Barrier Alignment
Flink's consistency model relies on distributed snapshots inspired by the Chandy-Lamport algorithm. The process begins when the JobManager's checkpoint coordinator triggers a save operation. Instead of halting the pipeline, the system injects special control messages known as barriers into the data stream at the source operators. These barriers carry a unique checkpoint identifier and travel downstream alongside regular data records.
When an operator encounters a barrier, it captures a consistent snapshot of its internal state (including buffered records, window aggregates, and custom counters) and stores it in the configured state backend. A critical requirement is barrier alignment: if a operator has multiple upstream inputs, it must receive bariers from all upstream tasks before it can initiate its own snapshot. This ensures that the captured state represents a globally consistent point in time across the entire job topology.
During the snapshot window, incoming data is temporarily cached and processed only after the state persistence completes. If any single task fails to persist its state within the configured timeout, the entire checkpoint attempt is aborted, and the coordinator will re-trigger the process. This atomicity prevents partial state corruption and guarantees that recovery always restores a valid, consistent snapshot.
Checkpoints versus Savepoints
While checkpoints and savepoints utilize identical snapshotting mechanics and storage formats, their lifecycle management differs fundamentally. Checkpoints are automated, lightweight snapshots managed internally by Flink for fault tolerance. They are periodically created and automatically cleaned up or retained based on configuration. Savepoints, conversely, are explicit, user-triggered state exports designed for operational control.
Savepoints are primarily utilized during application upgrades, topology modifications, or parallelism adjustments. Because savepoints retain additional metadata regarding operator structure and state naming, they enable safe migration of state between different versions of a streaming job. To ensure state compatibility across savepoint restores, developers must assign static identifiers (UIDs) to all stateful operators using the .uid() method. This guarantees that Flink correctly maps historical state to the corresponding operators in the updated job topology, preventing state loss or mismatch errors during deployment.