Skip to content

Commit

Permalink
New redis transport for low latency applications.
Browse files Browse the repository at this point in the history
Signed-off-by: Rule Timothy (VM/EMT3) <[email protected]>
  • Loading branch information
timrulebosch committed Feb 15, 2024
1 parent 4fa3e20 commit 4809769
Show file tree
Hide file tree
Showing 9 changed files with 676 additions and 354 deletions.
2 changes: 2 additions & 0 deletions dse/modelc/adapter/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ add_library(adapter OBJECT
simbus/states.c
transport/endpoint.c
transport/mq.c
transport/msgpack.c
$<$<BOOL:${UNIX}>:transport/mq_posix.c>
transport/redis.c
transport/redispubsub.c
${DSE_SCHEMAS_SOURCE_DIR}/dse_schemas/flatcc/src/builder.c
${DSE_SCHEMAS_SOURCE_DIR}/dse_schemas/flatcc/src/emitter.c
Expand Down
53 changes: 38 additions & 15 deletions dse/modelc/adapter/transport/endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
#include <errno.h>
#include <dse/logger.h>
#include <dse/modelc/adapter/transport/endpoint.h>
#include <dse/modelc/adapter/transport/redis.h>
#include <dse/modelc/adapter/transport/redispubsub.h>
#include <dse/modelc/adapter/transport/mq.h>


#define REDIS_PORT 6379
#define REDIS_URI_SCHEME "redis" URI_SCHEME_DELIM
#define UNIX_URI_SCHEME "unix" URI_SCHEME_DELIM
#define REDIS_PORT 6379
#define REDIS_URI_SCHEME "redis" URI_SCHEME_DELIM
#define REDISASYNC_URI_SCHEME "redisasync" URI_SCHEME_DELIM
#define UNIX_URI_SCHEME "unix" URI_SCHEME_DELIM


/**
Expand Down Expand Up @@ -60,8 +62,9 @@ Endpoint* endpoint_create(const char* transport, const char* uri, uint32_t uid,
Endpoint* endpoint = NULL;
static char _uri[MAX_URI_LEN]; /* Other API's may refer to this data. */

/* Redis Pub/Sub. */
if (strcmp(transport, TRANSPORT_REDISPUBSUB) == 0) {
/* Redis and Redis Pub/Sub. */
if ((strcmp(transport, TRANSPORT_REDISPUBSUB) == 0) ||
(strcmp(transport, TRANSPORT_REDIS) == 0)) {
/* Decode the URI. */
strncpy(_uri, uri, MAX_URI_LEN - 1);
if (strncmp(_uri, REDIS_URI_SCHEME, strlen(REDIS_URI_SCHEME)) == 0) {
Expand All @@ -73,29 +76,49 @@ Endpoint* endpoint_create(const char* transport, const char* uri, uint32_t uid,
p = strtok_r(NULL, ":", &saveptr);
if (p) port = atol(p);
/* Create this endpoint. */
endpoint = redispubsub_connect(
NULL, hostname, port, uid, bus_mode, timeout);
if (strcmp(transport, TRANSPORT_REDISPUBSUB) == 0) {
endpoint = redispubsub_connect(
NULL, hostname, port, uid, bus_mode, timeout);
} else {
endpoint = redis_connect(
NULL, hostname, port, uid, bus_mode, timeout, false);
}
} else if (strncmp(_uri, REDISASYNC_URI_SCHEME,
strlen(REDISASYNC_URI_SCHEME)) == 0) {
/* Parse according to: redisasync://host:[port] */
char* saveptr;
char* p = _uri + strlen(REDISASYNC_URI_SCHEME);
char* hostname = strtok_r(p, ":", &saveptr);
int32_t port = REDIS_PORT;
p = strtok_r(NULL, ":", &saveptr);
if (p) port = atol(p);
/* Create this endpoint. */
endpoint = redis_connect(
NULL, hostname, port, uid, bus_mode, timeout, true);
} else if (strncmp(_uri, UNIX_URI_SCHEME, strlen(UNIX_URI_SCHEME)) ==
0) {
/* Parse according to: unix:///tmp/redis/redis.sock */
char* path = _uri + strlen(UNIX_URI_SCHEME);
/* Create this endpoint. */
endpoint =
redispubsub_connect(path, NULL, 0, uid, bus_mode, timeout);
if (strcmp(transport, TRANSPORT_REDISPUBSUB) == 0) {
endpoint =
redispubsub_connect(path, NULL, 0, uid, bus_mode, timeout);
} else {
endpoint =
redis_connect(path, NULL, 0, uid, bus_mode, timeout, false);
}
} else {
if (errno == 0) errno = EINVAL;
log_error("ERROR: Incorrect Redis URI ($s)", uri);
log_error("ERROR: Incorrect Redis URI (%s)", uri);
return NULL;
}

/* Message Queue. */
/* Message Queue. */
} else if (strcmp(transport, TRANSPORT_MQ) == 0) {
endpoint = mq_connect(uri, uid, bus_mode, timeout);

/* Unknown transport. */
/* Unknown transport. */
} else {
if (errno == 0) errno = EINVAL;
log_error("ERROR: unknown transport! ($s)", transport);
log_error("ERROR: unknown transport! (%s)", transport);
return NULL;
}

Expand Down
12 changes: 11 additions & 1 deletion dse/modelc/adapter/transport/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
#define URI_SCHEME_DELIM "://"

#define TRANSPORT_REDISPUBSUB "redispubsub"
#define TRANSPORT_REDIS "redis"
#define TRANSPORT_MQ "mq"


typedef struct Endpoint Endpoint;
typedef struct Endpoint Endpoint;
typedef struct msgpack_sbuffer msgpack_sbuffer;


typedef void* (*EndpointCreateChannelFunc)(
Endpoint* endpoint, const char* channel_name);
Expand Down Expand Up @@ -58,4 +61,11 @@ DLL_PRIVATE Endpoint* endpoint_create(const char* transport, const char* uri,
uint32_t uid, bool bus_mode, double timeout);


/* msgpack.c */
DLL_PRIVATE msgpack_sbuffer mp_encode_fbs(
void* buffer, uint32_t buffer_length, const char* channel_name);
DLL_PRIVATE int32_t mp_decode_fbs(char* msg, int msg_len, uint8_t** buffer,
uint32_t* buffer_length, Endpoint* endpoint, const char** channel_name);


#endif // DSE_MODELC_ADAPTER_TRANSPORT_ENDPOINT_H_
165 changes: 11 additions & 154 deletions dse/modelc/adapter/transport/mq.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,7 @@ void mq_endpoint_destroy(Endpoint* endpoint)
free(endpoint->private);
}
if (endpoint) {
/* Release the endpoint_channels hashmap. */
char** keys = hashmap_keys(&endpoint->endpoint_channels);
uint32_t count = hashmap_number_keys(endpoint->endpoint_channels);
for (uint32_t i = 0; i < count; i++) {
MqChannel* ch = hashmap_get(&endpoint->endpoint_channels, keys[i]);
free(ch);
}
hashmap_destroy(&endpoint->endpoint_channels);
for (uint32_t _ = 0; _ < count; _++)
free(keys[_]);
free(keys);
}
free(endpoint);
}
Expand Down Expand Up @@ -211,29 +201,16 @@ void* mq_create_channel(Endpoint* endpoint, const char* channel_name)
assert(endpoint);
assert(endpoint->private);

/* Check if the endpoint channel already exists. */
MqChannel* endpoint_channel =
hashmap_get(&endpoint->endpoint_channels, channel_name);
if (endpoint_channel) {
assert(strcmp(endpoint_channel->channel_name, channel_name) == 0);
return (void*)endpoint_channel;
}

/* Create the MqChannel object. */
endpoint_channel = calloc(1, sizeof(MqChannel));
assert(endpoint_channel);
endpoint_channel->channel_name = channel_name;

/* Add to Endpoint->endpoint_channels. */
if (hashmap_set(&endpoint->endpoint_channels, channel_name,
endpoint_channel) == NULL) {
assert(0);
/* Maintain a list of channel_names. There is no associated metadata
so only store the channel name pointer. */
if (hashmap_get(&endpoint->endpoint_channels, channel_name) == NULL) {
hashmap_set(
&endpoint->endpoint_channels, channel_name, (void*)channel_name);
log_notice(" Endpoint Channel : %s\n", channel_name);
}

log_notice(" Endpoint Channel : %s\n", endpoint_channel->channel_name);

/* Return the created object, to the caller (which will be an Adapter). */
return (void*)endpoint_channel;
return (void*)channel_name;
}


Expand Down Expand Up @@ -338,33 +315,8 @@ int32_t mq_send_fbs(Endpoint* endpoint, void* endpoint_channel, void* buffer,

if (mq_ep->mq_send == NULL) log_fatal("MQ not configured");

/**
* Encode as a MsgPack datagram:
* MSG:Object[0]: message indicator (SBNO or SBCH) (string)
* MSG:Object[1]: channel name (string)
* MSG:Object[2]: buffer (FBS) (bin)
*/
msgpack_sbuffer sbuf;
msgpack_packer pk;
msgpack_sbuffer_init(&sbuf);
msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
if (endpoint_channel) {
/* Sending a Channel Message. */
MqChannel* ch = (MqChannel*)endpoint_channel;
int _ch_name_len = strlen(ch->channel_name);
msgpack_pack_str(&pk, 4);
msgpack_pack_str_body(&pk, "SBCH", 4);
msgpack_pack_str(&pk, _ch_name_len);
msgpack_pack_str_body(&pk, ch->channel_name, _ch_name_len);
} else {
/* Sending a Notify Message. */
msgpack_pack_str(&pk, 4);
msgpack_pack_str_body(&pk, "SBNO", 4);
msgpack_pack_str(&pk, 0);
msgpack_pack_str_body(&pk, "", 0);
}
msgpack_pack_bin(&pk, buffer_length);
msgpack_pack_bin_body(&pk, (uint8_t*)buffer, buffer_length);
const char* channel_name = endpoint_channel;
msgpack_sbuffer sbuf = mp_encode_fbs(buffer, buffer_length, channel_name);
if (sbuf.size > MQ_MAX_MSGSIZE) {
log_fatal("Message size (%d) exceeds MQ_MAX_MSGSIZE (%d)", sbuf.size,
MQ_MAX_MSGSIZE);
Expand Down Expand Up @@ -472,101 +424,6 @@ int32_t mq_recv_fbs(Endpoint* endpoint, const char** channel_name,
return 0; /* Indicate that no message was received. */
}

/**
* Decode as a MsgPack datagram:
* MSG:Object[0]: message indicator (SBNO or SBCH) (string)
* MSG:Object[1]: channel name (string)
* MSG:Object[2]: buffer (FBS) (bin)
*/
bool result;
msgpack_unpacker unpacker;
// +4 is COUNTER_SIZE.
result = msgpack_unpacker_init(&unpacker, msg_len + 4);
if (!result) {
log_error("MsgPack unpacker init failed!");
goto error_clean_up;
}
if (msgpack_unpacker_buffer_capacity(&unpacker) < (size_t)msg_len) {
log_error("MsgPack unpacker buffer size too small!");
goto error_clean_up;
}
memcpy(msgpack_unpacker_buffer(&unpacker), (const char*)msg, msg_len);
msgpack_unpacker_buffer_consumed(&unpacker, msg_len);

msgpack_unpacked unpacked;
msgpack_unpack_return ret;
msgpack_unpacked_init(&unpacked);
msgpack_object obj;
/* Object[0]: message indicator (string) */
ret = msgpack_unpacker_next(&unpacker, &unpacked);
if (ret != MSGPACK_UNPACK_SUCCESS) {
log_simbus("WARNING: data vector unpacked with unexpected return code! "
"(ret=%d)",
ret);
log_error("MsgPack msgpack_unpacker_next failed!");
goto error_clean_up;
}
obj = unpacked.data;
assert(obj.type == MSGPACK_OBJECT_STR);
static char msg_ind[10];
assert(obj.via.str.size < 10);
strncpy(msg_ind, obj.via.str.ptr, obj.via.str.size);
msg_ind[obj.via.str.size + 1] = '\0';
/* Object[1]: channel name (string) */
ret = msgpack_unpacker_next(&unpacker, &unpacked);
if (ret != MSGPACK_UNPACK_SUCCESS) {
log_simbus("WARNING: data vector unpacked with unexpected return code! "
"(ret=%d)",
ret);
log_error("MsgPack msgpack_unpacker_next failed!");
goto error_clean_up;
}
if (strcmp(msg_ind, "SBCH") == 0) {
obj = unpacked.data;
assert(obj.type == MSGPACK_OBJECT_STR);
static char ch_name[100];
assert(obj.via.str.size < 100);
strncpy(ch_name, obj.via.str.ptr, obj.via.str.size);
ch_name[obj.via.str.size + 1] = '\0';
MqChannel* endpoint_channel =
hashmap_get(&endpoint->endpoint_channels, ch_name);
assert(endpoint_channel);
*channel_name = endpoint_channel->channel_name;
} else {
/* Potential Notify message. */
}
/* Object[2]: buffer (FBS) (bin) */
ret = msgpack_unpacker_next(&unpacker, &unpacked);
if (ret != MSGPACK_UNPACK_SUCCESS) {
log_simbus("WARNING: data vector unpacked with unexpected return code! "
"(ret=%d)",
ret);
log_error("MsgPack msgpack_unpacker_next failed!");
goto error_clean_up;
}
obj = unpacked.data;
assert(obj.type == MSGPACK_OBJECT_BIN);
const char* bin_ptr = obj.via.bin.ptr;
uint32_t bin_size = obj.via.bin.size;
/* Marshal data to caller. */
if (bin_size > *buffer_length) {
/* Prepare the buffer, resize if necessary. */
*buffer = realloc(*buffer, bin_size);
if (*buffer == NULL) {
log_error("Malloc failed!");
}
*buffer_length = (size_t)bin_size;
}
memset(*buffer, 0, *buffer_length);
memcpy(*buffer, bin_ptr, bin_size);
return_len = bin_size;

/* Release any allocated objects. */
error_clean_up:
/* Unpack: Cleanup. */
msgpack_unpacked_destroy(&unpacked);
msgpack_unpacker_destroy(&unpacker);

/* Return the buffer length (+ve) as indicator of success. */
return (uint32_t)return_len;
return mp_decode_fbs(
msg, msg_len, buffer, buffer_length, endpoint, channel_name);
}
5 changes: 0 additions & 5 deletions dse/modelc/adapter/transport/mq.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ typedef enum MqKind {
__MQ_KIND_COUNT__
} MqKind;

typedef struct MqChannel {
/* Reference to the Adapter Channel linked to this Endpoint. */
const char* channel_name;
} MqChannel;

typedef struct MqDesc {
char endpoint[MQ_MAX_EP_LEN];
void* data;
Expand Down
Loading

0 comments on commit 4809769

Please sign in to comment.