Skip to content

Commit

Permalink
Implemented selection
Browse files Browse the repository at this point in the history
  • Loading branch information
AdalbertMemSQL committed Oct 22, 2024
1 parent 1ceb998 commit b0937ca
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,18 @@ void accept(String operation, Integer partition, String offset, Map<String, Valu
throws JsonProcessingException;
}

public void observe(State state, ObserveConsumer consumer) throws Exception {
public void observe(State state, Set<String> selectedColumns, ObserveConsumer consumer)
throws Exception {
List<Column> columns = getSchema()
.getSchemas(0)
.getTables(0)
.getColumnsList();
if (selectedColumns != null) {
columns = columns.stream()
.filter(column -> selectedColumns.contains(column.getName()))
.collect(Collectors.toList());
}

List<Column> pkColumns = columns
.stream()
.filter(Column::getPrimaryKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,20 @@
import fivetran_sdk.SchemaList;
import fivetran_sdk.SchemaRequest;
import fivetran_sdk.SchemaResponse;
import fivetran_sdk.SchemaSelection;
import fivetran_sdk.Selection;
import fivetran_sdk.TableSelection;
import fivetran_sdk.TablesWithSchema;
import fivetran_sdk.TestRequest;
import fivetran_sdk.TestResponse;
import fivetran_sdk.TextField;
import fivetran_sdk.UpdateRequest;
import fivetran_sdk.UpdateResponse;
import io.grpc.stub.StreamObserver;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -135,13 +142,59 @@ public void schema(SchemaRequest request, StreamObserver<SchemaResponse>
}
}

/**
* Returns a set of column names that should be selected. If no selection is provided - returns
* null. In this case all columns should be selected. If configured database and table are not
* selected - returns an empty list. In this case, no columns are populated.
*
* @param request an absolute URL giving the base location of the image
* @param conf the location of the image, relative to the url argument
* @return set of column names to select, null if select all columns
*/
private Set<String> getSelectedColumns(UpdateRequest request, SingleStoreConfiguration conf) {
if (!request.hasSelection()) {
return null;
}
Selection sel = request.getSelection();

if (!sel.hasWithSchema()) {
return null;
}
TablesWithSchema tablesWithSchema = sel.getWithSchema();

for (SchemaSelection schemaSelection : tablesWithSchema.getSchemasList()) {
if (!schemaSelection.getIncluded() || !schemaSelection.getSchemaName()
.equals(conf.database())) {
continue;
}

for (TableSelection tableSelection : schemaSelection.getTablesList()) {
if (!tableSelection.getIncluded() || !tableSelection.getTableName().equals(conf.table())) {
continue;
}

Map<String, Boolean> columns = tableSelection.getColumnsMap();
Set<String> selectedColumns = new HashSet<>();
for (String columnName : columns.keySet()) {
if (columns.get(columnName)) {
selectedColumns.add(columnName);
}
}

return selectedColumns;
}
}

return new HashSet<>();
}

@Override
public void update(UpdateRequest request, StreamObserver<UpdateResponse>
responseObserver) {
SingleStoreConfiguration configuration = new SingleStoreConfiguration(
request.getConfigurationMap());
SingleStoreConnection conn = new SingleStoreConnection(configuration);
Set<String> selectedColumns = getSelectedColumns(request, configuration);

try {
State state;
Expand All @@ -158,7 +211,7 @@ public void update(UpdateRequest request, StreamObserver<UpdateResponse>
.build())
.build());

conn.observe(state, (operation, partition, offset, row) -> {
conn.observe(state, selectedColumns, (operation, partition, offset, row) -> {
switch (operation) {
case "Insert":
responseObserver.onNext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -409,7 +411,7 @@ public void observe() throws Exception {
State state = new State(8);
Thread t = new Thread(() -> {
try {
observeConn.observe(state, (operation, partition, offset, row) -> {
observeConn.observe(state, null, (operation, partition, offset, row) -> {
if (operation.equals("Delete") || operation.equals("Update") || operation.equals(
"Insert")) {
records.add(new Record(operation, row));
Expand Down Expand Up @@ -470,7 +472,7 @@ public void observe() throws Exception {
records.clear();
t = new Thread(() -> {
try {
observeConn.observe(state, (operation, partition, offset, row) -> {
observeConn.observe(state, null, (operation, partition, offset, row) -> {
if (operation.equals("Delete") || operation.equals("Update") || operation.equals(
"Insert")) {
records.add(new Record(operation, row));
Expand Down Expand Up @@ -592,7 +594,7 @@ public void observeAllTypes() throws Exception {
List<Record> records = new ArrayList<>();
Thread t = new Thread(() -> {
try {
observeConn.observe(new State(8), (operation, partition, offset, row) -> {
observeConn.observe(new State(8), null, (operation, partition, offset, row) -> {
if (operation.equals("Delete") || operation.equals("Update") || operation.equals(
"Insert")) {
records.add(new Record(operation, row));
Expand Down Expand Up @@ -929,7 +931,7 @@ public void observeVectorJson() throws Exception {

Thread t = new Thread(() -> {
try {
observeConn.observe(new State(8), (operation, partition, offset, row) -> {
observeConn.observe(new State(8), null, (operation, partition, offset, row) -> {
if (operation.equals("Delete") || operation.equals("Update") || operation.equals(
"Insert")) {
records.add(new Record(operation, row));
Expand All @@ -956,4 +958,52 @@ public void observeVectorJson() throws Exception {
}
}
}

@Test
public void observeFilter() throws Exception {
SingleStoreConfiguration conf = getConfig("observeFilter");
SingleStoreConnection conn = new SingleStoreConnection(conf);

try (Statement stmt = conn.getConnection().createStatement()) {
stmt.execute("DROP TABLE IF EXISTS observeFilter");
stmt.execute(
"CREATE TABLE observeFilter (a INT, b INT, PRIMARY KEY(a));");
}

final Exception[] observeException = new Exception[1];
SingleStoreConnection observeConn = new SingleStoreConnection(conf);
List<Record> records = new ArrayList<>();
State state = new State(8);
Set<String> selectedColumns = new HashSet<>();
selectedColumns.add("a");
Thread t = new Thread(() -> {
try {
observeConn.observe(state, selectedColumns, (operation, partition, offset, row) -> {
if (operation.equals("Delete") || operation.equals("Update") || operation.equals(
"Insert")) {
records.add(new Record(operation, row));
state.setOffset(partition, offset);
}
});
} catch (Exception e) {
observeException[0] = e;
}
});
t.start();

try (Statement stmt = conn.getConnection().createStatement()) {
stmt.execute("INSERT INTO observeFilter VALUES(1, 1)");
}

Thread.sleep(1000);
((com.singlestore.jdbc.Connection) observeConn.getConnection()).cancelCurrentQuery();
Thread.sleep(1000);
t.interrupt();

assertTrue(observeException[0].getMessage().contains("Query execution was interrupted"));

assertEquals("Insert", records.get(0).operation);
assertEquals(1, records.get(0).row.get("a").getInt());
assertNull(records.get(0).row.get("b"));
}
}

0 comments on commit b0937ca

Please sign in to comment.