Skip to content

Commit

Permalink
set httpfs config in a connection once time
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal committed Jan 29, 2024
1 parent 3410e33 commit f62127a
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.accio.cache;
package io.accio.base.client.duckdb;

public interface CacheStorageConfig
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,19 @@
import java.sql.SQLException;
import java.sql.Statement;

import static java.lang.String.format;

public class DuckDBDataSource
extends BaseDataSource
implements DataSource, Serializable
{
private final DuckDBConnection duckDBConnection;
private final DuckdbS3StyleStorageConfig duckdbS3StyleStorageConfig;

public DuckDBDataSource(DuckDBConnection duckDBConnection)
public DuckDBDataSource(DuckDBConnection duckDBConnection, DuckdbS3StyleStorageConfig duckdbS3StyleStorageConfifg)
{
this.duckDBConnection = duckDBConnection;
this.duckdbS3StyleStorageConfig = duckdbS3StyleStorageConfifg;
}

@Override
Expand All @@ -51,11 +55,14 @@ public Connection getConnection()
// Refer to the official doc, if we want to create multiple read-write connections,
// to the same database in-memory database instance, we can use the custom `duplicate()` method.
// https://duckdb.org/docs/api/java
Connection connection = ((DuckDBConnection) duckDBConnection).duplicate();
Connection connection = duckDBConnection.duplicate();
Statement statement = connection.createStatement();
statement.execute("set search_path = 'main'");
// install extensions from stable repository
statement.execute("SET custom_extension_repository = 'http://extensions.duckdb.org'");
// init httpfs settings
statement.execute(format("SET s3_endpoint='%s'\n", duckdbS3StyleStorageConfig.getEndpoint()));
statement.execute(format("SET s3_url_style='%s'\n", duckdbS3StyleStorageConfig.getUrlStyle()));
return connection;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ public final class DuckdbClient
private final HikariDataSource ds;

@Inject
public DuckdbClient(DuckDBConfig duckDBConfig)
public DuckdbClient(DuckDBConfig duckDBConfig, DuckdbS3StyleStorageConfig duckdbS3StyleStorageConfig)
{
try {
// The instance will be cleared after the process end. We don't need to
// close this connection
Class.forName("org.duckdb.DuckDBDriver");
DuckDBConnection duckDBConnection = (DuckDBConnection) DriverManager.getConnection("jdbc:duckdb:");
this.duckDBConfig = duckDBConfig;
DuckDBDataSource dataSource = new DuckDBDataSource(duckDBConnection);
DuckDBDataSource dataSource = new DuckDBDataSource(duckDBConnection, duckdbS3StyleStorageConfig);
HikariConfig config = new HikariConfig();
config.setDataSource(dataSource);
config.setPoolName("MY_POOL");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* limitations under the License.
*/

package io.accio.cache;
package io.accio.base.client.duckdb;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
Expand Down Expand Up @@ -103,12 +103,10 @@ public String getUrlStyle()
public String generateDuckdbParquetStatement(String path, String tableName)
{
// ref: https://github.com/duckdb/duckdb/issues/1403
StringBuilder sb = new StringBuilder("INSTALL httpfs;\n" +
"LOAD httpfs;\n");
sb.append(format("SET s3_endpoint='%s';\n", endpoint));
StringBuilder sb = new StringBuilder();
// TODO: check why can't we set s3 access key and secret key in Data source
accessKey.ifPresent(accessKey -> sb.append(format("SET s3_access_key_id='%s';\n", accessKey)));
secretKey.ifPresent(secretKey -> sb.append(format("SET s3_secret_access_key='%s';\n", secretKey)));
sb.append(format("SET s3_url_style='%s';\n", urlStyle));
sb.append("BEGIN TRANSACTION;\n");
sb.append(format("CREATE TABLE \"%s\" AS SELECT * FROM read_parquet('s3://%s');", tableName, path));
sb.append("COMMIT;\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.accio.base.client.AutoCloseableIterator;
import io.accio.base.client.duckdb.DuckDBConfig;
import io.accio.base.client.duckdb.DuckdbClient;
import io.accio.base.client.duckdb.DuckdbS3StyleStorageConfig;
import io.accio.base.dto.Column;
import io.accio.base.dto.Manifest;
import io.accio.base.dto.Model;
Expand Down Expand Up @@ -65,7 +66,7 @@ public static Model addColumnsToModel(Model model, Column... columns)
@BeforeClass
public void init()
{
duckdbClient = new DuckdbClient(new DuckDBConfig());
duckdbClient = new DuckdbClient(new DuckDBConfig(), new DuckdbS3StyleStorageConfig());
prepareData();
}

Expand Down
1 change: 1 addition & 0 deletions accio-cache/src/main/java/io/accio/cache/CacheManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.accio.base.ConnectorRecordIterator;
import io.accio.base.Parameter;
import io.accio.base.SessionContext;
import io.accio.base.client.duckdb.CacheStorageConfig;
import io.accio.base.client.duckdb.DuckDBConfig;
import io.accio.base.client.duckdb.DuckdbClient;
import io.accio.base.dto.CacheInfo;
Expand Down
2 changes: 2 additions & 0 deletions accio-cache/src/main/java/io/accio/cache/CacheModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

import com.google.inject.Binder;
import com.google.inject.Scopes;
import io.accio.base.client.duckdb.CacheStorageConfig;
import io.accio.base.client.duckdb.DuckDBConfig;
import io.accio.base.client.duckdb.DuckdbClient;
import io.accio.base.client.duckdb.DuckdbS3StyleStorageConfig;
import io.airlift.configuration.AbstractConfigurationAwareModule;

import static io.airlift.configuration.ConfigBinder.configBinder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.accio.base.client.AutoCloseableIterator;
import io.accio.base.client.duckdb.DuckDBConfig;
import io.accio.base.client.duckdb.DuckdbClient;
import io.accio.base.client.duckdb.DuckdbS3StyleStorageConfig;
import io.accio.base.dto.Column;
import io.accio.base.dto.Manifest;
import io.accio.base.dto.Model;
Expand Down Expand Up @@ -65,7 +66,7 @@ public static Model addColumnsToModel(Model model, Column... columns)
@BeforeClass
public void init()
{
duckdbClient = new DuckdbClient(new DuckDBConfig());
duckdbClient = new DuckdbClient(new DuckDBConfig(), new DuckdbS3StyleStorageConfig());
prepareData();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.accio.base.CatalogSchemaTableName;
import io.accio.base.client.duckdb.DuckDBConfig;
import io.accio.base.client.duckdb.DuckdbClient;
import io.accio.base.client.duckdb.DuckdbS3StyleStorageConfig;
import io.accio.base.dto.Column;
import io.accio.base.dto.Model;
import io.accio.cache.CacheInfoPair;
Expand Down Expand Up @@ -78,7 +79,7 @@ public void testCheckCacheMemoryLimit()

DuckDBConfig duckDBConfig = new DuckDBConfig();
duckDBConfig.setMaxCacheTableSizeRatio(0);
try (DuckdbTaskManager taskManager = new DuckdbTaskManager(duckDBConfig, new DuckdbClient(duckDBConfig))) {
try (DuckdbTaskManager taskManager = new DuckdbTaskManager(duckDBConfig, new DuckdbClient(duckDBConfig, new DuckdbS3StyleStorageConfig()))) {
assertThatCode(taskManager::checkCacheMemoryLimit).hasMessageMatching("Cache memory limit exceeded. Usage: .* bytes, Limit: 0.0 bytes");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.accio.base.client.Client;
import io.accio.base.client.duckdb.DuckDBConfig;
import io.accio.base.client.duckdb.DuckdbClient;
import io.accio.base.client.duckdb.DuckdbS3StyleStorageConfig;
import io.accio.base.dto.Column;
import io.accio.base.dto.EnumDefinition;
import io.accio.base.dto.JoinType;
Expand Down Expand Up @@ -51,7 +52,7 @@ public class TestMetricValidation

public TestMetricValidation()
{
client = new DuckdbClient(new DuckDBConfig());
client = new DuckdbClient(new DuckDBConfig(), new DuckdbS3StyleStorageConfig());
sample = AccioMDL.fromManifest(withDefaultCatalogSchema()
.setModels(List.of(
Model.model("Flight",
Expand Down

0 comments on commit f62127a

Please sign in to comment.