Skip to content

Commit

Permalink
Merge pull request #107 from meroxa/multi-row-inserts-pg
Browse files Browse the repository at this point in the history
Support multi-row inserts

#107
  • Loading branch information
juha-aiven authored Mar 2, 2022
2 parents e92146f + d3fcb30 commit e5b2b14
Show file tree
Hide file tree
Showing 9 changed files with 396 additions and 95 deletions.
6 changes: 5 additions & 1 deletion docs/sink-connector-config-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ Writes

Use standard SQL ``INSERT`` statements.

``multi``

Use multi-row inserts, e.g. ``INSERT INTO table_name (column_list) VALUES (value_list_1), (value_list_2), ... (value_list_n);``

``upsert``

Use the appropriate upsert semantics for the target database if it is supported by the connector, e.g. ``INSERT .. ON CONFLICT .. DO UPDATE SET ..``.
Expand All @@ -68,7 +72,7 @@ Writes

* Type: string
* Default: insert
* Valid Values: [insert, upsert, update]
* Valid Values: [insert, multi, upsert, update]
* Importance: high

``batch.size``
Expand Down
8 changes: 8 additions & 0 deletions docs/sink-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ from Kafka.
This mode is used by default. To enable it explicitly, set
`insert.mode=insert`.

### Multi Mode

In this mode, the connector executes an `INSERT` SQL query with multiple
values (effectively inserting multiple row/records per query).
Supported in `SqliteDatabaseDialect` and `PostgreSqlDatabaseDialect`.

To use this mode, set `insert.mode=multi`

### Update Mode

In this mode, the connector executes `UPDATE` SQL query on each record
Expand Down
31 changes: 30 additions & 1 deletion src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,24 @@ String buildInsertStatement(
Collection<ColumnId> nonKeyColumns
);

/**
* Build an INSERT statement for multiple rows.
*
* @param table the identifier of the table; may not be null
* @param records number of rows which will be inserted; must be a positive number
* @param keyColumns the identifiers of the columns in the primary/unique key; may not be null
* but may be empty
* @param nonKeyColumns the identifiers of the other columns in the table; may not be null but may
* be empty
* @return the INSERT statement; may not be null
*/
String buildMultiInsertStatement(
TableId table,
int records,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
);

/**
* Build the INSERT prepared statement expression for the given table and its columns.
*
Expand Down Expand Up @@ -494,7 +512,18 @@ interface StatementBinder {
* @param record the sink record with values to be bound into the statement; never null
* @throws SQLException if there is a problem binding values into the statement
*/
void bindRecord(SinkRecord record) throws SQLException;
default void bindRecord(SinkRecord record) throws SQLException {
bindRecord(1, record);
}

/**
* Bind the values in the supplied record, starting at the specified index.
*
* @param index the index at which binding starts; must be positive
* @param record the sink record with values to be bound into the statement; never null
* @throws SQLException if there is a problem binding values into the statement
*/
int bindRecord(int index, SinkRecord record) throws SQLException;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.TimeZone;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.connect.data.Date;
Expand Down Expand Up @@ -85,6 +86,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.aiven.connect.jdbc.util.CollectionUtils.isEmpty;
import static java.util.Objects.requireNonNull;
import static java.util.stream.IntStream.range;

/**
* A {@link DatabaseDialect} implementation that provides functionality based upon JDBC and SQL.
*
Expand Down Expand Up @@ -1350,6 +1355,44 @@ public String buildInsertStatement(
return builder.toString();
}

@Override
public String buildMultiInsertStatement(final TableId table,
final int records,
final Collection<ColumnId> keyColumns,
final Collection<ColumnId> nonKeyColumns) {

if (records < 1) {
throw new IllegalArgumentException("number of records must be a positive number, but got: " + records);
}
if (isEmpty(keyColumns) && isEmpty(nonKeyColumns)) {
throw new IllegalArgumentException("no columns specified");
}
requireNonNull(table, "table must not be null");

final String insertStatement = expressionBuilder()
.append("INSERT INTO ")
.append(table)
.append("(")
.appendList()
.delimitedBy(",")
.transformedBy(ExpressionBuilder.columnNames())
.of(keyColumns, nonKeyColumns)
.append(") VALUES ")
.toString();

final String singleRowPlaceholder = expressionBuilder()
.append("(")
.appendMultiple(",", "?", keyColumns.size() + nonKeyColumns.size())
.append(")")
.toString();

final String allRowsPlaceholder = range(1, records + 1)
.mapToObj(i -> singleRowPlaceholder)
.collect(Collectors.joining(","));

return insertStatement + allRowsPlaceholder;
}

@Override
public String buildUpdateStatement(
final TableId table,
Expand Down
144 changes: 106 additions & 38 deletions src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.aiven.connect.jdbc.sink.JdbcSinkConfig.InsertMode.MULTI;

public class BufferedRecords {
private static final Logger log = LoggerFactory.getLogger(BufferedRecords.class);

Expand All @@ -53,6 +55,7 @@ public class BufferedRecords {
private List<SinkRecord> records = new ArrayList<>();
private SchemaPair currentSchemaPair;
private FieldsMetadata fieldsMetadata;
private TableDefinition tableDefinition;
private PreparedStatement preparedStatement;
private StatementBinder preparedStatementBinder;

Expand All @@ -76,39 +79,10 @@ public List<SinkRecord> add(final SinkRecord record) throws SQLException {
record.valueSchema()
);

if (currentSchemaPair == null) {
currentSchemaPair = schemaPair;
// re-initialize everything that depends on the record schema
fieldsMetadata = FieldsMetadata.extract(
tableId.tableName(),
config.pkMode,
config.pkFields,
config.fieldsWhitelist,
currentSchemaPair
);
dbStructure.createOrAmendIfNecessary(
config,
connection,
tableId,
fieldsMetadata
);
log.debug("buffered records in list {}", records.size());

final TableDefinition tableDefinition = dbStructure.tableDefinitionFor(tableId, connection);
final String sql = getInsertSql(tableDefinition);
log.debug(
"{} sql: {}",
config.insertMode,
sql
);
close();
preparedStatement = connection.prepareStatement(sql);
preparedStatementBinder = dbDialect.statementBinder(
preparedStatement,
config.pkMode,
schemaPair,
fieldsMetadata,
config.insertMode
);
if (currentSchemaPair == null) {
reInitialize(schemaPair);
}

final List<SinkRecord> flushed;
Expand All @@ -134,27 +108,74 @@ public List<SinkRecord> add(final SinkRecord record) throws SQLException {
return flushed;
}

private void prepareStatement() throws SQLException {
final String sql;
log.debug("Generating query for insert mode {} and {} records", config.insertMode, records.size());
if (config.insertMode == MULTI) {
sql = getMultiInsertSql();
} else {
sql = getInsertSql();
}

log.debug("Prepared SQL {} for insert mode {}", sql, config.insertMode);

close();
preparedStatement = connection.prepareStatement(sql);
preparedStatementBinder = dbDialect.statementBinder(
preparedStatement,
config.pkMode,
currentSchemaPair,
fieldsMetadata,
config.insertMode
);
}

/**
* Re-initialize everything that depends on the record schema
*/
private void reInitialize(final SchemaPair schemaPair) throws SQLException {
currentSchemaPair = schemaPair;
fieldsMetadata = FieldsMetadata.extract(
tableId.tableName(),
config.pkMode,
config.pkFields,
config.fieldsWhitelist,
currentSchemaPair
);
dbStructure.createOrAmendIfNecessary(
config,
connection,
tableId,
fieldsMetadata
);

tableDefinition = dbStructure.tableDefinitionFor(tableId, connection);
}

public List<SinkRecord> flush() throws SQLException {
if (records.isEmpty()) {
log.debug("Records is empty");
return new ArrayList<>();
}
log.debug("Flushing {} buffered records", records.size());
for (final SinkRecord record : records) {
preparedStatementBinder.bindRecord(record);
}
prepareStatement();
bindRecords();

int totalUpdateCount = 0;
boolean successNoInfo = false;
for (final int updateCount : preparedStatement.executeBatch()) {

log.debug("Executing batch...");
for (final int updateCount : executeBatch()) {
if (updateCount == Statement.SUCCESS_NO_INFO) {
successNoInfo = true;
continue;
}
totalUpdateCount += updateCount;
}
log.debug("Done executing batch.");
if (totalUpdateCount != records.size() && !successNoInfo) {
switch (config.insertMode) {
case INSERT:
case MULTI:
throw new ConnectException(String.format(
"Update count (%d) did not sum up to total number of records inserted (%d)",
totalUpdateCount,
Expand Down Expand Up @@ -186,6 +207,30 @@ public List<SinkRecord> flush() throws SQLException {
return flushedRecords;
}

private int[] executeBatch() throws SQLException {
if (config.insertMode == MULTI) {
preparedStatement.addBatch();
}
log.debug("Executing batch with insert mode {}", config.insertMode);
return preparedStatement.executeBatch();
}

private void bindRecords() throws SQLException {
log.debug("Binding {} buffered records", records.size());
int index = 1;
for (final SinkRecord record : records) {
if (config.insertMode == MULTI) {
// All records are bound to the same prepared statement,
// so when binding fields for record N (N > 0)
// we need to start at the index where binding fields for record N - 1 stopped.
index = preparedStatementBinder.bindRecord(index, record);
} else {
preparedStatementBinder.bindRecord(record);
}
}
log.debug("Done binding records.");
}

public void close() throws SQLException {
log.info("Closing BufferedRecords with preparedStatement: {}", preparedStatement);
if (preparedStatement != null) {
Expand All @@ -194,7 +239,30 @@ public void close() throws SQLException {
}
}

private String getInsertSql(final TableDefinition tableDefinition) {
private String getMultiInsertSql() {
if (config.insertMode != MULTI) {
throw new ConnectException(String.format(
"Multi-row first insert SQL unsupported by insert mode %s",
config.insertMode
));
}
try {
return dbDialect.buildMultiInsertStatement(
tableId,
records.size(),
asColumns(fieldsMetadata.keyFieldNames),
asColumns(fieldsMetadata.nonKeyFieldNames)
);
} catch (final UnsupportedOperationException e) {
throw new ConnectException(String.format(
"Write to table '%s' in MULTI mode is not supported with the %s dialect.",
tableId,
dbDialect.name()
));
}
}

private String getInsertSql() {
switch (config.insertMode) {
case INSERT:
return dbDialect.buildInsertStatement(
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class JdbcSinkConfig extends JdbcConfig {

public enum InsertMode {
INSERT,
MULTI,
UPSERT,
UPDATE;
}
Expand Down Expand Up @@ -122,6 +123,8 @@ public enum PrimaryKeyMode {
"The insertion mode to use. Supported modes are:\n"
+ "``insert``\n"
+ " Use standard SQL ``INSERT`` statements.\n"
+ "``multi``\n"
+ " Use multi-row ``INSERT`` statements.\n"
+ "``upsert``\n"
+ " Use the appropriate upsert semantics for the target database if it is supported by "
+ "the connector, e.g. ``INSERT .. ON CONFLICT .. DO UPDATE SET ..``.\n"
Expand Down
Loading

0 comments on commit e5b2b14

Please sign in to comment.