Skip to content

Commit

Permalink
[FLINK-30371] Fixed a Problem in the Exception Handling of the Close …
Browse files Browse the repository at this point in the history
…Method.

This Problem leads to Connection Leaks if some problem occurs.
  • Loading branch information
KevDi committed Oct 27, 2023
1 parent e3c52a4 commit 360c0cb
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public synchronized void close() {
flush();
} catch (Exception e) {
LOG.warn("Writing records to JDBC failed.", e);
throw new RuntimeException("Writing records to JDBC failed.", e);
flushException = e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class JdbcTestFixture implements DerbyTestBase {
public static final String OUTPUT_TABLE = "newbooks";
public static final String OUTPUT_TABLE_2 = "newbooks2";
public static final String OUTPUT_TABLE_3 = "newbooks3";
public static final String OUTPUT_TABLE_4 = "newbooks4";
public static final String WORDS_TABLE = "words";
public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE;
public static final String SELECT_ID_BOOKS = "select id from " + INPUT_TABLE;
Expand Down Expand Up @@ -189,6 +190,7 @@ public static void initSchema(DatabaseMetadata metadata) throws SQLException {
createTable(conn, OUTPUT_TABLE);
createTable(conn, OUTPUT_TABLE_2);
createTable(conn, OUTPUT_TABLE_3);
createTable(conn, OUTPUT_TABLE_4);
createWordsTable(conn);
}
}
Expand Down Expand Up @@ -232,6 +234,7 @@ public static void cleanUpDatabasesStatic(DatabaseMetadata dbMetadata) throws SQ
stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE);
stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE_2);
stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE_3);
stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE_4);
stat.executeUpdate("DROP TABLE " + WORDS_TABLE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcDataTestBase;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcRowOutputFormat;
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
Expand All @@ -34,6 +35,7 @@
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
Expand All @@ -44,9 +46,7 @@
import java.util.Arrays;
import java.util.List;

import static org.apache.flink.connector.jdbc.JdbcTestFixture.OUTPUT_TABLE;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doReturn;

Expand Down Expand Up @@ -221,6 +221,30 @@ void testJdbcOutputFormat() throws Exception {
check(expected);
}

@Test
public void testExceptionOnFlush() {
JdbcRowOutputFormat jdbcOutputFormat =
JdbcRowOutputFormat.buildJdbcOutputFormat()
.setDrivername(getMetadata().getDriverClass())
.setDBUrl(getMetadata().getJdbcUrl())
.setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE_4))
.setBatchSize(2)
.finish();
setRuntimeContext(jdbcOutputFormat, true);
try {
jdbcOutputFormat.open(0, 1);

jdbcOutputFormat.writeRecord(toRow(TEST_DATA[1]));
jdbcOutputFormat.writeRecord(toRow(TEST_DATA[1]));
} catch (IOException e) {
try {
jdbcOutputFormat.close();
} catch (Exception e1) {
assertThat(jdbcOutputFormat.getConnection()).isEqualTo(null);
}
}
}

private void check(Row[] rows) throws SQLException {
check(rows, getMetadata().getJdbcUrl(), OUTPUT_TABLE, fieldNames);
}
Expand Down

0 comments on commit 360c0cb

Please sign in to comment.