Preparing the Flink 1.20.0 Distribution
Begin by downloading the official Flink 1.20.0 binary distribution for Scala 2.12:
wget https://archive.apache.org/dist/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz
tar -xzf flink-1.20.0-bin-scala_2.12.tgz
tar -czf flink-1.20.0.tar.gz flink-1.20.0
If Hudi support is required, copy the compatible Hudi Flink bundle into the lib directory:
cp hudi-flink1.19-bundle-0.13.0.jar flink-1.20.0/lib/
Generate an MD5 checksum for integrity verification and place both the archive and its checksum in DataSophon’s package repository:
md5sum flink-1.20.0.tar.gz | awk '{print $1}' > flink-1.20.0.tar.gz.md5
cp flink-1.20.0.tar.gz flink-1.20.0.tar.gz.md5 /opt/datasophon/DDP/packages/
Updating the Service Definition
Edit the Flink service definition file to register version 1.20.0:
vi /opt/datasophon/datasophon-manager-1.2.1/conf/meta/DDP-1.2.1/FLINK/service_ddl.json
Replace the contents with the following JSON structure, ensuring compatibility with Flink 1.20.0’s configuration format and high-availability model:
{
"name": "FLINK",
"label": "Flink",
"description": "Real-time stream processing engine",
"version": "1.20.0",
"sortNum": 6,
"dependencies": [],
"packageName": "flink-1.20.0.tar.gz",
"decompressPackageName": "flink-1.20.0",
"runAs": "root",
"roles": [
{
"name": "FlinkClient",
"label": "FlinkClient",
"roleType": "client",
"cardinality": "1+",
"logFile": "logs/flink.log"
}
],
"configWriter": {
"generators": [
{
"filename": "config.yaml",
"configFormat": "custom",
"templateName": "properties3.ftl",
"outputDirectory": "conf",
"includeParams": [
"jobmanager.bind-host",
"jobmanager.rpc.address",
"jobmanager.rpc.port",
"jobmanager.memory.process.size",
"jobmanager.execution.failover-strategy",
"taskmanager.bind-host",
"taskmanager.host",
"taskmanager.numberOfTaskSlots",
"taskmanager.memory.process.size",
"parallelism.default",
"enableJMHA",
"high-availability.type",
"high-availability.storageDir",
"high-availability.zookeeper.quorum",
"high-availability.zookeeper.path.root",
"high-availability.zookeeper.client.acl",
"execution.checkpointing.interval",
"execution.checkpointing.mode",
"state.backend",
"state.checkpoints.dir",
"state.savepoints.dir",
"custom.flink.conf.yaml",
"classloader.check-leaked-classloader"
]
},
{
"filename": "masters",
"configFormat": "custom",
"templateName": "flink_masters.ftl",
"outputDirectory": "conf",
"includeParams": ["flink.master.hostnames"]
},
{
"filename": "workers",
"configFormat": "custom",
"templateName": "flink_workers.ftl",
"outputDirectory": "conf",
"includeParams": ["flink.worker.hostnames"]
}
]
},
"parameters": [
{
"name": "jobmanager.bind-host",
"label": "JobManager Bind Host",
"description": "Network interface for JobManager to bind to",
"required": true,
"type": "input",
"value": "0.0.0.0",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "0.0.0.0"
},
{
"name": "jobmanager.rpc.address",
"label": "JobManager RPC Address",
"description": "Hostname or IP for JobManager RPC communication",
"required": true,
"type": "input",
"value": "bigdata1",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "bigdata1"
},
{
"name": "jobmanager.rpc.port",
"label": "JobManager RPC Port",
"description": "Port used for JobManager RPC calls",
"required": true,
"type": "input",
"value": "6123",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "6123"
},
{
"name": "jobmanager.memory.process.size",
"label": "JobManager Process Memory",
"description": "Total memory allocated to the JobManager process",
"required": true,
"type": "input",
"value": "1600m",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "1600m"
},
{
"name": "jobmanager.execution.failover-strategy",
"label": "Failover Strategy",
"description": "Strategy for job recovery on failure",
"required": true,
"type": "input",
"value": "region",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "region"
},
{
"name": "taskmanager.bind-host",
"label": "TaskManager Bind Host",
"description": "Network interface for TaskManager to bind to",
"required": true,
"type": "input",
"value": "0.0.0.0",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "0.0.0.0"
},
{
"name": "taskmanager.host",
"label": "TaskManager Host",
"description": "Hostname or IP of the machine running TaskManager",
"required": true,
"type": "input",
"value": "bigdata1",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "bigdata1"
},
{
"name": "taskmanager.numberOfTaskSlots",
"label": "Task Slots per TaskManager",
"description": "Number of parallel task slots per TaskManager",
"required": true,
"type": "input",
"value": "1",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "1"
},
{
"name": "taskmanager.memory.process.size",
"label": "TaskManager Process Memory",
"description": "Total memory allocated to each TaskManager process",
"required": true,
"type": "input",
"value": "1024m",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "1024m"
},
{
"name": "parallelism.default",
"label": "Default Parallelism",
"description": "Default number of parallel tasks for jobs",
"required": true,
"type": "input",
"value": "1",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "1"
},
{
"name": "enableJMHA",
"label": "Enable JobManager High Availability",
"description": "Enable HA mode using ZooKeeper",
"required": true,
"type": "switch",
"value": false,
"configurableInWizard": true,
"hidden": false,
"defaultValue": false
},
{
"name": "high-availability.type",
"label": "HA Mode",
"description": "Use ZooKeeper for leader election",
"required": true,
"type": "input",
"value": "zookeeper",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "zookeeper"
},
{
"name": "high-availability.storageDir",
"label": "HA Metadata Storage (HDFS)",
"description": "HDFS path for storing JobManager recovery metadata",
"required": true,
"type": "input",
"value": "hdfs://nameservice1/flink/ha/",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "hdfs://nameservice1/flink/ha/"
},
{
"name": "high-availability.zookeeper.quorum",
"label": "ZooKeeper Quorum",
"description": "Comma-separated list of ZooKeeper server addresses",
"required": true,
"type": "input",
"value": "${zkUrls}",
"configurableInWizard": true,
"hidden": false,
"defaultValue": ""
},
{
"name": "high-availability.zookeeper.path.root",
"label": "ZooKeeper Root Path",
"description": "Root znode for Flink HA metadata",
"required": true,
"type": "input",
"value": "/flink",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "/flink"
},
{
"name": "high-availability.zookeeper.client.acl",
"label": "ZooKeeper ACL Mode",
"description": "Set to 'creator' if ZooKeeper ACL is enabled",
"required": true,
"type": "input",
"value": "open",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "open"
},
{
"name": "execution.checkpointing.interval",
"label": "Checkpoint Interval",
"description": "Time between automatic checkpoints",
"required": true,
"type": "input",
"value": "3min",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "3min"
},
{
"name": "execution.checkpointing.mode",
"label": "Checkpoint Mode",
"description": "Guarantee level for checkpointing (EXACTLY_ONCE or AT_LEAST_ONCE)",
"required": true,
"type": "input",
"value": "EXACTLY_ONCE",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "EXACTLY_ONCE"
},
{
"name": "state.backend",
"label": "State Backend",
"description": "Storage backend for operator state (filesystem or rocksdb)",
"required": true,
"type": "input",
"value": "filesystem",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "filesystem"
},
{
"name": "state.checkpoints.dir",
"label": "Checkpoint Storage Directory",
"description": "HDFS path for persistent checkpoint data",
"required": true,
"type": "input",
"value": "hdfs://nameservice1/flink/flink-checkpoints",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "hdfs://nameservice1/flink/flink-checkpoints"
},
{
"name": "state.savepoints.dir",
"label": "Savepoint Storage Directory",
"description": "HDFS path for user-triggered savepoints",
"required": true,
"type": "input",
"value": "hdfs://nameservice1/flink/flink-savepoints",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "hdfs://nameservice1/flink/flink-savepoints"
},
{
"name": "custom.flink.conf.yaml",
"label": "Custom Flink Configuration",
"description": "Additional key-value pairs to inject into flink-conf.yaml",
"configType": "custom",
"required": false,
"type": "multipleWithKey",
"value": [],
"configurableInWizard": true,
"hidden": false,
"defaultValue": ""
},
{
"name": "classloader.check-leaked-classloader",
"label": "Disable ClassLoader Leak Detection",
"description": "Disable internal classloader leak checks",
"required": true,
"type": "switch",
"value": false,
"configurableInWizard": true,
"hidden": false,
"defaultValue": false
},
{
"name": "flink.master.hostnames",
"label": "JobManager Hosts and Ports",
"description": "Comma-separated list of JobManager hosts and ports (e.g., bigdata1:8081,bigdata2:8081)",
"required": true,
"type": "input",
"value": "bigdata1:8081,bigdata2:8081",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "localhost:8081"
},
{
"name": "flink.worker.hostnames",
"label": "TaskManager Hosts",
"description": "Comma-separated list of TaskManager hostnames or IPs",
"required": true,
"type": "input",
"value": "bigdata1,bigdata2,bigdata3",
"configurableInWizard": true,
"hidden": false,
"defaultValue": "localhost"
}
]
}
Configuring Template Files
Update the main Flink configuration template to properly render nested YAML structures:
vi /opt/datasophon/datasophon-worker/conf/templates/flink-conf.ftl
Replace its content with:
<#-- JobManager Configuration -->
jobmanager:
bind-host: ${parameters.jobmanager.bind-host!"0.0.0.0"}
rpc:
address: ${parameters.jobmanager.rpc.address!"localhost"}
port: ${parameters.jobmanager.rpc.port!6123}
memory:
process:
size: ${parameters.jobmanager.memory.process.size!"1600m"}
execution:
failover-strategy: ${parameters.jobmanager.execution.failover-strategy!"region"}
<#-- TaskManager Configuration -->
taskmanager:
bind-host: ${parameters.taskmanager.bind-host!"0.0.0.0"}
host: ${parameters.taskmanager.host!"localhost"}
numberOfTaskSlots: ${parameters.taskmanager.numberOfTaskSlots!1}
memory:
process:
size: ${parameters.taskmanager.memory.process.size!"1024m"}
<#-- Default Parameters -->
parallelism.default: ${parameters.parallelism.default!"1"}
execution.checkpointing.interval: ${parameters.execution.checkpointing.interval!"3min"}
execution.checkpointing.mode: ${parameters.execution.checkpointing.mode!"EXACTLY_ONCE"}
state.backend: ${parameters.state.backend!"filesystem"}
state.checkpoints.dir: ${parameters.state.checkpoints.dir!"hdfs:///flink-checkpoints"}
state.savepoints.dir: ${parameters.state.savepoints.dir!"hdfs:///flink-savepoints"}
<#-- HA Configuration -->
high-availability.type: ${parameters."high-availability.type"!"zookeeper"}
high-availability.storageDir: ${parameters."high-availability.storageDir"!"hdfs:///flink/ha"}
high-availability.zookeeper.quorum: ${parameters."high-availability.zookeeper.quorum"!"localhost:2181"}
high-availability.zookeeper.path.root: ${parameters."high-availability.zookeeper.path.root"!"/flink"}
high-availability.zookeeper.client.acl: ${parameters."high-availability.zookeeper.client.acl"!"open"}
<#-- Optional ClassLoader Check -->
classloader.check-leaked-classloader: ${parameters."classloader.check-leaked-classloader"!false}
<#-- Custom Configuration Injection -->
<#if parameters.custom.flink.conf.yaml?has_content>
<#assign customConfig = parameters.custom.flink.conf.yaml>
<#list customConfig?keys as key>
${key}: ${customConfig[key]}
</#list>
</#if>
Create the master node list template:
vi /opt/datasophon/datasophon-worker/conf/templates/flink_masters.ftl
Content:
<#list itemList as item>
<#list item.value?split(",") as host>
${host?trim}
</#list>
</#list>
Create the worker node list template:
vi /opt/datasophon/datasophon-worker/conf/templates/flink_workers.ftl
Content:
<#list itemList as item>
<#list item.value?split(",") as host>
${host?trim}
</#list>
</#list>
Setting Environment Variables
Udpate the DataSophon environment script on all nodes:
vim /etc/profile.d/datasophon-env.sh
Add or update the folowing lines:
export FLINK_HOME=/opt/datasophon/flink-1.20.0
export HADOOP_CLASSPATH=$(hadoop classpath)
Apply the changes:
source /etc/profile.d/datasophon-env.sh
Restarting Services
Restart the DataSophon worker service on all nodes:
sh /opt/datasophon/datasophon-worker/bin/datasophon-worker.sh restart worker
Restart the DataSophon API service on the manager node:
sh /opt/datasophon/datasophon-manager-1.2.1/bin/datasophon-api.sh restart api
Validation and Testing
Verify Flink 1.20.0 is properly integrated by submittting a sample job in yarn-per-job mode:
flink run -d -t yarn-per-job $FLINK_HOME/examples/streaming/WordCount.jar
Test application mode with a streaming job:
flink run-application -t yarn-application $FLINK_HOME/examples/streaming/TopSpeedWindowing.jar
Check the Flink Web UI and YARN logs to confirm successful deployment and execution.