Configuring TaskManager Hostnames
Each TaskManager must be configured with its respective hostname in flink-conf.yaml:
taskmanager.host: hadoop103
On another node:
taskmanager.host: hadoop104
Starting and Stopping a Standalone Cluster
From the JobManager node (hadoop102):
# Start cluster
bin/start-cluster.sh
# Stop cluster
bin/stop-cluster.sh
Web UI Access
After startup, the Flink dashboard is accessible at:
http://hadoop102:8081
Submitting Jobs in Session Mode
Via Web UI
Jobs can be uploaded and launched through the web interface.
Via Command Line
bin/flink run -m hadoop102:8081 \
-c com.atguigu.flink01.Flink03_WC_Unbound_Socket \
./flink-0918-1.0-SNAPSHOT.jar
Flink Deployment Modes
Standalone Mode
The default mode where Flink manages its own cluster resources.
Flink on YARN
Environment Setup
Add the flolowing to /etc/profile.d/my_env.sh on all nodes:
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
Reload environment variibles:
source /etc/profile
YARN Session Mode
Start a long-running Flink session on YARN:
bin/yarn-session.sh -nm flink-session-cluster01
Submit a job (after session is active):
bin/flink run \
-c com.atguigu.flink01.Flink03_WC_Unbound_Socket \
./flink-0918-1.0-SNAPSHOT.jar
Stop the session via YARN:
yarn application -kill application_1700281106461_0453
Per-Job Mode
Launch a dedicated Flink cluster for a single job:
bin/flink run -d -t yarn-per-job \
-c com.atguigu.flink01.Flink03_WC_Unbound_Socket \
./flink-0918-1.0-SNAPSHOT.jar
Termination is done via the Flink Web UI.
Application Mode
In this mode, the application’s main() runs on the JobManager within YARN.
First, upload required files to HDFS:
hadoop fs -mkdir /flink-dist
hadoop fs -put lib/ /flink-dist
hadoop fs -put plugins/ /flink-dist
hadoop fs -mkdir /flink-jars
hadoop fs -put ./flink-0918-1.0-SNAPSHOT.jar /flink-jars
Then submit:
bin/flink run-application -d -t yarn-application \
-Dyarn.provided.lib.dirs="hdfs://hadoop102:9820/flink-dist" \
-c com.atguigu.flink01.Flink03_WC_Unbound_Socket \
hdfs://hadoop102:9820/flink-jars/flink-0918-1.0-SNAPSHOT.jar
Key Difference: In per-job mode, the client executes main() and submits the JobGraph. In application mode, main() runs inside the YARN cluster.
Configuring Flink History Server
Create HDFS Directory
hadoop fs -mkdir -p /logs/flink-job
Update flink-conf.yaml
Append the following:
jobmanager.archive.fs.dir: hdfs://hadoop102:9820/logs/flink-job
historyserver.web.address: hadoop102
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://hadoop102:9820/logs/flink-job
historyserver.archive.fs.refresh-interval: 5000
Start/Stop History Server
bin/historyserver.sh start
bin/historyserver.sh stop
Access archived jobs at:
http://hadoop102:8082
Architecture Overview (Standalone Session Mode)
Core components:
- Client: Submits applications.
- JobManager: Coordinates job execution via Dispatcher and ResourceManager.
- TaskManager: Executes tasks in slots.
Each job is managed by a JobMaster, which builds an execution graph and schedules tasks across available slots.
Parallelism Configuration
Parallelism determines the number of subtasks per operator.
Three ways to set parallelism:
- In code:
operator.setParallelism(n) - In config:
parallelism.defaultinflink-conf.yaml - At submission:
-p nin command line
Maximum usable parallelism = number of TaskManagers Ă— slots per TaskManager.