Skip to content

Commit

Permalink
Publish msg: headers passed by structure (#15)
Browse files Browse the repository at this point in the history
<!-- Thank you for submitting a pull request to our repository. -->

Message headers in "Publish" operation are send using structure istead
of string.

<!-- Please provide a brief description of the changes in this pull
request (max 80 chars). -->

## Changes Made

<!-- Please list the changes that you made in this pull request. -->

## Related Issues

<!-- Please list any related issues or pull requests.
Fixes #14   
-->

## Checklist

<!-- Please check off the following items by putting an "x" in the box:
-->

- [x] I have used a PR title that is descriptive enough for a release
note.
- [x] I have tested these changes locally.
- [ ] I have added appropriate tests or updated existing tests.
- [ ] I have tested these changes on a dedicated VM or a customer VM
[name of the VM]
- [ ] I have added appropriate documentation or updated existing
documentation.
  • Loading branch information
kwitekrac authored Jun 20, 2024
1 parent be9aac1 commit 986d502
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 130 deletions.
21 changes: 10 additions & 11 deletions labview/labview_rabbitmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
LABVIEW_PUBLIC_FUNCTION
char *lv_rabbitmq_version(void)
{
char *VERSION = "0.0.1";
char *VERSION = "0.0.3";
return VERSION;
}

Expand Down Expand Up @@ -218,7 +218,7 @@ int lv_amqp_consume_message(int64_t conn_intptr, int timeout_sec, LStrHandle out
}

LABVIEW_PUBLIC_FUNCTION
int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingKey, uint8_t* msgHeaderBuf, uint64_t msgHeaderBufLen, LStrHandle messageBody, LStrHandle errorDescription)
int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingKey, KeyValuePairArrHdl headers, LStrHandle messageBody, LStrHandle errorDescription)
{
amqp_connection_state_t conn = (amqp_connection_state_t) conn_intptr;
amqp_basic_properties_t props;
Expand All @@ -230,22 +230,21 @@ int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange,
messageBodyBuffer.len = (*messageBody)->cnt;
messageBodyBuffer.bytes = (void *)((*messageBody)->str);


amqp_table_t *table;
if (msgHeaderBufLen > 0)
if ((*headers)->dimSize > 0)
{
// Update flags to use custom headers
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_HEADERS_FLAG;
table = &props.headers;
stringToHeaders(table, msgHeaderBuf, msgHeaderBufLen);
props._flags |= AMQP_BASIC_HEADERS_FLAG;

amqp_table_t *table = &props.headers;
buildHeaders(table, headers);
}

int error_code = amqp_basic_publish(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingKey), 0, 0, &props, messageBodyBuffer);

// Free allocated headers
if (msgHeaderBufLen > 0)
//Free allocated headers
if ((*headers)->dimSize > 0)
{
free(table->entries);
free(props.headers.entries);
}

return error_code;
Expand Down
4 changes: 3 additions & 1 deletion labview/labview_rabbitmq.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include "labview_types.h"

#ifndef LABVIEW_RABBITMQ_H
#define LABVIEW_RABBITMQ_H

Expand Down Expand Up @@ -44,7 +46,7 @@ int lv_amqp_exchange_declare(int64_t conn_intptr, uint16_t channel, char *exchan

int lv_amqp_login(int64_t conn_intptr, char* host, int port, int timeout_sec, char* username, char* password, LStrHandle error_description);

int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingkey, uint8_t* headerBuffer, uint64_t headerBufferLen, LStrHandle messagebody, LStrHandle error_description);
int lv_amqp_basic_publish(int64_t conn_intptr, uint16_t channel, char *exchange, char *routingKey, KeyValuePairArrHdl headers, LStrHandle messageBody, LStrHandle errorDescription);

int lv_amqp_create_queue(int64_t conn_intptr, uint16_t channel, char* queue_name_in, LStrHandle queue_name_out, uint8_t passive, LStrHandle errorDescription);

Expand Down
20 changes: 20 additions & 0 deletions labview/labview_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,26 @@
#ifndef LABVIEW_TYPES_H
#define LABVIEW_TYPES_H

/**
* Structure that represents LabVIEW Cluster (items: string key, string value, int32 dataType)
*/
typedef struct {
LStrHandle key;
LStrHandle value;
int32_t dataType;
} KeyValuePairRec;
/**
* Structure that represents LabVIEW Cluster Array
*/
typedef struct {
int32_t dimSize;
KeyValuePairRec elt[1];
} KeyValuePairArr;
/**
* Handle to LabVIEW Cluster Array, used to pass it between LabVIEW and C
*/
typedef KeyValuePairArr **KeyValuePairArrHdl;

/**
* This function copies the contents of a C string into a LabVIEW LStrHandle,
* resizing the handle if necessary.
Expand Down
197 changes: 81 additions & 116 deletions labview/msg_headers.c
Original file line number Diff line number Diff line change
@@ -1,124 +1,9 @@
#include <rabbitmq-c/amqp.h>
#include <stdlib.h>

#include "extcode.h"
#include "labview_types.h"

/**
* This function parses a string representation of headers and populates an AMQP
* table with the parsed header entries. The headers in the string are expected
* to be in a specific format where key-value pairs are separated by '=' and
* multiple headers are separated by ';'.
*
* @note The `table` parameter should be a initialized `amqp_table_t` structure.
* The `headerBuffer` should point to a null-terminated C string.
* The function modifies the `table` structure in place with the parsed
* entries.
*/

const int MAX_HEADER_VALUE_LENGTH=64; // limits strings length sends as header value

void stringToHeaders(amqp_table_t* table, const uint8_t* headerBuffer,
uint64_t headerBufferLen) {
amqp_table_entry_t* entries = NULL;
int numEntries = 0;
int finished = FALSE;
int index = 0;

while (finished == 0) {
// Find the delimiter ';'
char* delimiter = findInBuffer(headerBuffer, headerBufferLen, index, 0x3B);
if (delimiter == NULL) {
delimiter =
headerBuffer +
headerBufferLen; // If no delimiter found, use the end of the string
}
if (delimiter == headerBuffer + headerBufferLen) {
finished = TRUE;
}
// Find the equal_char '='
char* equal_char = findInBuffer(headerBuffer, headerBufferLen, index, 0x3D);

if (equal_char != NULL && equal_char < delimiter) {
// Add the entry to the entries array
amqp_table_entry_t* newEntries =
realloc(entries, (numEntries + 1) * sizeof(amqp_table_entry_t));
if (newEntries == NULL) {
// Error handling for memory allocation failure
perror("Memory allocation failed");
free(entries);
return table;
}
entries = newEntries;
amqp_field_value_t value;

// Get the kind (first byte)
value.kind = headerBuffer[index];
index++; // Move to the next character

// Parse the key
entries[numEntries].key.bytes = headerBuffer + index;
entries[numEntries].key.len = equal_char - ((char*)headerBuffer + index);

// Init and parse the value
char valueTempBuffer[MAX_HEADER_VALUE_LENGTH];
int valueLength = delimiter - equal_char - 1;
memcpy(valueTempBuffer, equal_char + 1, valueLength);

// Parse the value based on the kind
switch (value.kind) {
case AMQP_FIELD_KIND_UTF8:
case AMQP_FIELD_KIND_BYTES:
value.value.bytes.bytes = equal_char + 1; // fist char after "="
value.value.bytes.len = valueLength;
break;
case AMQP_FIELD_KIND_I8:
value.value.i8 = (int8_t)valueTempBuffer[0];
break;
case AMQP_FIELD_KIND_U8:
value.value.u8 = (uint8_t)valueTempBuffer[0];
break;
case AMQP_FIELD_KIND_I16:
value.value.i16 = *((int16_t*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_U16:
value.value.u16 = *((uint16_t*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_I32:
value.value.i32 = *((int32_t*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_U32:
value.value.u32 = *((uint32_t*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_I64:
value.value.i64 = *((int64_t*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_U64:
case AMQP_FIELD_KIND_TIMESTAMP:
value.value.u64 = *((uint64_t*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_F32:
value.value.f32 = *((float*)valueTempBuffer);
break;
case AMQP_FIELD_KIND_F64:
value.value.f64 = *((double*)valueTempBuffer);
break;
default:
// Unsupported kind, ignore this header
break;
}

entries[numEntries].value = value;
numEntries++;
}

// Move to the next header (skip the delimiter ';')
index = delimiter - (char*)headerBuffer + 1;
}

// Assign the entries to the table
table->num_entries = numEntries;
table->entries = entries;
}

/**
* This function takes an AMQP table containing headers and converts them into
Expand Down Expand Up @@ -267,4 +152,84 @@ void headersToString(amqp_table_t* table, LStrHandle concatenatedHeaders) {

// Free the temporary buffer
free(headers);
}


amqp_field_value_t createFieldValue(int32_t dataType, LStrHandle value)
{
amqp_field_value_t fieldValue;
fieldValue.kind = dataType;

switch (fieldValue.kind) {
case AMQP_FIELD_KIND_UTF8:
case AMQP_FIELD_KIND_BYTES:
fieldValue.value.bytes.bytes = (void *)(*value)->str;
fieldValue.value.bytes.len = (*value)->cnt;
break;
case AMQP_FIELD_KIND_I8:
fieldValue.value.i8 = *((int8_t*)(*value)->str);
break;
case AMQP_FIELD_KIND_U8:
fieldValue.value.u8 = *((uint8_t*)(*value)->str);
break;
case AMQP_FIELD_KIND_I16:
fieldValue.value.i16 = *((int16_t*)(*value)->str);
break;
case AMQP_FIELD_KIND_U16:
fieldValue.value.u16 = *((uint16_t*)(*value)->str);
break;
case AMQP_FIELD_KIND_I32:
fieldValue.value.i32 = *((int32_t*)(*value)->str);
break;
case AMQP_FIELD_KIND_U32:
fieldValue.value.u32 = *((uint32_t*)(*value)->str);
break;
case AMQP_FIELD_KIND_I64:
fieldValue.value.i64 = *((int64_t*)(*value)->str);
break;
case AMQP_FIELD_KIND_U64:
case AMQP_FIELD_KIND_TIMESTAMP:
fieldValue.value.u64 = *((uint64_t*)(*value)->str);
break;
case AMQP_FIELD_KIND_F32:
fieldValue.value.f32 = *((float*)(*value)->str);
break;
case AMQP_FIELD_KIND_F64:
fieldValue.value.f64 = *((double*)(*value)->str);
break;
default:
// Unsupported kind, ignore this header
break;
}
return fieldValue;
}

/**
* This function parses a C representation of array of clusters and populates an AMQP
* table with the parsed header entries.
*
* @note The `table` parameter should be a initialized `amqp_table_t` structure.
* The `KeyValuePairArrHdl` should be a handler to C structure generated by LabVIEW.
* The function modifies the `table` structure in place with the parsed entries.
*/
void buildHeaders(amqp_table_t* table, KeyValuePairArrHdl headers)
{
amqp_table_entry_t* entries = malloc(sizeof(amqp_table_entry_t) * (*headers)->dimSize);

int32_t i = 0;
KeyValuePairRec *p = (*headers)->elt;
for (; i < (*headers)->dimSize; i++, p++)
{
amqp_field_value_t value;
LStrPtr keyPtr = *p->key;

entries[i].key.bytes = (void *)keyPtr->str;
entries[i].key.len = keyPtr->cnt;
entries[i].value = createFieldValue(p->dataType, p->value);
}

// Set the entries to the headers table
table -> entries = entries;
table -> num_entries = (*headers)->dimSize;

}
5 changes: 3 additions & 2 deletions labview/msg_headers.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <rabbitmq-c/amqp.h>
#include "extcode.h"
#include "labview_types.h"

#ifndef MSG_HEADERS_H
#define MSG_HEADERS_H
Expand All @@ -10,7 +11,7 @@
* Key-value pairs are separated by '=' and multiple headers are separated by
* ';'.
*/
void parseHeaders(amqp_table_t* table, const uint8_t* headerBuffer, uint64_t headerBufferLen);
void headersToString(amqp_table_t *table, LStrHandle concatenatedHeaders);

/**
* This function parses a string representation of headers and populates an AMQP
Expand All @@ -23,6 +24,6 @@ void parseHeaders(amqp_table_t* table, const uint8_t* headerBuffer, uint64_t hea
* The function modifies the `table` structure in place with the parsed
* entries.
*/
void headersToString(amqp_table_t *table, LStrHandle concatenatedHeaders);
void buildHeaders(amqp_table_t* table, KeyValuePairArrHdl headers);

#endif

0 comments on commit 986d502

Please sign in to comment.