Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ThreadLocal Issue with Repository Save. #1840

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class ReactiveCouchbaseTemplate implements ReactiveCouchbaseOperations, A
private final CouchbaseConverter converter;
private final PersistenceExceptionTranslator exceptionTranslator;
private final ReactiveCouchbaseTemplateSupport templateSupport;
private ThreadLocal<PseudoArgs<?>> threadLocalArgs = new ThreadLocal<>();
private final ThreadLocal<PseudoArgs<?>> threadLocalArgs = new ThreadLocal<>();
private final QueryScanConsistency scanConsistency;

public ReactiveCouchbaseTemplate(final CouchbaseClientFactory clientFactory, final CouchbaseConverter converter) {
Expand Down Expand Up @@ -257,14 +257,6 @@ public PseudoArgs<?> getPseudoArgs() {
* set the ThreadLocal field
*/
public void setPseudoArgs(PseudoArgs<?> threadLocalArgs) {
if (this.threadLocalArgs == null) {
synchronized (this) {
if (this.threadLocalArgs == null) {
this.threadLocalArgs = new ThreadLocal<>();
}
}
}

this.threadLocalArgs.set(threadLocalArgs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.lang.reflect.AnnotatedElement;

import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
import org.springframework.data.couchbase.core.query.OptionsBuilder;
import org.springframework.data.couchbase.core.support.PseudoArgs;
import org.springframework.data.couchbase.repository.Collection;
import org.springframework.data.couchbase.repository.ScanConsistency;
import org.springframework.data.couchbase.repository.Scope;
Expand All @@ -35,7 +37,7 @@
*
* @author Michael Reiche
*/
public class CouchbaseRepositoryBase<T, ID> {
public abstract class CouchbaseRepositoryBase<T, ID> {

/**
* Contains information about the entity being used in this repository.
Expand Down Expand Up @@ -82,9 +84,11 @@ <S extends T> String getId(S entity) {

protected String getScope() {
String fromAnnotation = OptionsBuilder.annotationString(Scope.class, CollectionIdentifier.DEFAULT_SCOPE,
new AnnotatedElement[] { getJavaType(), repositoryInterface });
new AnnotatedElement[] { getJavaType(), getRepositoryInterface() });
String fromMetadata = crudMethodMetadata != null ? crudMethodMetadata.getScope() : null;
return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_SCOPE, fromMetadata, fromAnnotation);
PseudoArgs<?> pa = getReactiveTemplate().getPseudoArgs();
String fromThreadLocal = pa != null ? pa.getScope() : null;
return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_SCOPE, fromThreadLocal, fromMetadata, fromAnnotation);
}

/**
Expand All @@ -96,12 +100,18 @@ protected String getScope() {
* 1. repository.withCollection()
*/
protected String getCollection() {
String fromAnnotation = OptionsBuilder.annotationString(Collection.class, CollectionIdentifier.DEFAULT_COLLECTION,
new AnnotatedElement[] { getJavaType(), repositoryInterface });
String fromAnnotation = OptionsBuilder.annotationString(Collection.class,
CollectionIdentifier.DEFAULT_COLLECTION,
new AnnotatedElement[] { getJavaType(), getRepositoryInterface() });
String fromMetadata = crudMethodMetadata != null ? crudMethodMetadata.getCollection() : null;
return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_COLLECTION, fromMetadata, fromAnnotation);
PseudoArgs<?> pa = getReactiveTemplate().getPseudoArgs();
String fromThreadLocal = pa != null ? pa.getCollection() : null;
return OptionsBuilder.fromFirst(CollectionIdentifier.DEFAULT_COLLECTION, fromThreadLocal, fromMetadata,
fromAnnotation);
}

protected abstract ReactiveCouchbaseTemplate getReactiveTemplate();

/**
* Get the QueryScanConsistency from <br>
* 1. The method annotation (method *could* be available from crudMethodMetadata)<br>
Expand Down Expand Up @@ -132,4 +142,5 @@ QueryScanConsistency getQueryScanConsistency() {
void setRepositoryMethodMetadata(CrudMethodMetadata crudMethodMetadata) {
this.crudMethodMetadata = crudMethodMetadata;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import java.util.stream.Collectors;

import org.springframework.data.couchbase.core.CouchbaseOperations;
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentEntity;
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentProperty;
import org.springframework.data.couchbase.core.CouchbaseTemplate;
import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
import org.springframework.data.couchbase.core.query.Query;
import org.springframework.data.couchbase.repository.CouchbaseRepository;
import org.springframework.data.couchbase.repository.query.CouchbaseEntityInformation;
Expand All @@ -37,7 +37,6 @@
import org.springframework.util.Assert;

import com.couchbase.client.java.query.QueryScanConsistency;
import org.springframework.util.ReflectionUtils;

/**
* Repository base implementation for Couchbase.
Expand Down Expand Up @@ -71,7 +70,13 @@ public SimpleCouchbaseRepository(CouchbaseEntityInformation<T, String> entityInf
@Override
@SuppressWarnings("unchecked")
public <S extends T> S save(S entity) {
return operations.save(entity, getScope(), getCollection());
String scopeName = getScope();
String collectionName = getCollection();
// clear out the PseudoArgs here as whatever is called by operations.save() could be in a different thread.
// not that this will also clear out Options, but that's ok as any options would not work
// with all of insert/upsert/replace. If Options are needed, use template.insertById/upsertById/replaceById
getReactiveTemplate().setPseudoArgs(null);
return operations.save(entity, scopeName, collectionName);
}

@Override
Expand Down Expand Up @@ -177,4 +182,8 @@ public CouchbaseOperations getOperations() {
return operations;
}

@Override
protected ReactiveCouchbaseTemplate getReactiveTemplate() {
return ((CouchbaseTemplate) getOperations()).reactive();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@
import org.reactivestreams.Publisher;
import org.springframework.data.couchbase.core.CouchbaseOperations;
import org.springframework.data.couchbase.core.ReactiveCouchbaseOperations;
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentEntity;
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentProperty;
import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
import org.springframework.data.couchbase.core.query.Query;
import org.springframework.data.couchbase.repository.ReactiveCouchbaseRepository;
import org.springframework.data.couchbase.repository.query.CouchbaseEntityInformation;
import org.springframework.data.domain.Sort;
import org.springframework.data.util.Streamable;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;

/**
* Reactive repository base implementation for Couchbase.
Expand Down Expand Up @@ -76,7 +74,13 @@ public Flux<T> findAll(Sort sort) {
@SuppressWarnings("unchecked")
@Override
public <S extends T> Mono<S> save(S entity) {
return save(entity, getScope(), getCollection());
String scopeName = getScope();
String collectionName = getCollection();
// clear out the PseudoArgs here as whatever is called by operations.save() could be in a different thread.
// not that this will also clear out Options, but that's ok as any options would not work
// with all of insert/upsert/replace. If Options are needed, use template.insertById/upsertById/replaceById
getReactiveTemplate().setPseudoArgs(null);
return operations.save(entity, scopeName, collectionName);
}

@Override
Expand Down Expand Up @@ -227,4 +231,9 @@ public ReactiveCouchbaseOperations getOperations() {
return operations;
}

@Override
protected ReactiveCouchbaseTemplate getReactiveTemplate() {
return (ReactiveCouchbaseTemplate) getOperations();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.springframework.data.couchbase.domain;

import org.springframework.data.couchbase.repository.Collection;
import org.springframework.data.couchbase.repository.Scope;

@Scope("must set scope name")
@Collection("my_collection")
public interface ReactiveAirportMustScopeRepository extends ReactiveAirportRepository {
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import reactor.core.Disposable;

import java.util.List;
import java.util.Random;
import java.util.UUID;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -31,6 +35,7 @@
import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
import org.springframework.data.couchbase.domain.Airport;
import org.springframework.data.couchbase.domain.ConfigScoped;
import org.springframework.data.couchbase.domain.ReactiveAirportMustScopeRepository;
import org.springframework.data.couchbase.domain.ReactiveAirportRepository;
import org.springframework.data.couchbase.domain.ReactiveAirportRepositoryAnnotated;
import org.springframework.data.couchbase.domain.ReactiveUserColRepository;
Expand Down Expand Up @@ -61,6 +66,7 @@ public class ReactiveCouchbaseRepositoryQueryCollectionIntegrationTests extends

@Autowired ReactiveAirportRepository reactiveAirportRepository;
@Autowired ReactiveAirportRepositoryAnnotated reactiveAirportRepositoryAnnotated;
@Autowired ReactiveAirportMustScopeRepository reactiveAirportMustScopeRepository;
@Autowired ReactiveUserColRepository userColRepository;
@Autowired public CouchbaseTemplate couchbaseTemplate;
@Autowired public ReactiveCouchbaseTemplate reactiveCouchbaseTemplate;
Expand Down Expand Up @@ -116,6 +122,21 @@ public void myTest() {

}

@Test
void testThreadLocal() throws InterruptedException {

String scopeName = "my_scope";
String id = UUID.randomUUID().toString();

Airport airport = new Airport(id, "testThreadLocal", "icao");
Disposable s = reactiveAirportMustScopeRepository.withScope(scopeName).findById(airport.getId()).doOnNext(u -> {
throw new RuntimeException("User already Exists! " + u);
}).then(reactiveAirportMustScopeRepository.withScope(scopeName).save(airport))
.subscribe(u -> LOGGER.info("User Persisted Successfully! {}", u));

reactiveAirportMustScopeRepository.withScope(scopeName).deleteById(id).block();
}

/**
* can test against _default._default without setting up additional scope/collection and also test for collections and
* scopes that do not exist These same tests should be repeated on non-default scope and collection in a test that
Expand Down