Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: assimbly/runtime
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 5.0.0
Choose a base ref
...
head repository: assimbly/runtime
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: main
Choose a head ref
Loading
Showing with 1,268 additions and 343 deletions.
  1. +1 −1 .github/workflows/release.yml
  2. +2 −2 .github/workflows/update-version.yml
  3. +1 −1 broker/pom.xml
  4. +1 −1 brokerRest/pom.xml
  5. +1 −1 dil/pom.xml
  6. +1 −0 dil/src/main/java/org/assimbly/dil/blocks/beans/AggregateStrategy.java
  7. +3 −0 dil/src/main/java/org/assimbly/dil/blocks/beans/enrich/EnrichStrategy.java
  8. +33 −7 dil/src/main/java/org/assimbly/dil/blocks/beans/enrich/json/JsonEnrichStrategy.java
  9. +2 −2 dil/src/main/java/org/assimbly/dil/blocks/beans/enrich/override/OverrideEnrichStrategy.java
  10. +10 −2 dil/src/main/java/org/assimbly/dil/blocks/beans/enrich/zipfile/ZipFileEnrichStrategy.java
  11. +5 −1 dil/src/main/java/org/assimbly/dil/blocks/beans/xml/XmlAggregateStrategy.java
  12. +15 −12 dil/src/main/java/org/assimbly/dil/blocks/connections/Connection.java
  13. +2 −2 dil/src/main/java/org/assimbly/dil/blocks/connections/broker/ActiveMQConnection.java
  14. +40 −14 dil/src/main/java/org/assimbly/dil/blocks/connections/broker/RabbitMQConnection.java
  15. +4 −0 dil/src/main/java/org/assimbly/dil/blocks/processors/JsonExchangeFormatter.java
  16. +56 −0 dil/src/main/java/org/assimbly/dil/blocks/processors/OpenTelemetryLogProcessor.java
  17. +1 −0 dil/src/main/java/org/assimbly/dil/blocks/processors/SetHeadersProcessor.java
  18. +1 −24 dil/src/main/java/org/assimbly/dil/blocks/processors/SetLogProcessor.java
  19. +3 −6 dil/src/main/java/org/assimbly/dil/event/EventConfigurer.java
  20. +8 −3 dil/src/main/java/org/assimbly/dil/event/collect/RouteCollector.java
  21. +64 −30 dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java
  22. +10 −0 dil/src/main/java/org/assimbly/dil/event/domain/Collection.java
  23. +14 −3 dil/src/main/java/org/assimbly/dil/event/domain/MessageEvent.java
  24. +29 −13 dil/src/main/java/org/assimbly/dil/loader/FlowLoader.java
  25. +12 −8 dil/src/main/java/org/assimbly/dil/loader/FlowLoaderReport.java
  26. +3 −8 dil/src/main/java/org/assimbly/dil/loader/RouteLoader.java
  27. +21 −0 dil/src/main/java/org/assimbly/dil/transpiler/XMLFileConfiguration.java
  28. +22 −26 dil/src/main/java/org/assimbly/dil/transpiler/marshalling/Unmarshall.java
  29. +11 −0 dil/src/main/java/org/assimbly/dil/transpiler/marshalling/catalog/CustomKameletCatalog.java
  30. +8 −7 dil/src/main/java/org/assimbly/dil/transpiler/marshalling/core/RouteTemplate.java
  31. +1 −1 dil/src/main/java/org/assimbly/dil/transpiler/transform/Transform.java
  32. +4 −4 dil/src/main/java/org/assimbly/dil/validation/FtpValidator.java
  33. +9 −0 dil/src/main/java/org/assimbly/dil/validation/beans/FtpSettings.java
  34. +23 −0 dil/src/main/resources/kamelets/setbodybyheaders-action.kamelet.yaml
  35. +22 −0 dil/src/main/resources/kamelets/setbodybyheaders-sink.kamelet.yaml
  36. +22 −24 dil/src/main/resources/transform-to-dil.xsl
  37. +1 −1 integration/pom.xml
  38. +80 −2 integration/src/main/java/org/assimbly/integration/Integration.java
  39. +17 −1 integration/src/main/java/org/assimbly/integration/impl/BaseIntegration.java
  40. +493 −124 integration/src/main/java/org/assimbly/integration/impl/CamelIntegration.java
  41. +1 −1 integrationRest/pom.xml
  42. +1 −0 integrationRest/src/main/java/org/assimbly/integrationrest/FlowManagerRuntime.java
  43. +135 −0 integrationRest/src/main/java/org/assimbly/integrationrest/HealthRuntime.java
  44. +25 −0 integrationRest/src/main/java/org/assimbly/integrationrest/IntegrationRuntime.java
  45. +41 −2 integrationRest/src/main/java/org/assimbly/integrationrest/StatisticsRuntime.java
  46. +9 −9 pom.xml
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -61,7 +61,7 @@ jobs:
- name: Checkout the code
uses: actions/checkout@v4
with:
repository: ${{ github.repository }}
repository: "assimbly/runtime"
ref: ${{ inputs.branch }}
fetch-depth: 0
token: ${{ secrets.RELEASE_TOKEN }}
4 changes: 2 additions & 2 deletions .github/workflows/update-version.yml
Original file line number Diff line number Diff line change
@@ -13,11 +13,11 @@ on:
required: true

workflow_call:
inputs:
inputs:
branch:
description: 'Branch to use'
type: string
default: 'develop'
default: 'develop'
milestone:
description: 'Milestone to use as version'
type: string
2 changes: 1 addition & 1 deletion broker/pom.xml
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
<parent>
<artifactId>runtime</artifactId>
<groupId>org.assimbly</groupId>
<version>5.0.0</version>
<version>5.0.3</version>
</parent>

<name>broker</name>
2 changes: 1 addition & 1 deletion brokerRest/pom.xml
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
<parent>
<artifactId>runtime</artifactId>
<groupId>org.assimbly</groupId>
<version>5.0.0</version>
<version>5.0.3</version>
</parent>

<name>broker-rest</name>
2 changes: 1 addition & 1 deletion dil/pom.xml
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
<parent>
<artifactId>runtime</artifactId>
<groupId>org.assimbly</groupId>
<version>5.0.0</version>
<version>5.0.3</version>
</parent>

<name>dil</name>
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
switch(aggregateType) {
case "xml":
case "text/xml":
case "application/xml":
aggregateStrategy = new XmlAggregateStrategy();
break;
case "json":
Original file line number Diff line number Diff line change
@@ -24,9 +24,12 @@ public Exchange aggregate(Exchange originalExchange, Exchange resourceExchange)
AggregationStrategy enrichStrategy;

switch(enrichType) {
case "xml":
case "text/xml":
case "application/xml":
enrichStrategy = (AggregationStrategy) new XmlEnrichStrategy();
break;
case "json":
case "application/json":
enrichStrategy = (AggregationStrategy) new JsonEnrichStrategy();
break;
Original file line number Diff line number Diff line change
@@ -2,33 +2,41 @@

import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.TypeConversionException;
import org.apache.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.assimbly.aggregate.json.JsonAggregateStrategy;


public class JsonEnrichStrategy implements AggregationStrategy {

final static Logger logger = Logger.getLogger(JsonAggregateStrategy.class);

@Override
public Exchange aggregate(Exchange original, Exchange resource) {

JSONArray array = new JSONArray();

if(resource == null) {
return original;
}else if (original == null || !(original.getIn().getBody(String.class) instanceof String)) {
}

array = wrapArray(array,resource.getIn().getBody(String.class));
JSONArray array = new JSONArray();

if (original == null) {

String resourceBody = convertBodyToString(resource);

array = wrapArray(array,resourceBody);
resource.getIn().setBody(array.toString(2));

return resource;

}else{

array = wrapArray(array,original.getIn().getBody(String.class));
array = wrapArray(array, resource.getIn().getBody(String.class));
String originalBody = convertBodyToString(original);
String resourceBody = convertBodyToString(resource);

array = wrapArray(array, originalBody);
array = wrapArray(array, resourceBody);

original.getIn().setBody(array.toString(2));

@@ -46,4 +54,22 @@ private JSONArray wrapArray(JSONArray array, String json){
}
}

private String convertBodyToString(Exchange exchange){

Object body = exchange.getIn().getBody();

if (body instanceof String) {
return exchange.getIn().getBody(String.class);
} else {
try {
// Convert Object to String using Camel's typeconverter
return exchange.getContext().getTypeConverter().convertTo(String.class, body);
} catch (TypeConversionException e) {
logger.error("Failed to enrich message body of type: " + body.getClass().getName() + " | Error:" + e.getMessage());
throw e;
}
}

}

}
Original file line number Diff line number Diff line change
@@ -22,10 +22,10 @@ public Exchange aggregate(Exchange original, Exchange resource) {
boolean ignoreNullResource = original.getProperty("AssimblyAggregateNoExceptionOnNull", boolean.class);

if (errorRoute && !ignoreNullResource) {
throw new EnrichException("Can't override body - nothing in the bottom route");
throw new EnrichException("Can't override body");
}

original.getIn().setBody(null);
original.getIn().setBody(null,String.class);
result = original;
} else {
original.getIn().setBody(resource.getIn().getBody());
Original file line number Diff line number Diff line change
@@ -27,13 +27,21 @@ public class ZipFileEnrichStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
elementNames = new ArrayList<>();

if (newExchange == null) {
// there’s no remote file to consume
return oldExchange;
}

Message in = oldExchange.getIn();
Message resource = newExchange.getIn();

byte[] sourceZip = in.getBody(byte[].class);
byte[] resourceData = resource.getBody(byte[].class);
byte[] resourceData = newExchange.getContext().getTypeConverter().convertTo(byte[].class, resource.getBody());

String fileName = resource.getHeader(Exchange.FILE_NAME, String.class);
String fileName = resource.getHeader(Exchange.FILE_NAME_CONSUMED, String.class);
if(fileName == null) {
fileName = resource.getHeader(Exchange.FILE_NAME, String.class);
}

ByteArrayOutputStream baos = new ByteArrayOutputStream();

Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ public Exchange aggregate(Exchange newExchange, Exchange splitExchange) {
private String buildAggregateBody(String newXml, String splitXml) {
String result = "";

if(splitXml.indexOf("?>") != -1) {
if(containsXmlDeclaration(splitXml)) {
// removes xml declaration from splitXml
int declarationEndPos = splitXml.indexOf("?>");
splitXml = splitXml.substring(declarationEndPos + 2);
@@ -62,6 +62,10 @@ private String buildAggregateBody(String newXml, String splitXml) {
return result;
}

private boolean containsXmlDeclaration(String xml) {
return xml.trim().startsWith("<?xml");
}

private String getBody(Exchange exchange) {

try {
Original file line number Diff line number Diff line change
@@ -57,36 +57,39 @@ private void startConnection() throws Exception{

EncryptableProperties decryptedProperties = decryptProperties(properties);

switch (connectionType) {
case "ActiveMQ":
if(connectionType==null){
connectionType = "unconfigured";
}

switch (connectionType.toLowerCase()) {
case "activemq":
new ActiveMQConnection(context, decryptedProperties, connectionId, "activeMQ").start();
break;
case "AmazonMQ":
case "amazonmq":
new ActiveMQConnection(context, decryptedProperties, connectionId, "amazonmq").start();
break;
case "SonicMQ":
case "sonicmq":
String connectId = stepType + connectionIdValue + RANDOM.nextInt(1000000);
new SonicMQConnection(context, decryptedProperties, connectionId, "sonicmq").start(flowId, connectId, connectionIdValue);
uri = uri.replace("sonicmq:", "sonicmq." + flowId + connectId + ":");
properties.put(stepType + "." + stepId + ".uri", uri);
break;
case "MQ":
System.out.println("New MQ Connection");
case "mq":
new MQConnection(context, decryptedProperties, connectionId, "sjms").start(stepType, stepId);
break;
case "AMQPS":
case "amqps":
new AMQPConnection(context, decryptedProperties, connectionId, "amqps").start(true);
break;
case "AMQP":
case "amqp":
new AMQPConnection(context, decryptedProperties, connectionId, "amqp").start(false);
break;
case "IBMMQ":
case "ibmq":
new IBMMQConnection(context, decryptedProperties, connectionId, "ibmmq").start(stepType, stepId);
break;
case "RABBITMQ":
new RabbitMQConnection(context, decryptedProperties, connectionId, "rabbitmq").start();
case "rabbitmq", "spring-rabbitmq":
new RabbitMQConnection(context, decryptedProperties, connectionId, "spring-rabbitmq").start();
break;
case "JDBC":
case "jdbc":
new JDBCConnection(context, decryptedProperties, connectionId, "sql").start(stepType, stepId);
break;
default:
Original file line number Diff line number Diff line change
@@ -73,10 +73,10 @@ private void setFields(){
} else if (conType.equals("pooled")) {

if (maxConnections == null) {
maxConnections = "10";
maxConnections = "2";
}
if (concurentConsumers == null) {
concurentConsumers = "10";
concurentConsumers = "2";
}

}
Original file line number Diff line number Diff line change
@@ -17,7 +17,9 @@ public class RabbitMQConnection {
private String componentName;
private String connectionId;
private CachingConnectionFactory rabbitMQConnectionFactory;
private String url;
private String uri;
private String host;
private String port;
private String username;
private String password;
private String virtualHost;
@@ -36,10 +38,12 @@ public void start() throws Exception {
setFields();

if (context.hasComponent(componentName) == null) {
if (url != null) {
if (uri != null) {
setConnection();
}else if(host!=null && port!=null){
setConnection();
}else{
throw new Exception("Unknown url. Broker url is required");
throw new Exception("RabbitMQ connection parameters are invalid. Broker uri or host/port are required");
}
} else {
log.error("RabbitMQ connection parameters are invalid.");
@@ -49,28 +53,50 @@ public void start() throws Exception {

private void setFields(){

url = properties.getProperty("connection." + connectionId + ".url");
uri = properties.getProperty("connection." + connectionId + ".uri");
host = properties.getProperty("connection." + connectionId + ".host");
port = properties.getProperty("connection." + connectionId + ".port");
virtualHost = properties.getProperty("connection." + connectionId + ".vhost");
username = properties.getProperty("connection." + connectionId + ".username");
password = properties.getProperty("connection." + connectionId + ".password");

}

private void setConnection() throws JMSException {

if(context.getRegistry().lookupByName(connectionId) == null) {

private void setConnection() throws JMSException {
log.info("Create new rabbitMQ Connection with connection-id: " + connectionId);

rabbitMQConnectionFactory.setUri(url);
rabbitMQConnectionFactory.setVirtualHost(virtualHost);
if(username!=null && password != null) {
rabbitMQConnectionFactory.setUsername(username);
rabbitMQConnectionFactory.setPassword(password);
}
rabbitMQConnectionFactory = new CachingConnectionFactory();

//Connection connection = rabbitMQConnectionFactory.createConnection();
rabbitMQConnectionFactory.start();
if (uri != null) {
rabbitMQConnectionFactory.setUri(uri);
} else if (host != null && port != null) {
rabbitMQConnectionFactory.setHost(host);
rabbitMQConnectionFactory.setPort(Integer.parseInt(port));
} else {
rabbitMQConnectionFactory.setHost("localhost");
rabbitMQConnectionFactory.setPort(5672);
}

context.getRegistry().bind(connectionId,rabbitMQConnectionFactory);
if (virtualHost != null) {
rabbitMQConnectionFactory.setVirtualHost(virtualHost);
} else {
rabbitMQConnectionFactory.setVirtualHost("/");
}

if (username != null && password != null) {
rabbitMQConnectionFactory.setUsername(username);
rabbitMQConnectionFactory.setPassword(password);
}

rabbitMQConnectionFactory.start();

context.getRegistry().bind(connectionId, rabbitMQConnectionFactory);
}else{
log.info("Reuse RabbitMQ Connection with connection-id: " + connectionId);
}

}

Original file line number Diff line number Diff line change
@@ -144,6 +144,10 @@ private void addVariables(Exchange exchange) {
}

private void addHeaders(Exchange exchange) {

JSONObject headers2 = getJsonFromMap(filterHeaderAndProperties(exchange.getIn().getHeaders()));
json.put("Headers", headers2);

if (showAll || showHeaders) {
JSONObject headers = getJsonFromMap(filterHeaderAndProperties(exchange.getIn().getHeaders()));
json.put("Headers", headers);
Loading