diff --git a/camel/pom.xml b/camel/pom.xml
index dfb451a..05571af 100644
--- a/camel/pom.xml
+++ b/camel/pom.xml
@@ -76,6 +76,10 @@
camel-stream
3.14.7
+
+ org.apache.camel
+ camel-netty
+
org.apache.camel
camel-jdbc
diff --git a/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java b/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java
index c1f142f..976fb8e 100644
--- a/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java
+++ b/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java
@@ -20,6 +20,10 @@ public static String format(final String _in, final Map _mapping
return formatter.format(_mappings);
}
+ public String getM_fmtStr() {
+ return m_fmtStr;
+ }
+
private final String m_fmtStr;
public ManzanMessageFormatter(final String _fmtStr) {
@@ -33,8 +37,9 @@ public String format(final Map _mappings) {
ret = ret.replace("\\n", "\n").replace("\\t", "\t");
for (final Entry repl : _mappings.entrySet()) {
- ret = ret.replace("$" + repl.getKey() + "$", "" + repl.getValue());
- String jsonIndicator ="$json:" + repl.getKey() + "$";
+ final String key = repl.getKey();
+ ret = ret.replace("$" + key + "$", "" + repl.getValue());
+ String jsonIndicator ="$json:" + key + "$";
if(ret.contains(jsonIndicator)) {
ret = ret.replace(jsonIndicator, jsonEncode("" + repl.getValue()));
}
diff --git a/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java b/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java
index 9fa543a..b5aaca2 100644
--- a/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java
+++ b/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java
@@ -15,7 +15,8 @@
import com.github.theprez.manzan.WatchStarter;
import com.github.theprez.manzan.routes.ManzanRoute;
import com.github.theprez.manzan.routes.event.FileEvent;
-import com.github.theprez.manzan.routes.event.WatchMsgEvent;
+import com.github.theprez.manzan.routes.event.WatchMsgEventSockets;
+import com.github.theprez.manzan.routes.event.WatchMsgEventSql;
import com.ibm.as400.access.AS400SecurityException;
import com.ibm.as400.access.ErrorCompletingRequestException;
import com.ibm.as400.access.ObjectDoesNotExistException;
@@ -73,12 +74,16 @@ public synchronized Map getRoutes() throws IOException, AS4
switch (type) {
case "watch":
String id = getRequiredString(name, "id");
- ret.put(name, new WatchMsgEvent(name, id, format, destinations, schema, interval, numToProcess));
+ String sqlRouteName = name + "sql";
+ String socketRouteName = name + "socket";
+
+ ret.put(sqlRouteName, new WatchMsgEventSql(sqlRouteName, id, format, destinations, schema, interval, numToProcess));
String strwch = getOptionalString(name, "strwch");
if (StringUtils.isNonEmpty(strwch)) {
WatchStarter ws = new WatchStarter(id, strwch);
ws.strwch();
}
+ ret.put(socketRouteName, new WatchMsgEventSockets(socketRouteName, format, destinations, schema, interval, numToProcess));
break;
case "file":
String file = getRequiredString(name, "file");
diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java
new file mode 100644
index 0000000..f0c30ed
--- /dev/null
+++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java
@@ -0,0 +1,47 @@
+package com.github.theprez.manzan.routes.event;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.model.dataformat.JsonLibrary;
+
+import com.github.theprez.jcmdutils.StringUtils;
+import com.github.theprez.manzan.ManzanEventType;
+import com.github.theprez.manzan.ManzanMessageFormatter;
+import com.github.theprez.manzan.routes.ManzanRoute;
+
+public class WatchMsgEventSockets extends ManzanRoute {
+
+ private final ManzanMessageFormatter m_formatter;
+ private final String m_socketIp = "0.0.0.0";
+ private final String m_socketPort = "8080";
+
+ public WatchMsgEventSockets(final String _name, final String _format,
+ final List _destinations, final String _schema, final int _interval, final int _numToProcess)
+ throws IOException {
+ super(_name);
+ m_formatter = StringUtils.isEmpty(_format) ? null : new ManzanMessageFormatter(_format);
+ super.setRecipientList(_destinations);
+ }
+
+//@formatter:off
+ @Override
+ public void configure() {
+ from(String.format("netty:tcp://%s:%s?sync=false", m_socketIp, m_socketPort))
+ .unmarshal().json(JsonLibrary.Jackson, Map.class)
+ .routeId("manzan_msg:"+m_name)
+ .setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG))
+ .setHeader("session_id", simple("${body[sessionId]}"))
+ .setHeader("data_map", simple("${body}"))
+ .marshal().json(true) //TODO: skip this if we are applying a format
+ .setBody(simple("${body}\n"))
+ .process(exchange -> {
+ if (null != m_formatter) {
+ exchange.getIn().setBody(m_formatter.format(getDataMap(exchange)));
+ }
+ })
+ .recipientList(constant(getRecipientList())).parallelProcessing().stopOnException().end();
+ }
+ //@formatter:on
+}
\ No newline at end of file
diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSql.java
similarity index 96%
rename from camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java
rename to camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSql.java
index 0470c9f..44d3337 100644
--- a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java
+++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSql.java
@@ -10,7 +10,7 @@
import com.github.theprez.manzan.ManzanMessageFormatter;
import com.github.theprez.manzan.routes.ManzanRoute;
-public class WatchMsgEvent extends ManzanRoute {
+public class WatchMsgEventSql extends ManzanRoute {
private final int m_interval;
private final int m_numToProcess;
@@ -18,7 +18,7 @@ public class WatchMsgEvent extends ManzanRoute {
private final String m_sessionId;
private final ManzanMessageFormatter m_formatter;
- public WatchMsgEvent(final String _name, final String _session_id, final String _format,
+ public WatchMsgEventSql(final String _name, final String _session_id, final String _format,
final List _destinations, final String _schema, final int _interval, final int _numToProcess)
throws IOException {
super(_name);
@@ -93,4 +93,4 @@ public void configure() {
.to("jdbc:jt400")
.to("stream:err");
}
-}
+}
\ No newline at end of file
diff --git a/docs/_sidebar.md b/docs/_sidebar.md
index 369f3bb..41597a1 100644
--- a/docs/_sidebar.md
+++ b/docs/_sidebar.md
@@ -6,6 +6,7 @@
* [Data Sources](config/data.md)
* [Destinations](config/dests.md)
* [Logging](config/logging.md)
+ * [Messaging](config/messaging.md)
* Examples
* [File](config/examples/file.md)
* [Twilio](config/examples/twilio.md)
diff --git a/docs/config/messaging.md b/docs/config/messaging.md
new file mode 100644
index 0000000..c4fb90f
--- /dev/null
+++ b/docs/config/messaging.md
@@ -0,0 +1,25 @@
+## Messaging
+
+### Messaging Preference
+
+By default, Manzan will send messages between the Handler component and the Camel component via socket communication. If however socket communication happens to fail for any reason, it will use SQL based communication as a fallback option. If instead, you prefer Manzan to use SQL based communication as a first option, you can set the environment variable `MANZAN_MESSAGING_PREFERENCE=SQL`. Set the environment variable back to the value `SOCKETS` to set your preference to socket communication.
+
+### Messaging options
+Manzan supports two options for sending messages between the Handler and the Camel component (SQL and socket communication).\
+**SOCKET COMMUNICATION:** This option provides real time communication and is faster than the sql option.\
+**SQL COMMUNICATION:** Using this option works via the Handler component inserting data into a Db2 table, and then the Camel component subsequently reading from the table. This option isn't quite as fast as socket communication, however the data is more durable in the case that the Camel component is malfunctioning.
+
+Socket communication is recommended, because by providing a fallback option to SQL in the case of socket communication malfunctioning, we guarantee data will not be lost.
+
+* `MANZAN_MESSAGING_PREFERENCE = SQL`: Prefer SQL based communication
+* `MANZAN_MESSAGING_PREFERENCE != SQL`: Prefer socket based communication
+
+### Setting the MANZAN_MESSAGING_PREFERENCE
+
+The `MANZAN_MESSAGING_PREFERENCE` environment variable can be set with the `ADDENVVAR` command. For example, to set the messaging preference to prefer SQL based communication run the command:
+
+```cl
+ADDENVVAR ENVVAR(MANZAN_MESSAGING_PREFERENCE) VALUE(SQL) LEVEL(*SYS) REPLACE(*YES)
+```
+
+After which you will need to run `ENDTCPSVR *SSHD` and `STRTCPSVR *SSHD`. Then restart your SSH session for the new environment variable to take effect.
\ No newline at end of file
diff --git a/ile/Makefile b/ile/Makefile
index fcc16c5..546b942 100644
--- a/ile/Makefile
+++ b/ile/Makefile
@@ -19,16 +19,16 @@ src/mzversion.h:
/qsys.lib/${BUILDLIB}.lib:
system "RUNSQL SQL('create schema ${BUILDLIB}') COMMIT(*NONE) NAMING(*SQL) "
-/qsys.lib/${BUILDLIB}.lib/handler.pgm: /qsys.lib/${BUILDLIB}.lib/handler.module /qsys.lib/${BUILDLIB}.lib/pub_json.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/debug.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/userconf.module
+/qsys.lib/${BUILDLIB}.lib/handler.pgm: /qsys.lib/${BUILDLIB}.lib/handler.module /qsys.lib/${BUILDLIB}.lib/pub_json.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/debug.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/userconf.module /qsys.lib/${BUILDLIB}.lib/SockClient.module
/qsys.lib/${BUILDLIB}.lib/%.pgm:
system "CRTPGM PGM(${BUILDLIB}/$*) MODULE($(patsubst %.module,$(BUILDLIB)/%,$(notdir $^))) ACTGRP(*CALLER)"
/qsys.lib/${BUILDLIB}.lib/%.module: src/%.cpp src/mzversion.h
- system "CRTCPPMOD MODULE(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$<') OPTION(*EVENTF) SYSIFCOPT(*IFS64IO) DBGVIEW(*SOURCE) TERASPACE(*YES *TSIFC) STGMDL(*SNGLVL) DTAMDL(*p128) DEFINE(DEBUG_ENABLED)"
+ system "CRTCPPMOD MODULE(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$<') OPTION(*EVENTF) SYSIFCOPT(*IFS64IO) DBGVIEW(*SOURCE) TERASPACE(*YES *TSIFC) STGMDL(*SNGLVL) DTAMDL(*p128) DEFINE(DEBUG_ENABLED) OUTPUT(*PRINT) TGTCCSID(*JOB)"
/qsys.lib/${BUILDLIB}.lib/%.module: src/%.sqlc
- system "CRTSQLCI OBJ(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$^') COMMIT(*NONE) DATFMT(*ISO) TIMFMT(*ISO) DBGVIEW(*SOURCE) CVTCCSID(*JOB) COMPILEOPT('INCDIR(''src'')') SQLPATH(${BUILDLIB}) DFTRDBCOL(${BUILDLIB}) OPTION(*SQL)"
+ system "CRTSQLCPPI OBJ(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$^') COMMIT(*NONE) DATFMT(*ISO) TIMFMT(*ISO) DBGVIEW(*SOURCE) CVTCCSID(*JOB) COMPILEOPT('INCDIR(''src'')') SQLPATH(${BUILDLIB}) DFTRDBCOL(${BUILDLIB}) OPTION(*SQL)"
/qsys.lib/${BUILDLIB.lib}:
-system "RUNSQL SQL('create schema ${BUILDLIB}') NAMING(*SYS)"
diff --git a/ile/src/SockClient.cpp b/ile/src/SockClient.cpp
new file mode 100644
index 0000000..d3dd799
--- /dev/null
+++ b/ile/src/SockClient.cpp
@@ -0,0 +1,69 @@
+#include
+#include // For memset and memcpy
+#include // For socket functions
+#include // For inet_addr
+#include // For close
+#include "SockClient.h"
+#include "manzan.h"
+
+// Constructor to initialize the socket descriptor to -1
+SockClient::SockClient(){
+ sock_fd = -1;
+}
+
+// Method to open a socket and connect to the server
+bool SockClient::openSocket(const std::string ip, int port) {
+ // Create socket
+ sock_fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock_fd < 0) {
+ DEBUG_ERROR("Error creating socket\n");
+ return false;
+ }
+
+ // Define server address
+ struct sockaddr_in server_address;
+ server_address.sin_family = AF_INET;
+ server_address.sin_port = htons(port);
+ server_address.sin_addr.s_addr = inet_addr(const_cast(ip.c_str()));
+
+ // Connect to server
+ if (connect(sock_fd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) {
+ DEBUG_ERROR("Error connecting to server\n");
+ closeSocket();
+ return false;
+ }
+
+ DEBUG_INFO("Connected to server at %s:%d\n", ip.c_str(), port);
+ return true;
+}
+
+// Method to send a message (string) over the socket
+bool SockClient::sendMessage(const std::string message) {
+ if (sock_fd < 0) {
+ DEBUG_ERROR("Socket is not open\n");
+ return false;
+ }
+
+ int bytes_sent = send(sock_fd, const_cast(message.c_str()), message.size(), 0);
+ if (bytes_sent < 0) {
+ DEBUG_ERROR("Error sending message\n");
+ return false;
+ }
+ DEBUG_INFO("Sent message: %s\n", message);
+
+ return true;
+}
+
+// Method to close the socket
+void SockClient::closeSocket() {
+ if (sock_fd >= 0) {
+ close(sock_fd);
+ sock_fd = -1;
+ DEBUG_INFO("Socket closed\n");
+ }
+}
+
+// Destructor to ensure socket is closed
+SockClient::~SockClient() {
+ closeSocket();
+}
diff --git a/ile/src/SockClient.h b/ile/src/SockClient.h
new file mode 100644
index 0000000..0e556cb
--- /dev/null
+++ b/ile/src/SockClient.h
@@ -0,0 +1,23 @@
+#include
+
+// extern "C" {
+class SockClient {
+
+int sock_fd;
+public:
+ // Constructor to initialize the socket descriptor to -1
+ SockClient();
+
+ // Method to open a socket and connect to the server
+ bool openSocket(const std::string ip, int port);
+
+ // Method to send a message (string) over the socket
+ bool sendMessage(const std::string message);
+
+ // Method to close the socket
+ void closeSocket();
+
+ // Destructor to ensure socket is closed
+ ~SockClient();
+};
+// }
diff --git a/ile/src/handler.cpp b/ile/src/handler.cpp
index 092fd2e..3444eac 100644
--- a/ile/src/handler.cpp
+++ b/ile/src/handler.cpp
@@ -12,6 +12,8 @@
#include "event_data.h"
#include "userconf.h"
#include "mzversion.h"
+#include "pub_json.h"
+#include "SockClient.h"
static FILE *fd = NULL;
diff --git a/ile/src/pub_db2.sqlc b/ile/src/pub_db2.sqlc
index 288137d..e21c284 100644
--- a/ile/src/pub_db2.sqlc
+++ b/ile/src/pub_db2.sqlc
@@ -7,17 +7,28 @@
#include
#include
+#include
+#include "pub_json.h"
+#include "SockClient.h"
+
#define COPY_PARM(dest, parm) \
memset(dest, 0, sizeof(dest)); \
strncpy(dest, parm ? parm : "", -1 + sizeof(dest));
-void check_sql_error(int sqlcode, const char* sqlstate)
+const int PORT = 8080;
+const std::string SERVER = "127.0.0.1";
+char* MSG_PREF_SOCKETS = "SOCKETS";
+char* MSG_PREF_SQL = "SQL";
+
+void check_sql_error(int sqlcode, const char *sqlstate)
{
if (sqlcode != 0)
{
DEBUG_ERROR("SQL Code: %d\n", sqlcode);
DEBUG_ERROR("SQL State: %s\n", sqlstate);
+ throw std::runtime_error("SQL did not execute successfully");
+
}
else
{
@@ -25,15 +36,102 @@ void check_sql_error(int sqlcode, const char* sqlstate)
}
}
-// TODO: implement this
-// 1. define Db2 tables
-// // have autoincrement and autotimestamp columns
-// 2. Add Db2 table creation to Makefile
-// 3. Rename this method to .rpgle (if using RPG)
-// 4. implement!
-extern int db2_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE)
-{
- exec sql include SQLCA;
+struct messageParams {
+ const char* session_id;
+ const char* msgid;
+ const char* msg_type;
+ const int msg_severity;
+ const char* msg_timestamp;
+ const char* job;
+ const char* sending_usrprf;
+ const char* message;
+ const char* sending_program_name;
+ const char* sending_module_name;
+ const char* sending_procedure_name;
+};
+
+bool send_message(std::string message){
+ // Create a SocketClient instance
+ SockClient client;
+
+ // Open a socket and connect to server
+ if (!client.openSocket(SERVER, PORT)) {
+ // TODO: How do we want to handle this error? Drop the message, insert into table?
+ DEBUG_ERROR("Failed to connect to socket: %s:%d\n", SERVER.c_str(), PORT);
+ return false;
+ }
+
+ // Send a message over the socket
+ if (!client.sendMessage(message)){
+ DEBUG_ERROR("Failed to send message: %s", message.c_str());
+ return false;
+ }
+
+ // Close the socket
+ client.closeSocket();
+ return true;
+}
+
+/**
+ * Return the messaging preference between the ile and camel components. If the messaging preference is
+ * sockets, then we will use socket communication, otherwise we will use sql communication.
+ */
+char* get_messaging_preference(){
+ char *msg_pref = getenv("MANZAN_MESSAGING_PREFERENCE");
+ if (msg_pref == NULL || strcmp(msg_pref, MSG_PREF_SQL) != 0){
+ msg_pref = MSG_PREF_SOCKETS;
+ }
+ DEBUG_INFO("MESSAGING_PREFERENCE: %s\n", msg_pref);
+ return msg_pref;
+}
+
+/**
+ * Try func1(params). If it throws an exception, then try func2(params).
+ */
+template
+int tryFuncWithFallback(Func1 func1, Func2 func2, Params params) {
+ try {
+ return func1(params);
+ } catch (const std::exception& e) {
+ DEBUG_ERROR("Error: %s. Trying fallback function.\n", e.what());
+ return func2(params);
+ }
+}
+
+int send_socket_message(const messageParams& params){
+ std::string json_message = construct_json_message(
+ params.session_id,
+ params.msgid,
+ params.msg_type,
+ params.msg_severity,
+ params.msg_timestamp,
+ params.job,
+ params.sending_usrprf,
+ params.message,
+ params.sending_program_name,
+ params.sending_module_name,
+ params.sending_procedure_name);
+
+ DEBUG_INFO("Sending message %s", const_cast(json_message.c_str()));
+
+ // Allocate memory for the utf-8 encoded json message
+ char *json_utf8 = get_utf8_output_buf(json_message);
+
+ to_utf8(json_utf8, get_utf8_output_buf_length(json_message), json_message.c_str());
+
+ // Send the message to the socket
+ bool res = send_message(json_utf8);
+ free(json_utf8);
+
+ if (!res){
+ throw std::runtime_error("Failed to send message");
+ }
+ return 0;
+}
+
+int send_sql_message(const messageParams& params){
+ exec sql include sqlca;
+ EXEC SQL BEGIN DECLARE SECTION;
char msg_session_id[11];
char msg_msg_id[8];
char msg_msg_type[11];
@@ -45,27 +143,61 @@ extern int db2_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE)
char msg_sending_program_name[257];
char msg_sending_module_name[11];
char msg_sending_procedure_name[2048];
+ EXEC SQL END DECLARE SECTION;
- COPY_PARM(msg_session_id, _session_id);
- COPY_PARM(msg_msg_id, _msgid);
- COPY_PARM(msg_msg_type, _msg_type);
- sprintf(msg_severity, "%d", _msg_severity);
- COPY_PARM(msg_timestamp, _msg_timestamp);
- COPY_PARM(msg_job, _job);
- COPY_PARM(msg_sending_usrprf, _sending_usrprf);
- COPY_PARM(msg_message, _message);
- COPY_PARM(msg_sending_program_name, _sending_program_name);
- COPY_PARM(msg_sending_module_name, _sending_module_name);
- COPY_PARM(msg_sending_procedure_name, _sending_procedure_name);
+ COPY_PARM(msg_session_id, params.session_id);
+ COPY_PARM(msg_msg_id, params.msgid);
+ COPY_PARM(msg_msg_type, params.msg_type);
+ sprintf(msg_severity, "%d", params.msg_severity);
+ COPY_PARM(msg_timestamp, params.msg_timestamp);
+ COPY_PARM(msg_job, params.job);
+ COPY_PARM(msg_sending_usrprf, params.sending_usrprf);
+ COPY_PARM(msg_message, params.message);
+ COPY_PARM(msg_sending_program_name, params.sending_program_name);
+ COPY_PARM(msg_sending_module_name, params.sending_module_name);
+ COPY_PARM(msg_sending_procedure_name, params.sending_procedure_name);
EXEC SQL
INSERT INTO MANZANMSG(
SESSION_ID, MESSAGE_ID, MESSAGE_TYPE, SEVERITY, JOB, SENDING_USRPRF, SENDING_PROGRAM_NAME, SENDING_MODULE_NAME, SENDING_PROCEDURE_NAME, MESSAGE_TIMESTAMP, MESSAGE)
VALUES( : msg_session_id, : msg_msg_id, : msg_msg_type, : msg_severity, : msg_job, : msg_sending_usrprf, : msg_sending_program_name, : msg_sending_module_name, : msg_sending_procedure_name, : msg_timestamp, : msg_message);
- check_sql_error(sqlca.sqlcode, sqlca.sqlstate);
+ check_sql_error(sqlca.sqlcode, (const char *)sqlca.sqlstate);
return 0;
}
+// TODO: implement this
+// 1. define Db2 tables
+// // have autoincrement and autotimestamp columns
+// 2. Add Db2 table creation to Makefile
+// 3. Rename this method to .rpgle (if using RPG)
+// 4. implement!
+extern int db2_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE)
+{
+ messageParams params = {
+ _session_id, // should be a const char*
+ _msgid, // should be a const char*
+ _msg_type, // should be a const char*
+ _msg_severity, // should be an int
+ _msg_timestamp, // should be a const char*
+ _job, // should be a const char*
+ _sending_usrprf, // should be a const char*
+ _message, // should be a const char*
+ _sending_program_name, // should be a const char*
+ _sending_module_name, // should be a const char*
+ _sending_procedure_name // should be a const char*
+ };
+
+
+ const char* msg_pref = get_messaging_preference();
+ int res = tryFuncWithFallback(
+ msg_pref == MSG_PREF_SOCKETS ? send_socket_message : send_sql_message,
+ msg_pref == MSG_PREF_SOCKETS ? send_sql_message : send_socket_message,
+ params
+ );
+
+ return res != NULL ? 0 : -1;
+}
+
extern int db2_publish_vlog(PUBLISH_VLOG_FUNCTION_SIGNATURE)
{
return 0;
@@ -74,6 +206,7 @@ extern int db2_publish_vlog(PUBLISH_VLOG_FUNCTION_SIGNATURE)
int db2_publish_pal(PUBLISH_PAL_FUNCTION_SIGNATURE)
{
exec sql include SQLCA;
+ EXEC SQL BEGIN DECLARE SECTION;
char pal_sessid[11];
char pal_system_reference_code[11];
char pal_device_name[11];
@@ -87,6 +220,7 @@ int db2_publish_pal(PUBLISH_PAL_FUNCTION_SIGNATURE)
char pal_secondary_code[11];
char pal_table_id[11];
char pal_sequence[32];
+ EXEC SQL END DECLARE SECTION;
COPY_PARM(pal_sessid, _session_id);
COPY_PARM(pal_system_reference_code, _system_reference_code);
@@ -105,23 +239,27 @@ int db2_publish_pal(PUBLISH_PAL_FUNCTION_SIGNATURE)
EXEC SQL
INSERT INTO MANZANPAL(SESSION_ID, SYSTEM_REFERENCE_CODE, DEVICE_NAME, MODEL, SERIAL_NUMBER, RESOURCE_NAME, LOG_ID, PAL_TIMESTAMP, REFERENCE_CODE, SECONDARY_CODE, TABLE_ID, SEQUENCE_NUM)
VALUES( : pal_sessid, : pal_system_reference_code, : pal_device_name, : pal_model, : pal_serial_number, : pal_resource_name, : pal_log_identifier, : pal_timestamp, : pal_reference_code, : pal_secondary_code, : pal_table_id, : pal_sequence);
- check_sql_error(sqlca.sqlcode, sqlca.sqlstate);
+ check_sql_error(sqlca.sqlcode, (const char *)sqlca.sqlstate);
return 0;
}
extern int db2_publish_other(PUBLISH_OTHER_FUNCTION_SIGNATURE)
{
exec sql include SQLCA;
+
+ EXEC SQL BEGIN DECLARE SECTION;
char oth_sessid[11];
+ char oth_type[11];
+ EXEC SQL END DECLARE SECTION;
+
memset(oth_sessid, 0, sizeof(oth_sessid));
strncpy(oth_sessid, _session_id, 10);
- char oth_type[11];
memset(oth_type, 0, sizeof(oth_type));
strncpy(oth_type, _event_type, 10);
EXEC SQL
INSERT INTO MANZANOTH(SESSION_ID, EVENT) VALUES( : oth_sessid, : oth_type);
- check_sql_error(sqlca.sqlcode, sqlca.sqlstate);
+ check_sql_error(sqlca.sqlcode, (const char *)sqlca.sqlstate);
return 0;
}
\ No newline at end of file
diff --git a/ile/src/pub_json.cpp b/ile/src/pub_json.cpp
index 5c035ec..afed020 100644
--- a/ile/src/pub_json.cpp
+++ b/ile/src/pub_json.cpp
@@ -17,7 +17,8 @@ int to_utf8(char *out, size_t out_len, const char *in)
memset(&tocode, 0, sizeof(tocode));
tocode.CCSID = 1208;
QtqCode_T fromcode;
- fromcode.CCSID = 37;
+
+ // Setting to 0 allows the system to automatically detect ccsid (hopefully)
memset(&fromcode, 0, sizeof(fromcode));
iconv_t cd = QtqIconvOpen(&tocode, &fromcode);
@@ -29,9 +30,10 @@ int to_utf8(char *out, size_t out_len, const char *in)
int rc = iconv(cd, &input, &inleft, &output, &outleft);
if (rc == -1)
{
- DEBUG_ERROR("Error in converting characters\n");
+ DEBUG_ERROR("Error in converting characters. %d: %s\n", errno, strerror(errno));
return 9;
}
+ DEBUG_INFO("Conversion to UTF-8 successful.\n");
return iconv_close(cd);
}
@@ -91,12 +93,34 @@ void append_json_element(std::string &_str, const char *_key, const int _value)
_str += value;
}
+size_t get_utf8_output_buf_length(std::string str){
+ // Each UTF-8 encoded byte can be up to 4 bytes, and add one for the null terminator.
+ return str.length() * 4 + 1;
+}
+
+/*
+* Return an output buffer containing enough space for the utf-8 encoded message.
+* Return NULL if there is no space remaining on the heap.
+* Remember to free the buffer after use.
+*/
+char* get_utf8_output_buf(std::string str){
+ char *buf = (char *)malloc(get_utf8_output_buf_length(str));
+ if (buf == NULL) {
+ DEBUG_ERROR("No heap space available to allocate buffer for %s\n", str.c_str());
+ return NULL;
+ }
+ return buf;
+}
+
int json_publish(const char *_session_id, std::string &_json)
{
- int json_len = 1 + _json.length();
- char *utf8 = (char *)malloc(56 + _json.length() * 2);
+ char *utf8 = get_utf8_output_buf(_json);
+ if (utf8 == NULL){
+ DEBUG_ERROR("No heap space available. Aborting publishing message %s\n", utf8);
+ return -1;
+ }
- to_utf8(utf8, json_len, _json.c_str());
+ to_utf8(utf8, get_utf8_output_buf_length(_json), _json.c_str());
DEBUG_INFO("Publishing JSON\n");
DEBUG_INFO("%s\n", _json.c_str());
@@ -121,35 +145,45 @@ int json_publish(const char *_session_id, std::string &_json)
return 0;
}
-extern "C" int json_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE)
+std::string construct_json_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE)
{
- std::string jsonStr;
+std::string jsonStr;
jsonStr += "{\n ";
- append_json_element(jsonStr, "event_type", "message");
+ append_json_element(jsonStr, "EVENT_TYPE", "message");
jsonStr += ",\n ";
- append_json_element(jsonStr, "session_id", _session_id);
+ append_json_element(jsonStr, "SESSION_ID", _session_id);
jsonStr += ",\n ";
- append_json_element(jsonStr, "job", _job);
+ append_json_element(jsonStr, "JOB", _job);
jsonStr += ",\n ";
- append_json_element(jsonStr, "msgid", _msgid);
+ append_json_element(jsonStr, "MESSAGE_ID", _msgid);
jsonStr += ",\n ";
- append_json_element(jsonStr, "msgtype", _msg_type);
+ append_json_element(jsonStr, "MESSAGE_TYPE", _msg_type);
jsonStr += ",\n ";
- append_json_element(jsonStr, "severity", _msg_severity);
+ append_json_element(jsonStr, "SEVERITY", _msg_severity);
jsonStr += ",\n ";
- append_json_element(jsonStr, "message_timestamp", _msg_timestamp);
+ append_json_element(jsonStr, "MESSAGE_TIMESTAMP", _msg_timestamp);
jsonStr += ",\n ";
- append_json_element(jsonStr, "sending_usrprf", _sending_usrprf);
+ append_json_element(jsonStr, "SENDING_USRPRF", _sending_usrprf);
jsonStr += ",\n ";
- append_json_element(jsonStr, "message", _message);
+ append_json_element(jsonStr, "MESSAGE", _message);
jsonStr += ",\n ";
- append_json_element(jsonStr, "sending_program_name", _sending_program_name);
+ append_json_element(jsonStr, "SENDING_PROGRAM_NAME", _sending_program_name);
jsonStr += ",\n ";
- append_json_element(jsonStr, "sending_module_name", _sending_module_name);
+ append_json_element(jsonStr, "SENDING_MODULE_NAME", _sending_module_name);
jsonStr += ",\n ";
- append_json_element(jsonStr, "sending_procedure_name", _sending_procedure_name);
-
+ append_json_element(jsonStr, "SENDING_PROCEDURE_NAME", _sending_procedure_name);
jsonStr += "\n}";
+ return jsonStr;
+}
+
+/**
+ * Publish json message to DTAQ
+ */
+extern "C" int json_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE)
+{
+ std::string jsonStr = construct_json_message(_session_id, _msgid, _msg_type, _msg_severity, _msg_timestamp, _job, _sending_usrprf,
+ _message, _sending_program_name, _sending_module_name, _sending_procedure_name);
+
return json_publish(_session_id, jsonStr);
}
diff --git a/ile/src/pub_json.h b/ile/src/pub_json.h
index bbb6f97..572ab4a 100644
--- a/ile/src/pub_json.h
+++ b/ile/src/pub_json.h
@@ -1,3 +1,5 @@
+#include
+
#ifndef _MANZAN_JSON_PUB_H_
#define _MANZAN_JSON_PUB_H_
extern "C" {
@@ -5,5 +7,11 @@ int json_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE);
int json_publish_vlog(PUBLISH_VLOG_FUNCTION_SIGNATURE);
int json_publish_pal(PUBLISH_PAL_FUNCTION_SIGNATURE);
int json_publish_other(PUBLISH_OTHER_FUNCTION_SIGNATURE);
+int to_utf8(char *out, size_t out_len, const char *in);
+char* get_utf8_output_buf(std::string str);
+size_t get_utf8_output_buf_length(std::string str);
}
+
+std::string construct_json_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE);
+
#endif
\ No newline at end of file
diff --git a/test/e2e/msg/snd2q/Makefile b/test/e2e/msg/snd2q/Makefile
index 8999435..d1ef8ff 100644
--- a/test/e2e/msg/snd2q/Makefile
+++ b/test/e2e/msg/snd2q/Makefile
@@ -11,6 +11,7 @@ SEVERITY:=80
setup: /qsys.lib/${TESTLIB}.lib/msgs.msgq
echo "\nstrwch=WCHMSG((*ALL)) WCHMSGQ((${TESTLIB}/MSGS))" >> data.ini
+ rm -f ${OUTPUT_FILE}
cleanup:
system -kKv "ENDWCH SSNID(TESTING)"