Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent duplicate Observation.stop with Micrometer #3976

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,14 +20,14 @@
import io.micrometer.observation.ObservationRegistry;

import reactor.core.observability.SignalListener;
import reactor.core.observability.micrometer.MicrometerObservationListenerDocumentation.ObservationTags;
import reactor.core.publisher.SignalType;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Function;

import static reactor.core.observability.micrometer.MicrometerObservationListenerDocumentation.ObservationTags.STATUS;
Expand Down Expand Up @@ -57,12 +57,6 @@ final class MicrometerObservationListener<T> implements SignalListener<T> {
*/
static final String CONTEXT_KEY_OBSERVATION = "micrometer.observation";

/**
* A value for the status tag, to be used when a Mono completes from onNext.
* In production, this is set to {@link ObservationTags#TAG_STATUS_COMPLETED}.
* In some tests, this can be overridden as a way to assert {@link #doOnComplete()} is no-op.
*/
final String completedOnNextStatus;
final MicrometerObservationListenerConfiguration configuration;
final ContextView originalContext;
final Observation tapObservation;
Expand All @@ -72,26 +66,26 @@ final class MicrometerObservationListener<T> implements SignalListener<T> {

boolean valued;

volatile int status = STATUS_INCOMPLETE;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<MicrometerObservationListener> STATUS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(MicrometerObservationListener.class, "status");

static final int STATUS_INCOMPLETE = 0;
static final int STATUS_CANCELLED = 1;
static final int STATUS_COMPLETED_IN_ON_NEXT = 2;
static final int STATUS_COMPLETED = 3;
static final int STATUS_ERROR = 4;

MicrometerObservationListener(ContextView subscriberContext, MicrometerObservationListenerConfiguration configuration) {
this(subscriberContext, configuration, null);
}

MicrometerObservationListener(ContextView subscriberContext,
MicrometerObservationListenerConfiguration configuration,
@Nullable Function<ObservationRegistry, Observation> observationSupplier) {
this(subscriberContext, configuration, TAG_STATUS_COMPLETED, observationSupplier);
}

//for test purposes, we can pass in a value for the status tag, to be used when a Mono completes from onNext
MicrometerObservationListener(ContextView subscriberContext,
MicrometerObservationListenerConfiguration configuration,
String completedOnNextStatus,
@Nullable Function<ObservationRegistry, Observation> observationSupplier) {
this.configuration = configuration;
this.originalContext = subscriberContext;
this.completedOnNextStatus = completedOnNextStatus;

this.valued = false;

//creation of the listener matches subscription (Publisher.subscribe(Subscriber) / doFirst)
//while doOnSubscription matches the moment where the Publisher acknowledges said subscription
Expand Down Expand Up @@ -179,48 +173,49 @@ public Context addToContext(Context originalContext) {

@Override
public void doOnCancel() {
Observation observation = tapObservation
.lowCardinalityKeyValue(STATUS.asString(), TAG_STATUS_CANCELLED);
if (STATUS_UPDATER.compareAndSet(this, STATUS_INCOMPLETE, STATUS_CANCELLED)) {
Observation observation =
tapObservation.lowCardinalityKeyValue(STATUS.asString(),
TAG_STATUS_CANCELLED);

observation.stop();
observation.stop();
}
}

@Override
public void doOnComplete() {
// We differentiate between empty completion and value completion via tags.
String status = null;
if (!valued) {
status = TAG_STATUS_COMPLETED_EMPTY;
}
else if (!configuration.isMono) {
status = TAG_STATUS_COMPLETED;
}

// if status == null, recording with OnComplete tag is done directly in onNext for the Mono(valued) case
if (status != null) {
// The comparison can fail if the Publisher was terminated by error,
// cancellation, or recording with OnComplete tag was done directly in onNext for
// the Mono(valued) case
if (STATUS_UPDATER.compareAndSet(this, STATUS_INCOMPLETE, STATUS_COMPLETED)) {
// We differentiate between empty completion and value completion via tags.
String status = valued ? TAG_STATUS_COMPLETED : TAG_STATUS_COMPLETED_EMPTY;
Observation completeObservation = tapObservation
.lowCardinalityKeyValue(STATUS.asString(), status);
.lowCardinalityKeyValue(STATUS.asString(), status);

completeObservation.stop();
}
}

@Override
public void doOnError(Throwable e) {
Observation errorObservation = tapObservation
.lowCardinalityKeyValue(STATUS.asString(), TAG_STATUS_ERROR)
.error(e);
if (STATUS_UPDATER.compareAndSet(this, STATUS_INCOMPLETE, STATUS_ERROR)) {
Observation errorObservation =
tapObservation.lowCardinalityKeyValue(STATUS.asString(), TAG_STATUS_ERROR)
.error(e);

errorObservation.stop();
errorObservation.stop();
}
}

@Override
public void doOnNext(T t) {
valued = true;
if (configuration.isMono) {
if (configuration.isMono
&& STATUS_UPDATER.compareAndSet(this, STATUS_INCOMPLETE, STATUS_COMPLETED_IN_ON_NEXT)) {
//record valued completion directly
Observation completeObservation = tapObservation
.lowCardinalityKeyValue(STATUS.asString(), completedOnNextStatus);
.lowCardinalityKeyValue(STATUS.asString(), TAG_STATUS_COMPLETED);

completeObservation.stop();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,12 +27,14 @@
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.micrometer.observation.tck.TestObservationRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.provider.ValueSource;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.test.ParameterizedTestWithName;
import reactor.test.StepVerifier;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

Expand Down Expand Up @@ -246,6 +248,21 @@ void tapFromFluxWithTags(boolean automatic) {
.hasKeyValuesCount(4);
}

// see https://github.com/reactor/reactor-core/issues/3972
@ParameterizedTestWithName
@ValueSource(booleans = {true, false})
void tapMonoCancelAfterNext(boolean automatic) {
if (automatic) {
Hooks.enableAutomaticContextPropagation();
}
TestObservationRegistry observationRegistry = TestObservationRegistry.create();
Mono<Integer> mono = Mono.just(1).tap(Micrometer.observation(observationRegistry));
StepVerifier.create(Flux.from(mono).take(1).single())
.expectNext(1)
.expectComplete()
.verify();
}

@ParameterizedTestWithName
@ValueSource(booleans = {true, false})
void tapFromMonoWithTags(boolean automatic) {
Expand Down Expand Up @@ -476,10 +493,7 @@ void observationMonoStoppedByOnNext(boolean automatic) {
registry,
true);

final String expectedStatus = "completedOnNext";

//we use a test-oriented constructor to force the onNext completion case to have a different tag value
MicrometerObservationListener<Integer> listener = new MicrometerObservationListener<>(subscriberContext, configuration, expectedStatus, null);
MicrometerObservationListener<Integer> listener = new MicrometerObservationListener<>(subscriberContext, configuration, null);

listener.doFirst(); // forces observation start
listener.doOnNext(1); // emulates onNext, should stop observation
Expand All @@ -491,16 +505,14 @@ void observationMonoStoppedByOnNext(boolean automatic) {
.hasBeenStarted()
.hasBeenStopped()
.hasLowCardinalityKeyValue("forcedType", "Mono")
.hasLowCardinalityKeyValue("reactor.status", expectedStatus)
.hasLowCardinalityKeyValue("reactor.status", "completed")
.doesNotHaveError();

// Should this fail, an io.micrometer.observation.tck.InvalidObservationException
// would have been thrown with
// "Invalid stop: Observation 'valuedMono' has already been stopped" message
// (Starting from Micrometer 1.14.0)
listener.doOnComplete();

//if the test in doOnComplete was bugged, the status key would be associated with "completed" now
assertThat(registry)
.hasSingleObservationThat()
.as("post-doOnComplete")
.hasLowCardinalityKeyValue("reactor.status", expectedStatus);
}

@ParameterizedTestWithName
Expand Down
Loading