Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
akrambek committed Jan 21, 2025
1 parent dc277e1 commit 4b4af2d
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.support.serializer.JsonSerde;

@Configuration
Expand Down Expand Up @@ -75,9 +76,13 @@ public StreamsBuilderFactoryBean apiGenKafkaStreamsBuilder()
{
waitForTopics();

Map<String, Object> paymentStreamsConfigProperties = commonStreamsConfigProperties();
paymentStreamsConfigProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, API_GEN_STREAMS_BUILDER_BEAN_NAME);
return new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(paymentStreamsConfigProperties));
Map<String, Object> streamsConfig = commonStreamsConfigProperties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, API_GEN_STREAMS_BUILDER_BEAN_NAME);
StreamsBuilderFactoryBean factoryBean =
new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(streamsConfig));
factoryBean.setCleanupConfig(new CleanupConfig(false, false));

return factoryBean;
}

private Map<String, Object> commonStreamsConfigProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
public record ApiGenEvent(
ApiGenEventType type,
String kafkaVersion,
String httpVersion)
String httpVersion,
String message)
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ public enum ApiGenEventType
KAFKA_ASYNC_API_ERRORED,
HTTP_ASYNC_API_PUBLISHED,
HTTP_ASYNC_API_ERRORED,
ZILL_CONFIG_ERRORED,
ZILLA_CONFIG_ERRORED,
ZILLA_CONFIG_PUBLISHED;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package io.aklivity.zillabase.service.api.gen.internal.processor;

import java.io.UnsupportedEncodingException;
import java.util.Map;

import org.apache.kafka.common.serialization.Serde;
Expand All @@ -39,6 +40,7 @@
public class ApiGenProcessor
{
private final Serde<String> stringSerde = Serdes.String();
private final Serde<byte[]> byteSerde = Serdes.ByteArray();
private final Serde<ApiGenEvent> eventSerde = new ApiGenEventSerde();

private final ApiGenConfig config;
Expand All @@ -65,8 +67,19 @@ public void buildPipeline(
String zcatalogsTopic = config.zcatalogsTopic();
String eventsTopic = config.eventsTopic();

streamsBuilder.stream(zcatalogsTopic, Consumed.with(stringSerde, stringSerde))
.mapValues(e -> new ApiGenEvent(ApiGenEventType.CATALOG_UPDATED, "", ""))
streamsBuilder.stream(zcatalogsTopic, Consumed.with(stringSerde, byteSerde))
.mapValues(e ->
{
try
{
System.out.println("Catalog updated: " + new String(e, "UTF-8"));
}
catch (UnsupportedEncodingException ex)
{
throw new RuntimeException(ex);
}
return new ApiGenEvent(ApiGenEventType.CATALOG_UPDATED, null, null, null);
})
.to(eventsTopic, Produced.with(stringSerde, eventSerde));

KStream<String, ApiGenEvent> eventsStream = streamsBuilder.stream(eventsTopic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public ApiGenEvent generate(
{
ApiGenEventType eventType;
String httpSpecVersion = null;
String message = null;

try
{
Expand All @@ -108,11 +109,11 @@ public ApiGenEvent generate(
}
catch (Exception ex)
{
ex.printStackTrace();
eventType = ApiGenEventType.HTTP_ASYNC_API_ERRORED;
message = ex.getMessage();
}

return new ApiGenEvent(eventType, event.kafkaVersion(), httpSpecVersion);
return new ApiGenEvent(eventType, event.kafkaVersion(), httpSpecVersion, message);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public ApiGenEvent generate(
{
ApiGenEventType eventType;
String specVersion = null;
String message = null;

try
{
Expand All @@ -93,11 +94,11 @@ public ApiGenEvent generate(
}
catch (Exception ex)
{
ex.printStackTrace();
eventType = ApiGenEventType.KAFKA_ASYNC_API_ERRORED;
message = ex.getMessage();
}

return new ApiGenEvent(eventType, specVersion, null);
return new ApiGenEvent(eventType, specVersion, null, message);
}

private String generateKafkaAsyncApiSpecs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,22 @@ public ApiGenEvent publish(
ApiGenEvent event)
{
ApiGenEventType newState;
String message = null;

try
{
String zillaConfig = generateConfig(event);
boolean published = specService.publishConfig(zillaConfig);

newState = published ? ApiGenEventType.ZILLA_CONFIG_PUBLISHED : ApiGenEventType.ZILL_CONFIG_ERRORED;
newState = published ? ApiGenEventType.ZILLA_CONFIG_PUBLISHED : ApiGenEventType.ZILLA_CONFIG_ERRORED;
}
catch (Exception ex)
{
ex.printStackTrace();
newState = ApiGenEventType.ZILL_CONFIG_ERRORED;
newState = ApiGenEventType.ZILLA_CONFIG_ERRORED;
message = ex.getMessage();
}

return new ApiGenEvent(newState, event.kafkaVersion(), event.httpVersion());
return new ApiGenEvent(newState, event.kafkaVersion(), event.httpVersion(), message);
}

private String generateConfig(
Expand Down
1 change: 1 addition & 0 deletions services/api-gen/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
#

server.port=8085
spring.kafka.streams.properties.auto.offset.reset: latest
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void shouldGenerateHttpAsyncApiEvent()
{
String kafkaSpecVersion = "1";
String httpSpecVersion = "1";
ApiGenEvent inputEvent = new ApiGenEvent(ApiGenEventType.KAFKA_ASYNC_API_PUBLISHED, kafkaSpecVersion, null);
ApiGenEvent inputEvent = new ApiGenEvent(ApiGenEventType.KAFKA_ASYNC_API_PUBLISHED, kafkaSpecVersion, null, null);

when(specHelper.fetchSpec(KAFKA_ASYNCAPI_ARTIFACT_ID, kafkaSpecVersion)).thenReturn(kafkaSpec);
when(specHelper.publishSpec(anyString(), anyString())).thenReturn(httpSpecVersion);
Expand All @@ -89,7 +89,7 @@ public void shouldGenerateHttpAsyncApiEvent()
@Test
public void shouldHandleExceptionDuringGeneration()
{
ApiGenEvent inputEvent = new ApiGenEvent(ApiGenEventType.KAFKA_ASYNC_API_PUBLISHED, "kafkaVersion", null);
ApiGenEvent inputEvent = new ApiGenEvent(ApiGenEventType.KAFKA_ASYNC_API_PUBLISHED, "kafkaVersion", null, null);

when(specHelper.fetchSpec(anyString(), anyString())).thenThrow(new RuntimeException("Error"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void shouldGenerateKafkaAsyncApiEvent() throws Exception
when(kafkaHelper.resolve()).thenReturn(schemaRecords);
when(specHelper.publishSpec(anyString(), anyString())).thenReturn(specVersion);

ApiGenEvent inputEvent = new ApiGenEvent(ApiGenEventType.KAFKA_ASYNC_API_PUBLISHED, null, null);
ApiGenEvent inputEvent = new ApiGenEvent(ApiGenEventType.KAFKA_ASYNC_API_PUBLISHED, null, null, null);

ApiGenEvent resultEvent = kafkaAsyncApiService.generate(inputEvent);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void setUp()
@Test
public void shouldPublishConfigSuccessfully()
{
ApiGenEvent event = new ApiGenEvent(ApiGenEventType.HTTP_ASYNC_API_PUBLISHED, "2.8.0", "1.1");
ApiGenEvent event = new ApiGenEvent(ApiGenEventType.HTTP_ASYNC_API_PUBLISHED, "2.8.0", "1.1", null);

when(specService.publishConfig(anyString())).thenReturn(true);

Expand Down

0 comments on commit 4b4af2d

Please sign in to comment.