Skip to content

Commit

Permalink
Adds basic support for metrics.
Browse files Browse the repository at this point in the history
Signed-off-by: Santiago Pericas-Geertsen <[email protected]>
  • Loading branch information
spericas committed Jan 9, 2025
1 parent 524622a commit e1444ed
Show file tree
Hide file tree
Showing 20 changed files with 792 additions and 119 deletions.
18 changes: 18 additions & 0 deletions fault-tolerance/fault-tolerance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@
<groupId>io.helidon.builder</groupId>
<artifactId>helidon-builder-api</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.metrics</groupId>
<artifactId>helidon-metrics-api</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.service</groupId>
<artifactId>helidon-service-registry</artifactId>
</dependency>
<dependency>
<!--
Used to declare Features in module-info.java
Expand All @@ -71,6 +79,16 @@
<artifactId>helidon-common-testing-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.helidon.metrics</groupId>
<artifactId>helidon-metrics</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.helidon.config</groupId>
<artifactId>helidon-config-yaml</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.helidon.logging</groupId>
<artifactId>helidon-logging-jul</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2024 Oracle and/or its affiliates.
* Copyright (c) 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,27 @@
*/
@RuntimeType.PrototypedBy(BulkheadConfig.class)
public interface Bulkhead extends FtHandler, RuntimeType.Api<BulkheadConfig> {

/**
* Counter for all the calls in a bulkhead.
*/
String FT_BULKHEAD_CALLS_TOTAL = "ft.bulkhead.calls.total";

/**
* Histogram of waiting time to enter a bulkhead.
*/
String FT_BULKHEAD_WAITINGDURATION = "ft.bulkhead.waitingDuration";

/**
* Gauge of number of executions running at a certain time.
*/
String FT_BULKHEAD_EXECUTIONSRUNNING = "ft.bulkhead.executionsRunning";

/**
* Gauge of number of executions waiting at a certain time.
*/
String FT_BULKHEAD_EXECUTIONSWAITING = "ft.bulkhead.executionsWaiting";

/**
* Create {@link Bulkhead} from its configuration.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2024 Oracle and/or its affiliates.
* Copyright (c) 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,11 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import io.helidon.metrics.api.Counter;
import io.helidon.metrics.api.Gauge;
import io.helidon.metrics.api.Tag;
import io.helidon.metrics.api.Timer;

class BulkheadImpl implements Bulkhead {
private static final System.Logger LOGGER = System.getLogger(BulkheadImpl.class.getName());

Expand All @@ -46,6 +51,11 @@ class BulkheadImpl implements Bulkhead {
private final Set<Supplier<?>> cancelledSuppliers = new CopyOnWriteArraySet<>();
private final BulkheadConfig config;

private Counter callsCounterMetric;
private Timer waitingDurationMetric;
private Gauge<Long> executionsRunningMetric;
private Gauge<Long> executionsWaitingMetric;

BulkheadImpl(BulkheadConfig config) {
this.inProgress = new Semaphore(config.limit(), true);
this.name = config.name().orElseGet(() -> "bulkhead-" + System.identityHashCode(config));
Expand All @@ -55,6 +65,18 @@ class BulkheadImpl implements Bulkhead {
: new ZeroCapacityQueue();
this.inProgressLock = new ReentrantLock(true);
this.config = config;

if (MetricsUtils.metricsEnabled()) {
Tag nameTag = Tag.create("name", name);
callsCounterMetric = MetricsUtils.counterBuilder(FT_BULKHEAD_CALLS_TOTAL, nameTag);
waitingDurationMetric = MetricsUtils.timerBuilder(FT_BULKHEAD_WAITINGDURATION, nameTag);
executionsRunningMetric = MetricsUtils.gaugeBuilder(FT_BULKHEAD_EXECUTIONSRUNNING,
concurrentExecutions::get,
nameTag);
executionsWaitingMetric = MetricsUtils.gaugeBuilder(FT_BULKHEAD_EXECUTIONSRUNNING,
() -> 0L, // todo
nameTag);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2024 Oracle and/or its affiliates.
* Copyright (c) 2020, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,18 @@
*/
@RuntimeType.PrototypedBy(CircuitBreakerConfig.class)
public interface CircuitBreaker extends FtHandler, RuntimeType.Api<CircuitBreakerConfig> {

/**
* Counter for all the calls in a timeout.
*/
String FT_CIRCUITBREAKER_CALLS_TOTAL = "ft.circuitbreaker.calls.total";

/**
* Counter for the number of times a circuit breaks has moved from
* {@link State#CLOSED} to {@link State#OPEN}.
*/
String FT_CIRCUITBREAKER_OPENED_TOTAL = "ft.circuitbreaker.opened.total";

/**
* Create a new circuit builder based on its configuration.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2024 Oracle and/or its affiliates.
* Copyright (c) 2020, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,9 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import io.helidon.metrics.api.Counter;
import io.helidon.metrics.api.Tag;

class CircuitBreakerImpl implements CircuitBreaker {
/*
Configuration options
Expand All @@ -47,6 +50,9 @@ class CircuitBreakerImpl implements CircuitBreaker {
private final String name;
private final CircuitBreakerConfig config;

private Counter callsCounterMetric;
private Counter openedCounterMetric;

CircuitBreakerImpl(CircuitBreakerConfig config) {
this.delayMillis = config.delay().toMillis();
this.successThreshold = config.successThreshold();
Expand All @@ -55,6 +61,12 @@ class CircuitBreakerImpl implements CircuitBreaker {
this.errorChecker = ErrorChecker.create(config.skipOn(), config.applyOn());
this.name = config.name().orElseGet(() -> "circuit-breaker-" + System.identityHashCode(config));
this.config = config;

if (MetricsUtils.metricsEnabled()) {
Tag nameTag = Tag.create("name", name);
callsCounterMetric = MetricsUtils.counterBuilder(FT_CIRCUITBREAKER_CALLS_TOTAL, nameTag);
openedCounterMetric = MetricsUtils.counterBuilder(FT_CIRCUITBREAKER_OPENED_TOTAL, nameTag);
}
}

@Override
Expand All @@ -69,6 +81,9 @@ public String name() {

@Override
public <T> T invoke(Supplier<? extends T> supplier) {
if (MetricsUtils.metricsEnabled()) {
callsCounterMetric.increment();
}
return switch (state.get()) {
case CLOSED -> executeTask(supplier);
case HALF_OPEN -> halfOpenTask(supplier);
Expand Down Expand Up @@ -131,6 +146,10 @@ private <U> U executeTask(Supplier<? extends U> supplier) {
results.reset();
// if we successfully switch to open, we need to schedule switch to half-open
scheduleHalf();
// update metrics for this transition
if (MetricsUtils.metricsEnabled()) {
openedCounterMetric.increment();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (c) 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.faulttolerance;

import java.util.List;
import java.util.function.Supplier;

import io.helidon.common.config.Config;
import io.helidon.metrics.api.Counter;
import io.helidon.metrics.api.Gauge;
import io.helidon.metrics.api.MeterRegistry;
import io.helidon.metrics.api.Metrics;
import io.helidon.metrics.api.MetricsFactory;
import io.helidon.metrics.api.Tag;
import io.helidon.metrics.api.Timer;
import io.helidon.service.registry.ServiceRegistry;
import io.helidon.service.registry.ServiceRegistryManager;

import static io.helidon.metrics.api.Meter.Scope.VENDOR;

@SuppressWarnings("unchecked")
class MetricsUtils {

private static final String FT_METRICS_ENABLED = "ft.metrics.enabled";
private static final MetricsFactory METRICS_FACTORY = MetricsFactory.getInstance();
private static final MeterRegistry METRICS_REGISTRY = Metrics.globalRegistry();

private static volatile Boolean metricsEnabled;

private MetricsUtils() {
}

/**
* Looks for the metrics enabled flag in config and caches result. FT metrics
* are disabled by default.
*
* @return value of metrics flag
*/
static boolean metricsEnabled() {
if (metricsEnabled == null) {
ServiceRegistry registry = ServiceRegistryManager.create().registry();
Config config = registry.get(Config.class);
metricsEnabled = config.get(FT_METRICS_ENABLED).asBoolean().orElse(false);
}
return metricsEnabled;
}

static <T extends Number> Gauge<T> gaugeBuilder(String name, Supplier<T> supplier, Tag... tags) {
Gauge.Builder<T> builder = METRICS_FACTORY.gaugeBuilder(name, supplier).scope(VENDOR);
List<Tag> tagList = List.of(tags);
builder.tags(tagList);
METRICS_REGISTRY.getOrCreate(builder);
return METRICS_REGISTRY.gauge(name, tagList).orElseThrow();
}

static Counter counterBuilder(String name, Tag... tags) {
Counter.Builder builder = METRICS_FACTORY.counterBuilder(name).scope(VENDOR);
List<Tag> tagList = List.of(tags);
builder.tags(tagList);
METRICS_REGISTRY.getOrCreate(builder);
return METRICS_REGISTRY.counter(name, tagList).orElseThrow();
}

static Timer timerBuilder(String name, Tag... tags) {
Timer.Builder builder = METRICS_FACTORY.timerBuilder(name).scope(VENDOR);
List<Tag> tagList = List.of(tags);
builder.tags(tagList);
METRICS_REGISTRY.getOrCreate(builder);
return METRICS_REGISTRY.timer(name, tagList).orElseThrow();
}

static <T extends Number> Gauge<T> gauge(String name, Tag... tags) {
return METRICS_REGISTRY.gauge(name, List.of(tags)).orElseThrow();
}

static Counter counter(String name, Tag... tags) {
return METRICS_REGISTRY.counter(name, List.of(tags)).orElseThrow();
}

static Timer timer(String name, Tag... tags) {
return METRICS_REGISTRY.timer(name, List.of(tags)).orElseThrow();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2024 Oracle and/or its affiliates.
* Copyright (c) 2020, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,18 @@
*/
@RuntimeType.PrototypedBy(RetryConfig.class)
public interface Retry extends FtHandler, RuntimeType.Api<RetryConfig> {

/**
* Counter for all the calls in a retry. Will always be greater than
* {@link #FT_RETRY_RETRIES_TOTAL} that only counts actual retries.
*/
String FT_RETRY_CALLS_TOTAL = "ft.retry.calls.total";

/**
* Counter for all retry calls, excluding the initial call.
*/
String FT_RETRY_RETRIES_TOTAL = "ft.retry.retries.total";

/**
* Create a new retry from its configuration.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2024 Oracle and/or its affiliates.
* Copyright (c) 2020, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,20 +26,33 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import io.helidon.metrics.api.Counter;
import io.helidon.metrics.api.Tag;

class RetryImpl implements Retry {

private final ErrorChecker errorChecker;
private final long maxTimeNanos;
private final RetryPolicy retryPolicy;
private final RetryConfig retryConfig;
private final AtomicLong retryCounter = new AtomicLong(0L);
private final String name;

private Counter callsCounterMetric;
private Counter retryCounterMetric;

RetryImpl(RetryConfig retryConfig) {
this.name = retryConfig.name().orElseGet(() -> "retry-" + System.identityHashCode(retryConfig));
this.errorChecker = ErrorChecker.create(retryConfig.skipOn(), retryConfig.applyOn());
this.maxTimeNanos = retryConfig.overallTimeout().toNanos();
this.retryPolicy = retryConfig.retryPolicy().orElseThrow();
this.retryConfig = retryConfig;

if (MetricsUtils.metricsEnabled()) {
Tag nameTag = Tag.create("name", name);
callsCounterMetric = MetricsUtils.counterBuilder(FT_RETRY_CALLS_TOTAL, nameTag);
retryCounterMetric = MetricsUtils.counterBuilder(FT_RETRY_RETRIES_TOTAL, nameTag);
}
}

@Override
Expand All @@ -57,6 +70,9 @@ public <T> T invoke(Supplier<? extends T> supplier) {
RetryContext<? extends T> context = new RetryContext<>();
while (true) {
try {
if (MetricsUtils.metricsEnabled()) {
callsCounterMetric.increment();
}
return supplier.get();
} catch (Throwable t) {
Throwable throwable = SupplierHelper.unwrapThrowable(t);
Expand Down Expand Up @@ -87,6 +103,10 @@ public <T> T invoke(Supplier<? extends T> supplier) {

// now we are retrying for sure
retryCounter.getAndIncrement();
if (MetricsUtils.metricsEnabled()) {
retryCounterMetric.increment();
}

// just block current thread, we are expected to run in Virtual threads with Loom
try {
Thread.sleep(Duration.ofMillis(delayMillis));
Expand Down
Loading

0 comments on commit e1444ed

Please sign in to comment.