From 4b4af2d7b1b9fe1366e9114579cb53ac28f59cfe Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Tue, 21 Jan 2025 10:39:06 -0800 Subject: [PATCH] WIP --- .../api/gen/internal/config/KafkaConfig.java | 11 ++++++++--- .../api/gen/internal/model/ApiGenEvent.java | 3 ++- .../api/gen/internal/model/ApiGenEventType.java | 2 +- .../gen/internal/processor/ApiGenProcessor.java | 17 +++++++++++++++-- .../internal/service/HttpAsyncApiService.java | 5 +++-- .../internal/service/KafkaAsyncApiService.java | 5 +++-- .../internal/service/PublishConfigService.java | 9 +++++---- .../src/main/resources/application.properties | 1 + .../service/HttpAsyncApiServiceTest.java | 4 ++-- .../service/KafkaAsyncApiServiceTest.java | 2 +- .../service/PublishConfigServiceTest.java | 2 +- 11 files changed, 42 insertions(+), 19 deletions(-) diff --git a/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/config/KafkaConfig.java b/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/config/KafkaConfig.java index 1398ca5..3b2fb02 100644 --- a/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/config/KafkaConfig.java +++ b/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/config/KafkaConfig.java @@ -39,6 +39,7 @@ import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.core.CleanupConfig; import org.springframework.kafka.support.serializer.JsonSerde; @Configuration @@ -75,9 +76,13 @@ public StreamsBuilderFactoryBean apiGenKafkaStreamsBuilder() { waitForTopics(); - Map paymentStreamsConfigProperties = commonStreamsConfigProperties(); - paymentStreamsConfigProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, API_GEN_STREAMS_BUILDER_BEAN_NAME); - return new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(paymentStreamsConfigProperties)); + Map streamsConfig = commonStreamsConfigProperties(); + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, API_GEN_STREAMS_BUILDER_BEAN_NAME); + StreamsBuilderFactoryBean factoryBean = + new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(streamsConfig)); + factoryBean.setCleanupConfig(new CleanupConfig(false, false)); + + return factoryBean; } private Map commonStreamsConfigProperties() diff --git a/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/model/ApiGenEvent.java b/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/model/ApiGenEvent.java index 4b95c68..869715f 100644 --- a/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/model/ApiGenEvent.java +++ b/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/model/ApiGenEvent.java @@ -17,6 +17,7 @@ public record ApiGenEvent( ApiGenEventType type, String kafkaVersion, - String httpVersion) + String httpVersion, + String message) { } diff --git a/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/model/ApiGenEventType.java b/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/model/ApiGenEventType.java index a603f13..fe05828 100644 --- a/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/model/ApiGenEventType.java +++ b/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/model/ApiGenEventType.java @@ -21,6 +21,6 @@ public enum ApiGenEventType KAFKA_ASYNC_API_ERRORED, HTTP_ASYNC_API_PUBLISHED, HTTP_ASYNC_API_ERRORED, - ZILL_CONFIG_ERRORED, + ZILLA_CONFIG_ERRORED, ZILLA_CONFIG_PUBLISHED; } diff --git a/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/processor/ApiGenProcessor.java b/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/processor/ApiGenProcessor.java index dddcaa0..2ea5f2d 100644 --- a/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/processor/ApiGenProcessor.java +++ b/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/processor/ApiGenProcessor.java @@ -14,6 +14,7 @@ */ package io.aklivity.zillabase.service.api.gen.internal.processor; +import java.io.UnsupportedEncodingException; import java.util.Map; import org.apache.kafka.common.serialization.Serde; @@ -39,6 +40,7 @@ public class ApiGenProcessor { private final Serde stringSerde = Serdes.String(); + private final Serde byteSerde = Serdes.ByteArray(); private final Serde eventSerde = new ApiGenEventSerde(); private final ApiGenConfig config; @@ -65,8 +67,19 @@ public void buildPipeline( String zcatalogsTopic = config.zcatalogsTopic(); String eventsTopic = config.eventsTopic(); - streamsBuilder.stream(zcatalogsTopic, Consumed.with(stringSerde, stringSerde)) - .mapValues(e -> new ApiGenEvent(ApiGenEventType.CATALOG_UPDATED, "", "")) + streamsBuilder.stream(zcatalogsTopic, Consumed.with(stringSerde, byteSerde)) + .mapValues(e -> + { + try + { + System.out.println("Catalog updated: " + new String(e, "UTF-8")); + } + catch (UnsupportedEncodingException ex) + { + throw new RuntimeException(ex); + } + return new ApiGenEvent(ApiGenEventType.CATALOG_UPDATED, null, null, null); + }) .to(eventsTopic, Produced.with(stringSerde, eventSerde)); KStream eventsStream = streamsBuilder.stream(eventsTopic, diff --git a/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/service/HttpAsyncApiService.java b/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/service/HttpAsyncApiService.java index 03cfc44..481ee90 100644 --- a/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/service/HttpAsyncApiService.java +++ b/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/service/HttpAsyncApiService.java @@ -97,6 +97,7 @@ public ApiGenEvent generate( { ApiGenEventType eventType; String httpSpecVersion = null; + String message = null; try { @@ -108,11 +109,11 @@ public ApiGenEvent generate( } catch (Exception ex) { - ex.printStackTrace(); eventType = ApiGenEventType.HTTP_ASYNC_API_ERRORED; + message = ex.getMessage(); } - return new ApiGenEvent(eventType, event.kafkaVersion(), httpSpecVersion); + return new ApiGenEvent(eventType, event.kafkaVersion(), httpSpecVersion, message); } diff --git a/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/service/KafkaAsyncApiService.java b/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/service/KafkaAsyncApiService.java index f2cd804..54f98c8 100644 --- a/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/service/KafkaAsyncApiService.java +++ b/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/service/KafkaAsyncApiService.java @@ -75,6 +75,7 @@ public ApiGenEvent generate( { ApiGenEventType eventType; String specVersion = null; + String message = null; try { @@ -93,11 +94,11 @@ public ApiGenEvent generate( } catch (Exception ex) { - ex.printStackTrace(); eventType = ApiGenEventType.KAFKA_ASYNC_API_ERRORED; + message = ex.getMessage(); } - return new ApiGenEvent(eventType, specVersion, null); + return new ApiGenEvent(eventType, specVersion, null, message); } private String generateKafkaAsyncApiSpecs( diff --git a/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/service/PublishConfigService.java b/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/service/PublishConfigService.java index 87b599e..4acd027 100644 --- a/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/service/PublishConfigService.java +++ b/services/api-gen/src/main/java/io/aklivity/zillabase/service/api/gen/internal/service/PublishConfigService.java @@ -69,21 +69,22 @@ public ApiGenEvent publish( ApiGenEvent event) { ApiGenEventType newState; + String message = null; try { String zillaConfig = generateConfig(event); boolean published = specService.publishConfig(zillaConfig); - newState = published ? ApiGenEventType.ZILLA_CONFIG_PUBLISHED : ApiGenEventType.ZILL_CONFIG_ERRORED; + newState = published ? ApiGenEventType.ZILLA_CONFIG_PUBLISHED : ApiGenEventType.ZILLA_CONFIG_ERRORED; } catch (Exception ex) { - ex.printStackTrace(); - newState = ApiGenEventType.ZILL_CONFIG_ERRORED; + newState = ApiGenEventType.ZILLA_CONFIG_ERRORED; + message = ex.getMessage(); } - return new ApiGenEvent(newState, event.kafkaVersion(), event.httpVersion()); + return new ApiGenEvent(newState, event.kafkaVersion(), event.httpVersion(), message); } private String generateConfig( diff --git a/services/api-gen/src/main/resources/application.properties b/services/api-gen/src/main/resources/application.properties index 935a5ef..ad42f56 100644 --- a/services/api-gen/src/main/resources/application.properties +++ b/services/api-gen/src/main/resources/application.properties @@ -14,3 +14,4 @@ # server.port=8085 +spring.kafka.streams.properties.auto.offset.reset: latest diff --git a/services/api-gen/src/test/java/io/aklivity/zillabase/service/api/gen/internal/service/HttpAsyncApiServiceTest.java b/services/api-gen/src/test/java/io/aklivity/zillabase/service/api/gen/internal/service/HttpAsyncApiServiceTest.java index b500f6d..aef579e 100644 --- a/services/api-gen/src/test/java/io/aklivity/zillabase/service/api/gen/internal/service/HttpAsyncApiServiceTest.java +++ b/services/api-gen/src/test/java/io/aklivity/zillabase/service/api/gen/internal/service/HttpAsyncApiServiceTest.java @@ -71,7 +71,7 @@ public void shouldGenerateHttpAsyncApiEvent() { String kafkaSpecVersion = "1"; String httpSpecVersion = "1"; - ApiGenEvent inputEvent = new ApiGenEvent(ApiGenEventType.KAFKA_ASYNC_API_PUBLISHED, kafkaSpecVersion, null); + ApiGenEvent inputEvent = new ApiGenEvent(ApiGenEventType.KAFKA_ASYNC_API_PUBLISHED, kafkaSpecVersion, null, null); when(specHelper.fetchSpec(KAFKA_ASYNCAPI_ARTIFACT_ID, kafkaSpecVersion)).thenReturn(kafkaSpec); when(specHelper.publishSpec(anyString(), anyString())).thenReturn(httpSpecVersion); @@ -89,7 +89,7 @@ public void shouldGenerateHttpAsyncApiEvent() @Test public void shouldHandleExceptionDuringGeneration() { - ApiGenEvent inputEvent = new ApiGenEvent(ApiGenEventType.KAFKA_ASYNC_API_PUBLISHED, "kafkaVersion", null); + ApiGenEvent inputEvent = new ApiGenEvent(ApiGenEventType.KAFKA_ASYNC_API_PUBLISHED, "kafkaVersion", null, null); when(specHelper.fetchSpec(anyString(), anyString())).thenThrow(new RuntimeException("Error")); diff --git a/services/api-gen/src/test/java/io/aklivity/zillabase/service/api/gen/internal/service/KafkaAsyncApiServiceTest.java b/services/api-gen/src/test/java/io/aklivity/zillabase/service/api/gen/internal/service/KafkaAsyncApiServiceTest.java index d978aec..fb74108 100644 --- a/services/api-gen/src/test/java/io/aklivity/zillabase/service/api/gen/internal/service/KafkaAsyncApiServiceTest.java +++ b/services/api-gen/src/test/java/io/aklivity/zillabase/service/api/gen/internal/service/KafkaAsyncApiServiceTest.java @@ -76,7 +76,7 @@ public void shouldGenerateKafkaAsyncApiEvent() throws Exception when(kafkaHelper.resolve()).thenReturn(schemaRecords); when(specHelper.publishSpec(anyString(), anyString())).thenReturn(specVersion); - ApiGenEvent inputEvent = new ApiGenEvent(ApiGenEventType.KAFKA_ASYNC_API_PUBLISHED, null, null); + ApiGenEvent inputEvent = new ApiGenEvent(ApiGenEventType.KAFKA_ASYNC_API_PUBLISHED, null, null, null); ApiGenEvent resultEvent = kafkaAsyncApiService.generate(inputEvent); diff --git a/services/api-gen/src/test/java/io/aklivity/zillabase/service/api/gen/internal/service/PublishConfigServiceTest.java b/services/api-gen/src/test/java/io/aklivity/zillabase/service/api/gen/internal/service/PublishConfigServiceTest.java index 5fd9c4a..df65218 100644 --- a/services/api-gen/src/test/java/io/aklivity/zillabase/service/api/gen/internal/service/PublishConfigServiceTest.java +++ b/services/api-gen/src/test/java/io/aklivity/zillabase/service/api/gen/internal/service/PublishConfigServiceTest.java @@ -58,7 +58,7 @@ public void setUp() @Test public void shouldPublishConfigSuccessfully() { - ApiGenEvent event = new ApiGenEvent(ApiGenEventType.HTTP_ASYNC_API_PUBLISHED, "2.8.0", "1.1"); + ApiGenEvent event = new ApiGenEvent(ApiGenEventType.HTTP_ASYNC_API_PUBLISHED, "2.8.0", "1.1", null); when(specService.publishConfig(anyString())).thenReturn(true);