Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaky TestDeltaLakeLocalConcurrentWritesTest.testConcurrentInsertsSelectingFromTheSameVersionedTable #21725

Open
ebyhr opened this issue Apr 26, 2024 · 6 comments
Assignees

Comments

@ebyhr
Copy link
Member

ebyhr commented Apr 26, 2024

Error:  io.trino.plugin.deltalake.TestDeltaLakeLocalConcurrentWritesTest.testConcurrentInsertsSelectingFromTheSameVersionedTable -- Time elapsed: 3.660 s <<< ERROR!
io.trino.testing.QueryFailedException: Failed to write Delta Lake transaction log entry
	at io.trino.testing.AbstractTestingTrinoClient.execute(AbstractTestingTrinoClient.java:133)
	at io.trino.testing.DistributedQueryRunner.executeInternal(DistributedQueryRunner.java:501)
	at io.trino.testing.DistributedQueryRunner.execute(DistributedQueryRunner.java:484)
	at io.trino.testing.QueryRunner.execute(QueryRunner.java:82)
	at io.trino.plugin.deltalake.TestDeltaLakeLocalConcurrentWritesTest.lambda$testConcurrentInsertsSelectingFromTheSameVersionedTable$9(TestDeltaLakeLocalConcurrentWritesTest.java:230)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1570)
	Suppressed: java.lang.Exception: SQL: INSERT INTO test_concurrent_inserts_select_from_same_versioned_table_gu3yeqy900 SELECT 2, 'c' AS part FROM test_concurrent_inserts_select_from_same_versioned_table_gu3yeqy900 FOR VERSION AS OF 0
		at io.trino.testing.DistributedQueryRunner.executeInternal(DistributedQueryRunner.java:508)
		... 7 more
Caused by: io.trino.spi.TrinoException: Failed to write Delta Lake transaction log entry
	at io.trino.plugin.deltalake.DeltaLakeMetadata.finishInsert(DeltaLakeMetadata.java:2028)
	at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.finishInsert(ClassLoaderSafeConnectorMetadata.java:633)
	at io.trino.tracing.TracingConnectorMetadata.finishInsert(TracingConnectorMetadata.java:718)
	at io.trino.metadata.MetadataManager.finishInsert(MetadataManager.java:1169)
	at io.trino.tracing.TracingMetadata.finishInsert(TracingMetadata.java:706)
	at io.trino.sql.planner.LocalExecutionPlanner.lambda$createTableFinisher$4(LocalExecutionPlanner.java:4113)
	at io.trino.operator.TableFinishOperator.getOutput(TableFinishOperator.java:319)
	at io.trino.operator.Driver.processInternal(Driver.java:403)
	at io.trino.operator.Driver.lambda$process$8(Driver.java:306)
	at io.trino.operator.Driver.tryWithLock(Driver.java:709)
	at io.trino.operator.Driver.process(Driver.java:298)
	at io.trino.operator.Driver.processForDuration(Driver.java:269)
	at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:890)
	at io.trino.execution.executor.dedicated.SplitProcessor.run(SplitProcessor.java:76)
	at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.lambda$run$0(TaskEntry.java:191)
	at io.trino.$gen.Trino_testversion____20240426_081326_177.run(Unknown Source)
	at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.run(TaskEntry.java:192)
	at io.trino.execution.executor.scheduler.FairScheduler.runTask(FairScheduler.java:174)
	at io.trino.execution.executor.scheduler.FairScheduler.lambda$submit$0(FairScheduler.java:161)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: io.trino.spi.TrinoException: Error reading statistics from cache
	at io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess.readExtendedStatistics(CachingExtendedStatisticsAccess.java:67)
	at io.trino.plugin.deltalake.DeltaLakeMetadata.updateTableStatistics(DeltaLakeMetadata.java:3592)
	at io.trino.plugin.deltalake.DeltaLakeMetadata.finishInsert(DeltaLakeMetadata.java:2010)
	... 25 more
Caused by: java.lang.IllegalArgumentException: Invalid JSON bytes for [simple type, class io.trino.plugin.deltalake.statistics.ExtendedStatistics]
	at io.airlift.json.JsonCodec.fromJson(JsonCodec.java:196)
	at io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.readExtendedStatistics(MetaDirStatisticsAccess.java:75)
	at io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.readExtendedStatistics(MetaDirStatisticsAccess.java:65)
	at io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess.lambda$readExtendedStatistics$0(CachingExtendedStatisticsAccess.java:63)
	at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4938)
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3576)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2318)
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:[219](https://github.com/trinodb/trino/actions/runs/8845096149/job/24288361644#step:5:220)1)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2081)
	at com.google.common.cache.LocalCache.get(LocalCache.java:4019)
	at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4933)
	at io.trino.cache.EvictableCache.get(EvictableCache.java:112)
	at io.trino.cache.CacheUtils.uncheckedCacheGet(CacheUtils.java:37)
	at io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess.readExtendedStatistics(CachingExtendedStatisticsAccess.java:63)
	... 27 more
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1]
	at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
	at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1767)
	at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:360)
	at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2115)
	at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1603)
	at io.airlift.json.JsonCodec.fromJson(JsonCodec.java:193)
	... 40 more

https://github.com/trinodb/trino/actions/runs/8845096149/job/24288361644

@findinpath
Copy link
Contributor

findinpath commented Apr 26, 2024

The following stacktrace points out that the JSON content read is corrupted:

Caused by: java.lang.IllegalArgumentException: Invalid JSON bytes for [simple type, class io.trino.plugin.deltalake.statistics.ExtendedStatistics]
	at io.airlift.json.JsonCodec.fromJson(JsonCodec.java:196)
	at io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.readExtendedStatistics(MetaDirStatisticsAccess.java:75)
	at io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.readExtendedStatistics(MetaDirStatisticsAccess.java:65)
	at io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess.lambda$readExtendedStatistics$0(CachingExtendedStatisticsAccess.java:63)

The issue reported there seems to be related with the way that LocalOutputFile writes content to the persistence:

OutputStream stream = Files.newOutputStream(path);
try (OutputStream out = new LocalOutputStream(location, stream)) {
out.write(data);
}

The overwriting of the content is not done in an atomical manner.
We need to rethink the implementation of createOrOverwrite() method from LocalOutputFile used to interact in tests with the local file storage.

cc @electrum

@ebyhr
Copy link
Member Author

ebyhr commented May 8, 2024

@electrum
Copy link
Member

electrum commented May 8, 2024

We could change this to write to a temporary file and then rename. We could name the temporary file .tmp.$RANDOM.$ORIGINAL

@ebyhr
Copy link
Member Author

ebyhr commented Jun 3, 2024

Reoccured on master https://github.com/trinodb/trino/actions/runs/9357870915/job/25758647303

@findinpath Do you have any updates?

@ebyhr
Copy link
Member Author

ebyhr commented Sep 19, 2024

@ebyhr
Copy link
Member Author

ebyhr commented Sep 22, 2024

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

3 participants