diff --git a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ClientCalls.java b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ClientCalls.java index 54b8d2ec..b95d3a2b 100644 --- a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ClientCalls.java +++ b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ClientCalls.java @@ -8,7 +8,6 @@ package com.salesforce.reactorgrpc.stub; import io.grpc.CallOptions; -import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; import java.util.function.BiConsumer; import java.util.function.Function; @@ -34,9 +33,8 @@ public static Mono oneToOne( BiConsumer> delegate, CallOptions options) { try { - return Mono - .create(emitter -> monoSource.subscribe( - request -> delegate.accept(request, new StreamObserver() { + return monoSource.flatMap(r -> + Mono.create(emitter -> delegate.accept(r, new StreamObserver() { @Override public void onNext(TResponse tResponse) { emitter.success(tResponse); @@ -51,10 +49,10 @@ public void onError(Throwable throwable) { public void onCompleted() { // Do nothing } - }), - emitter::error - )) - .transform(Operators.lift(new SubscribeOnlyOnceLifter())); + }) + ) + .transform(Operators.lift(new SubscribeOnlyOnceLifter())) + ); } catch (Throwable throwable) { return Mono.error(throwable); } @@ -97,17 +95,8 @@ public static Mono manyToOne( Function, StreamObserver> delegate, CallOptions options) { try { - ReactorSubscriberAndClientProducer subscriberAndGRPCProducer = - fluxSource.subscribeWith(new ReactorSubscriberAndClientProducer<>()); - ReactorClientStreamObserverAndPublisher observerAndPublisher = - new ReactorClientStreamObserverAndPublisher<>( - s -> subscriberAndGRPCProducer.subscribe((CallStreamObserver) s), - subscriberAndGRPCProducer::cancel - ); - - return Flux.from(observerAndPublisher) - .doOnSubscribe(s -> delegate.apply(observerAndPublisher)) - .singleOrEmpty(); + ReactorGrpcClientCallFlux operator = new ReactorGrpcClientCallFlux<>(fluxSource, delegate); + return operator.doOnSubscribe(operator.onSubscribeHook()).singleOrEmpty(); } catch (Throwable throwable) { return Mono.error(throwable); } @@ -123,19 +112,11 @@ public static Flux manyToMany( Function, StreamObserver> delegate, CallOptions options) { try { - final int prefetch = ReactorCallOptions.getPrefetch(options); final int lowTide = ReactorCallOptions.getLowTide(options); - ReactorSubscriberAndClientProducer subscriberAndGRPCProducer = - fluxSource.subscribeWith(new ReactorSubscriberAndClientProducer<>()); - ReactorClientStreamObserverAndPublisher observerAndPublisher = - new ReactorClientStreamObserverAndPublisher<>( - s -> subscriberAndGRPCProducer.subscribe((CallStreamObserver) s), - subscriberAndGRPCProducer::cancel, prefetch, lowTide - ); - - return Flux.from(observerAndPublisher).doOnSubscribe(s -> delegate.apply(observerAndPublisher)); + ReactorGrpcClientCallFlux operator = new ReactorGrpcClientCallFlux<>(fluxSource, delegate, prefetch, lowTide); + return operator.doOnSubscribe(operator.onSubscribeHook()); } catch (Throwable throwable) { return Flux.error(throwable); } diff --git a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorGrpcClientCallFlux.java b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorGrpcClientCallFlux.java new file mode 100644 index 00000000..7fce36fa --- /dev/null +++ b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ReactorGrpcClientCallFlux.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2019, Salesforce.com, Inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ +package com.salesforce.reactorgrpc.stub; + +import io.grpc.stub.CallStreamObserver; +import io.grpc.stub.StreamObserver; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxOperator; +import reactor.util.context.Context; + +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Create a {@link Flux} that allows for correct context propagation in client calls. + * + * @param + * @param + */ +public class ReactorGrpcClientCallFlux extends FluxOperator { + + private final ReactorSubscriberAndClientProducer requestConsumer; + private final ReactorClientStreamObserverAndPublisher responsePublisher; + private final Function, StreamObserver> delegate; + + ReactorGrpcClientCallFlux(Flux in, Function, StreamObserver> delegate) { + super(in); + this.delegate = delegate; + this.requestConsumer = new ReactorSubscriberAndClientProducer<>(); + this.responsePublisher = new ReactorClientStreamObserverAndPublisher<>(s -> requestConsumer.subscribe((CallStreamObserver) s), requestConsumer::cancel); + } + + public ReactorGrpcClientCallFlux(Flux in, Function, StreamObserver> delegate, int prefetch, int lowTide) { + super(in); + this.delegate = delegate; + this.requestConsumer = new ReactorSubscriberAndClientProducer<>(); + this.responsePublisher = new ReactorClientStreamObserverAndPublisher<>(s -> requestConsumer.subscribe((CallStreamObserver) s), requestConsumer::cancel, prefetch, lowTide); + } + + public Consumer onSubscribeHook() { + return s -> this.delegate.apply(responsePublisher); + } + + @Override + public void subscribe(CoreSubscriber actual) { + responsePublisher.subscribe(actual); + source.subscribe(new CoreSubscriber() { + @Override + public void onSubscribe(Subscription s) { + requestConsumer.onSubscribe(s); + } + + @Override + public void onNext(TRequest tRequest) { + requestConsumer.onNext(tRequest); + } + + @Override + public void onError(Throwable throwable) { + requestConsumer.onError(throwable); + } + + @Override + public void onComplete() { + requestConsumer.onComplete(); + } + + @Override + public Context currentContext() { + return actual.currentContext(); + } + }); + } +} diff --git a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/SubscribeOnlyOnceLifter.java b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/SubscribeOnlyOnceLifter.java index 7d93bfce..ceafa470 100644 --- a/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/SubscribeOnlyOnceLifter.java +++ b/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/SubscribeOnlyOnceLifter.java @@ -10,6 +10,7 @@ import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Scannable; +import reactor.util.context.Context; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; @@ -25,6 +26,11 @@ public class SubscribeOnlyOnceLifter extends AtomicBoolean implements BiFunct @Override public CoreSubscriber apply(Scannable scannable, CoreSubscriber coreSubscriber) { return new CoreSubscriber() { + @Override + public Context currentContext() { + return coreSubscriber.currentContext(); + } + @Override public void onSubscribe(Subscription subscription) { if (!compareAndSet(false, true)) { diff --git a/reactor/reactor-grpc-test/src/test/java/com/salesforce/reactorgrpc/ReactorContextPropagationTest.java b/reactor/reactor-grpc-test/src/test/java/com/salesforce/reactorgrpc/ReactorContextPropagationTest.java new file mode 100644 index 00000000..fcca79a1 --- /dev/null +++ b/reactor/reactor-grpc-test/src/test/java/com/salesforce/reactorgrpc/ReactorContextPropagationTest.java @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2019, Salesforce.com, Inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.reactorgrpc; + +import io.grpc.testing.GrpcServerRule; +import org.junit.*; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ReactorContextPropagationTest { + + @Rule + public GrpcServerRule serverRule = new GrpcServerRule(); + + private static class SimpleGreeter extends ReactorGreeterGrpc.GreeterImplBase { + @Override + public Mono sayHello(Mono request) { + return request.map(HelloRequest::getName) + .map(name -> HelloResponse.newBuilder().setMessage("Hello " + name).build()); + } + + @Override + public Mono sayHelloReqStream(Flux request) { + return request.transformDeferredContextual((f, ctx) -> f.map(HelloRequest::getName)) + .collect(Collectors.joining("and")) + .map(names -> HelloResponse.newBuilder().setMessage("Hello " + names).build()); + } + + @Override + public Flux sayHelloRespStream(Mono request) { + return request.repeat(2) + .map(HelloRequest::getName) + .zipWith(Flux.just("Hello ", "Hi ", "Greetings "), String::join) + .map(greeting -> HelloResponse.newBuilder().setMessage(greeting).build()); + } + + @Override + public Flux sayHelloBothStream(Flux request) { + return request.map(HelloRequest::getName) + .map(name -> HelloResponse.newBuilder().setMessage("Hello " + name).build()); + } + } + + @BeforeClass + public static void beforeAll(){ + Hooks.enableContextLossTracking(); + Hooks.onOperatorDebug(); + } + + @AfterClass + public static void afterAll(){ + Hooks.disableContextLossTracking(); + Hooks.resetOnOperatorDebug(); + } + + @Before + public void setup() { + serverRule.getServiceRegistry().addService(new SimpleGreeter()); + } + + @Test + public void oneToOne() { + ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(serverRule.getChannel()); + Mono req = Mono.just(HelloRequest.newBuilder().setName("reactor").build()); + + Mono resp = req + .doOnEach(signal -> assertThat(signal.getContextView().getOrEmpty("name")).isNotEmpty()) + .transform(stub::sayHello) + .doOnEach(signal -> assertThat(signal.getContextView().getOrEmpty("name")).isNotEmpty()) + .contextWrite(ctx -> ctx.put("name", "context")); + + StepVerifier.create(resp) + .expectNextCount(1) + .verifyComplete(); + } + + @Test + public void oneToMany() { + ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(serverRule.getChannel()); + Mono req = Mono.just(HelloRequest.newBuilder().setName("reactor").build()); + + Flux resp = req + .doOnEach(signal -> assertThat(signal.getContextView().getOrEmpty("name")).isNotEmpty()) + .as(stub::sayHelloRespStream) + .doOnEach(signal -> assertThat(signal.getContextView().getOrEmpty("name")).isNotEmpty()) + .contextWrite(ctx -> ctx.put("name", "context")); + + StepVerifier.create(resp) + .expectNextCount(3) + .verifyComplete(); + } + + @Test + public void manyToOne() { + ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(serverRule.getChannel()); + Flux req = Mono.deferContextual(ctx -> Mono.just(HelloRequest.newBuilder().setName(ctx.get("name")).build())).repeat(2); + + Mono resp = req + .doOnEach(signal -> assertThat(signal.getContextView().getOrEmpty("name")).isNotEmpty()) + .as(stub::sayHelloReqStream) + .doOnEach(signal -> assertThat(signal.getContextView().getOrEmpty("name")).isNotEmpty()) + .contextWrite(ctx -> ctx.put("name", "context")); + + StepVerifier.create(resp) + .expectAccessibleContext() + .contains("name", "context") + .then() + .expectNextCount(1) + .verifyComplete(); + } + + @Test + public void manyToMany() { + ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(serverRule.getChannel()); + Flux req = Mono.just(HelloRequest.newBuilder().setName("reactor").build()).repeat(2).contextWrite(c -> c.put("name", "boom")); + + Flux resp = req + .doOnEach(signal -> assertThat(signal.getContextView().getOrEmpty("name")).isNotEmpty()) + .transform(stub::sayHelloBothStream) + .doOnEach(signal -> assertThat(signal.getContextView().getOrEmpty("name")).isNotEmpty()) + .contextWrite(ctx -> ctx.put("name", "context")); + + StepVerifier.create(resp) + .expectNextCount(3) + .verifyComplete(); + } +} \ No newline at end of file