Prerequisites
Exactly Once Processing
For exactly once semantics to work properly:
- Source systems: Must support data retransmission (e.g., message queues like Kafka, distributed file systems like HDFS)
- Sink systems: Must support idempotent operations (e.g., Doris supports deduplication)
At Least Once Processing
For at least once semantics:
- Source systems: Must support data retransmission (e.g., Kafka, HDFS)
Enabling Checkpoints
StreamExecutionEnvironment executionEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing every 5 seconds with EXACTLY_ONCE mode
executionEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
Essential Configuration Parameters
Final Checkpoints
// Final checkpoints - default is true since version 1.15
configuration.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
Changelog State Backend
// Requires maximum concurrent checkpoints to be set to 1
executionEnv.enableChangelogStateBackend(true);
HDFS Integration Setup
// Required when using HDFS in code - set Hadoop username
System.setProperty("HADOOP_USER_NAME", "HADOOP");
Unaligned Checkpoints
// Requirements: EXACTLY_ONCE mode and max concurrency of 1
checkpointConfig.enableUnalignedCheckpoints();
// Effective only with unaligned checkpoints: Default is 0, meaning immediate use of unaligned checkpoints
// If greater than 0, starts with aligned checkpoints and switches to unaligned if alignment exceeds this duration
checkpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(4));
Core Checkpoint Settings
// 1. Enable checkpointing: Defaults to aligned barriers, 5-second interval, exactly-once mode
executionEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig checkpointConfiguration = executionEnv.getCheckpointConfig();
// 2. Define checkpoint storage location
checkpointConfiguration.setCheckpointStorage("hdfs:///ip:port/directory");
// 3. Set checkpoint timeout: Default is 10 minutes
checkpointConfiguration.setCheckpointTimeout(60000);
// 4. Maximum number of concurrent checkpoints running simultaneously
checkpointConfiguration.setMaxConcurrentCheckpoints(1);
// 5. Minimum pause duration: Interval between completion of one checkpoint and start of next
checkpointConfiguration.setMinPauseBetweenCheckpoints(1000);
// 6. External system cleanup behavior when job is cancelled
// DELETE_ON_CANCELLATION: Deletes external checkpoint directories when manually cancelled
// RETAIN_ON_CANCELLATION: Preserves external checkpoint directories when manually cancelled
checkpointConfiguration.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 7. Tolerable consecutive checkpoint failures: Default 0 means job fails immediately on checkpoint failure
checkpointConfiguration.setTolerableCheckpointFailureNumber(10);
Additional Configuration Options
| Parameter | Default Value | Type | Description |
|---|
These configurations provide comprehensive control over Flink's fault tolerance mechanisms and state management capabilities.