Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cached delta log gets corrupted when dropping and recreating delta table with Trino #21451

Closed
sdaberdaku opened this issue Apr 8, 2024 · 15 comments · Fixed by #23408
Closed
Labels
bug Something isn't working delta-lake Delta Lake connector

Comments

@sdaberdaku
Copy link
Member

sdaberdaku commented Apr 8, 2024

In Trino 444 with Alluxio cache enabled, when dropping and then recreating a delta table, I occasionally get the following error:

io.trino.spi.TrinoException: Error getting snapshot for my_schema.my_table
	at io.trino.plugin.deltalake.DeltaLakeMetadata.getSnapshot(DeltaLakeMetadata.java:485)
	at io.trino.plugin.deltalake.DeltaLakeMetadata.getTableHandle(DeltaLakeMetadata.java:547)
	at io.trino.plugin.deltalake.DeltaLakeMetadata.getTableHandle(DeltaLakeMetadata.java:319)
	at io.trino.spi.connector.ConnectorMetadata.getTableHandle(ConnectorMetadata.java:141)
	at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.getTableHandle(ClassLoaderSafeConnectorMetadata.java:1243)
	at io.trino.tracing.TracingConnectorMetadata.getTableHandle(TracingConnectorMetadata.java:149)
	at io.trino.metadata.MetadataManager.lambda$getTableHandle$5(MetadataManager.java:283)
	at java.base/java.util.Optional.flatMap(Optional.java:289)
	at io.trino.metadata.MetadataManager.getTableHandle(MetadataManager.java:277)
	at io.trino.metadata.MetadataManager.getRedirectionAwareTableHandle(MetadataManager.java:1884)
	at io.trino.metadata.MetadataManager.getRedirectionAwareTableHandle(MetadataManager.java:1876)
	at io.trino.tracing.TracingMetadata.getRedirectionAwareTableHandle(TracingMetadata.java:1475)
	at io.trino.sql.analyzer.StatementAnalyzer$Visitor.getTableHandle(StatementAnalyzer.java:5843)
	at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitTable(StatementAnalyzer.java:2288)
	at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitTable(StatementAnalyzer.java:522)
	at io.trino.sql.tree.Table.accept(Table.java:60)
	at io.trino.sql.tree.AstVisitor.process(AstVisitor.java:27)
	at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:541)
	at io.trino.sql.analyzer.StatementAnalyzer$Visitor.analyzeFrom(StatementAnalyzer.java:4892)
	at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuerySpecification(StatementAnalyzer.java:3085)
	at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuerySpecification(StatementAnalyzer.java:522)
	at io.trino.sql.tree.QuerySpecification.accept(QuerySpecification.java:155)
	at io.trino.sql.tree.AstVisitor.process(AstVisitor.java:27)
	at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:541)
	at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:549)
	at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuery(StatementAnalyzer.java:1566)
	at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuery(StatementAnalyzer.java:522)
	at io.trino.sql.tree.Query.accept(Query.java:118)
	at io.trino.sql.tree.AstVisitor.process(AstVisitor.java:27)
	at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:541)
	at io.trino.sql.analyzer.StatementAnalyzer.analyze(StatementAnalyzer.java:501)
	at io.trino.sql.analyzer.StatementAnalyzer.analyze(StatementAnalyzer.java:490)
	at io.trino.sql.analyzer.Analyzer.analyze(Analyzer.java:97)
	at io.trino.sql.analyzer.Analyzer.analyze(Analyzer.java:86)
	at io.trino.execution.SqlQueryExecution.analyze(SqlQueryExecution.java:285)
	at io.trino.execution.SqlQueryExecution.<init>(SqlQueryExecution.java:218)
	at io.trino.execution.SqlQueryExecution$SqlQueryExecutionFactory.createQueryExecution(SqlQueryExecution.java:886)
	at io.trino.dispatcher.LocalDispatchQueryFactory.lambda$createDispatchQuery$0(LocalDispatchQueryFactory.java:153)
	at io.trino.$gen.Trino_444____20240405_162025_2.call(Unknown Source)
	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.io.UncheckedIOException: Could not parse JSON
	at io.trino.plugin.base.util.JsonUtils.parseJson(JsonUtils.java:120)
	at io.trino.plugin.base.util.JsonUtils.parseJson(JsonUtils.java:75)
	at io.trino.plugin.deltalake.transactionlog.TransactionLogParser.parseJson(TransactionLogParser.java:145)
	at io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson(TransactionLogTail.java:126)
	at io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson(TransactionLogTail.java:114)
	at io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.loadNewTail(TransactionLogTail.java:72)
	at io.trino.plugin.deltalake.transactionlog.TableSnapshot.load(TableSnapshot.java:100)
	at io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.lambda$loadSnapshot$1(TransactionLogAccess.java:166)
	at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4938)
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3576)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2318)
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2191)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2081)
	at com.google.common.cache.LocalCache.get(LocalCache.java:4019)
	at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4933)
	at io.trino.cache.EvictableCache.get(EvictableCache.java:112)
	at io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.loadSnapshot(TransactionLogAccess.java:165)
	at io.trino.plugin.deltalake.DeltaLakeMetadata.getSnapshot(DeltaLakeMetadata.java:478)
	... 44 more
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected end-of-input: expected close marker for Object (start marker at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 583])
 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 584] (through reference chain: io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry["add"]->io.trino.plugin.deltalake.transactionlog.AddFileEntry["tags"])
	at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:402)
	at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:361)
	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.wrapAndThrow(BeanDeserializerBase.java:1937)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:572)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:440)
	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1493)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:348)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:185)
	at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:545)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:570)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:440)
	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1493)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:348)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:185)
	at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:342)
	at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4881)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3035)
	at io.trino.plugin.base.util.JsonUtils.parseJson(JsonUtils.java:115)
	... 61 more
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: expected close marker for Object (start marker at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 583])
 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 584]
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:585)
	at com.fasterxml.jackson.core.base.ParserBase._handleEOF(ParserBase.java:535)
	at com.fasterxml.jackson.core.base.ParserBase._eofAsNextChar(ParserBase.java:552)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2491)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextFieldName(ReaderBasedJsonParser.java:913)
	at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:596)
	at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:449)
	at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:32)
	at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:545)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:570)
	... 75 more

Unfortunately the error is not easy to replicate, I can drop and recreate the same table without issues multiple times.

The actual delta log files in the object storage are not corrupted, they can be read by Spark and by Trino if the coordinator is restarted.

@ebyhr
Copy link
Member

ebyhr commented Apr 8, 2024

cc: @findinpath @wendigo @raunaqmorarka

@ebyhr ebyhr added bug Something isn't working delta-lake Delta Lake connector labels Apr 8, 2024
@sdaberdaku
Copy link
Member Author

sdaberdaku commented Apr 24, 2024

The problem persists also in Trino 445.
We have two use cases where this issue emerges:
In the first case we overwrite a Delta Table with Spark and Trino cannot read it any more. The issue is resolved if we restart the coordinator.
In the second case we overwrite a Delta Table with DBT through Trino itself. Again, by restarting the coordinator the tables become readable.

@jojovem
Copy link

jojovem commented Jul 31, 2024

Hi all.

I´m having the same issue. Everytime i get these errors i just delete the cache data directly in the disk and it solves my problem.

My stack is: Trino (450) + DeltaLake Connector + AWS Glue

cache configuration for the catalog is pretty straighforward:

fs.cache.enabled=true
fs.cache.directories=/tmp/trino-cache/catalog
fs.cache.max-sizes=490GB

When i get the snapshot errors now and then, i just do a
rm -rf /tmp/trino-cache/catalog
from within the coordinator pod and it works again
The disk is an EBS volume

@raunaqmorarka
Copy link
Member

cc: @jkylling @Pluies

@jkylling
Copy link
Contributor

jkylling commented Jul 31, 2024

Use cases where we can no longer assume that the commit and checkpoint files within the Delta log are immutable should likely just disable the cache on the coordinator. Having mutable commit and checkpoint files is not really addressed by the Delta protocol, and makes any concurrent access of Delta tables brittle. So it's better to avoid this, and use a unique locations every time a table is created.

@sdaberdaku
Copy link
Member Author

Hello @jkylling,

I am definitely using the same unique and deterministic location for a given delta table (which simply depends on the delta table name). Maybe I misunderstood your response, could you please elaborate a bit more?
Also, is it possible to disable caching only on the coordinator and keep it enabled on workers?

@jkylling
Copy link
Contributor

Hello @jkylling,

I am definitely using the same unique and deterministic location for a given delta table (which simply depends on the delta table name). Maybe I misunderstood your response, could you please elaborate a bit more? Also, is it possible to disable caching only on the coordinator and keep it enabled on workers?

Sorry, I might be confusing this with another issue. Are you able to share a bit more about your environment? Which object store are you using? You mention that this happens when overwriting a table. What operation do you run on Spark or Trino when you do an overwrite? Can it ever happen that the content of a commit at path/to/my/table/_delta_log/00...00.json is different if I check it at different times?

Unfortunately, it looks like disabling the cache on the coordinator only is no longer possible.

@sdaberdaku
Copy link
Member Author

sdaberdaku commented Jul 31, 2024

I am running Trino in AWS EKS using EC2 nodes with NVMe support where I am mounting the cache folders (ec2 nodes are provisioned by Karpenter, and the NVMe disks are automatically mounted as RAID0 ephemeral storage).

The object store is S3 and I am using Glue as a Data Catalog.

With Spark I overwite the tables like so:

(df
 .write
 .format("delta")
 .mode("overwrite")
 .option("overwriteSchema", "true")
 .save(path))

Regarding the the content of a commit I really have no idea.

@jojovem
Copy link

jojovem commented Jul 31, 2024

@jkylling It seems that the stack @sdaberdaku is using is pretty similar to mine.
In my case, this problem usually happens when i delete the entire s3 bucket (including the _delta_log) to start over with a new table. I also delete the table on aws glue and re-run the catalog crawler.

It seems the cache seems to think the previous created table is still valid, so it seems to keep searching for the snapshot that do not exist anymore. Maybe it should flush the cache when this kind of error happens?

I also tried with:
CALL system.flush_metadata_cache();

But it doesnt work, i have to manually delete the cache to make it work again.

@jkylling
Copy link
Contributor

Discussed this issue with @raunaqmorarka and we will add a configuration option to disable caching of files with /_delta_log/ in their path to avoid issues with Delta tables with mutable commits.

That said, having mutable files within /_delta_log/ is an anti-pattern which is likely to bite you in other subtle ways. It's better to avoid it, for instance by adding a UUID to the location of every table, or by deleting a table while preserving its history and data.

@sdaberdaku
Copy link
Member Author

Hello @jkylling,
Any news on the configuration option to disable delta log file caching?

Thanks,

Sebastian

@jkylling
Copy link
Contributor

jkylling commented Sep 10, 2024

Hi @sdaberdaku,

No one has had time to look into this yet. If you want to contribute this yourself I'd be happy to give some guidance.

Basically, a new https://github.com/trinodb/trino/blob/c8568a9ccfcf2876ef441588a6040270d82f95b2/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/cache/DeltaLakeCacheKeyProvider.java must be added which returns an empty cache key for any file within /_delta_log/. Enabling of this CacheKeyProvider would be controlled by some new configuration option defined within

, and used to select the provider at
newOptionalBinder(binder, CacheKeyProvider.class).setBinding().to(DeltaLakeCacheKeyProvider.class).in(Scopes.SINGLETON);

@Pluies
Copy link
Contributor

Pluies commented Sep 12, 2024

Alternatively, you can reuse code from closed PR #21131 and use skip-paths to skip over file paths containing /_delta_log 👍

@sdaberdaku
Copy link
Member Author

Thank you @jkylling and @Pluies for the valuable suggestions!
I will look into this and try my best to submit a PR!

@sdaberdaku
Copy link
Member Author

Hello @jkylling,

I implemented your suggestions and submitted a PR (I also submitted the cla).
Would you be willing to be a reviewer?

Best,

Sebastian

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working delta-lake Delta Lake connector
Development

Successfully merging a pull request may close this issue.

6 participants