diff --git a/fahrschein-opentelemetry/build.gradle b/fahrschein-opentelemetry/build.gradle index ba412153..de1211ce 100644 --- a/fahrschein-opentelemetry/build.gradle +++ b/fahrschein-opentelemetry/build.gradle @@ -5,7 +5,7 @@ plugins { dependencies { api project(':fahrschein') - implementation "io.opentelemetry:opentelemetry-api:${property('opentelemetry.version')}" + api "io.opentelemetry:opentelemetry-api:${property('opentelemetry.version')}" testImplementation "io.opentelemetry:opentelemetry-sdk-testing:${property('opentelemetry.version')}" testImplementation "io.opentelemetry:opentelemetry-extension-trace-propagators:${property('opentelemetry.version')}" testImplementation("org.assertj:assertj-core:3.24.2") diff --git a/fahrschein-spring-boot-starter/build.gradle b/fahrschein-spring-boot-starter/build.gradle index b0317f77..ecaf561d 100644 --- a/fahrschein-spring-boot-starter/build.gradle +++ b/fahrschein-spring-boot-starter/build.gradle @@ -9,6 +9,7 @@ dependencies { api project(':fahrschein') // optional dependency: will be used if it's found on the classpath. compileOnly project(':fahrschein-http-jdk11') + implementation project(':fahrschein-opentelemetry') implementation project(':fahrschein-http-spring') implementation project(':fahrschein-metrics-micrometer') implementation "org.springframework:spring-web:${property('spring.version')}" diff --git a/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/config/Registry.java b/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/config/Registry.java index d29a3498..dd273325 100644 --- a/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/config/Registry.java +++ b/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/config/Registry.java @@ -17,6 +17,7 @@ import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -110,10 +111,17 @@ public static List list(final T... elements) { return list; } + public Map getBeansOfType(Class clazz) { + if (registry instanceof DefaultListableBeanFactory factory) { + return factory.getBeansOfType(clazz); + } else { + LOG.warn("Unable to get beans of type {} from registry of type: {}", clazz.getName(), registry.getClass().getName()); + } + return Collections.emptyMap(); + } public void registerAliasesForNakadiListener() { - if (registry instanceof DefaultListableBeanFactory) { - DefaultListableBeanFactory factory = (DefaultListableBeanFactory) registry; + if (registry instanceof DefaultListableBeanFactory factory) { String[] beans = factory.getBeanNamesForAnnotation(NakadiEventListener.class); Arrays.asList(beans).forEach(bean -> { String alias = bean + "NakadiListener"; diff --git a/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/nakadi/config/FahrscheinNakadiClientFactory.java b/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/nakadi/config/FahrscheinNakadiClientFactory.java index 6a8d5aba..dc5336fa 100644 --- a/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/nakadi/config/FahrscheinNakadiClientFactory.java +++ b/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/nakadi/config/FahrscheinNakadiClientFactory.java @@ -3,12 +3,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.zalando.fahrschein.CursorManager; +import org.zalando.fahrschein.EventPublishingHandler; import org.zalando.fahrschein.NakadiClient; import org.zalando.fahrschein.NakadiClientBuilder; import org.zalando.fahrschein.http.api.RequestFactory; import org.zalando.spring.boot.fahrschein.nakadi.config.properties.AbstractConfig; import java.net.URI; +import java.util.List; @Slf4j class FahrscheinNakadiClientFactory { @@ -16,11 +18,14 @@ class FahrscheinNakadiClientFactory { private static final URI INVALID_NAKADI_URL = URI.create("https://nakadi-url.invalid/"); static NakadiClient create(AbstractConfig config, CursorManager cursorManager, ObjectMapper objectMapper, - RequestFactory requestFactory) { + RequestFactory requestFactory, List handlers) { URI nakadiUri = config.getNakadiUrl() != null ? URI.create(config.getNakadiUrl()) : INVALID_NAKADI_URL; - NakadiClientBuilder ncb = NakadiClient.builder(nakadiUri, requestFactory) - .withCursorManager(cursorManager).withObjectMapper(objectMapper); + NakadiClientBuilder ncb = NakadiClient + .builder(nakadiUri, requestFactory) + .withCursorManager(cursorManager) + .withObjectMapper(objectMapper) + .withRequestHandlers(handlers); if (config.getOauth().getEnabled()) { ncb = ncb.withAccessTokenProvider(OAuth.buildAccessTokenProvider(config.getOauth())); diff --git a/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/nakadi/config/FahrscheinRegistrar.java b/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/nakadi/config/FahrscheinRegistrar.java index cadfd233..828da274 100644 --- a/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/nakadi/config/FahrscheinRegistrar.java +++ b/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/nakadi/config/FahrscheinRegistrar.java @@ -6,6 +6,7 @@ import org.zalando.fahrschein.CursorManager; import org.zalando.fahrschein.NakadiClient; import org.zalando.fahrschein.http.api.RequestFactory; +import org.zalando.fahrschein.opentelemetry.InstrumentedPublishingHandler; import org.zalando.spring.boot.fahrschein.config.Registry; import org.zalando.spring.boot.fahrschein.nakadi.NakadiPublisher; import org.zalando.spring.boot.fahrschein.nakadi.config.properties.AbstractConfig; @@ -65,15 +66,16 @@ private String registerPublisher(PublisherConfig publisherConfig) { }); } - private String registerNakadiClient(AbstractConfig consumerConfig, String type) { - return registry.registerIfAbsent(consumerConfig.getId() + "-" + type, NakadiClient.class, () -> { - log.info(LOG_PREFIX + "NakadiClient ...", consumerConfig.getId()); - final String requestFactoryRef = registerRequestFactory(consumerConfig); + private String registerNakadiClient(AbstractConfig config, String type) { + return registry.registerIfAbsent(config.getId() + "-" + type, NakadiClient.class, () -> { + log.info(LOG_PREFIX + "NakadiClient ...", config.getId()); + final String requestFactoryRef = registerRequestFactory(config); return genericBeanDefinition(FahrscheinNakadiClientFactory.class) - .addConstructorArgValue(consumerConfig) - .addConstructorArgReference(registerCursorManager(consumerConfig, requestFactoryRef)) - .addConstructorArgReference(registerObjectMapper(consumerConfig)) + .addConstructorArgValue(config) + .addConstructorArgReference(registerCursorManager(config, requestFactoryRef)) + .addConstructorArgReference(registerObjectMapper(config)) .addConstructorArgReference(requestFactoryRef) + .addConstructorArgValue(registry.getBeansOfType(InstrumentedPublishingHandler.class).values().stream().toList()) .setFactoryMethod(CREATE); }); } diff --git a/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/nakadi/config/MeterRegistryAwareBeanPostProcessor.java b/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/nakadi/config/MeterRegistryAwareBeanPostProcessor.java index aa8cfad5..a6617e39 100644 --- a/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/nakadi/config/MeterRegistryAwareBeanPostProcessor.java +++ b/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/nakadi/config/MeterRegistryAwareBeanPostProcessor.java @@ -28,8 +28,4 @@ public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; } - @Override - public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { - return bean; - } } diff --git a/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/nakadi/config/NakadiClientAutoConfiguration.java b/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/nakadi/config/NakadiClientAutoConfiguration.java index 517a897f..3c89ed27 100644 --- a/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/nakadi/config/NakadiClientAutoConfiguration.java +++ b/fahrschein-spring-boot-starter/src/main/java/org/zalando/spring/boot/fahrschein/nakadi/config/NakadiClientAutoConfiguration.java @@ -2,12 +2,14 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.opentelemetry.api.trace.Tracer; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.zalando.fahrschein.opentelemetry.InstrumentedPublishingHandler; @Configuration @AutoConfigureAfter(name = { @@ -31,4 +33,11 @@ public static MeterRegistry consoleLoggingRegistry() { public static BeanPostProcessor meterRegistryAwareBeanPostProcessor() { return new MeterRegistryAwareBeanPostProcessor(); } + + @Bean + @ConditionalOnBean({Tracer.class}) + public static InstrumentedPublishingHandler instrumentedPublishingHandler(Tracer tracer) { + return new InstrumentedPublishingHandler(tracer); + } + } diff --git a/fahrschein-spring-boot-starter/src/test/java/org/zalando/spring/boot/fahrschein/nakadi/ApplicationTest.java b/fahrschein-spring-boot-starter/src/test/java/org/zalando/spring/boot/fahrschein/nakadi/ApplicationTest.java index 0d122940..53fda367 100644 --- a/fahrschein-spring-boot-starter/src/test/java/org/zalando/spring/boot/fahrschein/nakadi/ApplicationTest.java +++ b/fahrschein-spring-boot-starter/src/test/java/org/zalando/spring/boot/fahrschein/nakadi/ApplicationTest.java @@ -12,6 +12,7 @@ import org.springframework.core.convert.converter.Converter; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.zalando.fahrschein.AccessTokenProvider; +import org.zalando.fahrschein.EventPublishingHandler; import org.zalando.fahrschein.NakadiClient; import org.zalando.fahrschein.http.api.ContentEncoding; import org.zalando.spring.boot.fahrschein.config.TimeSpan; @@ -46,6 +47,9 @@ public class ApplicationTest { @Autowired private MeterRegistry meterRegistry; + @Autowired + private List publisherHandlers; + @Autowired @Qualifier("fahrscheinConfigProperties") private FahrscheinConfigProperties configProperties; @@ -54,6 +58,7 @@ public class ApplicationTest { public void contextLoads() { assertThat(meterRegistry).isNotNull(); assertThat(publisher).isNotNull(); + assertThat(publisherHandlers).hasSize(1); Map clientBeans = aac.getBeansOfType(NakadiClient.class); assertThat(clientBeans).hasSize(2); diff --git a/fahrschein-spring-boot-starter/src/test/java/org/zalando/spring/boot/fahrschein/nakadi/ExampleApplication.java b/fahrschein-spring-boot-starter/src/test/java/org/zalando/spring/boot/fahrschein/nakadi/ExampleApplication.java index e130e01e..86651e98 100644 --- a/fahrschein-spring-boot-starter/src/test/java/org/zalando/spring/boot/fahrschein/nakadi/ExampleApplication.java +++ b/fahrschein-spring-boot-starter/src/test/java/org/zalando/spring/boot/fahrschein/nakadi/ExampleApplication.java @@ -1,5 +1,7 @@ package org.zalando.spring.boot.fahrschein.nakadi; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.TracerProvider; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @@ -24,6 +26,11 @@ public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { } + @Bean(name = "tracer") + public Tracer tracer() { + return TracerProvider.noop().get("testing"); + } + @Bean(destroyMethod = "shutdown") public Executor taskScheduler() { return Executors.newScheduledThreadPool(6);