From bf9c8a85a35dbe1dd963507bbdd19e1891743a9d Mon Sep 17 00:00:00 2001 From: Arun S M Date: Sun, 14 Jan 2024 12:25:09 -0600 Subject: [PATCH] Use Key for Kafka Publishing (#112) * Introduce a standard CC event and the SDK Signed-off-by: S m, Aruna * Generate SDK build alongside the jar for containers The SDK contains the connector's common models that the chaincode developers can utilize. If the connector's flag is set to parse the events, the events will be parsed for common structure and utilized for Kafka publishing. The SDK can be later reused for other purposes. Signed-off-by: S m, Aruna * Update mvn dependency for json-schema Dependency updates for package version Signed-off-by: S m, Aruna --------- Signed-off-by: S m, Aruna --- .github/workflows/release.yml | 5 + pom.xml | 148 ++++++++++++++++-- .../rest/client/config/FabricProperties.java | 1 + .../java/rest/client/sdk/StandardCCEvent.java | 24 +++ .../service/impl/EventPublishServiceImpl.java | 39 +++-- .../client/util/FabricEventParseUtil.java | 4 + src/main/resources/application.template | 1 + ...onfigHandlerControllerIntegrationTest.java | 2 +- .../integration/sample-application.yml | 2 + 9 files changed, 201 insertions(+), 25 deletions(-) create mode 100644 src/main/java/hlf/java/rest/client/sdk/StandardCCEvent.java diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index f4fa70c9..04c9fd85 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -52,6 +52,11 @@ jobs: shell: bash env: USERNAME_OR_ORG: "${{ github.repository_owner }}" + - name: Release the SDK to GitHub + run: | + mvn -B jar:jar deploy:deploy -Dsdk + env: + GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}" - name: Increment the pom version run: | mvn build-helper:parse-version help:effective-pom validate -D${type} diff --git a/pom.xml b/pom.xml index 9c273412..b53d147a 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,7 @@ 0.8.8 3.19.6 2021.0.3 + 2.2.2 @@ -168,6 +217,12 @@ org.springframework.boot spring-boot-starter-test test + + + org.skyscreamer + jsonassert + + org.mockito @@ -201,10 +256,53 @@ micrometer-registry-prometheus runtime - + + net.bytebuddy + byte-buddy + 1.14.11 + + + + + + github + GitHub Packages + https://maven.pkg.github.com/hyperledger-labs/hlf-connector + + + + hlf-connector-sdk + + + sdk + + + + + + org.apache.maven.plugins + maven-jar-plugin + + sdk-only + + **/hlf/java/rest/client/sdk/* + + + + + + jar + + prepare-package + + + + + + patch-release @@ -229,6 +327,17 @@ + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + @@ -256,6 +365,17 @@ + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + @@ -283,6 +403,17 @@ + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + @@ -290,19 +421,6 @@ hlf-connector - - - org.springframework.boot - spring-boot-maven-plugin - - - - repackage - - - - - diff --git a/src/main/java/hlf/java/rest/client/config/FabricProperties.java b/src/main/java/hlf/java/rest/client/config/FabricProperties.java index f68c17b1..cb34d651 100644 --- a/src/main/java/hlf/java/rest/client/config/FabricProperties.java +++ b/src/main/java/hlf/java/rest/client/config/FabricProperties.java @@ -67,6 +67,7 @@ public static class Events { // TODO: This will be removed or deprecated and the property 'chaincodeDetails' will be // preferred for providing Chaincode details for Event subscription private List chaincode; + private boolean standardCCEventEnabled; private List block; private List chaincodeDetails; } diff --git a/src/main/java/hlf/java/rest/client/sdk/StandardCCEvent.java b/src/main/java/hlf/java/rest/client/sdk/StandardCCEvent.java new file mode 100644 index 00000000..c2467539 --- /dev/null +++ b/src/main/java/hlf/java/rest/client/sdk/StandardCCEvent.java @@ -0,0 +1,24 @@ +package hlf.java.rest.client.sdk; + +import java.io.Serializable; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; +import org.hyperledger.fabric.contract.annotation.DataType; +import org.hyperledger.fabric.contract.annotation.Property; + +/** + * StandardCCEvent can be used by smart contract developers to send a commonly wrapped event that + * the hlf-connector decodes. The decoded event can be used to publish to Kafka. + */ +@Data +@DataType +public class StandardCCEvent implements Serializable { + @Property() + @JsonProperty("key") + private String key; + + @Property + @JsonProperty("event") + private String event; +} diff --git a/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java b/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java index 7b2d4d83..ab52217b 100644 --- a/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java +++ b/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java @@ -1,12 +1,15 @@ package hlf.java.rest.client.service.impl; +import hlf.java.rest.client.config.FabricProperties; +import hlf.java.rest.client.config.KafkaProperties; +import hlf.java.rest.client.sdk.StandardCCEvent; import hlf.java.rest.client.service.EventPublishService; import hlf.java.rest.client.util.FabricClientConstants; +import hlf.java.rest.client.util.FabricEventParseUtil; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.internals.RecordHeader; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; @@ -19,8 +22,9 @@ @ConditionalOnProperty("kafka.event-listener.topic") public class EventPublishServiceImpl implements EventPublishService { - @Value("${kafka.event-listener.topic}") - private String topicName; + @Autowired private KafkaProperties kafkaProperties; + + @Autowired private FabricProperties fabricProperties; @Autowired private KafkaTemplate kafkaTemplate; @@ -34,7 +38,8 @@ public boolean sendMessage(String msg, String fabricTxId, String eventName, Stri try { ProducerRecord producerRecord = - new ProducerRecord(topicName, String.valueOf(msg.hashCode()), msg); + new ProducerRecord( + kafkaProperties.getEventListener().getTopic(), String.valueOf(msg.hashCode()), msg); producerRecord .headers() @@ -86,9 +91,18 @@ public boolean publishChaincodeEvents( boolean status = true; try { - + String key = String.valueOf(payload.hashCode()); + if (fabricProperties.getEvents().isStandardCCEventEnabled()) { + // Fetch the key information for chaincode events, + // but only if the feature is enabled. + + // Parse the payload and use the key. + StandardCCEvent standardCCEvent = + FabricEventParseUtil.parseString(payload, StandardCCEvent.class); + key = standardCCEvent.getKey(); + } ProducerRecord producerRecord = - new ProducerRecord<>(topicName, String.valueOf(payload.hashCode()), payload); + new ProducerRecord<>(kafkaProperties.getEventListener().getTopic(), key, payload); producerRecord .headers() @@ -113,7 +127,9 @@ public boolean publishChaincodeEvents( FabricClientConstants.FABRIC_EVENT_TYPE, FabricClientConstants.FABRIC_EVENT_TYPE_CHAINCODE.getBytes())); - log.info("Publishing Chaincode event to outbound topic {}", topicName); + log.info( + "Publishing Chaincode event to outbound topic {}", + kafkaProperties.getEventListener().getTopic()); ListenableFuture> future = kafkaTemplate.send(producerRecord); @@ -157,7 +173,10 @@ public boolean publishBlockEvents( try { ProducerRecord producerRecord = - new ProducerRecord<>(topicName, String.valueOf(payload.hashCode()), payload); + new ProducerRecord<>( + kafkaProperties.getEventListener().getTopic(), + String.valueOf(payload.hashCode()), + payload); producerRecord .headers() @@ -192,7 +211,9 @@ public boolean publishBlockEvents( FabricClientConstants.IS_PRIVATE_DATA_PRESENT, isPrivateDataPresent.toString().getBytes())); - log.info("Publishing Block event to outbound topic {}", topicName); + log.info( + "Publishing Block event to outbound topic {}", + kafkaProperties.getEventListener().getTopic()); ListenableFuture> future = kafkaTemplate.send(producerRecord); diff --git a/src/main/java/hlf/java/rest/client/util/FabricEventParseUtil.java b/src/main/java/hlf/java/rest/client/util/FabricEventParseUtil.java index 15813575..5da83d9c 100644 --- a/src/main/java/hlf/java/rest/client/util/FabricEventParseUtil.java +++ b/src/main/java/hlf/java/rest/client/util/FabricEventParseUtil.java @@ -198,4 +198,8 @@ public static String createEventStructure( } return message; } + + public static T parseString(String string, Class toClass) throws JsonProcessingException { + return mapper.readValue(string, toClass); + } } diff --git a/src/main/resources/application.template b/src/main/resources/application.template index cd50d739..49e735fe 100644 --- a/src/main/resources/application.template +++ b/src/main/resources/application.template @@ -30,6 +30,7 @@ fabric: apikey: ePVYHwAaQ0V1XOTX6U events: enable: true + standardCCEventEnabled: boolean (if set to true then the chaincode event is attempted at deserializing in the connector) chaincode: (Note : Will soon be deprecated / removed) block: chaincodeDetails: diff --git a/src/test/java/hlf/java/rest/client/integration/ConfigHandlerControllerIntegrationTest.java b/src/test/java/hlf/java/rest/client/integration/ConfigHandlerControllerIntegrationTest.java index 49eff10a..dac61a3a 100644 --- a/src/test/java/hlf/java/rest/client/integration/ConfigHandlerControllerIntegrationTest.java +++ b/src/test/java/hlf/java/rest/client/integration/ConfigHandlerControllerIntegrationTest.java @@ -81,7 +81,7 @@ private static class TestConfiguration { static String FABRIC_PROPERTIES_CLIENT = "FabricProperties.Client(rest=FabricProperties.Client.Rest(apikey=expected-key))"; static String FABRIC_PROPERTIES_EVENTS = - "FabricProperties.Events(enable=true, chaincode=[chaincode12, chaincode2], block=[block111, block2], chaincodeDetails=null)"; + "FabricProperties.Events(enable=true, chaincode=[chaincode12, chaincode2], standardCCEventEnabled=false, block=[block111, block2], chaincodeDetails=null)"; static String KAFKA_PROPERTIES_PRODUCER = "Producer{brokerHost='localhost:8087', topic='hlf-offchain-topic1', saslJaasConfig='null'}"; static String KAFKA_CONSUMER_PROPERTIES = diff --git a/src/test/resources/integration/sample-application.yml b/src/test/resources/integration/sample-application.yml index 2535ba80..7544a1e5 100644 --- a/src/test/resources/integration/sample-application.yml +++ b/src/test/resources/integration/sample-application.yml @@ -36,6 +36,7 @@ fabric: apikey: expected-key events: enable: true + standardCCEventEnabled: false chaincode: chaincode12, chaincode2 block: block111, block2 kafka: @@ -69,6 +70,7 @@ fabric: filename: connection.yaml events: enable: false + standardCCEventEnabled: false chaincode: channel1 block: channel1 client: