diff --git a/src/main/java/com/singlestore/fivetran/connector/SingleStoreConnection.java b/src/main/java/com/singlestore/fivetran/connector/SingleStoreConnection.java index d044746..e0824a7 100644 --- a/src/main/java/com/singlestore/fivetran/connector/SingleStoreConnection.java +++ b/src/main/java/com/singlestore/fivetran/connector/SingleStoreConnection.java @@ -251,11 +251,18 @@ void accept(String operation, Integer partition, String offset, Map selectedColumns, ObserveConsumer consumer) + throws Exception { List columns = getSchema() .getSchemas(0) .getTables(0) .getColumnsList(); + if (selectedColumns != null) { + columns = columns.stream() + .filter(column -> selectedColumns.contains(column.getName())) + .collect(Collectors.toList()); + } + List pkColumns = columns .stream() .filter(Column::getPrimaryKey) diff --git a/src/main/java/com/singlestore/fivetran/connector/SingleStoreConnectorServiceImpl.java b/src/main/java/com/singlestore/fivetran/connector/SingleStoreConnectorServiceImpl.java index 4a42128..193d1ec 100644 --- a/src/main/java/com/singlestore/fivetran/connector/SingleStoreConnectorServiceImpl.java +++ b/src/main/java/com/singlestore/fivetran/connector/SingleStoreConnectorServiceImpl.java @@ -15,6 +15,10 @@ import fivetran_sdk.SchemaList; import fivetran_sdk.SchemaRequest; import fivetran_sdk.SchemaResponse; +import fivetran_sdk.SchemaSelection; +import fivetran_sdk.Selection; +import fivetran_sdk.TableSelection; +import fivetran_sdk.TablesWithSchema; import fivetran_sdk.TestRequest; import fivetran_sdk.TestResponse; import fivetran_sdk.TextField; @@ -22,6 +26,9 @@ import fivetran_sdk.UpdateResponse; import io.grpc.stub.StreamObserver; import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,6 +142,51 @@ public void schema(SchemaRequest request, StreamObserver } } + /** + * Returns a set of column names that should be selected. If no selection is provided - returns + * null. In this case all columns should be selected. If configured database and table are not + * selected - returns an empty list. In this case, no columns are populated. + * + * @param request an absolute URL giving the base location of the image + * @param conf the location of the image, relative to the url argument + * @return set of column names to select, null if select all columns + */ + private Set getSelectedColumns(UpdateRequest request, SingleStoreConfiguration conf) { + if (!request.hasSelection()) { + return null; + } + Selection sel = request.getSelection(); + + if (!sel.hasWithSchema()) { + return null; + } + TablesWithSchema tablesWithSchema = sel.getWithSchema(); + + for (SchemaSelection schemaSelection : tablesWithSchema.getSchemasList()) { + if (!schemaSelection.getIncluded() || !schemaSelection.getSchemaName() + .equals(conf.database())) { + continue; + } + + for (TableSelection tableSelection : schemaSelection.getTablesList()) { + if (!tableSelection.getIncluded() || !tableSelection.getTableName().equals(conf.table())) { + continue; + } + + Map columns = tableSelection.getColumnsMap(); + Set selectedColumns = new HashSet<>(); + for (String columnName : columns.keySet()) { + if (columns.get(columnName)) { + selectedColumns.add(columnName); + } + } + + return selectedColumns; + } + } + + return new HashSet<>(); + } @Override public void update(UpdateRequest request, StreamObserver @@ -142,6 +194,7 @@ public void update(UpdateRequest request, StreamObserver SingleStoreConfiguration configuration = new SingleStoreConfiguration( request.getConfigurationMap()); SingleStoreConnection conn = new SingleStoreConnection(configuration); + Set selectedColumns = getSelectedColumns(request, configuration); try { State state; @@ -158,7 +211,7 @@ public void update(UpdateRequest request, StreamObserver .build()) .build()); - conn.observe(state, (operation, partition, offset, row) -> { + conn.observe(state, selectedColumns, (operation, partition, offset, row) -> { switch (operation) { case "Insert": responseObserver.onNext( diff --git a/src/test/java/com/singlestore/fivetran/connector/SingleStoreConnectionTest.java b/src/test/java/com/singlestore/fivetran/connector/SingleStoreConnectionTest.java index 4fab16f..d94da97 100644 --- a/src/test/java/com/singlestore/fivetran/connector/SingleStoreConnectionTest.java +++ b/src/test/java/com/singlestore/fivetran/connector/SingleStoreConnectionTest.java @@ -20,8 +20,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -409,7 +411,7 @@ public void observe() throws Exception { State state = new State(8); Thread t = new Thread(() -> { try { - observeConn.observe(state, (operation, partition, offset, row) -> { + observeConn.observe(state, null, (operation, partition, offset, row) -> { if (operation.equals("Delete") || operation.equals("Update") || operation.equals( "Insert")) { records.add(new Record(operation, row)); @@ -470,7 +472,7 @@ public void observe() throws Exception { records.clear(); t = new Thread(() -> { try { - observeConn.observe(state, (operation, partition, offset, row) -> { + observeConn.observe(state, null, (operation, partition, offset, row) -> { if (operation.equals("Delete") || operation.equals("Update") || operation.equals( "Insert")) { records.add(new Record(operation, row)); @@ -592,7 +594,7 @@ public void observeAllTypes() throws Exception { List records = new ArrayList<>(); Thread t = new Thread(() -> { try { - observeConn.observe(new State(8), (operation, partition, offset, row) -> { + observeConn.observe(new State(8), null, (operation, partition, offset, row) -> { if (operation.equals("Delete") || operation.equals("Update") || operation.equals( "Insert")) { records.add(new Record(operation, row)); @@ -929,7 +931,7 @@ public void observeVectorJson() throws Exception { Thread t = new Thread(() -> { try { - observeConn.observe(new State(8), (operation, partition, offset, row) -> { + observeConn.observe(new State(8), null, (operation, partition, offset, row) -> { if (operation.equals("Delete") || operation.equals("Update") || operation.equals( "Insert")) { records.add(new Record(operation, row)); @@ -956,4 +958,52 @@ public void observeVectorJson() throws Exception { } } } + + @Test + public void observeFilter() throws Exception { + SingleStoreConfiguration conf = getConfig("observeFilter"); + SingleStoreConnection conn = new SingleStoreConnection(conf); + + try (Statement stmt = conn.getConnection().createStatement()) { + stmt.execute("DROP TABLE IF EXISTS observeFilter"); + stmt.execute( + "CREATE TABLE observeFilter (a INT, b INT, PRIMARY KEY(a));"); + } + + final Exception[] observeException = new Exception[1]; + SingleStoreConnection observeConn = new SingleStoreConnection(conf); + List records = new ArrayList<>(); + State state = new State(8); + Set selectedColumns = new HashSet<>(); + selectedColumns.add("a"); + Thread t = new Thread(() -> { + try { + observeConn.observe(state, selectedColumns, (operation, partition, offset, row) -> { + if (operation.equals("Delete") || operation.equals("Update") || operation.equals( + "Insert")) { + records.add(new Record(operation, row)); + state.setOffset(partition, offset); + } + }); + } catch (Exception e) { + observeException[0] = e; + } + }); + t.start(); + + try (Statement stmt = conn.getConnection().createStatement()) { + stmt.execute("INSERT INTO observeFilter VALUES(1, 1)"); + } + + Thread.sleep(1000); + ((com.singlestore.jdbc.Connection) observeConn.getConnection()).cancelCurrentQuery(); + Thread.sleep(1000); + t.interrupt(); + + assertTrue(observeException[0].getMessage().contains("Query execution was interrupted")); + + assertEquals("Insert", records.get(0).operation); + assertEquals(1, records.get(0).row.get("a").getInt()); + assertNull(records.get(0).row.get("b")); + } }