Skip to content

Commit

Permalink
Merge pull request #4 from arenadata/feature/ADH-5241
Browse files Browse the repository at this point in the history
[ADH-5241]: Implement the trino-adb connector reading function
  • Loading branch information
VitekArkhipov authored Nov 28, 2024
2 parents 3959401 + f5b0722 commit bd1ddec
Show file tree
Hide file tree
Showing 89 changed files with 4,652 additions and 1,734 deletions.
2 changes: 1 addition & 1 deletion core/docker/arenadata/coordinator/etc/log.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# Enable verbose logging from Trino
#io.trino=DEBUG
io.trino=DEBUG
2 changes: 1 addition & 1 deletion core/docker/arenadata/worker/etc/log.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# Enable verbose logging from Trino
#io.trino=DEBUG
io.trino=DEBUG
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,41 @@
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.airlift.units.MinDataSize;
import io.trino.plugin.adb.connector.protocol.TransferDataProtocol;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;

public class AdbPluginConfig
{
public static final String IDENTIFIER_QUOTE = "\"";
private AdbPluginConfig.ArrayMapping arrayMapping = AdbPluginConfig.ArrayMapping.DISABLED;
private int maxScanParallelism = 1;
private boolean includeSystemTables;
private DataSize writeBufferSize = DataSize.of(16L, DataSize.Unit.MEGABYTE);
private Integer fetchSize;
private DataSize writeBufferSize = DataSize.of(64L, DataSize.Unit.MEGABYTE);
private DataSize readBufferSize = DataSize.of(64L, DataSize.Unit.MEGABYTE);
private final TransferDataProtocol dataProtocol = TransferDataProtocol.GPFDIST;
private Duration gpfdistRetryTimeout;

public TransferDataProtocol getDataProtocol()
{
return dataProtocol;
}

public Integer getFetchSize()
{
return fetchSize;
}

@Config("adb.fetch-size")
public AdbPluginConfig setFetchSize(int fetchSize)
{
this.fetchSize = fetchSize;
return this;
}

@NotNull
public AdbPluginConfig.ArrayMapping getArrayMapping()
{
Expand Down Expand Up @@ -81,13 +98,41 @@ public DataSize getWriteBufferSize()
}

@Config("adb.connector.write-buffer-size")
@ConfigDescription("Maximum amount of memory that could be allocated per sink when executing write queries. Defaults to 16MB")
@ConfigDescription("Maximum amount of memory that could be allocated per sink when executing write queries. Defaults to 64MB")
public AdbPluginConfig setWriteBufferSize(DataSize writeBufferSize)
{
this.writeBufferSize = writeBufferSize;
return this;
}

@MinDataSize("1kB")
@NotNull
public DataSize getReadBufferSize()
{
return readBufferSize;
}

@Config("adb.connector.read-buffer-size")
@ConfigDescription("Maximum amount of memory that could be allocated per record cursor when executing read queries. Defaults to 64MB")
public AdbPluginConfig setReadBufferSize(DataSize readBufferSize)
{
this.readBufferSize = readBufferSize;
return this;
}

public Duration getGpfdistRetryTimeout()
{
return this.gpfdistRetryTimeout;
}

@Config("adb.gpfdist.retry-timeout")
@ConfigDescription("Value of adb gpfdist_retry_timeout property. Defaults to null (use adb defaults)")
public AdbPluginConfig setGpfdistRetryTimeout(Duration gpfdistRetryTimeout)
{
this.gpfdistRetryTimeout = gpfdistRetryTimeout;
return this;
}

public static enum ArrayMapping
{
DISABLED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.google.common.primitives.Shorts;
import com.google.common.primitives.SignedBytes;
import io.airlift.slice.Slice;
import io.trino.plugin.adb.connector.AdbSqlClient;
import io.trino.plugin.adb.connector.datatype.mapper.DataTypeMapper;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -131,7 +131,7 @@ public static Object[] getJdbcObjectArray(ConnectorSession session, Type element
return valuesArray;
}

public static String getArrayElementPgTypeName(ConnectorSession session, AdbSqlClient client, Type elementType)
public static String getArrayElementPgTypeName(ConnectorSession session, DataTypeMapper typeMapper, Type elementType)
{
if (DOUBLE.equals(elementType)) {
return "float8";
Expand Down Expand Up @@ -164,10 +164,10 @@ public static String getArrayElementPgTypeName(ConnectorSession session, AdbSqlC
}

if (elementType instanceof ArrayType) {
return getArrayElementPgTypeName(session, client, ((ArrayType) elementType).getElementType());
return getArrayElementPgTypeName(session, typeMapper, ((ArrayType) elementType).getElementType());
}

return client.toWriteMapping(session, elementType).getDataType();
return typeMapper.toWriteMapping(session, elementType).getDataType();
}

private static Object trinoNativeToJdbcObject(ConnectorSession session, Type trinoType, Object trinoNative)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.airlift.configuration.ConfigBinder;
import io.trino.plugin.adb.AdbPluginConfig;
import io.trino.plugin.adb.connector.encode.EncoderModule;
import io.trino.plugin.adb.connector.datatype.mapper.DataTypeMapper;
import io.trino.plugin.adb.connector.datatype.mapper.DataTypeMapperImpl;
import io.trino.plugin.adb.connector.encode.DataFormatModule;
import io.trino.plugin.adb.connector.metadata.AdbMetadataDao;
import io.trino.plugin.adb.connector.metadata.impl.AdbMetadataDaoImpl;
import io.trino.plugin.adb.connector.protocol.TransferDataProtocol;
import io.trino.plugin.adb.connector.protocol.gpfdist.GpfdistModule;
import io.trino.plugin.adb.connector.table.AdbCreateTableStorageConfig;
import io.trino.plugin.adb.connector.table.AdbTableProperties;
import io.trino.plugin.adb.connector.table.SplitSourceManager;
import io.trino.plugin.adb.connector.table.SplitSourceManagerImpl;
import io.trino.plugin.adb.connector.table.StatisticsManager;
import io.trino.plugin.adb.connector.table.StatisticsManagerImpl;
import io.trino.plugin.jdbc.DecimalModule;
import io.trino.plugin.jdbc.ForBaseJdbc;
import io.trino.plugin.jdbc.JdbcClient;
Expand All @@ -44,21 +50,26 @@ public class AdbClientModule
@Override
protected void setup(Binder binder)
{
install(new EncoderModule());
binder.bind(AdbMetadataDao.class).to(AdbMetadataDaoImpl.class).in(Scopes.SINGLETON);
binder.bind(DataTypeMapper.class).to(DataTypeMapperImpl.class).in(Scopes.SINGLETON);
binder.bind(StatisticsManager.class).to(StatisticsManagerImpl.class).in(Scopes.SINGLETON);
binder.bind(SplitSourceManager.class).to(SplitSourceManagerImpl.class).in(Scopes.SINGLETON);
binder.bind(JdbcClient.class).annotatedWith(ForBaseJdbc.class).to(AdbSqlClient.class).in(Scopes.SINGLETON);
ConfigBinder.configBinder(binder).bindConfig(AdbCreateTableStorageConfig.class);
ConfigBinder.configBinder(binder).bindConfig(JdbcStatisticsConfig.class);
JdbcModule.bindSessionPropertiesProvider(binder, AdbSessionProperties.class);
JdbcModule.bindTablePropertiesProvider(binder, AdbTableProperties.class);
OptionalBinder.newOptionalBinder(binder, QueryBuilder.class).setBinding().to(CollationAwareQueryBuilder.class).in(Scopes.SINGLETON);
this.install(new DecimalModule());
this.install(new JdbcJoinPushdownSupportModule());
this.install(new RemoteQueryCancellationModule());
Multibinder.newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(Query.class).in(Scopes.SINGLETON);
AdbPluginConfig pluginConfig = this.buildConfigObject(AdbPluginConfig.class);

install(new DataFormatModule());
install(new DecimalModule());
install(new JdbcJoinPushdownSupportModule());
install(new RemoteQueryCancellationModule());

if (pluginConfig.getDataProtocol() == TransferDataProtocol.GPFDIST) {
this.install(new GpfdistModule());
install(new GpfdistModule());
}
else {
throw new UnsupportedOperationException("Unsupported data protocol: " + pluginConfig.getDataProtocol());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.units.Duration;
import io.trino.plugin.adb.AdbPluginConfig;
import io.trino.plugin.base.session.PropertyMetadataUtil;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.session.PropertyMetadata;

import java.util.List;
import java.util.Optional;

public class AdbSessionProperties
implements SessionPropertiesProvider
Expand All @@ -33,7 +36,9 @@ public AdbSessionProperties(AdbPluginConfig config)
this.sessionProperties = ImmutableList.of(
PropertyMetadata.enumProperty("array_mapping", "Handling of PostgreSql arrays", AdbPluginConfig.ArrayMapping.class, config.getArrayMapping(), false),
PropertyMetadata.integerProperty(
"max_scan_parallelism", "Maximum degree of parallelism when scanning tables. Defaults to 1.", config.getMaxScanParallelism(), false));
"max_scan_parallelism", "Maximum degree of parallelism when scanning tables. Defaults to 1.", config.getMaxScanParallelism(), false),
PropertyMetadataUtil.durationProperty(
"gpfdist_retry_timeout", "Value of adb gpfdist_retry_timeout property", config.getGpfdistRetryTimeout(), false));
}

@Override
Expand All @@ -56,4 +61,9 @@ public static int getMaxScanParallelism(ConnectorSession session)
{
return session.getProperty("max_scan_parallelism", Integer.class);
}

public static Optional<Duration> getGpfdistRetryTimeout(ConnectorSession session)
{
return Optional.ofNullable(session.getProperty("gpfdist_retry_timeout", Duration.class));
}
}
Loading

0 comments on commit bd1ddec

Please sign in to comment.