Skip to content

Commit

Permalink
fixup! Add option to disable filesystem caching of /_delta_log/ direc…
Browse files Browse the repository at this point in the history
…tory
  • Loading branch information
sdaberdaku committed Sep 16, 2024
1 parent 5d9a699 commit d005bba
Show file tree
Hide file tree
Showing 7 changed files with 387 additions and 69 deletions.
6 changes: 3 additions & 3 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -1235,11 +1235,11 @@ the Delta Lake connector.
* - Property name
- Description
- Default
* - `delta.fs.cache.delta-log-caching-enabled`
- Set to `false` to disable caching of the `_delta_log` directory of
* - `delta.fs.cache.disable-transaction-log-caching`
- Set to `true` to disable caching of the `_delta_log` directory of
Delta Tables. This is useful in those cases when Delta Tables are
destroyed and recreated, and the files inside the transaction log
directory get overwritten and cannot be safely cached. Effective
only when `fs.cache.enabled=true`.
- `true`
- `false`
:::
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class DeltaLakeConfig
private boolean projectionPushdownEnabled = true;
private boolean queryPartitionFilterRequired;
private boolean deletionVectorsEnabled;
private boolean deltaLogFileSystemCacheEnabled = true;
private boolean deltaLogFileSystemCacheDisabled;

public Duration getMetadataCacheTtl()
{
Expand Down Expand Up @@ -534,16 +534,16 @@ public DeltaLakeConfig setDeletionVectorsEnabled(boolean deletionVectorsEnabled)
return this;
}

public boolean isDeltaLogFileSystemCacheEnabled()
public boolean isDeltaLogFileSystemCacheDisabled()
{
return deltaLogFileSystemCacheEnabled;
return deltaLogFileSystemCacheDisabled;
}

@Config("delta.fs.cache.delta-log-caching-enable")
@ConfigDescription("Enable object storage caching of the _delta_log directory (effective only when fs.cache.enabled=true)")
public DeltaLakeConfig setDeltaLogFileSystemCacheEnabled(boolean deltaLogFileSystemCacheEnabled)
@Config("delta.fs.cache.disable-transaction-log-caching")
@ConfigDescription("Disable filesystem caching of the _delta_log directory (effective only when fs.cache.enabled=true)")
public DeltaLakeConfig setDeltaLogFileSystemCacheDisabled(boolean deltaLogFileSystemCacheDisabled)
{
this.deltaLogFileSystemCacheEnabled = deltaLogFileSystemCacheEnabled;
this.deltaLogFileSystemCacheDisabled = deltaLogFileSystemCacheDisabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.trino.plugin.base.security.ConnectorAccessControlModule;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.deltalake.cache.DeltaLakeCacheKeyProvider;
import io.trino.plugin.deltalake.cache.MutableDeltaLogDeltaLakeCacheKeyProvider;
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunctionProvider;
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesProcessorProvider;
import io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler;
Expand Down Expand Up @@ -152,12 +151,7 @@ public void setup(Binder binder)
binder.bind(FunctionProvider.class).to(DeltaLakeFunctionProvider.class).in(Scopes.SINGLETON);
binder.bind(TableChangesProcessorProvider.class).in(Scopes.SINGLETON);

if (buildConfigObject(DeltaLakeConfig.class).isDeltaLogFileSystemCacheEnabled()) {
newOptionalBinder(binder, CacheKeyProvider.class).setBinding().to(DeltaLakeCacheKeyProvider.class).in(Scopes.SINGLETON);
}
else {
newOptionalBinder(binder, CacheKeyProvider.class).setBinding().to(MutableDeltaLogDeltaLakeCacheKeyProvider.class).in(Scopes.SINGLETON);
}
newOptionalBinder(binder, CacheKeyProvider.class).setBinding().to(DeltaLakeCacheKeyProvider.class).in(Scopes.SINGLETON);

closingBinder(binder).registerExecutor(ExecutorService.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,42 @@
*/
package io.trino.plugin.deltalake.cache;

import com.google.inject.Inject;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.cache.CacheKeyProvider;
import io.trino.plugin.deltalake.DeltaLakeConfig;

import java.util.Optional;

import static io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.STARBURST_META_DIR;
import static io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.STATISTICS_META_DIR;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;

public class DeltaLakeCacheKeyProvider
implements CacheKeyProvider
{
private final boolean deltaLogFileSystemCacheDisabled;

@Inject
public DeltaLakeCacheKeyProvider(DeltaLakeConfig deltaLakeConfig)
{
// Disabling the delta log folder caching is useful in those scenarios when Delta Tables are deleted and re-created,
// and caching their _delta_log directories should be avoided.
deltaLogFileSystemCacheDisabled = deltaLakeConfig.isDeltaLogFileSystemCacheDisabled();
}

/**
* Get the cache key of a TrinoInputFile. Returns Optional.empty() if the file is not cacheable.
*/
@Override
public Optional<String> getCacheKey(TrinoInputFile inputFile)
{
String path = inputFile.location().path();
// Explicitly exclude the files in the _delta_log directory when deltaLogFileSystemCacheDisabled is set to true,
// as they can change when the Delta Table is overwritten, https://github.com/trinodb/trino/issues/21451
if (this.deltaLogFileSystemCacheDisabled && path.contains("/" + TRANSACTION_LOG_DIRECTORY + "/")) {
return Optional.empty();
}
if (path.endsWith(".trinoSchema") || path.contains("/.trinoPermissions/")) {
// Needed to avoid caching files from FileHiveMetastore on coordinator during tests
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ protected DistributedQueryRunner createQueryRunner()
.build();
}

private URL getResourceLocation(String resourcePath)
protected URL getResourceLocation(String resourcePath)
{
return getClass().getClassLoader().getResource(resourcePath);
}

private void registerTable(String name, String resourcePath)
protected void registerTable(String name, String resourcePath)
{
String dataPath = getResourceLocation(resourcePath).toExternalForm();
getQueryRunner().execute(format("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')", name, dataPath));
Expand Down Expand Up @@ -545,7 +545,7 @@ public void testCreateOrReplaceTableAsSelect()
assertUpdate("DROP TABLE test_create_or_replace_as_select");
}

private void assertFileSystemAccesses(@Language("SQL") String query, Multiset<CacheOperation> expectedCacheAccesses)
protected void assertFileSystemAccesses(@Language("SQL") String query, Multiset<CacheOperation> expectedCacheAccesses)
{
assertUpdate("CALL system.flush_metadata_cache()");
DistributedQueryRunner queryRunner = getDistributedQueryRunner();
Expand Down
Loading

0 comments on commit d005bba

Please sign in to comment.