Introduction to Custom Table Connectors
When working with Apache Flink's Table API and SQL, users often interact with data through predefined connectors. While Flink provides a rich set of built-in connectors capable of handling a wide array of data sources and sinks, specific scenarios frequently arise where these standard implementations fall short. The necessity for custom connectors emerges in situations such as:
- Integrating with unique data formats (e.g., parsing proprietary log files, custom binary formats).
- Implementing specific pre-processing or post-processing logic during data ingestion or emission (e.g., data cleansing before writing, schema transformation, or performing DDL operations like
TRUNCATEbefore an insert). - Connecting to specialized or enterprise-specific databases and data systems not supported out-of-the-box.
Flink's standard connectors cover many common systems, as shown below:
| Name | Version | Source | Sink |
|---|---|---|---|
| Filesystem | Bounded and Unbounded Scan | Streaming Sink, Batch Sink | |
| Elasticsearch | 6.x & 7.x | Not supported | Streaming Sink, Batch Sink |
| Opensearch | 1.x & 2.x | Not supported | Streaming Sink, Batch Sink |
| Apache Kafka | 0.10+ | Unbounded Scan | Streaming Sink, Batch Sink |
| Amazon DynamoDB | Not supported | Streaming Sink, Batch Sink | |
| Amazon Kinesis Data Streams | Unbounded Scan | Streaming Sink | |
| Amazon Kinesis Data Firehose | Not supported | Streaming Sink | |
| JDBC | Bounded Scan, Lookup | Streaming Sink, Batch Sink | |
| Apache HBase | 1.4.x & 2.2.x | Bounded Scan, Lookup | Streaming Sink, Batch Sink |
| Apache Hive | Supported Versions | Unbounded Scan, Bounded Scan, Lookup | Streaming Sink, Batch Sink |
| MongoDB | 3.6.x & 4.x & 5.x & 6.0.x | Bounded Scan, Lookup | Streaming Sink, Batch Sink |
Despite this comprehensive list, custom implementations are often the solution for niche requirements.
Core Components of a Custom Dynamic Table Connector
A custom Flink table connector generally requires implementing a factory that can create both a dynamic table source and a dynamic table sink. The primary interfaces for this are:
org.apache.flink.table.factories.DynamicTableSourceFactory: Responsible for creating aDynamicTableSource, which defines how Flink reads data from an external system. Its key method iscreateDynamicTableSource(Context context).org.apache.flink.table.factories.DynamicTableSinkFactory: Responsible for creating aDynamicTableSink, which defines how Flink writes data to an external system. Its key method iscreateDynamicTableSink(Context context).
However, building a functional custom connector involves more than just these two factory interfaces. It entails a hierarchical structure of classes to handle configuration, logical planning, and runtime execution.
Illustrative Example: A Basic Custom JDBC Sink Connector
Let's consider creating a simplified custom JDBC sink connector that supports basic insert and row-level delete operations. This example highlights the typical class structure and interactions involved.
Key Classes and Their Functions
A custom JDBC connector, especially for a sink, will typically involve several components working together:
| Class | Primary Function | Notes |
|---|---|---|
CustomJdbcTableFactory |
Implements DynamicTableSourceFactory and DynamicTableSinkFactory. Acts as the entry point for Flink to discover and instantiate the connector. |
Parses table properties and creates the appropriate DynamicTableSource or DynamicTableSink. |
CustomJdbcDynamicSink |
Implements DynamicTableSink and optionally SupportsRowLevelDelete. Describes the capabilities of the sink. |
Responsible for providing the runtime implementation (e.g., an OutputFormatProvider). |
CustomJdbcSinkRuntimeProvider |
Implements OutputFormatProvider. Bridges the logical sink definition with the physical runtime execution. |
Supplies the OutputFormat that Flink tasks will use for writting data. |
CustomJdbcOutputFormat |
Implements OutputFormat<RowData>. Handles the lifecycle of data writing for individual tasks. |
Manages connections, prepares statements, and orchestrates calls to the underlying data writer. |
JdbcDataWriter |
Contains the core logic for database interaction (e.g., building SQL queries, parameter binding, executing statements). | This component performs the actual INSERT or DELETE operations against the database, ideally with batching. |
Implementation Details
1. The Connector Factory: CustomJdbcTableFactory
This factory class is the primary entry point for Flink. It defines the identifier for the connector and specifies the required and optional configuration options that users can provide in their DDL statements (e.g., CREATE TABLE ... WITH ('connector' = 'cust-jdbc', ...)).
For a custom sink, it parses these options and instantiates CustomJdbcDynamicSink.
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import java.util.HashSet;
import java.util.Set;
import java.util.Collections;
public class CustomJdbcTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {
public static final String IDENTIFIER = "custom-jdbc";
private static final ConfigOption<String> DRIVER_CLASS = ConfigOptions.key("driver-class")
.stringType().noDefaultValue().withDescription("JDBC driver class.");
private static final ConfigOption<String> URL = ConfigOptions.key("url")
.stringType().noDefaultValue().withDescription("JDBC URL.");
private static final ConfigOption<String> TABLE_NAME = ConfigOptions.key("table-name")
.stringType().noDefaultValue().withDescription("Target database table name.");
private static final ConfigOption<String> USERNAME = ConfigOptions.key("username")
.stringType().noDefaultValue().withDescription("Database username.");
private static final ConfigOption<String> PASSWORD = ConfigOptions.key("password")
.stringType().noDefaultValue().withDescription("Database password.");
private static final ConfigOption<String> PRIMARY_KEY_COL = ConfigOptions.key("primary-key-column")
.stringType().noDefaultValue().withDescription("Column used as primary key for deletes.");
private static final ConfigOption<Integer> BATCH_SIZE = ConfigOptions.key("sink.batch-size")
.intType().defaultValue(200).withDescription("Batch size for JDBC sink operations.");
private static final ConfigOption<Integer> MAX_RETRIES = ConfigOptions.key("sink.max-retries")
.intType().defaultValue(3).withDescription("Max retries for JDBC sink operations.");
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(URL);
options.add(TABLE_NAME);
options.add(DRIVER_CLASS);
options.add(PRIMARY_KEY_COL); // Required for row-level deletes
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(USERNAME);
options.add(PASSWORD);
options.add(BATCH_SIZE);
options.add(MAX_RETRIES);
return options;
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
ReadableConfig config = helper.getOptions();
helper.validate();
// Extract connection options
JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions(
config.get(DRIVER_CLASS),
config.get(URL),
config.get(TABLE_NAME),
config.getOptional(USERNAME).orElse(null),
config.getOptional(PASSWORD).orElse(null),
config.get(PRIMARY_KEY_COL)
);
// Extract sink-specific options
JdbcSinkOptions sinkOptions = new JdbcSinkOptions(
config.get(BATCH_SIZE),
config.get(MAX_RETRIES)
);
DataType physicalDataType = context.getPhysicalRowDataType();
return new CustomJdbcDynamicSink(connectionOptions, sinkOptions, physicalDataType);
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// For brevity, we'll assume a custom source implementation would mirror
// the complexity of Flink's built-in JDBC source factory, handling
// connection options, read options, lookup options, etc.
// A full implementation would parse options and return a CustomJdbcDynamicSource instance.
throw new UnsupportedOperationException("Custom JDBC Table Source not implemented for this example.");
}
}
The JdbcConnectionOptions and JdbcSinkOptions are simple data classes to hold the parsed configurations:
// JdbcConnectionOptions.java
import java.io.Serializable;
import java.util.Objects;
public class JdbcConnectionOptions implements Serializable {
private static final long serialVersionUID = 1L;
private final String driverClass;
private final String dbURL;
private final String tableName;
private final String username;
private final String password;
private final String primaryKeyColumn;
public JdbcConnectionOptions(String driverClass, String dbURL, String tableName, String username, String password, String primaryKeyColumn) {
this.driverClass = driverClass;
this.dbURL = dbURL;
this.tableName = tableName;
this.username = username;
this.password = password;
this.primaryKeyColumn = primaryKeyColumn;
}
public String getDriverClass() { return driverClass; }
public String getDbURL() { return dbURL; }
public String getTableName() { return tableName; }
public String getUsername() { return username; }
public String getPassword() { return password; }
public String getPrimaryKeyColumn() { return primaryKeyColumn; }
// Equals and hashCode methods omitted for brevity
}
// JdbcSinkOptions.java
import java.io.Serializable;
public class JdbcSinkOptions implements Serializable {
private static final long serialVersionUID = 1L;
private final int batchSize;
private final int maxRetries;
public JdbcSinkOptions(int batchSize, int maxRetries) {
this.batchSize = batchSize;
this.maxRetries = maxRetries;
}
public int getBatchSize() { return batchSize; }
public int getMaxRetries() { return maxRetries; }
}
2. The Dynamic Table Sink: CustomJdbcDynamicSink
This class bridges the logical plan with the physical execution. It holds the parsed options and, crucially, provides the runtime provider which returns the actual OutputFormat.
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.Collections;
import java.util.List;
public class CustomJdbcDynamicSink implements DynamicTableSink, SupportsRowLevelDelete {
private final JdbcConnectionOptions connectionOptions;
private final JdbcSinkOptions sinkOptions;
private final DataType physicalDataType;
public CustomJdbcDynamicSink(
JdbcConnectionOptions connectionOptions,
JdbcSinkOptions sinkOptions,
DataType physicalDataType) {
this.connectionOptions = connectionOptions;
this.sinkOptions = sinkOptions;
this.physicalDataType = physicalDataType;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
// This sink supports inserts, updates (handled as delete+insert), and deletes
// We accept any requested changelog mode.
return requestedMode;
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
// Return a SinkFunctionProvider which creates our CustomJdbcSinkFunction
return new SinkFunctionProvider(
new CustomJdbcSinkFunction(connectionOptions, sinkOptions, physicalDataType));
}
@Override
public DynamicTableSink copy() {
return new CustomJdbcDynamicSink(connectionOptions, sinkOptions, physicalDataType);
}
@Override
public String as SummaryString() {
return "CustomJdbcSink";
}
@Override
public List<LogicalType> get==DeletedPkColumns(List<LogicalType> tableSchema) {
// Indicate which column(s) are used for primary key matching for deletes
// For simplicity, we assume the primary key is a single column determined by configuration.
// In a real scenario, you'd map connectionOptions.getPrimaryKeyColumn() to its LogicalType
// from tableSchema. For this example, we return an empty list or specific type.
// This is a placeholder; real implementation needs to find the actual LogicalType by name.
return Collections.emptyList(); // Or a list containing the LogicalType of the primary key column
}
}
Note: The getDeletedPkColumns method in a real implementation would correctly identify and return the LogicalType of the primary key column based on the schema and the primaryKeyColumn option. For brevity, it's simplified here.
3. The Runtime Implementation: CustomJdbcSinkFunction and JdbcDataWriter
Instead of the older OutputFormat directly, modern Flink often uses SinkFunction or newer SinkV2 API. For this example, we'll use a SinkFunction which internally uses our JdbcDataWriter for batching efficiency.
import org.apache.flink.api.common.functions.RichSinkFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.table.data.RowData.FieldGetter;
import static org.apache.flink.table.types.utils.DataTypeUtils.getFieldNames;
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
public class CustomJdbcSinkFunction extends RichSinkFunction<RowData> {
private static final Logger LOG = LoggerFactory.getLogger(CustomJdbcSinkFunction.class);
private final JdbcConnectionOptions connectionOptions;
private final JdbcSinkOptions sinkOptions;
private final DataType physicalDataType;
private final RowType rowType; // Logical row type for schema information
private transient Connection connection;
private transient JdbcDataWriter dataWriter;
private transient List<RowData> batchBuffer;
private transient ScheduledExecutorService scheduler;
public CustomJdbcSinkFunction(
JdbcConnectionOptions connectionOptions,
JdbcSinkOptions sinkOptions,
DataType physicalDataType) {
this.connectionOptions = connectionOptions;
this.sinkOptions = sinkOptions;
this.physicalDataType = physicalDataType;
this.rowType = (RowType) physicalDataType.getLogicalType();
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
LOG.info("Opening CustomJdbcSinkFunction for Flink task {}-{}", getRuntimeContext().getTaskNameWith=""Subtasks(), getRuntimeContext().getIndexOfThisSubtask());
try {
// Establish connection once per task
this.connection = DriverManager.getConnection(
connectionOptions.getDbURL(),
connectionOptions.getUsername(),
connectionOptions.getPassword()
);
this.connection.setAutoCommit(false); // Enable manual transaction control
// Initialize data writer with connection and schema info
this.dataWriter = new JdbcDataWriter(connection, connectionOptions, rowType);
this.batchBuffer = new ArrayList<>(sinkOptions.getBatchSize());
// Schedule periodic flush if needed (for small batches or long running jobs)
if (sinkOptions.getBatchSize() > 1) { // Only schedule if batching is enabled
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::flush, 1, 1, TimeUnit.SECONDS); // Flush every 1 second
}
} catch (SQLException e) {
LOG.error("Failed to open JDBC connection or initialize writer", e);
throw new IOException("Failed to initialize JDBC sink.", e);
}
}
@Override
public void invoke(RowData row, Context context) throws Exception {
batchBuffer.add(row);
if (batchBuffer.size() >= sinkOptions.getBatchSize()) {
flush();
}
}
private void flush() {
if (batchBuffer.isEmpty()) {
return;
}
int attempt = 0;
while (attempt <= sinkOptions.getMaxRetries()) {
try {
dataWriter.executeBatch(batchBuffer);
connection.commit();
batchBuffer.clear();
LOG.debug("Successfully flushed batch of {} rows.", batchBuffer.size());
return; // Batch committed successfully
} catch (SQLException e) {
LOG.warn("Failed to flush batch, attempt {}/{}. Retrying...", attempt + 1, sinkOptions.getMaxRetries(), e);
try {
connection.rollback(); // Rollback on failure
} catch (SQLException ex) {
LOG.error("Failed to rollback transaction", ex);
}
attempt++;
if (attempt > sinkOptions.getMaxRetries()) {
LOG.error("Exceeded max retries ({}) for batch. Failing task.", sinkOptions.getMaxRetries(), e);
throw new RuntimeException("Failed to flush batch after multiple retries.", e);
}
// Optional: add a short delay before retrying
try {
Thread.sleep(1000L * attempt); // exponential backoff
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during retry backoff.", ie);
}
}
}
}
@Override
public void close() throws Exception {
LOG.info("Closing CustomJdbcSinkFunction for Flink task {}-{}", getRuntimeContext().getTaskNameWithSubtasks(), getRuntimeContext().getIndexOfThisSubtask());
// Flush any remaining data
try {
flush();
} finally {
if (scheduler != null) {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
LOG.error("Failed to close JDBC connection", e);
}
}
}
super.close();
}
}
The JdbcDataWriter encapsulates the SQL generation and batch execution logic. This is where the actual database interaction takes place, optimized for performance.
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.math.BigDecimal;
import java.util.List;
import java.util.stream.Collectors;
public class JdbcDataWriter implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(JdbcDataWriter.class);
private final JdbcConnectionOptions connectionOptions;
private final RowType rowType; // Flink's Logical RowType
private final String[] fieldNames; // Extracted field names for SQL
private transient Connection connection; // Managed by the sink function
private transient PreparedStatement insertStatement;
private transient PreparedStatement deleteStatement;
public JdbcDataWriter(Connection connection, JdbcConnectionOptions options, RowType rowType) throws SQLException {
this.connection = connection;
this.connectionOptions = options;
this.rowType = rowType;
this.fieldNames = rowType.getFieldNames().toArray(new String[0]);
prepareStatements();
}
private void prepareStatements() throws SQLException {
String insertSql = buildInsertSQL();
String deleteSql = buildDeleteSQL();
LOG.debug("Prepared INSERT SQL: {}", insertSql);
LOG.debug("Prepared DELETE SQL: {}", deleteSql);
this.insertStatement = connection.prepareStatement(insertSql);
this.deleteStatement = connection.prepareStatement(deleteSql);
}
private String buildInsertSQL() {
String columns = String.join(", ", fieldNames);
String placeholders = String.join(", ", java.util.Collections.nCopies(fieldNames.length, "?"));
return String.format("INSERT INTO %s (%s) VALUES (%s)",
connectionOptions.getTableName(), columns, placeholders);
}
private String buildDeleteSQL() {
return String.format("DELETE FROM %s WHERE %s = ?",
connectionOptions.getTableName(), connectionOptions.getPrimaryKeyColumn());
}
public void executeBatch(List<RowData> rows) throws SQLException {
for (RowData row : rows) {
switch (row.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
bindParameters(insertStatement, row);
insertStatement.addBatch();
break;
case UPDATE_BEFORE: // Flink sends UPDATE_BEFORE + UPDATE_AFTER for updates
case DELETE:
// Assuming primary key is at index 0 for simplicity, needs to be dynamic
int pkIndex = rowType.getFieldIndex(connectionOptions.getPrimaryKeyColumn());
if (pkIndex == -1) {
throw new IllegalArgumentException("Primary key column '" + connectionOptions.getPrimaryKeyColumn() + "' not found in schema.");
}
bindPrimaryKeyParameter(deleteStatement, row, pkIndex);
deleteStatement.addBatch();
break;
default:
LOG.warn("Unsupported RowKind: {}. Skipping row.", row.getRowKind());
break;
}
}
executeBatchStatements();
}
private void executeBatchStatements() throws SQLException {
if (insertStatement != null) {
insertStatement.executeBatch();
}
if (deleteStatement != null) {
deleteStatement.executeBatch();
}
}
private void bindParameters(PreparedStatement stmt, RowData row) throws SQLException {
for (int i = 0; i < fieldNames.length; i++) {
bindField(stmt, i + 1, row, i, rowType.getTypeAt(i));
}
}
private void bindPrimaryKeyParameter(PreparedStatement stmt, RowData row, int pkIndex) throws SQLException {
bindField(stmt, 1, row, pkIndex, rowType.getTypeAt(pkIndex));
}
private void bindField(PreparedStatement stmt, int parameterIndex, RowData row, int fieldIndex, LogicalType fieldType) throws SQLException {
if (row.isNullAt(fieldIndex)) {
stmt.setNull(parameterIndex, getSqlType(fieldType));
return;
}
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
stmt.setBoolean(parameterIndex, row.getBoolean(fieldIndex));
break;
case TINYINT:
stmt.setByte(parameterIndex, row.getByte(fieldIndex));
break;
case SMALLINT:
stmt.setShort(parameterIndex, row.getShort(fieldIndex));
break;
case INTEGER:
stmt.setInt(parameterIndex, row.getInt(fieldIndex));
break;
case BIGINT:
stmt.setLong(parameterIndex, row.getLong(fieldIndex));
break;
case FLOAT:
stmt.setFloat(parameterIndex, row.getFloat(fieldIndex));
break;
case DOUBLE:
stmt.setDouble(parameterIndex, row.getDouble(fieldIndex));
break;
case CHAR:
case VARCHAR:
StringData strData = row.getString(fieldIndex);
stmt.setString(parameterIndex, strData.toString());
break;
case BINARY:
case VARBINARY:
stmt.setBytes(parameterIndex, row.getBinary(fieldIndex));
break;
case DECIMAL:
DecimalType decimalType = (DecimalType) fieldType;
BigDecimal decimalValue = row.getDecimal(fieldIndex, decimalType.getPrecision(), decimalType.getScale()).toBigDecimal();
stmt.setBigDecimal(parameterIndex, decimalValue);
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
TimestampType tsType = (TimestampType) fieldType;
TimestampData tsData = row.getTimestamp(fieldIndex, tsType.getPrecision());
stmt.setTimestamp(parameterIndex, tsData.toTimestamp());
break;
// Add other type mappings as needed
default:
// Fallback for unsupported types, may require custom serialization
LOG.warn("Unsupported Flink LogicalTypeRoot: {}. Attempting to set as Object.", fieldType.getTypeRoot());
Object genericValue = RowData.createFieldGetter(fieldType, fieldIndex).getFieldOrNull(row);
stmt.setObject(parameterIndex, genericValue);
break;
}
}
// Helper to get JDBC SQL Type for setNull (simplified, real impl uses Dialect)
private int getSqlType(LogicalType fieldType) {
switch (fieldType.getTypeRoot()) {
case BOOLEAN: return java.sql.Types.BOOLEAN;
case TINYINT: return java.sql.Types.TINYINT;
case SMALLINT: return java.sql.Types.SMALLINT;
case INTEGER: return java.sql.Types.INTEGER;
case BIGINT: return java.sql.Types.BIGINT;
case FLOAT: return java.sql.Types.REAL;
case DOUBLE: return java.sql.Types.DOUBLE;
case CHAR:
case VARCHAR: return java.sql.Types.VARCHAR;
case BINARY:
case VARBINARY: return java.sql.Types.VARBINARY;
case DECIMAL: return java.sql.Types.DECIMAL;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return java.sql.Types.TIMESTAMP;
default: return java.sql.Types.OTHER;
}
}
}
This rewritten JdbcDataWriter includes several improvements over the original CustomJdbcOutputOperation:
- **Connection Management:** The
Connectionis passed in, implying it's managed by the callingSinkFunction(opened once per task, closed once per task), avoiding per-row connection overhead. - **Prepared Statements:** SQL statements are prepared once upon initialization, significantly reducing overhead for repeated operations.
- **Batching:** The
executeBatchmethod accumulates rows and then executes them in a single batch operation for both insserts and deletes, which is crucial for performance. - **RowKind Handling:** Differentiates between
INSERT/UPDATE_AFTERandDELETE/UPDATE_BEFOREto add to the respective batch. - **Type Binding:** More robust parameter binding based on Flink's
LogicalTypesystem, handling various data types. - **Primary Key Identification:** The primary key column is configurable, and its index is dynamically looked up in the schema.
Conclusion
Developing custom table connectors in Apache Flink, particularly for recent versions, involves a structured approach beyond simple interface implementations. The shift towards explicit factory patterns and structured runtime providers, rather than older lambda-style or simple OutputFormat implementations, highlights Flink's emphasis on robustness, debuggability, and performance optimization. While requiring more boilerplate code, this framework allows developers to integrate Flink seamlessly with diverse data ecosystems and implement highly tailored data processing logic.