Developing Custom Dynamic Table Connectors for Apache Flink

Introduction to Custom Table Connectors

When working with Apache Flink's Table API and SQL, users often interact with data through predefined connectors. While Flink provides a rich set of built-in connectors capable of handling a wide array of data sources and sinks, specific scenarios frequently arise where these standard implementations fall short. The necessity for custom connectors emerges in situations such as:

  1. Integrating with unique data formats (e.g., parsing proprietary log files, custom binary formats).
  2. Implementing specific pre-processing or post-processing logic during data ingestion or emission (e.g., data cleansing before writing, schema transformation, or performing DDL operations like TRUNCATE before an insert).
  3. Connecting to specialized or enterprise-specific databases and data systems not supported out-of-the-box.

Flink's standard connectors cover many common systems, as shown below:

Name Version Source Sink
Filesystem Bounded and Unbounded Scan Streaming Sink, Batch Sink
Elasticsearch 6.x & 7.x Not supported Streaming Sink, Batch Sink
Opensearch 1.x & 2.x Not supported Streaming Sink, Batch Sink
Apache Kafka 0.10+ Unbounded Scan Streaming Sink, Batch Sink
Amazon DynamoDB Not supported Streaming Sink, Batch Sink
Amazon Kinesis Data Streams Unbounded Scan Streaming Sink
Amazon Kinesis Data Firehose Not supported Streaming Sink
JDBC Bounded Scan, Lookup Streaming Sink, Batch Sink
Apache HBase 1.4.x & 2.2.x Bounded Scan, Lookup Streaming Sink, Batch Sink
Apache Hive Supported Versions Unbounded Scan, Bounded Scan, Lookup Streaming Sink, Batch Sink
MongoDB 3.6.x & 4.x & 5.x & 6.0.x Bounded Scan, Lookup Streaming Sink, Batch Sink

Despite this comprehensive list, custom implementations are often the solution for niche requirements.

Core Components of a Custom Dynamic Table Connector

A custom Flink table connector generally requires implementing a factory that can create both a dynamic table source and a dynamic table sink. The primary interfaces for this are:

  • org.apache.flink.table.factories.DynamicTableSourceFactory: Responsible for creating a DynamicTableSource, which defines how Flink reads data from an external system. Its key method is createDynamicTableSource(Context context).
  • org.apache.flink.table.factories.DynamicTableSinkFactory: Responsible for creating a DynamicTableSink, which defines how Flink writes data to an external system. Its key method is createDynamicTableSink(Context context).

However, building a functional custom connector involves more than just these two factory interfaces. It entails a hierarchical structure of classes to handle configuration, logical planning, and runtime execution.

Illustrative Example: A Basic Custom JDBC Sink Connector

Let's consider creating a simplified custom JDBC sink connector that supports basic insert and row-level delete operations. This example highlights the typical class structure and interactions involved.

Key Classes and Their Functions

A custom JDBC connector, especially for a sink, will typically involve several components working together:

Class Primary Function Notes
CustomJdbcTableFactory Implements DynamicTableSourceFactory and DynamicTableSinkFactory. Acts as the entry point for Flink to discover and instantiate the connector. Parses table properties and creates the appropriate DynamicTableSource or DynamicTableSink.
CustomJdbcDynamicSink Implements DynamicTableSink and optionally SupportsRowLevelDelete. Describes the capabilities of the sink. Responsible for providing the runtime implementation (e.g., an OutputFormatProvider).
CustomJdbcSinkRuntimeProvider Implements OutputFormatProvider. Bridges the logical sink definition with the physical runtime execution. Supplies the OutputFormat that Flink tasks will use for writting data.
CustomJdbcOutputFormat Implements OutputFormat<RowData>. Handles the lifecycle of data writing for individual tasks. Manages connections, prepares statements, and orchestrates calls to the underlying data writer.
JdbcDataWriter Contains the core logic for database interaction (e.g., building SQL queries, parameter binding, executing statements). This component performs the actual INSERT or DELETE operations against the database, ideally with batching.

Implementation Details

1. The Connector Factory: CustomJdbcTableFactory

This factory class is the primary entry point for Flink. It defines the identifier for the connector and specifies the required and optional configuration options that users can provide in their DDL statements (e.g., CREATE TABLE ... WITH ('connector' = 'cust-jdbc', ...)).

For a custom sink, it parses these options and instantiates CustomJdbcDynamicSink.


import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;

import java.util.HashSet;
import java.util.Set;
import java.util.Collections;

public class CustomJdbcTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {

    public static final String IDENTIFIER = "custom-jdbc";

    private static final ConfigOption<String> DRIVER_CLASS = ConfigOptions.key("driver-class")
            .stringType().noDefaultValue().withDescription("JDBC driver class.");
    private static final ConfigOption<String> URL = ConfigOptions.key("url")
            .stringType().noDefaultValue().withDescription("JDBC URL.");
    private static final ConfigOption<String> TABLE_NAME = ConfigOptions.key("table-name")
            .stringType().noDefaultValue().withDescription("Target database table name.");
    private static final ConfigOption<String> USERNAME = ConfigOptions.key("username")
            .stringType().noDefaultValue().withDescription("Database username.");
    private static final ConfigOption<String> PASSWORD = ConfigOptions.key("password")
            .stringType().noDefaultValue().withDescription("Database password.");
    private static final ConfigOption<String> PRIMARY_KEY_COL = ConfigOptions.key("primary-key-column")
            .stringType().noDefaultValue().withDescription("Column used as primary key for deletes.");
    private static final ConfigOption<Integer> BATCH_SIZE = ConfigOptions.key("sink.batch-size")
            .intType().defaultValue(200).withDescription("Batch size for JDBC sink operations.");
    private static final ConfigOption<Integer> MAX_RETRIES = ConfigOptions.key("sink.max-retries")
            .intType().defaultValue(3).withDescription("Max retries for JDBC sink operations.");

    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(URL);
        options.add(TABLE_NAME);
        options.add(DRIVER_CLASS);
        options.add(PRIMARY_KEY_COL); // Required for row-level deletes
        return options;
    }

    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(USERNAME);
        options.add(PASSWORD);
        options.add(BATCH_SIZE);
        options.add(MAX_RETRIES);
        return options;
    }

    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        ReadableConfig config = helper.getOptions();
        helper.validate();

        // Extract connection options
        JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions(
                config.get(DRIVER_CLASS),
                config.get(URL),
                config.get(TABLE_NAME),
                config.getOptional(USERNAME).orElse(null),
                config.getOptional(PASSWORD).orElse(null),
                config.get(PRIMARY_KEY_COL)
        );

        // Extract sink-specific options
        JdbcSinkOptions sinkOptions = new JdbcSinkOptions(
                config.get(BATCH_SIZE),
                config.get(MAX_RETRIES)
        );

        DataType physicalDataType = context.getPhysicalRowDataType();

        return new CustomJdbcDynamicSink(connectionOptions, sinkOptions, physicalDataType);
    }

    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        // For brevity, we'll assume a custom source implementation would mirror
        // the complexity of Flink's built-in JDBC source factory, handling
        // connection options, read options, lookup options, etc.
        // A full implementation would parse options and return a CustomJdbcDynamicSource instance.
        throw new UnsupportedOperationException("Custom JDBC Table Source not implemented for this example.");
    }
}

The JdbcConnectionOptions and JdbcSinkOptions are simple data classes to hold the parsed configurations:


// JdbcConnectionOptions.java
import java.io.Serializable;
import java.util.Objects;

public class JdbcConnectionOptions implements Serializable {
    private static final long serialVersionUID = 1L;
    private final String driverClass;
    private final String dbURL;
    private final String tableName;
    private final String username;
    private final String password;
    private final String primaryKeyColumn;

    public JdbcConnectionOptions(String driverClass, String dbURL, String tableName, String username, String password, String primaryKeyColumn) {
        this.driverClass = driverClass;
        this.dbURL = dbURL;
        this.tableName = tableName;
        this.username = username;
        this.password = password;
        this.primaryKeyColumn = primaryKeyColumn;
    }

    public String getDriverClass() { return driverClass; }
    public String getDbURL() { return dbURL; }
    public String getTableName() { return tableName; }
    public String getUsername() { return username; }
    public String getPassword() { return password; }
    public String getPrimaryKeyColumn() { return primaryKeyColumn; }

    // Equals and hashCode methods omitted for brevity
}

// JdbcSinkOptions.java
import java.io.Serializable;

public class JdbcSinkOptions implements Serializable {
    private static final long serialVersionUID = 1L;
    private final int batchSize;
    private final int maxRetries;

    public JdbcSinkOptions(int batchSize, int maxRetries) {
        this.batchSize = batchSize;
        this.maxRetries = maxRetries;
    }

    public int getBatchSize() { return batchSize; }
    public int getMaxRetries() { return maxRetries; }
}

2. The Dynamic Table Sink: CustomJdbcDynamicSink

This class bridges the logical plan with the physical execution. It holds the parsed options and, crucially, provides the runtime provider which returns the actual OutputFormat.


import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;

import java.util.Collections;
import java.util.List;

public class CustomJdbcDynamicSink implements DynamicTableSink, SupportsRowLevelDelete {

    private final JdbcConnectionOptions connectionOptions;
    private final JdbcSinkOptions sinkOptions;
    private final DataType physicalDataType;

    public CustomJdbcDynamicSink(
            JdbcConnectionOptions connectionOptions,
            JdbcSinkOptions sinkOptions,
            DataType physicalDataType) {
        this.connectionOptions = connectionOptions;
        this.sinkOptions = sinkOptions;
        this.physicalDataType = physicalDataType;
    }

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        // This sink supports inserts, updates (handled as delete+insert), and deletes
        // We accept any requested changelog mode.
        return requestedMode;
    }

    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        // Return a SinkFunctionProvider which creates our CustomJdbcSinkFunction
        return new SinkFunctionProvider(
                new CustomJdbcSinkFunction(connectionOptions, sinkOptions, physicalDataType));
    }

    @Override
    public DynamicTableSink copy() {
        return new CustomJdbcDynamicSink(connectionOptions, sinkOptions, physicalDataType);
    }

    @Override
    public String as  SummaryString() {
        return "CustomJdbcSink";
    }

    @Override
    public List<LogicalType> get==DeletedPkColumns(List<LogicalType> tableSchema) {
        // Indicate which column(s) are used for primary key matching for deletes
        // For simplicity, we assume the primary key is a single column determined by configuration.
        // In a real scenario, you'd map connectionOptions.getPrimaryKeyColumn() to its LogicalType
        // from tableSchema. For this example, we return an empty list or specific type.
        // This is a placeholder; real implementation needs to find the actual LogicalType by name.
        return Collections.emptyList(); // Or a list containing the LogicalType of the primary key column
    }
}

Note: The getDeletedPkColumns method in a real implementation would correctly identify and return the LogicalType of the primary key column based on the schema and the primaryKeyColumn option. For brevity, it's simplified here.

3. The Runtime Implementation: CustomJdbcSinkFunction and JdbcDataWriter

Instead of the older OutputFormat directly, modern Flink often uses SinkFunction or newer SinkV2 API. For this example, we'll use a SinkFunction which internally uses our JdbcDataWriter for batching efficiency.


import org.apache.flink.api.common.functions.RichSinkFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.table.data.RowData.FieldGetter;
import static org.apache.flink.table.types.utils.DataTypeUtils.getFieldNames;
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;

public class CustomJdbcSinkFunction extends RichSinkFunction<RowData> {

    private static final Logger LOG = LoggerFactory.getLogger(CustomJdbcSinkFunction.class);
    private final JdbcConnectionOptions connectionOptions;
    private final JdbcSinkOptions sinkOptions;
    private final DataType physicalDataType;
    private final RowType rowType; // Logical row type for schema information

    private transient Connection connection;
    private transient JdbcDataWriter dataWriter;
    private transient List<RowData> batchBuffer;
    private transient ScheduledExecutorService scheduler;

    public CustomJdbcSinkFunction(
            JdbcConnectionOptions connectionOptions,
            JdbcSinkOptions sinkOptions,
            DataType physicalDataType) {
        this.connectionOptions = connectionOptions;
        this.sinkOptions = sinkOptions;
        this.physicalDataType = physicalDataType;
        this.rowType = (RowType) physicalDataType.getLogicalType();
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        LOG.info("Opening CustomJdbcSinkFunction for Flink task {}-{}", getRuntimeContext().getTaskNameWith=""Subtasks(), getRuntimeContext().getIndexOfThisSubtask());

        try {
            // Establish connection once per task
            this.connection = DriverManager.getConnection(
                    connectionOptions.getDbURL(),
                    connectionOptions.getUsername(),
                    connectionOptions.getPassword()
            );
            this.connection.setAutoCommit(false); // Enable manual transaction control

            // Initialize data writer with connection and schema info
            this.dataWriter = new JdbcDataWriter(connection, connectionOptions, rowType);
            this.batchBuffer = new ArrayList<>(sinkOptions.getBatchSize());

            // Schedule periodic flush if needed (for small batches or long running jobs)
            if (sinkOptions.getBatchSize() > 1) { // Only schedule if batching is enabled
                 scheduler = Executors.newSingleThreadScheduledExecutor();
                 scheduler.scheduleAtFixedRate(this::flush, 1, 1, TimeUnit.SECONDS); // Flush every 1 second
            }

        } catch (SQLException e) {
            LOG.error("Failed to open JDBC connection or initialize writer", e);
            throw new IOException("Failed to initialize JDBC sink.", e);
        }
    }

    @Override
    public void invoke(RowData row, Context context) throws Exception {
        batchBuffer.add(row);
        if (batchBuffer.size() >= sinkOptions.getBatchSize()) {
            flush();
        }
    }

    private void flush() {
        if (batchBuffer.isEmpty()) {
            return;
        }

        int attempt = 0;
        while (attempt <= sinkOptions.getMaxRetries()) {
            try {
                dataWriter.executeBatch(batchBuffer);
                connection.commit();
                batchBuffer.clear();
                LOG.debug("Successfully flushed batch of {} rows.", batchBuffer.size());
                return; // Batch committed successfully
            } catch (SQLException e) {
                LOG.warn("Failed to flush batch, attempt {}/{}. Retrying...", attempt + 1, sinkOptions.getMaxRetries(), e);
                try {
                    connection.rollback(); // Rollback on failure
                } catch (SQLException ex) {
                    LOG.error("Failed to rollback transaction", ex);
                }
                attempt++;
                if (attempt > sinkOptions.getMaxRetries()) {
                    LOG.error("Exceeded max retries ({}) for batch. Failing task.", sinkOptions.getMaxRetries(), e);
                    throw new RuntimeException("Failed to flush batch after multiple retries.", e);
                }
                // Optional: add a short delay before retrying
                try {
                    Thread.sleep(1000L * attempt); // exponential backoff
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("Interrupted during retry backoff.", ie);
                }
            }
        }
    }

    @Override
    public void close() throws Exception {
        LOG.info("Closing CustomJdbcSinkFunction for Flink task {}-{}", getRuntimeContext().getTaskNameWithSubtasks(), getRuntimeContext().getIndexOfThisSubtask());
        // Flush any remaining data
        try {
            flush();
        } finally {
            if (scheduler != null) {
                scheduler.shutdown();
                try {
                    if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                        scheduler.shutdownNow();
                    }
                } catch (InterruptedException e) {
                    scheduler.shutdownNow();
                    Thread.currentThread().interrupt();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    LOG.error("Failed to close JDBC connection", e);
                }
            }
        }
        super.close();
    }
}

The JdbcDataWriter encapsulates the SQL generation and batch execution logic. This is where the actual database interaction takes place, optimized for performance.


import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.math.BigDecimal;
import java.util.List;
import java.util.stream.Collectors;

public class JdbcDataWriter implements Serializable {

    private static final Logger LOG = LoggerFactory.getLogger(JdbcDataWriter.class);
    private final JdbcConnectionOptions connectionOptions;
    private final RowType rowType; // Flink's Logical RowType
    private final String[] fieldNames; // Extracted field names for SQL

    private transient Connection connection; // Managed by the sink function
    private transient PreparedStatement insertStatement;
    private transient PreparedStatement deleteStatement;

    public JdbcDataWriter(Connection connection, JdbcConnectionOptions options, RowType rowType) throws SQLException {
        this.connection = connection;
        this.connectionOptions = options;
        this.rowType = rowType;
        this.fieldNames = rowType.getFieldNames().toArray(new String[0]);
        prepareStatements();
    }

    private void prepareStatements() throws SQLException {
        String insertSql = buildInsertSQL();
        String deleteSql = buildDeleteSQL();
        LOG.debug("Prepared INSERT SQL: {}", insertSql);
        LOG.debug("Prepared DELETE SQL: {}", deleteSql);

        this.insertStatement = connection.prepareStatement(insertSql);
        this.deleteStatement = connection.prepareStatement(deleteSql);
    }

    private String buildInsertSQL() {
        String columns = String.join(", ", fieldNames);
        String placeholders = String.join(", ", java.util.Collections.nCopies(fieldNames.length, "?"));
        return String.format("INSERT INTO %s (%s) VALUES (%s)",
                connectionOptions.getTableName(), columns, placeholders);
    }

    private String buildDeleteSQL() {
        return String.format("DELETE FROM %s WHERE %s = ?",
                connectionOptions.getTableName(), connectionOptions.getPrimaryKeyColumn());
    }

    public void executeBatch(List<RowData> rows) throws SQLException {
        for (RowData row : rows) {
            switch (row.getRowKind()) {
                case INSERT:
                case UPDATE_AFTER:
                    bindParameters(insertStatement, row);
                    insertStatement.addBatch();
                    break;
                case UPDATE_BEFORE: // Flink sends UPDATE_BEFORE + UPDATE_AFTER for updates
                case DELETE:
                    // Assuming primary key is at index 0 for simplicity, needs to be dynamic
                    int pkIndex = rowType.getFieldIndex(connectionOptions.getPrimaryKeyColumn());
                    if (pkIndex == -1) {
                         throw new IllegalArgumentException("Primary key column '" + connectionOptions.getPrimaryKeyColumn() + "' not found in schema.");
                    }
                    bindPrimaryKeyParameter(deleteStatement, row, pkIndex);
                    deleteStatement.addBatch();
                    break;
                default:
                    LOG.warn("Unsupported RowKind: {}. Skipping row.", row.getRowKind());
                    break;
            }
        }
        executeBatchStatements();
    }

    private void executeBatchStatements() throws SQLException {
        if (insertStatement != null) {
            insertStatement.executeBatch();
        }
        if (deleteStatement != null) {
            deleteStatement.executeBatch();
        }
    }

    private void bindParameters(PreparedStatement stmt, RowData row) throws SQLException {
        for (int i = 0; i < fieldNames.length; i++) {
            bindField(stmt, i + 1, row, i, rowType.getTypeAt(i));
        }
    }

    private void bindPrimaryKeyParameter(PreparedStatement stmt, RowData row, int pkIndex) throws SQLException {
        bindField(stmt, 1, row, pkIndex, rowType.getTypeAt(pkIndex));
    }

    private void bindField(PreparedStatement stmt, int parameterIndex, RowData row, int fieldIndex, LogicalType fieldType) throws SQLException {
        if (row.isNullAt(fieldIndex)) {
            stmt.setNull(parameterIndex, getSqlType(fieldType));
            return;
        }

        switch (fieldType.getTypeRoot()) {
            case BOOLEAN:
                stmt.setBoolean(parameterIndex, row.getBoolean(fieldIndex));
                break;
            case TINYINT:
                stmt.setByte(parameterIndex, row.getByte(fieldIndex));
                break;
            case SMALLINT:
                stmt.setShort(parameterIndex, row.getShort(fieldIndex));
                break;
            case INTEGER:
                stmt.setInt(parameterIndex, row.getInt(fieldIndex));
                break;
            case BIGINT:
                stmt.setLong(parameterIndex, row.getLong(fieldIndex));
                break;
            case FLOAT:
                stmt.setFloat(parameterIndex, row.getFloat(fieldIndex));
                break;
            case DOUBLE:
                stmt.setDouble(parameterIndex, row.getDouble(fieldIndex));
                break;
            case CHAR:
            case VARCHAR:
                StringData strData = row.getString(fieldIndex);
                stmt.setString(parameterIndex, strData.toString());
                break;
            case BINARY:
            case VARBINARY:
                stmt.setBytes(parameterIndex, row.getBinary(fieldIndex));
                break;
            case DECIMAL:
                DecimalType decimalType = (DecimalType) fieldType;
                BigDecimal decimalValue = row.getDecimal(fieldIndex, decimalType.getPrecision(), decimalType.getScale()).toBigDecimal();
                stmt.setBigDecimal(parameterIndex, decimalValue);
                break;
            case TIMESTAMP_WITHOUT_TIME_ZONE:
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                TimestampType tsType = (TimestampType) fieldType;
                TimestampData tsData = row.getTimestamp(fieldIndex, tsType.getPrecision());
                stmt.setTimestamp(parameterIndex, tsData.toTimestamp());
                break;
            // Add other type mappings as needed
            default:
                // Fallback for unsupported types, may require custom serialization
                LOG.warn("Unsupported Flink LogicalTypeRoot: {}. Attempting to set as Object.", fieldType.getTypeRoot());
                Object genericValue = RowData.createFieldGetter(fieldType, fieldIndex).getFieldOrNull(row);
                stmt.setObject(parameterIndex, genericValue);
                break;
        }
    }

    // Helper to get JDBC SQL Type for setNull (simplified, real impl uses Dialect)
    private int getSqlType(LogicalType fieldType) {
        switch (fieldType.getTypeRoot()) {
            case BOOLEAN: return java.sql.Types.BOOLEAN;
            case TINYINT: return java.sql.Types.TINYINT;
            case SMALLINT: return java.sql.Types.SMALLINT;
            case INTEGER: return java.sql.Types.INTEGER;
            case BIGINT: return java.sql.Types.BIGINT;
            case FLOAT: return java.sql.Types.REAL;
            case DOUBLE: return java.sql.Types.DOUBLE;
            case CHAR:
            case VARCHAR: return java.sql.Types.VARCHAR;
            case BINARY:
            case VARBINARY: return java.sql.Types.VARBINARY;
            case DECIMAL: return java.sql.Types.DECIMAL;
            case TIMESTAMP_WITHOUT_TIME_ZONE:
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return java.sql.Types.TIMESTAMP;
            default: return java.sql.Types.OTHER;
        }
    }
}

This rewritten JdbcDataWriter includes several improvements over the original CustomJdbcOutputOperation:

  • **Connection Management:** The Connection is passed in, implying it's managed by the calling SinkFunction (opened once per task, closed once per task), avoiding per-row connection overhead.
  • **Prepared Statements:** SQL statements are prepared once upon initialization, significantly reducing overhead for repeated operations.
  • **Batching:** The executeBatch method accumulates rows and then executes them in a single batch operation for both insserts and deletes, which is crucial for performance.
  • **RowKind Handling:** Differentiates between INSERT/UPDATE_AFTER and DELETE/UPDATE_BEFORE to add to the respective batch.
  • **Type Binding:** More robust parameter binding based on Flink's LogicalType system, handling various data types.
  • **Primary Key Identification:** The primary key column is configurable, and its index is dynamically looked up in the schema.

Conclusion

Developing custom table connectors in Apache Flink, particularly for recent versions, involves a structured approach beyond simple interface implementations. The shift towards explicit factory patterns and structured runtime providers, rather than older lambda-style or simple OutputFormat implementations, highlights Flink's emphasis on robustness, debuggability, and performance optimization. While requiring more boilerplate code, this framework allows developers to integrate Flink seamlessly with diverse data ecosystems and implement highly tailored data processing logic.

Tags: apache-flink flink-table-api custom-connector dynamic-table-sink JDBC

Posted on Wed, 17 Jun 2026 16:50:09 +0000 by dsinghldh