Apache Flink Checkpoint Configuration Guide

Prerequisites

Exactly Once Processing

For exactly once semantics to work properly:

  • Source systems: Must support data retransmission (e.g., message queues like Kafka, distributed file systems like HDFS)
  • Sink systems: Must support idempotent operations (e.g., Doris supports deduplication)

At Least Once Processing

For at least once semantics:

  • Source systems: Must support data retransmission (e.g., Kafka, HDFS)

Enabling Checkpoints

StreamExecutionEnvironment executionEnv = StreamExecutionEnvironment.getExecutionEnvironment();

// Enable checkpointing every 5 seconds with EXACTLY_ONCE mode
executionEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

Essential Configuration Parameters

Final Checkpoints

// Final checkpoints - default is true since version 1.15
configuration.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);

Changelog State Backend

// Requires maximum concurrent checkpoints to be set to 1
executionEnv.enableChangelogStateBackend(true);

HDFS Integration Setup

// Required when using HDFS in code - set Hadoop username
System.setProperty("HADOOP_USER_NAME", "HADOOP");

Unaligned Checkpoints

// Requirements: EXACTLY_ONCE mode and max concurrency of 1
checkpointConfig.enableUnalignedCheckpoints();

// Effective only with unaligned checkpoints: Default is 0, meaning immediate use of unaligned checkpoints
// If greater than 0, starts with aligned checkpoints and switches to unaligned if alignment exceeds this duration
checkpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(4));

Core Checkpoint Settings

// 1. Enable checkpointing: Defaults to aligned barriers, 5-second interval, exactly-once mode
executionEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig checkpointConfiguration = executionEnv.getCheckpointConfig();
        
// 2. Define checkpoint storage location
checkpointConfiguration.setCheckpointStorage("hdfs:///ip:port/directory");

// 3. Set checkpoint timeout: Default is 10 minutes
checkpointConfiguration.setCheckpointTimeout(60000);

// 4. Maximum number of concurrent checkpoints running simultaneously
checkpointConfiguration.setMaxConcurrentCheckpoints(1);

// 5. Minimum pause duration: Interval between completion of one checkpoint and start of next
checkpointConfiguration.setMinPauseBetweenCheckpoints(1000);

// 6. External system cleanup behavior when job is cancelled
// DELETE_ON_CANCELLATION: Deletes external checkpoint directories when manually cancelled
// RETAIN_ON_CANCELLATION: Preserves external checkpoint directories when manually cancelled
checkpointConfiguration.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 7. Tolerable consecutive checkpoint failures: Default 0 means job fails immediately on checkpoint failure
checkpointConfiguration.setTolerableCheckpointFailureNumber(10);

Additional Configuration Options

Parameter Default Value Type Description

These configurations provide comprehensive control over Flink's fault tolerance mechanisms and state management capabilities.

Tags: Apache Flink Checkpoint Fault Tolerance Stream Processing State Management

Posted on Thu, 07 May 2026 06:14:41 +0000 by smpdawg