Skip to content

Commit

Permalink
Only create one socket listener (#175)
Browse files Browse the repository at this point in the history
Signed-off-by: Sanjula Ganepola <[email protected]>
Co-authored-by: Sanjula Ganepola <[email protected]>
Co-authored-by: Sanjula Ganepola <[email protected]>
  • Loading branch information
3 people authored Dec 18, 2024
1 parent 1f72320 commit af6bfa7
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.io.File;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -13,6 +15,7 @@

import com.github.theprez.jcmdutils.StringUtils;
import com.github.theprez.manzan.ManzanEventType;
import com.github.theprez.manzan.ManzanEventType;
import com.github.theprez.manzan.WatchStarter;
import com.github.theprez.manzan.routes.ManzanRoute;
import com.github.theprez.manzan.routes.event.FileEvent;
Expand All @@ -22,6 +25,8 @@
import com.ibm.as400.access.ErrorCompletingRequestException;
import com.ibm.as400.access.ObjectDoesNotExistException;

import static com.github.theprez.manzan.routes.ManzanRoute.createRecipientList;

public class DataConfig extends Config {

private final static int DEFAULT_INTERVAL = 5;
Expand All @@ -46,21 +51,25 @@ public synchronized Map<String, ManzanRoute> getRoutes() throws IOException, AS4
return m_routes;
}
final Map<String, ManzanRoute> ret = new LinkedHashMap<String, ManzanRoute>();
final List<String> watchEvents = new ArrayList<>();
final String schema = ApplicationConfig.get().getLibrary();

for (final String section : getIni().keySet()) {
final String type = getIni().get(section, "type");
if (StringUtils.isEmpty(type)) {
throw new RuntimeException("Type not specified for data source [" + section + "]");
}
if ("false".equalsIgnoreCase(getIni().get(section, "enabled"))) {
continue;
} else if ( type.equals("watch")){
// We will handle the watch events separately as the logic is a bit more complicated
watchEvents.add(section);
continue;
}
final String name = section;
final String schema = ApplicationConfig.get().getLibrary();
final String format = getOptionalString(name, "format");
int userInterval = getOptionalInt(name, "interval");
final int interval = userInterval != -1 ? userInterval : DEFAULT_INTERVAL;
int userNumToProcess = getOptionalInt(name, "numToProcess");
final int numToProcess = userNumToProcess != -1 ? userNumToProcess : DEFAULT_NUM_TO_PROCESS;
final List<String> destinations = new LinkedList<String>();
for (String d : getRequiredString(name, "destinations").split("\\s*,\\s*")) {
d = d.trim();
Expand All @@ -73,28 +82,6 @@ public synchronized Map<String, ManzanRoute> getRoutes() throws IOException, AS4
}
}
switch (type) {
case "watch":
String id = getRequiredString(name, "id");
String strwch = getRequiredString(name, "strwch");
String sqlRouteName = name + "sql";
String socketRouteName = name + "socket";

ManzanEventType eventType;
if(strwch.contains("WCHMSGQ")) {
eventType = ManzanEventType.WATCH_MSG;
} else if(strwch.contains("WCHLICLOG")) {
eventType = ManzanEventType.WATCH_VLOG;
} else if(strwch.contains("WCHPAL")) {
eventType = ManzanEventType.WATCH_PAL;
} else {
throw new RuntimeException("Watch for message, LIC log entry, or PAL entry not specified");
}

ret.put(sqlRouteName, new WatchMsgEventSql(sqlRouteName, id, format, destinations, schema, eventType, interval, numToProcess));
ret.put(socketRouteName, new WatchMsgEventSockets(socketRouteName, format, destinations, schema, interval, numToProcess));
WatchStarter ws = new WatchStarter(id, strwch);
ws.strwch();
break;
case "file":
String file = getRequiredString(name, "file");
String filter = getOptionalString(name, "filter");
Expand All @@ -104,7 +91,62 @@ public synchronized Map<String, ManzanRoute> getRoutes() throws IOException, AS4
throw new RuntimeException("Unknown destination type: " + type);
}
}

// We will create a formatMap to store the format for each watch session, as well
// as a destMap to store the destinations for each watch session
final Map<String, String> formatMap = new HashMap<>();
final Map<String, String> destMap = new HashMap<>();

for (int i = 0; i < watchEvents.size(); i++) {
final String section = watchEvents.get(i);
final String name = section;
int userNumToProcess = getOptionalInt(name, "numToProcess");
final int numToProcess = userNumToProcess != -1 ? userNumToProcess : DEFAULT_NUM_TO_PROCESS;
final String format = getOptionalString(name, "format");
String strwch = getRequiredString(name, "strwch");
String id = getRequiredString(name, "id");

ManzanEventType eventType;
if(strwch.contains("WCHMSGQ")) {
eventType = ManzanEventType.WATCH_MSG;
} else if(strwch.contains("WCHLICLOG")) {
eventType = ManzanEventType.WATCH_VLOG;
} else if(strwch.contains("WCHPAL")) {
eventType = ManzanEventType.WATCH_PAL;
} else {
throw new RuntimeException("Watch for message, LIC log entry, or PAL entry not specified");
}

int userInterval = getOptionalInt(name, "interval");
final int interval = userInterval != -1 ? userInterval : DEFAULT_INTERVAL;
final List<String> destinations = new LinkedList<String>();
for (String d : getRequiredString(name, "destinations").split("\\s*,\\s*")) {
d = d.trim();
if (!m_destinations.contains(d)) {
throw new RuntimeException(
"No destination configured named '" + d + "' for data source '" + name + "'");
}
if (StringUtils.isNonEmpty(d)) {
destinations.add(d);
}
}

// Build the maps
String destString = createRecipientList(destinations);
formatMap.put(id.toUpperCase(), format);
destMap.put(id.toUpperCase(), destString);

String sqlRouteName = name + "sql";
ret.put(sqlRouteName, new WatchMsgEventSql(sqlRouteName, id, format, destinations, schema, eventType, interval, numToProcess));
WatchStarter ws = new WatchStarter(id, strwch);
ws.strwch();
}

if (watchEvents.size() > 0){
// After iterating over the loop, the formatMap and destMap are complete. Now create the route.
final String routeName = "socketWatcher";
ret.put(routeName, new WatchMsgEventSockets(routeName, formatMap, destMap));
}
return m_routes = ret;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,20 @@ protected String getString(final Exchange _exchange, final String _attr) {
}

protected void setRecipientList(final List<String> _destinations) throws IOException {
m_recipientList = createRecipientList(_destinations);
if (StringUtils.isEmpty(m_recipientList)) {
throw new IOException("Message watch for '" + m_name + "' has no valid destinations");
}
}

public static String createRecipientList(final List<String> _destinations) throws IOException {
String destinationsStr = "";
for (final String dest : _destinations) {
if (StringUtils.isEmpty(dest)) {
continue;
}
destinationsStr += "direct:" + dest.toLowerCase().trim() + ",";
}
m_recipientList = destinationsStr.replaceFirst(",$", "").trim();

if (StringUtils.isEmpty(m_recipientList)) {
throw new IOException("Message watch for '" + m_name + "' has no valid destinations");
}
return destinationsStr.replaceFirst(",$", "").trim();
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
package com.github.theprez.manzan.routes.event;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.camel.model.dataformat.JsonLibrary;

import com.github.theprez.jcmdutils.StringUtils;
import com.github.theprez.manzan.ManzanEventType;
import com.github.theprez.manzan.ManzanMessageFormatter;
import com.github.theprez.manzan.routes.ManzanRoute;

public class WatchMsgEventSockets extends ManzanRoute {

private final ManzanMessageFormatter m_formatter;
private final Map<String, String> m_formatMap;
private final Map<String, String> m_destMap;
private final String m_socketIp = "0.0.0.0";
private final String m_socketPort = "8080";

public WatchMsgEventSockets(final String _name, final String _format,
final List<String> _destinations, final String _schema, final int _interval, final int _numToProcess)
throws IOException {
public WatchMsgEventSockets(final String _name, final Map<String, String> _formatMap,
final Map<String, String> _destMap) {
super(_name);
m_formatter = StringUtils.isEmpty(_format) ? null : new ManzanMessageFormatter(_format);
super.setRecipientList(_destinations);
m_formatMap = _formatMap;
m_destMap = _destMap;
}

//@formatter:off
Expand All @@ -32,16 +29,22 @@ public void configure() {
.unmarshal().json(JsonLibrary.Jackson, Map.class)
.routeId("manzan_msg:"+m_name)
.setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG))
.setHeader("session_id", simple("${body[sessionId]}"))
.setHeader("session_id", simple("${body[SESSION_ID]}"))
.setHeader("data_map", simple("${body}"))
.marshal().json(true) //TODO: skip this if we are applying a format
.setBody(simple("${body}\n"))
.process(exchange -> {
if (null != m_formatter) {
String sessionId = exchange.getIn().getHeader("session_id", String.class);
String format = m_formatMap.get(sessionId);
if (format != null) {
ManzanMessageFormatter m_formatter = new ManzanMessageFormatter(format);
exchange.getIn().setBody(m_formatter.format(getDataMap(exchange)));
}
String destinations = m_destMap.get(sessionId); // Get destinations from m_destMap
exchange.getIn().setHeader("destinations", destinations);
})
.recipientList(constant(getRecipientList())).parallelProcessing().stopOnException().end();
.recipientList(header("destinations"))
.parallelProcessing().stopOnException().end();
}
//@formatter:on
}

0 comments on commit af6bfa7

Please sign in to comment.