Integrating Flink 1.20.0 with DataSophon

Preparing the Flink 1.20.0 Distribution

Begin by downloading the official Flink 1.20.0 binary distribution for Scala 2.12:

wget https://archive.apache.org/dist/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz
tar -xzf flink-1.20.0-bin-scala_2.12.tgz
tar -czf flink-1.20.0.tar.gz flink-1.20.0

If Hudi support is required, copy the compatible Hudi Flink bundle into the lib directory:

cp hudi-flink1.19-bundle-0.13.0.jar flink-1.20.0/lib/

Generate an MD5 checksum for integrity verification and place both the archive and its checksum in DataSophon’s package repository:

md5sum flink-1.20.0.tar.gz | awk '{print $1}' > flink-1.20.0.tar.gz.md5
cp flink-1.20.0.tar.gz flink-1.20.0.tar.gz.md5 /opt/datasophon/DDP/packages/

Updating the Service Definition

Edit the Flink service definition file to register version 1.20.0:

vi /opt/datasophon/datasophon-manager-1.2.1/conf/meta/DDP-1.2.1/FLINK/service_ddl.json

Replace the contents with the following JSON structure, ensuring compatibility with Flink 1.20.0’s configuration format and high-availability model:

{
  "name": "FLINK",
  "label": "Flink",
  "description": "Real-time stream processing engine",
  "version": "1.20.0",
  "sortNum": 6,
  "dependencies": [],
  "packageName": "flink-1.20.0.tar.gz",
  "decompressPackageName": "flink-1.20.0",
  "runAs": "root",
  "roles": [
    {
      "name": "FlinkClient",
      "label": "FlinkClient",
      "roleType": "client",
      "cardinality": "1+",
      "logFile": "logs/flink.log"
    }
  ],
  "configWriter": {
    "generators": [
      {
        "filename": "config.yaml",
        "configFormat": "custom",
        "templateName": "properties3.ftl",
        "outputDirectory": "conf",
        "includeParams": [
          "jobmanager.bind-host",
          "jobmanager.rpc.address",
          "jobmanager.rpc.port",
          "jobmanager.memory.process.size",
          "jobmanager.execution.failover-strategy",
          "taskmanager.bind-host",
          "taskmanager.host",
          "taskmanager.numberOfTaskSlots",
          "taskmanager.memory.process.size",
          "parallelism.default",
          "enableJMHA",
          "high-availability.type",
          "high-availability.storageDir",
          "high-availability.zookeeper.quorum",
          "high-availability.zookeeper.path.root",
          "high-availability.zookeeper.client.acl",
          "execution.checkpointing.interval",
          "execution.checkpointing.mode",
          "state.backend",
          "state.checkpoints.dir",
          "state.savepoints.dir",
          "custom.flink.conf.yaml",
          "classloader.check-leaked-classloader"
        ]
      },
      {
        "filename": "masters",
        "configFormat": "custom",
        "templateName": "flink_masters.ftl",
        "outputDirectory": "conf",
        "includeParams": ["flink.master.hostnames"]
      },
      {
        "filename": "workers",
        "configFormat": "custom",
        "templateName": "flink_workers.ftl",
        "outputDirectory": "conf",
        "includeParams": ["flink.worker.hostnames"]
      }
    ]
  },
  "parameters": [
    {
      "name": "jobmanager.bind-host",
      "label": "JobManager Bind Host",
      "description": "Network interface for JobManager to bind to",
      "required": true,
      "type": "input",
      "value": "0.0.0.0",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "0.0.0.0"
    },
    {
      "name": "jobmanager.rpc.address",
      "label": "JobManager RPC Address",
      "description": "Hostname or IP for JobManager RPC communication",
      "required": true,
      "type": "input",
      "value": "bigdata1",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "bigdata1"
    },
    {
      "name": "jobmanager.rpc.port",
      "label": "JobManager RPC Port",
      "description": "Port used for JobManager RPC calls",
      "required": true,
      "type": "input",
      "value": "6123",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "6123"
    },
    {
      "name": "jobmanager.memory.process.size",
      "label": "JobManager Process Memory",
      "description": "Total memory allocated to the JobManager process",
      "required": true,
      "type": "input",
      "value": "1600m",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "1600m"
    },
    {
      "name": "jobmanager.execution.failover-strategy",
      "label": "Failover Strategy",
      "description": "Strategy for job recovery on failure",
      "required": true,
      "type": "input",
      "value": "region",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "region"
    },
    {
      "name": "taskmanager.bind-host",
      "label": "TaskManager Bind Host",
      "description": "Network interface for TaskManager to bind to",
      "required": true,
      "type": "input",
      "value": "0.0.0.0",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "0.0.0.0"
    },
    {
      "name": "taskmanager.host",
      "label": "TaskManager Host",
      "description": "Hostname or IP of the machine running TaskManager",
      "required": true,
      "type": "input",
      "value": "bigdata1",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "bigdata1"
    },
    {
      "name": "taskmanager.numberOfTaskSlots",
      "label": "Task Slots per TaskManager",
      "description": "Number of parallel task slots per TaskManager",
      "required": true,
      "type": "input",
      "value": "1",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "1"
    },
    {
      "name": "taskmanager.memory.process.size",
      "label": "TaskManager Process Memory",
      "description": "Total memory allocated to each TaskManager process",
      "required": true,
      "type": "input",
      "value": "1024m",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "1024m"
    },
    {
      "name": "parallelism.default",
      "label": "Default Parallelism",
      "description": "Default number of parallel tasks for jobs",
      "required": true,
      "type": "input",
      "value": "1",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "1"
    },
    {
      "name": "enableJMHA",
      "label": "Enable JobManager High Availability",
      "description": "Enable HA mode using ZooKeeper",
      "required": true,
      "type": "switch",
      "value": false,
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": false
    },
    {
      "name": "high-availability.type",
      "label": "HA Mode",
      "description": "Use ZooKeeper for leader election",
      "required": true,
      "type": "input",
      "value": "zookeeper",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "zookeeper"
    },
    {
      "name": "high-availability.storageDir",
      "label": "HA Metadata Storage (HDFS)",
      "description": "HDFS path for storing JobManager recovery metadata",
      "required": true,
      "type": "input",
      "value": "hdfs://nameservice1/flink/ha/",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "hdfs://nameservice1/flink/ha/"
    },
    {
      "name": "high-availability.zookeeper.quorum",
      "label": "ZooKeeper Quorum",
      "description": "Comma-separated list of ZooKeeper server addresses",
      "required": true,
      "type": "input",
      "value": "${zkUrls}",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": ""
    },
    {
      "name": "high-availability.zookeeper.path.root",
      "label": "ZooKeeper Root Path",
      "description": "Root znode for Flink HA metadata",
      "required": true,
      "type": "input",
      "value": "/flink",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "/flink"
    },
    {
      "name": "high-availability.zookeeper.client.acl",
      "label": "ZooKeeper ACL Mode",
      "description": "Set to 'creator' if ZooKeeper ACL is enabled",
      "required": true,
      "type": "input",
      "value": "open",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "open"
    },
    {
      "name": "execution.checkpointing.interval",
      "label": "Checkpoint Interval",
      "description": "Time between automatic checkpoints",
      "required": true,
      "type": "input",
      "value": "3min",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "3min"
    },
    {
      "name": "execution.checkpointing.mode",
      "label": "Checkpoint Mode",
      "description": "Guarantee level for checkpointing (EXACTLY_ONCE or AT_LEAST_ONCE)",
      "required": true,
      "type": "input",
      "value": "EXACTLY_ONCE",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "EXACTLY_ONCE"
    },
    {
      "name": "state.backend",
      "label": "State Backend",
      "description": "Storage backend for operator state (filesystem or rocksdb)",
      "required": true,
      "type": "input",
      "value": "filesystem",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "filesystem"
    },
    {
      "name": "state.checkpoints.dir",
      "label": "Checkpoint Storage Directory",
      "description": "HDFS path for persistent checkpoint data",
      "required": true,
      "type": "input",
      "value": "hdfs://nameservice1/flink/flink-checkpoints",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "hdfs://nameservice1/flink/flink-checkpoints"
    },
    {
      "name": "state.savepoints.dir",
      "label": "Savepoint Storage Directory",
      "description": "HDFS path for user-triggered savepoints",
      "required": true,
      "type": "input",
      "value": "hdfs://nameservice1/flink/flink-savepoints",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "hdfs://nameservice1/flink/flink-savepoints"
    },
    {
      "name": "custom.flink.conf.yaml",
      "label": "Custom Flink Configuration",
      "description": "Additional key-value pairs to inject into flink-conf.yaml",
      "configType": "custom",
      "required": false,
      "type": "multipleWithKey",
      "value": [],
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": ""
    },
    {
      "name": "classloader.check-leaked-classloader",
      "label": "Disable ClassLoader Leak Detection",
      "description": "Disable internal classloader leak checks",
      "required": true,
      "type": "switch",
      "value": false,
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": false
    },
    {
      "name": "flink.master.hostnames",
      "label": "JobManager Hosts and Ports",
      "description": "Comma-separated list of JobManager hosts and ports (e.g., bigdata1:8081,bigdata2:8081)",
      "required": true,
      "type": "input",
      "value": "bigdata1:8081,bigdata2:8081",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "localhost:8081"
    },
    {
      "name": "flink.worker.hostnames",
      "label": "TaskManager Hosts",
      "description": "Comma-separated list of TaskManager hostnames or IPs",
      "required": true,
      "type": "input",
      "value": "bigdata1,bigdata2,bigdata3",
      "configurableInWizard": true,
      "hidden": false,
      "defaultValue": "localhost"
    }
  ]
}

Configuring Template Files

Update the main Flink configuration template to properly render nested YAML structures:

vi /opt/datasophon/datasophon-worker/conf/templates/flink-conf.ftl

Replace its content with:

<#-- JobManager Configuration -->
jobmanager:
  bind-host: ${parameters.jobmanager.bind-host!"0.0.0.0"}
  rpc:
    address: ${parameters.jobmanager.rpc.address!"localhost"}
    port: ${parameters.jobmanager.rpc.port!6123}
  memory:
    process:
      size: ${parameters.jobmanager.memory.process.size!"1600m"}
  execution:
    failover-strategy: ${parameters.jobmanager.execution.failover-strategy!"region"}

<#-- TaskManager Configuration -->
taskmanager:
  bind-host: ${parameters.taskmanager.bind-host!"0.0.0.0"}
  host: ${parameters.taskmanager.host!"localhost"}
  numberOfTaskSlots: ${parameters.taskmanager.numberOfTaskSlots!1}
  memory:
    process:
      size: ${parameters.taskmanager.memory.process.size!"1024m"}

<#-- Default Parameters -->
parallelism.default: ${parameters.parallelism.default!"1"}
execution.checkpointing.interval: ${parameters.execution.checkpointing.interval!"3min"}
execution.checkpointing.mode: ${parameters.execution.checkpointing.mode!"EXACTLY_ONCE"}
state.backend: ${parameters.state.backend!"filesystem"}
state.checkpoints.dir: ${parameters.state.checkpoints.dir!"hdfs:///flink-checkpoints"}
state.savepoints.dir: ${parameters.state.savepoints.dir!"hdfs:///flink-savepoints"}

<#-- HA Configuration -->
high-availability.type: ${parameters."high-availability.type"!"zookeeper"}
high-availability.storageDir: ${parameters."high-availability.storageDir"!"hdfs:///flink/ha"}
high-availability.zookeeper.quorum: ${parameters."high-availability.zookeeper.quorum"!"localhost:2181"}
high-availability.zookeeper.path.root: ${parameters."high-availability.zookeeper.path.root"!"/flink"}
high-availability.zookeeper.client.acl: ${parameters."high-availability.zookeeper.client.acl"!"open"}

<#-- Optional ClassLoader Check -->
classloader.check-leaked-classloader: ${parameters."classloader.check-leaked-classloader"!false}

<#-- Custom Configuration Injection -->
<#if parameters.custom.flink.conf.yaml?has_content>
  <#assign customConfig = parameters.custom.flink.conf.yaml>
  <#list customConfig?keys as key>
    ${key}: ${customConfig[key]}
  </#list>
</#if>

Create the master node list template:

vi /opt/datasophon/datasophon-worker/conf/templates/flink_masters.ftl

Content:

<#list itemList as item>
  <#list item.value?split(",") as host>
    ${host?trim}
  </#list>
</#list>

Create the worker node list template:

vi /opt/datasophon/datasophon-worker/conf/templates/flink_workers.ftl

Content:

<#list itemList as item>
  <#list item.value?split(",") as host>
    ${host?trim}
  </#list>
</#list>

Setting Environment Variables

Udpate the DataSophon environment script on all nodes:

vim /etc/profile.d/datasophon-env.sh

Add or update the folowing lines:

export FLINK_HOME=/opt/datasophon/flink-1.20.0
export HADOOP_CLASSPATH=$(hadoop classpath)

Apply the changes:

source /etc/profile.d/datasophon-env.sh

Restarting Services

Restart the DataSophon worker service on all nodes:

sh /opt/datasophon/datasophon-worker/bin/datasophon-worker.sh restart worker

Restart the DataSophon API service on the manager node:

sh /opt/datasophon/datasophon-manager-1.2.1/bin/datasophon-api.sh restart api

Validation and Testing

Verify Flink 1.20.0 is properly integrated by submittting a sample job in yarn-per-job mode:

flink run -d -t yarn-per-job $FLINK_HOME/examples/streaming/WordCount.jar

Test application mode with a streaming job:

flink run-application -t yarn-application $FLINK_HOME/examples/streaming/TopSpeedWindowing.jar

Check the Flink Web UI and YARN logs to confirm successful deployment and execution.

Tags: Flink DataSophon YARN ZooKeeper HDFS

Posted on Mon, 22 Jun 2026 18:05:48 +0000 by nyy2000