Skip to content

Commit

Permalink
Microservice support (#640)
Browse files Browse the repository at this point in the history
* WIP: A skeleton microservice object

* Responds to a PING

* Clarified - this is NOT a JS thing!

* Renamed the example microservice

* First implementation of the default endpoint

* Sequence calculator service - a working example of multiple endpoints.

* Added support for info, stats, natsError

* refactored with the 'micro' prefix

* fixed includes

* wip custom error handler

* simplified examples

* Fixed leaks, debugged the tests

* Groups

* "Basic" test.

* more tests from go, fixes

* Docs (sans HTML) and some PR feedback (names)

* request handler returns err instead of calling Respond(Error)

* Reworked how endpoints are stores in servbice, refcounting

* fixed copyright messages

* PR feedback: fixes in examples

* PR feedback: removed redundant include mem.h

* PR Feedback: use `size_t` and `natsSubscription_GetSubject`

* PR feedback: initialize decoded_len

* PR feedback: removed err->message silliness

* PR feedback: cast size_t to int calling natsMsg_Create

* Ref counting fixes

* PR feedback: strdup-ed strings in Info and Stats

* Fixed NULL pointer crashes in the previous commit

* PR feedback: micro_new_endpoint: check if endpoint config is NULL

* PR feedback: missing unlocks

* PR feedback: use natsBuf_AppendByte where appropriate

* PR feedback: CamelCased remaining public fields

* Added support for multiple handlers for ConnectionClosed and AsyncError callback events.

Added a nats_CallbackList internal type, use it for closed and error cases.
Removed prior code for natsConn_get/setXXXCallback.
Cleaned up access to other callbacks in conn.c (lock opts).

* added is_internal to microError to avoid freeing internal values

* PR feedback: initialize  to avoid a warning

* Removed the never-implemented schema

* PR feedback: added a missing FREE(replaced)

* PR feedback: free the replaced value in SetErrorHandler and SetClosedCB

* reversed: previous (free the replaced value...), added a test

* PR feedback: whitespace

* PR feedback: nit - added natsOptions_(un)lock

* PR feedback: hopefully fixes all windows warnings

* PR feedback: hopefully fixes all windows warnings

* PR feedback: removed previous PR, wrong branch

* restored LOCK_OPTIONS macros

* `Done` handler

* printf

* Added Test_MicroAsyncErrorHandler

* PR feedback: warnings

* Fixed AsyncErr test

* Now with a Done handler to fix the test on gcc builds

* try-1

correct fix?

wip

minimized list.txt

minimized list.txt

minimized list.txt

wip

wip more

wip more+

wip more+oops

wip more+oops+printf

* wip

* wip

* wip2

* wip3

* wip4

* wip5

* reworked wip

* reworked wip2

* reworked wip3

* reworked wip4

* reworked wip5

* reworked wip5

* reworked wip6

* reworked wip7

* reworked wip8

* reworked wip9

* reworked wip10

* reworked wip11

* reworked wip12

* reworked wip13

* reworked wip14

* reworked wip15

* wip nits

* wip more nits

* Moving global variables inside natsLib for proper cleanup on close

Signed-off-by: Ivan Kozlovic <[email protected]>

* PR feedback sans global elimination

* Restored num_services=5 in tests

* PR feedback: microError.message

* PR feedback: nats_vsnprintf

* PR feedback: nats_vsnprintf, #2

* [CHANGED] More verbose endpoint INFO (#663)

* More verbose endpoint INFO

* better clone/free for metadata

* (valgrind) free test JSON array

* PR feedback: unsed var

* Fix to run on Windows

The change that I asked to make earlier regarding nats_vsnprint
in some places was wrong, but more importantly, you can't pass
a NULL buffer on Windows in order to calculate the length of
the formatted string. Instead, I added a new macro that uses
a different function for Windows.

Signed-off-by: Ivan Kozlovic <[email protected]>

---------

Signed-off-by: Ivan Kozlovic <[email protected]>
Co-authored-by: Lev Brouk <[email protected]>
Co-authored-by: Ivan Kozlovic <[email protected]>
  • Loading branch information
3 people authored Jun 14, 2023
1 parent 3179f7d commit 71708ed
Show file tree
Hide file tree
Showing 28 changed files with 6,355 additions and 24 deletions.
3 changes: 2 additions & 1 deletion examples/examples.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015-2018 The NATS Authors
// Copyright 2015-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -15,6 +15,7 @@
#define EXAMPLES_H_

#include <nats.h>
#include <micro_args.h>
#include <stdio.h>
#include <string.h>

Expand Down
147 changes: 147 additions & 0 deletions examples/micro-arithmetics.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "examples.h"

// Sequence NATS microservice example.
//
// This example illustrates multiple NATS microservices communicating with each
// other. Please see the main microservice, micro-sequence.c for a more detailed
// explanation.
//
// This specific microservice implements add, multiply, and divide operations.

// arithmeticsOp is a type for a C function that implements am operation: add,
// multiply, divide.
typedef void (*arithmeticsOP)(long double *result, long double a1, long double a2);

// handle_arithmetics_op is a helper function that wraps an implementation of an
// operation into a request handler.
static microError *
handle_arithmetics_op(microRequest *req, arithmeticsOP op)
{
microError *err = NULL;
microArgs *args = NULL;
long double a1, a2, result;
char buf[1024];
int len = 0;

err = micro_ParseArgs(&args, microRequest_GetData(req), microRequest_GetDataLength(req));
if ((err == NULL) && (microArgs_Count(args) != 2))
{
err = micro_Errorf("invalid number of arguments, expected 2 got %d", microArgs_Count(args));
}
if (err == NULL)
err = microArgs_GetFloat(&a1, args, 0);
if (err == NULL)
err = microArgs_GetFloat(&a2, args, 1);
if (err == NULL)
op(&result, a1, a2);
if (err == NULL)
len = snprintf(buf, sizeof(buf), "%Lf", result);
if (err == NULL)
err = microRequest_Respond(req, buf, len);

microArgs_Destroy(args);
return microError_Wrapf(err, "failed to handle arithmetics operation");
}

static void add(long double *result, long double a1, long double a2)
{
*result = a1 + a2;
}

static void divide(long double *result, long double a1, long double a2)
{
*result = a1 / a2;
}

static void multiply(long double *result, long double a1, long double a2)
{
*result = a1 * a2;
}

// request handlers for each operation.
static microError *handle_add(microRequest *req) { return handle_arithmetics_op(req, add); }
static microError *handle_divide(microRequest *req) { return handle_arithmetics_op(req, divide); }
static microError *handle_multiply(microRequest *req) { return handle_arithmetics_op(req, multiply); }

// main is the main entry point for the microservice.
int main(int argc, char **argv)
{
natsStatus s = NATS_OK;
microError *err = NULL;
natsConnection *conn = NULL;
natsOptions *opts = NULL;
microService *m = NULL;
microGroup *g = NULL;
char errorbuf[1024];

microServiceConfig cfg = {
.Description = "Arithmetic operations - NATS microservice example in C",
.Name = "c-arithmetics",
.Version = "1.0.0",
};
microEndpointConfig add_cfg = {
.Name = "add",
.Handler = handle_add,
};
microEndpointConfig divide_cfg = {
.Name = "divide",
.Handler = handle_divide,
};
microEndpointConfig multiply_cfg = {
.Name = "multiply",
.Handler = handle_multiply,
};

// Connect to NATS server
opts = parseArgs(argc, argv, "");
s = natsConnection_Connect(&conn, opts);
if (s != NATS_OK)
{
printf("Error: %u - %s\n", s, natsStatus_GetText(s));
nats_PrintLastErrorStack(stderr);
natsOptions_Destroy(opts);
return 1;
}

// Create the Microservice that listens on nc.
err = micro_AddService(&m, conn, &cfg);

// Add the endpoints for the functions.
if (err == NULL)
microService_AddGroup(&g, m, "op");
if (err == NULL)
err = microGroup_AddEndpoint(g, &add_cfg);
if (err == NULL)
err = microGroup_AddEndpoint(g, &multiply_cfg);
if (err == NULL)
err = microGroup_AddEndpoint(g, &divide_cfg);

// Run the service, until stopped.
if (err == NULL)
err = microService_Run(m);

// Cleanup.
microService_Destroy(m);
natsOptions_Destroy(opts);
natsConnection_Destroy(conn);
if (err != NULL)
{
printf("Error: %s\n", microError_String(err, errorbuf, sizeof(errorbuf)));
microError_Destroy(err);
return 1;
}
return 0;
}
229 changes: 229 additions & 0 deletions examples/micro-func.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "examples.h"

// Sequence NATS microservice example.
//
// This example illustrates multiple NATS microservices communicating with each
// other. Please see the main microservice, micro-sequence.c for a more detailed
// explanation.
//
// This specific microservice implements factorial, fibonacci, and power2
// functions. Instead of performing arithmetic operations locally, we call the
// arithmetics microservice to perform the operations.

// functionHandler is a type for a C function that implements a "function", i.e.
// power2, factorial, etc.
typedef microError *(*functionHandler)(long double *result, natsConnection *conn, int n);

// callArithmetics is a helper function that calls the arithmetics microservice.
static microError *
call_arithmetics(long double *result, natsConnection *nc, const char *subject, long double a1, long double a2)
{
microError *err = NULL;
microClient *client = NULL;
natsMsg *response = NULL;
microArgs *args = NULL;
char buf[1024];
int len;

err = micro_NewClient(&client, nc, NULL);
if (err == NULL)
len = snprintf(buf, sizeof(buf), "%Lf %Lf", a1, a2);
if (err == NULL)
err = microClient_DoRequest(&response, client, subject, buf, len);
if (err == NULL)
err = micro_ParseArgs(&args, natsMsg_GetData(response), natsMsg_GetDataLength(response));
if (err == NULL)
err = microArgs_GetFloat(result, args, 0);

microClient_Destroy(client);
natsMsg_Destroy(response);
return err;
}

// factorial implements the factorial(N) function. Calls the arithmetics service
// for all multiplications.
static microError *
factorial(long double *result, natsConnection *nc, int n)
{
microError *err = NULL;
int i;

if (n < 1)
return micro_Errorf("n=%d. must be greater than 0", n);

*result = 1;
for (i = 1; i <= n; i++)
{
err = call_arithmetics(result, nc, "op.multiply", *result, i);
if (err != NULL)
return err;
}
return NULL;
}

// fibonacci implements the fibonacci(N) function. Calls the arithmetics service
// for all additions.
static microError *
fibonacci(long double *result, natsConnection *nc, int n)
{
microError *err = NULL;
int i;
long double n1, n2;

if (n < 0)
return micro_Errorf("n=%d. must be non-negative", n);

if (n < 2)
{
*result = n;
return NULL;
}

for (i = 1, n1 = 0, n2 = 1; i <= n; i++)
{
err = call_arithmetics(result, nc, "op.add", n1, n2);
if (err != NULL)
return err;
n1 = n2;
n2 = *result;
}
return NULL;
}

// power2 implements the 2**N function. Calls the arithmetics service
// for all multiplications.
static microError *power2(long double *result, natsConnection *nc, int n)
{
microError *err = NULL;
int i;

if (n < 1)
return micro_Errorf("n=%d. must be greater than 0", n);

*result = 1;
for (i = 1; i <= n; i++)
{
err = call_arithmetics(result, nc, "op.multiply", *result, 2);
if (err != NULL)
return err;
}
return NULL;
}

// handle_function_op is a helper function that wraps an implementation function
// like factorial, fibonacci, etc. into a request handler.
static microError *
handle_function_op(microRequest *req, functionHandler op)
{
microError *err = NULL;
microArgs *args = NULL;
int n;
long double result;
char buf[1024];
int len = 0;

err = micro_ParseArgs(&args, microRequest_GetData(req), microRequest_GetDataLength(req));
if ((err == NULL) && (microArgs_Count(args) != 1))
{
err = micro_Errorf("Invalid number of arguments, expected 1 got %d", microArgs_Count(args));
}
if (err == NULL)
err = microArgs_GetInt(&n, args, 0);
if (err == NULL)
err = op(&result, microRequest_GetConnection(req), n);
if (err == NULL)
len = snprintf(buf, sizeof(buf), "%Lf", result);
if (err == NULL)
err = microRequest_Respond(req, buf, len);

microArgs_Destroy(args);
return err;
}

// handle_... are the request handlers for each function.
static microError *handle_factorial(microRequest *req) { return handle_function_op(req, factorial); }
static microError *handle_fibonacci(microRequest *req) { return handle_function_op(req, fibonacci); }
static microError *handle_power2(microRequest *req) { return handle_function_op(req, power2); }

// main is the main entry point for the microservice.
int main(int argc, char **argv)
{
natsStatus s = NATS_OK;
microError *err = NULL;
natsOptions *opts = NULL;
natsConnection *conn = NULL;
microService *m = NULL;
microGroup *g = NULL;
char errorbuf[1024];

microServiceConfig cfg = {
.Description = "Functions - NATS microservice example in C",
.Name = "c-functions",
.Version = "1.0.0",
};
microEndpointConfig factorial_cfg = {
.Name = "factorial",
.Handler = handle_factorial,
};
microEndpointConfig fibonacci_cfg = {
.Name = "fibonacci",
.Handler = handle_fibonacci,
};
microEndpointConfig power2_cfg = {
.Name = "power2",
.Handler = handle_power2,
};

// Connect to NATS server
opts = parseArgs(argc, argv, "");
s = natsConnection_Connect(&conn, opts);
if (s != NATS_OK)
{
printf("Error: %u - %s\n", s, natsStatus_GetText(s));
nats_PrintLastErrorStack(stderr);
natsOptions_Destroy(opts);
return 1;
}

// Create the Microservice that listens on nc.
err = micro_AddService(&m, conn, &cfg);

// Add the endpoints for the functions.
if (err == NULL)
err = microService_AddGroup(&g, m, "f");
if (err == NULL)
err = microGroup_AddEndpoint(g, &factorial_cfg);
if (err == NULL)
err = microGroup_AddEndpoint(g, &fibonacci_cfg);
if (err == NULL)
err = microGroup_AddEndpoint(g, &power2_cfg);

// Run the service, until stopped.
if (err == NULL)
err = microService_Run(m);

// Cleanup.
microService_Destroy(m);
natsOptions_Destroy(opts);
natsConnection_Destroy(conn);
if (err != NULL)
{
printf("Error: %s\n", microError_String(err, errorbuf, sizeof(errorbuf)));
microError_Destroy(err);
return 1;
}
return 0;
}
Loading

0 comments on commit 71708ed

Please sign in to comment.