Skip to content

Commit

Permalink
Use sql as backup (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonnyz32 authored Oct 23, 2024
1 parent fe875ab commit ffa181b
Show file tree
Hide file tree
Showing 15 changed files with 417 additions and 55 deletions.
4 changes: 4 additions & 0 deletions camel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@
<artifactId>camel-stream</artifactId>
<version>3.14.7</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-netty</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jdbc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public static String format(final String _in, final Map<String, Object> _mapping
return formatter.format(_mappings);
}

public String getM_fmtStr() {
return m_fmtStr;
}

private final String m_fmtStr;

public ManzanMessageFormatter(final String _fmtStr) {
Expand All @@ -33,8 +37,9 @@ public String format(final Map<String, Object> _mappings) {
ret = ret.replace("\\n", "\n").replace("\\t", "\t");

for (final Entry<String, Object> repl : _mappings.entrySet()) {
ret = ret.replace("$" + repl.getKey() + "$", "" + repl.getValue());
String jsonIndicator ="$json:" + repl.getKey() + "$";
final String key = repl.getKey();
ret = ret.replace("$" + key + "$", "" + repl.getValue());
String jsonIndicator ="$json:" + key + "$";
if(ret.contains(jsonIndicator)) {
ret = ret.replace(jsonIndicator, jsonEncode("" + repl.getValue()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
import com.github.theprez.manzan.WatchStarter;
import com.github.theprez.manzan.routes.ManzanRoute;
import com.github.theprez.manzan.routes.event.FileEvent;
import com.github.theprez.manzan.routes.event.WatchMsgEvent;
import com.github.theprez.manzan.routes.event.WatchMsgEventSockets;
import com.github.theprez.manzan.routes.event.WatchMsgEventSql;
import com.ibm.as400.access.AS400SecurityException;
import com.ibm.as400.access.ErrorCompletingRequestException;
import com.ibm.as400.access.ObjectDoesNotExistException;
Expand Down Expand Up @@ -73,12 +74,16 @@ public synchronized Map<String, ManzanRoute> getRoutes() throws IOException, AS4
switch (type) {
case "watch":
String id = getRequiredString(name, "id");
ret.put(name, new WatchMsgEvent(name, id, format, destinations, schema, interval, numToProcess));
String sqlRouteName = name + "sql";
String socketRouteName = name + "socket";

ret.put(sqlRouteName, new WatchMsgEventSql(sqlRouteName, id, format, destinations, schema, interval, numToProcess));
String strwch = getOptionalString(name, "strwch");
if (StringUtils.isNonEmpty(strwch)) {
WatchStarter ws = new WatchStarter(id, strwch);
ws.strwch();
}
ret.put(socketRouteName, new WatchMsgEventSockets(socketRouteName, format, destinations, schema, interval, numToProcess));
break;
case "file":
String file = getRequiredString(name, "file");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.github.theprez.manzan.routes.event;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.camel.model.dataformat.JsonLibrary;

import com.github.theprez.jcmdutils.StringUtils;
import com.github.theprez.manzan.ManzanEventType;
import com.github.theprez.manzan.ManzanMessageFormatter;
import com.github.theprez.manzan.routes.ManzanRoute;

public class WatchMsgEventSockets extends ManzanRoute {

private final ManzanMessageFormatter m_formatter;
private final String m_socketIp = "0.0.0.0";
private final String m_socketPort = "8080";

public WatchMsgEventSockets(final String _name, final String _format,
final List<String> _destinations, final String _schema, final int _interval, final int _numToProcess)
throws IOException {
super(_name);
m_formatter = StringUtils.isEmpty(_format) ? null : new ManzanMessageFormatter(_format);
super.setRecipientList(_destinations);
}

//@formatter:off
@Override
public void configure() {
from(String.format("netty:tcp://%s:%s?sync=false", m_socketIp, m_socketPort))
.unmarshal().json(JsonLibrary.Jackson, Map.class)
.routeId("manzan_msg:"+m_name)
.setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG))
.setHeader("session_id", simple("${body[sessionId]}"))
.setHeader("data_map", simple("${body}"))
.marshal().json(true) //TODO: skip this if we are applying a format
.setBody(simple("${body}\n"))
.process(exchange -> {
if (null != m_formatter) {
exchange.getIn().setBody(m_formatter.format(getDataMap(exchange)));
}
})
.recipientList(constant(getRecipientList())).parallelProcessing().stopOnException().end();
}
//@formatter:on
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
import com.github.theprez.manzan.ManzanMessageFormatter;
import com.github.theprez.manzan.routes.ManzanRoute;

public class WatchMsgEvent extends ManzanRoute {
public class WatchMsgEventSql extends ManzanRoute {

private final int m_interval;
private final int m_numToProcess;
private final String m_schema;
private final String m_sessionId;
private final ManzanMessageFormatter m_formatter;

public WatchMsgEvent(final String _name, final String _session_id, final String _format,
public WatchMsgEventSql(final String _name, final String _session_id, final String _format,
final List<String> _destinations, final String _schema, final int _interval, final int _numToProcess)
throws IOException {
super(_name);
Expand Down Expand Up @@ -93,4 +93,4 @@ public void configure() {
.to("jdbc:jt400")
.to("stream:err");
}
}
}
1 change: 1 addition & 0 deletions docs/_sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [Data Sources](config/data.md)
* [Destinations](config/dests.md)
* [Logging](config/logging.md)
* [Messaging](config/messaging.md)
* Examples
* [File](config/examples/file.md)
* [Twilio](config/examples/twilio.md)
Expand Down
25 changes: 25 additions & 0 deletions docs/config/messaging.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
## Messaging

### Messaging Preference

By default, Manzan will send messages between the Handler component and the Camel component via socket communication. If however socket communication happens to fail for any reason, it will use SQL based communication as a fallback option. If instead, you prefer Manzan to use SQL based communication as a first option, you can set the environment variable `MANZAN_MESSAGING_PREFERENCE=SQL`. Set the environment variable back to the value `SOCKETS` to set your preference to socket communication.

### Messaging options
Manzan supports two options for sending messages between the Handler and the Camel component (SQL and socket communication).\
**SOCKET COMMUNICATION:** This option provides real time communication and is faster than the sql option.\
**SQL COMMUNICATION:** Using this option works via the Handler component inserting data into a Db2 table, and then the Camel component subsequently reading from the table. This option isn't quite as fast as socket communication, however the data is more durable in the case that the Camel component is malfunctioning.

Socket communication is recommended, because by providing a fallback option to SQL in the case of socket communication malfunctioning, we guarantee data will not be lost.

* `MANZAN_MESSAGING_PREFERENCE = SQL`: Prefer SQL based communication
* `MANZAN_MESSAGING_PREFERENCE != SQL`: Prefer socket based communication

### Setting the MANZAN_MESSAGING_PREFERENCE

The `MANZAN_MESSAGING_PREFERENCE` environment variable can be set with the `ADDENVVAR` command. For example, to set the messaging preference to prefer SQL based communication run the command:

```cl
ADDENVVAR ENVVAR(MANZAN_MESSAGING_PREFERENCE) VALUE(SQL) LEVEL(*SYS) REPLACE(*YES)
```

After which you will need to run `ENDTCPSVR *SSHD` and `STRTCPSVR *SSHD`. Then restart your SSH session for the new environment variable to take effect.
6 changes: 3 additions & 3 deletions ile/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ src/mzversion.h:
/qsys.lib/${BUILDLIB}.lib:
system "RUNSQL SQL('create schema ${BUILDLIB}') COMMIT(*NONE) NAMING(*SQL) "

/qsys.lib/${BUILDLIB}.lib/handler.pgm: /qsys.lib/${BUILDLIB}.lib/handler.module /qsys.lib/${BUILDLIB}.lib/pub_json.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/debug.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/userconf.module
/qsys.lib/${BUILDLIB}.lib/handler.pgm: /qsys.lib/${BUILDLIB}.lib/handler.module /qsys.lib/${BUILDLIB}.lib/pub_json.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/debug.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/userconf.module /qsys.lib/${BUILDLIB}.lib/SockClient.module

/qsys.lib/${BUILDLIB}.lib/%.pgm:
system "CRTPGM PGM(${BUILDLIB}/$*) MODULE($(patsubst %.module,$(BUILDLIB)/%,$(notdir $^))) ACTGRP(*CALLER)"

/qsys.lib/${BUILDLIB}.lib/%.module: src/%.cpp src/mzversion.h
system "CRTCPPMOD MODULE(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$<') OPTION(*EVENTF) SYSIFCOPT(*IFS64IO) DBGVIEW(*SOURCE) TERASPACE(*YES *TSIFC) STGMDL(*SNGLVL) DTAMDL(*p128) DEFINE(DEBUG_ENABLED)"
system "CRTCPPMOD MODULE(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$<') OPTION(*EVENTF) SYSIFCOPT(*IFS64IO) DBGVIEW(*SOURCE) TERASPACE(*YES *TSIFC) STGMDL(*SNGLVL) DTAMDL(*p128) DEFINE(DEBUG_ENABLED) OUTPUT(*PRINT) TGTCCSID(*JOB)"

/qsys.lib/${BUILDLIB}.lib/%.module: src/%.sqlc
system "CRTSQLCI OBJ(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$^') COMMIT(*NONE) DATFMT(*ISO) TIMFMT(*ISO) DBGVIEW(*SOURCE) CVTCCSID(*JOB) COMPILEOPT('INCDIR(''src'')') SQLPATH(${BUILDLIB}) DFTRDBCOL(${BUILDLIB}) OPTION(*SQL)"
system "CRTSQLCPPI OBJ(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$^') COMMIT(*NONE) DATFMT(*ISO) TIMFMT(*ISO) DBGVIEW(*SOURCE) CVTCCSID(*JOB) COMPILEOPT('INCDIR(''src'')') SQLPATH(${BUILDLIB}) DFTRDBCOL(${BUILDLIB}) OPTION(*SQL)"

/qsys.lib/${BUILDLIB.lib}:
-system "RUNSQL SQL('create schema ${BUILDLIB}') NAMING(*SYS)"
Expand Down
69 changes: 69 additions & 0 deletions ile/src/SockClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#include <iostream>
#include <cstring> // For memset and memcpy
#include <sys/socket.h> // For socket functions
#include <arpa/inet.h> // For inet_addr
#include <unistd.h> // For close
#include "SockClient.h"
#include "manzan.h"

// Constructor to initialize the socket descriptor to -1
SockClient::SockClient(){
sock_fd = -1;
}

// Method to open a socket and connect to the server
bool SockClient::openSocket(const std::string ip, int port) {
// Create socket
sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if (sock_fd < 0) {
DEBUG_ERROR("Error creating socket\n");
return false;
}

// Define server address
struct sockaddr_in server_address;
server_address.sin_family = AF_INET;
server_address.sin_port = htons(port);
server_address.sin_addr.s_addr = inet_addr(const_cast<char*>(ip.c_str()));

// Connect to server
if (connect(sock_fd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) {
DEBUG_ERROR("Error connecting to server\n");
closeSocket();
return false;
}

DEBUG_INFO("Connected to server at %s:%d\n", ip.c_str(), port);
return true;
}

// Method to send a message (string) over the socket
bool SockClient::sendMessage(const std::string message) {
if (sock_fd < 0) {
DEBUG_ERROR("Socket is not open\n");
return false;
}

int bytes_sent = send(sock_fd, const_cast<char*>(message.c_str()), message.size(), 0);
if (bytes_sent < 0) {
DEBUG_ERROR("Error sending message\n");
return false;
}
DEBUG_INFO("Sent message: %s\n", message);

return true;
}

// Method to close the socket
void SockClient::closeSocket() {
if (sock_fd >= 0) {
close(sock_fd);
sock_fd = -1;
DEBUG_INFO("Socket closed\n");
}
}

// Destructor to ensure socket is closed
SockClient::~SockClient() {
closeSocket();
}
23 changes: 23 additions & 0 deletions ile/src/SockClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include <string>

// extern "C" {
class SockClient {

int sock_fd;
public:
// Constructor to initialize the socket descriptor to -1
SockClient();

// Method to open a socket and connect to the server
bool openSocket(const std::string ip, int port);

// Method to send a message (string) over the socket
bool sendMessage(const std::string message);

// Method to close the socket
void closeSocket();

// Destructor to ensure socket is closed
~SockClient();
};
// }
2 changes: 2 additions & 0 deletions ile/src/handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "event_data.h"
#include "userconf.h"
#include "mzversion.h"
#include "pub_json.h"
#include "SockClient.h"

static FILE *fd = NULL;

Expand Down
Loading

0 comments on commit ffa181b

Please sign in to comment.