From cef273d1692fbc81c96f854e7953b3f5e68c7c0f Mon Sep 17 00:00:00 2001 From: mikereiche Date: Mon, 9 Oct 2023 17:53:04 -0700 Subject: [PATCH] Fix ThreadLocal Issue with Repository Save. The issue was introduced when the Mono.deferContextual() was added to determine if the save() is in a transaction. It may be executing in a different thread when the PseudoArgs (scope, collection, and options) are retrieved ThreadLocal. This change ensures scope and collection are retrieved, but options are ignored and discarded. Closes #1838. --- .../core/ReactiveCouchbaseTemplate.java | 10 +------- .../support/CouchbaseRepositoryBase.java | 23 ++++++++++++++----- .../support/SimpleCouchbaseRepository.java | 17 ++++++++++---- .../SimpleReactiveCouchbaseRepository.java | 17 ++++++++++---- .../ReactiveAirportMustScopeRepository.java | 9 ++++++++ ...sitoryQueryCollectionIntegrationTests.java | 21 +++++++++++++++++ 6 files changed, 74 insertions(+), 23 deletions(-) create mode 100644 src/test/java/org/springframework/data/couchbase/domain/ReactiveAirportMustScopeRepository.java diff --git a/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java b/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java index 8190c93c6..74ddb3f08 100644 --- a/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java +++ b/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java @@ -53,7 +53,7 @@ public class ReactiveCouchbaseTemplate implements ReactiveCouchbaseOperations, A private final CouchbaseConverter converter; private final PersistenceExceptionTranslator exceptionTranslator; private final ReactiveCouchbaseTemplateSupport templateSupport; - private ThreadLocal> threadLocalArgs = new ThreadLocal<>(); + private final ThreadLocal> threadLocalArgs = new ThreadLocal<>(); private final QueryScanConsistency scanConsistency; public ReactiveCouchbaseTemplate(final CouchbaseClientFactory clientFactory, final CouchbaseConverter converter) { @@ -257,14 +257,6 @@ public PseudoArgs getPseudoArgs() { * set the ThreadLocal field */ public void setPseudoArgs(PseudoArgs threadLocalArgs) { - if (this.threadLocalArgs == null) { - synchronized (this) { - if (this.threadLocalArgs == null) { - this.threadLocalArgs = new ThreadLocal<>(); - } - } - } - this.threadLocalArgs.set(threadLocalArgs); } diff --git a/src/main/java/org/springframework/data/couchbase/repository/support/CouchbaseRepositoryBase.java b/src/main/java/org/springframework/data/couchbase/repository/support/CouchbaseRepositoryBase.java index f93fde619..79bacb2b5 100644 --- a/src/main/java/org/springframework/data/couchbase/repository/support/CouchbaseRepositoryBase.java +++ b/src/main/java/org/springframework/data/couchbase/repository/support/CouchbaseRepositoryBase.java @@ -18,7 +18,9 @@ import java.lang.reflect.AnnotatedElement; +import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate; import org.springframework.data.couchbase.core.query.OptionsBuilder; +import org.springframework.data.couchbase.core.support.PseudoArgs; import org.springframework.data.couchbase.repository.Collection; import org.springframework.data.couchbase.repository.ScanConsistency; import org.springframework.data.couchbase.repository.Scope; @@ -35,7 +37,7 @@ * * @author Michael Reiche */ -public class CouchbaseRepositoryBase { +public abstract class CouchbaseRepositoryBase { /** * Contains information about the entity being used in this repository. @@ -82,9 +84,11 @@ String getId(S entity) { protected String getScope() { String fromAnnotation = OptionsBuilder.annotationString(Scope.class, CollectionIdentifier.DEFAULT_SCOPE, - new AnnotatedElement[] { getJavaType(), repositoryInterface }); + new AnnotatedElement[] { getJavaType(), getRepositoryInterface() }); String fromMetadata = crudMethodMetadata != null ? crudMethodMetadata.getScope() : null; - return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_SCOPE, fromMetadata, fromAnnotation); + PseudoArgs pa = getReactiveTemplate().getPseudoArgs(); + String fromThreadLocal = pa != null ? pa.getScope() : null; + return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_SCOPE, fromThreadLocal, fromMetadata, fromAnnotation); } /** @@ -96,12 +100,18 @@ protected String getScope() { * 1. repository.withCollection() */ protected String getCollection() { - String fromAnnotation = OptionsBuilder.annotationString(Collection.class, CollectionIdentifier.DEFAULT_COLLECTION, - new AnnotatedElement[] { getJavaType(), repositoryInterface }); + String fromAnnotation = OptionsBuilder.annotationString(Collection.class, + CollectionIdentifier.DEFAULT_COLLECTION, + new AnnotatedElement[] { getJavaType(), getRepositoryInterface() }); String fromMetadata = crudMethodMetadata != null ? crudMethodMetadata.getCollection() : null; - return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_COLLECTION, fromMetadata, fromAnnotation); + PseudoArgs pa = getReactiveTemplate().getPseudoArgs(); + String fromThreadLocal = pa != null ? pa.getCollection() : null; + return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_COLLECTION, fromThreadLocal, fromMetadata, + fromAnnotation); } + protected abstract ReactiveCouchbaseTemplate getReactiveTemplate(); + /** * Get the QueryScanConsistency from
* 1. The method annotation (method *could* be available from crudMethodMetadata)
@@ -132,4 +142,5 @@ QueryScanConsistency getQueryScanConsistency() { void setRepositoryMethodMetadata(CrudMethodMetadata crudMethodMetadata) { this.crudMethodMetadata = crudMethodMetadata; } + } diff --git a/src/main/java/org/springframework/data/couchbase/repository/support/SimpleCouchbaseRepository.java b/src/main/java/org/springframework/data/couchbase/repository/support/SimpleCouchbaseRepository.java index b8c3b89a8..70e6590cb 100644 --- a/src/main/java/org/springframework/data/couchbase/repository/support/SimpleCouchbaseRepository.java +++ b/src/main/java/org/springframework/data/couchbase/repository/support/SimpleCouchbaseRepository.java @@ -23,8 +23,8 @@ import java.util.stream.Collectors; import org.springframework.data.couchbase.core.CouchbaseOperations; -import org.springframework.data.couchbase.core.mapping.CouchbasePersistentEntity; -import org.springframework.data.couchbase.core.mapping.CouchbasePersistentProperty; +import org.springframework.data.couchbase.core.CouchbaseTemplate; +import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate; import org.springframework.data.couchbase.core.query.Query; import org.springframework.data.couchbase.repository.CouchbaseRepository; import org.springframework.data.couchbase.repository.query.CouchbaseEntityInformation; @@ -37,7 +37,6 @@ import org.springframework.util.Assert; import com.couchbase.client.java.query.QueryScanConsistency; -import org.springframework.util.ReflectionUtils; /** * Repository base implementation for Couchbase. @@ -71,7 +70,13 @@ public SimpleCouchbaseRepository(CouchbaseEntityInformation entityInf @Override @SuppressWarnings("unchecked") public S save(S entity) { - return operations.save(entity, getScope(), getCollection()); + String scopeName = getScope(); + String collectionName = getCollection(); + // clear out the PseudoArgs here as whatever is called by operations.save() could be in a different thread. + // not that this will also clear out Options, but that's ok as any options would not work + // with all of insert/upsert/replace. If Options are needed, use template.insertById/upsertById/replaceById + getReactiveTemplate().setPseudoArgs(null); + return operations.save(entity, scopeName, collectionName); } @Override @@ -177,4 +182,8 @@ public CouchbaseOperations getOperations() { return operations; } + @Override + protected ReactiveCouchbaseTemplate getReactiveTemplate() { + return ((CouchbaseTemplate) getOperations()).reactive(); + } } diff --git a/src/main/java/org/springframework/data/couchbase/repository/support/SimpleReactiveCouchbaseRepository.java b/src/main/java/org/springframework/data/couchbase/repository/support/SimpleReactiveCouchbaseRepository.java index 658843563..3938515e5 100644 --- a/src/main/java/org/springframework/data/couchbase/repository/support/SimpleReactiveCouchbaseRepository.java +++ b/src/main/java/org/springframework/data/couchbase/repository/support/SimpleReactiveCouchbaseRepository.java @@ -26,15 +26,13 @@ import org.reactivestreams.Publisher; import org.springframework.data.couchbase.core.CouchbaseOperations; import org.springframework.data.couchbase.core.ReactiveCouchbaseOperations; -import org.springframework.data.couchbase.core.mapping.CouchbasePersistentEntity; -import org.springframework.data.couchbase.core.mapping.CouchbasePersistentProperty; +import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate; import org.springframework.data.couchbase.core.query.Query; import org.springframework.data.couchbase.repository.ReactiveCouchbaseRepository; import org.springframework.data.couchbase.repository.query.CouchbaseEntityInformation; import org.springframework.data.domain.Sort; import org.springframework.data.util.Streamable; import org.springframework.util.Assert; -import org.springframework.util.ReflectionUtils; /** * Reactive repository base implementation for Couchbase. @@ -76,7 +74,13 @@ public Flux findAll(Sort sort) { @SuppressWarnings("unchecked") @Override public Mono save(S entity) { - return save(entity, getScope(), getCollection()); + String scopeName = getScope(); + String collectionName = getCollection(); + // clear out the PseudoArgs here as whatever is called by operations.save() could be in a different thread. + // not that this will also clear out Options, but that's ok as any options would not work + // with all of insert/upsert/replace. If Options are needed, use template.insertById/upsertById/replaceById + getReactiveTemplate().setPseudoArgs(null); + return operations.save(entity, scopeName, collectionName); } @Override @@ -227,4 +231,9 @@ public ReactiveCouchbaseOperations getOperations() { return operations; } + @Override + protected ReactiveCouchbaseTemplate getReactiveTemplate() { + return (ReactiveCouchbaseTemplate) getOperations(); + } + } diff --git a/src/test/java/org/springframework/data/couchbase/domain/ReactiveAirportMustScopeRepository.java b/src/test/java/org/springframework/data/couchbase/domain/ReactiveAirportMustScopeRepository.java new file mode 100644 index 000000000..225c4d171 --- /dev/null +++ b/src/test/java/org/springframework/data/couchbase/domain/ReactiveAirportMustScopeRepository.java @@ -0,0 +1,9 @@ +package org.springframework.data.couchbase.domain; + +import org.springframework.data.couchbase.repository.Collection; +import org.springframework.data.couchbase.repository.Scope; + +@Scope("must set scope name") +@Collection("my_collection") +public interface ReactiveAirportMustScopeRepository extends ReactiveAirportRepository { +} diff --git a/src/test/java/org/springframework/data/couchbase/repository/query/ReactiveCouchbaseRepositoryQueryCollectionIntegrationTests.java b/src/test/java/org/springframework/data/couchbase/repository/query/ReactiveCouchbaseRepositoryQueryCollectionIntegrationTests.java index 44faad383..afd77875c 100644 --- a/src/test/java/org/springframework/data/couchbase/repository/query/ReactiveCouchbaseRepositoryQueryCollectionIntegrationTests.java +++ b/src/test/java/org/springframework/data/couchbase/repository/query/ReactiveCouchbaseRepositoryQueryCollectionIntegrationTests.java @@ -18,7 +18,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import reactor.core.Disposable; + import java.util.List; +import java.util.Random; +import java.util.UUID; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -31,6 +35,7 @@ import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate; import org.springframework.data.couchbase.domain.Airport; import org.springframework.data.couchbase.domain.ConfigScoped; +import org.springframework.data.couchbase.domain.ReactiveAirportMustScopeRepository; import org.springframework.data.couchbase.domain.ReactiveAirportRepository; import org.springframework.data.couchbase.domain.ReactiveAirportRepositoryAnnotated; import org.springframework.data.couchbase.domain.ReactiveUserColRepository; @@ -61,6 +66,7 @@ public class ReactiveCouchbaseRepositoryQueryCollectionIntegrationTests extends @Autowired ReactiveAirportRepository reactiveAirportRepository; @Autowired ReactiveAirportRepositoryAnnotated reactiveAirportRepositoryAnnotated; + @Autowired ReactiveAirportMustScopeRepository reactiveAirportMustScopeRepository; @Autowired ReactiveUserColRepository userColRepository; @Autowired public CouchbaseTemplate couchbaseTemplate; @Autowired public ReactiveCouchbaseTemplate reactiveCouchbaseTemplate; @@ -116,6 +122,21 @@ public void myTest() { } + @Test + void testThreadLocal() throws InterruptedException { + + String scopeName = "my_scope"; + String id = UUID.randomUUID().toString(); + + Airport airport = new Airport(id, "testThreadLocal", "icao"); + Disposable s = reactiveAirportMustScopeRepository.withScope(scopeName).findById(airport.getId()).doOnNext(u -> { + throw new RuntimeException("User already Exists! " + u); + }).then(reactiveAirportMustScopeRepository.withScope(scopeName).save(airport)) + .subscribe(u -> LOGGER.info("User Persisted Successfully! {}", u)); + + reactiveAirportMustScopeRepository.withScope(scopeName).deleteById(id).block(); + } + /** * can test against _default._default without setting up additional scope/collection and also test for collections and * scopes that do not exist These same tests should be repeated on non-default scope and collection in a test that