Skip to content

Commit

Permalink
Use Key for Kafka Publishing (#112)
Browse files Browse the repository at this point in the history
* Introduce a standard CC event and the SDK

Signed-off-by: S m, Aruna <[email protected]>

* Generate SDK build alongside the jar for containers

The SDK contains the connector's common models that
the chaincode developers can utilize. If the connector's
flag is set to parse the events, the events will be
parsed for common structure and utilized for Kafka
publishing.

The SDK can be later reused for other purposes.

Signed-off-by: S m, Aruna <[email protected]>

* Update mvn dependency for json-schema

Dependency updates for package version

Signed-off-by: S m, Aruna <[email protected]>

---------

Signed-off-by: S m, Aruna <[email protected]>
  • Loading branch information
arsulegai authored Jan 14, 2024
1 parent 0f13027 commit bf9c8a8
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 25 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ jobs:
shell: bash
env:
USERNAME_OR_ORG: "${{ github.repository_owner }}"
- name: Release the SDK to GitHub
run: |
mvn -B jar:jar deploy:deploy -Dsdk
env:
GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}"
- name: Increment the pom version
run: |
mvn build-helper:parse-version help:effective-pom validate -D${type}
Expand Down
148 changes: 133 additions & 15 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<jacoco.version>0.8.8</jacoco.version>
<protobuf.version>3.19.6</protobuf.version>
<spring-cloud.version>2021.0.3</spring-cloud.version>
<fabric-chaincode-java.version>2.2.2</fabric-chaincode-java.version>
</properties>

<!-- Adding dependency management through the Spring BOM, so that we need
Expand All @@ -37,6 +38,46 @@
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.hyperledger.fabric-chaincode-java</groupId>
<artifactId>fabric-chaincode-shim</artifactId>
<version>${fabric-chaincode-java.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>com.github.everit-org.json-schema</groupId>
<artifactId>org.everit.json.schema</artifactId>
</exclusion>
<exclusion>
<groupId>org.hyperledger.fabric-chaincode-java</groupId>
<artifactId>fabric-chaincode-protos</artifactId>
</exclusion>
<exclusion>
<groupId>javax.json</groupId>
<artifactId>javax.json-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.hyperledger.fabric-chaincode-java</groupId>
<artifactId>fabric-chaincode-protos</artifactId>
<version>${fabric-chaincode-java.version}</version>
<exclusions>
<exclusion>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.json</groupId>
<artifactId>javax.json-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.github.classgraph</groupId>
<artifactId>classgraph</artifactId>
<version>4.8.139</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
Expand Down Expand Up @@ -144,12 +185,20 @@
<dependency>
<groupId>org.hyperledger.fabric-sdk-java</groupId>
<artifactId>fabric-sdk-java</artifactId>
<version>2.2.12</version>
<version>2.2.25</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Logging -->
Expand All @@ -168,6 +217,12 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.skyscreamer</groupId>
<artifactId>jsonassert</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
Expand Down Expand Up @@ -201,10 +256,53 @@
<artifactId>micrometer-registry-prometheus</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>1.14.11</version>
</dependency>
</dependencies>

<distributionManagement>
<repository>
<id>github</id>
<name>GitHub Packages</name>
<url>https://maven.pkg.github.com/hyperledger-labs/hlf-connector</url>
</repository>
</distributionManagement>

<!-- this section is for auto version increments -->
<profiles>
<profile>
<id>hlf-connector-sdk</id>
<activation>
<property>
<name>sdk</name>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>sdk-only</classifier>
<includes>
<include>**/hlf/java/rest/client/sdk/*</include>
</includes>
</configuration>
<executions>
<execution>
<goals>
<goal>jar</goal>
</goals>
<phase>prepare-package</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>patch-release</id>
<activation>
Expand All @@ -229,6 +327,17 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
Expand Down Expand Up @@ -256,6 +365,17 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
Expand Down Expand Up @@ -283,26 +403,24 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<build>
<finalName>hlf-connector</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public static class Events {
// TODO: This will be removed or deprecated and the property 'chaincodeDetails' will be
// preferred for providing Chaincode details for Event subscription
private List<String> chaincode;
private boolean standardCCEventEnabled;
private List<String> block;
private List<ChaincodeDetails> chaincodeDetails;
}
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/hlf/java/rest/client/sdk/StandardCCEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package hlf.java.rest.client.sdk;

import java.io.Serializable;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import org.hyperledger.fabric.contract.annotation.DataType;
import org.hyperledger.fabric.contract.annotation.Property;

/**
* StandardCCEvent can be used by smart contract developers to send a commonly wrapped event that
* the hlf-connector decodes. The decoded event can be used to publish to Kafka.
*/
@Data
@DataType
public class StandardCCEvent implements Serializable {
@Property()
@JsonProperty("key")
private String key;

@Property
@JsonProperty("event")
private String event;
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package hlf.java.rest.client.service.impl;

import hlf.java.rest.client.config.FabricProperties;
import hlf.java.rest.client.config.KafkaProperties;
import hlf.java.rest.client.sdk.StandardCCEvent;
import hlf.java.rest.client.service.EventPublishService;
import hlf.java.rest.client.util.FabricClientConstants;
import hlf.java.rest.client.util.FabricEventParseUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
Expand All @@ -19,8 +22,9 @@
@ConditionalOnProperty("kafka.event-listener.topic")
public class EventPublishServiceImpl implements EventPublishService {

@Value("${kafka.event-listener.topic}")
private String topicName;
@Autowired private KafkaProperties kafkaProperties;

@Autowired private FabricProperties fabricProperties;

@Autowired private KafkaTemplate<String, String> kafkaTemplate;

Expand All @@ -34,7 +38,8 @@ public boolean sendMessage(String msg, String fabricTxId, String eventName, Stri
try {

ProducerRecord<String, String> producerRecord =
new ProducerRecord<String, String>(topicName, String.valueOf(msg.hashCode()), msg);
new ProducerRecord<String, String>(
kafkaProperties.getEventListener().getTopic(), String.valueOf(msg.hashCode()), msg);

producerRecord
.headers()
Expand Down Expand Up @@ -86,9 +91,18 @@ public boolean publishChaincodeEvents(
boolean status = true;

try {

String key = String.valueOf(payload.hashCode());
if (fabricProperties.getEvents().isStandardCCEventEnabled()) {
// Fetch the key information for chaincode events,
// but only if the feature is enabled.

// Parse the payload and use the key.
StandardCCEvent standardCCEvent =
FabricEventParseUtil.parseString(payload, StandardCCEvent.class);
key = standardCCEvent.getKey();
}
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topicName, String.valueOf(payload.hashCode()), payload);
new ProducerRecord<>(kafkaProperties.getEventListener().getTopic(), key, payload);

producerRecord
.headers()
Expand All @@ -113,7 +127,9 @@ public boolean publishChaincodeEvents(
FabricClientConstants.FABRIC_EVENT_TYPE,
FabricClientConstants.FABRIC_EVENT_TYPE_CHAINCODE.getBytes()));

log.info("Publishing Chaincode event to outbound topic {}", topicName);
log.info(
"Publishing Chaincode event to outbound topic {}",
kafkaProperties.getEventListener().getTopic());

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(producerRecord);

Expand Down Expand Up @@ -157,7 +173,10 @@ public boolean publishBlockEvents(
try {

ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topicName, String.valueOf(payload.hashCode()), payload);
new ProducerRecord<>(
kafkaProperties.getEventListener().getTopic(),
String.valueOf(payload.hashCode()),
payload);

producerRecord
.headers()
Expand Down Expand Up @@ -192,7 +211,9 @@ public boolean publishBlockEvents(
FabricClientConstants.IS_PRIVATE_DATA_PRESENT,
isPrivateDataPresent.toString().getBytes()));

log.info("Publishing Block event to outbound topic {}", topicName);
log.info(
"Publishing Block event to outbound topic {}",
kafkaProperties.getEventListener().getTopic());

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(producerRecord);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,8 @@ public static String createEventStructure(
}
return message;
}

public static <T> T parseString(String string, Class<T> toClass) throws JsonProcessingException {
return mapper.readValue(string, toClass);
}
}
1 change: 1 addition & 0 deletions src/main/resources/application.template
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ fabric:
apikey: ePVYHwAaQ0V1XOTX6U
events:
enable: true
standardCCEventEnabled: boolean (if set to true then the chaincode event is attempted at deserializing in the connector)
chaincode: <comma separated list of channels> (Note : Will soon be deprecated / removed)
block: <comma separated list of channels>
chaincodeDetails:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private static class TestConfiguration {
static String FABRIC_PROPERTIES_CLIENT =
"FabricProperties.Client(rest=FabricProperties.Client.Rest(apikey=expected-key))";
static String FABRIC_PROPERTIES_EVENTS =
"FabricProperties.Events(enable=true, chaincode=[chaincode12, chaincode2], block=[block111, block2], chaincodeDetails=null)";
"FabricProperties.Events(enable=true, chaincode=[chaincode12, chaincode2], standardCCEventEnabled=false, block=[block111, block2], chaincodeDetails=null)";
static String KAFKA_PROPERTIES_PRODUCER =
"Producer{brokerHost='localhost:8087', topic='hlf-offchain-topic1', saslJaasConfig='null'}";
static String KAFKA_CONSUMER_PROPERTIES =
Expand Down
2 changes: 2 additions & 0 deletions src/test/resources/integration/sample-application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ fabric:
apikey: expected-key
events:
enable: true
standardCCEventEnabled: false
chaincode: chaincode12, chaincode2
block: block111, block2
kafka:
Expand Down Expand Up @@ -69,6 +70,7 @@ fabric:
filename: connection.yaml
events:
enable: false
standardCCEventEnabled: false
chaincode: channel1
block: channel1
client:
Expand Down

0 comments on commit bf9c8a8

Please sign in to comment.