Skip to content

Commit

Permalink
Add files via upload
Browse files Browse the repository at this point in the history
  • Loading branch information
olsky committed Jun 8, 2016
1 parent 7656802 commit 8febc79
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 3 deletions.
27 changes: 24 additions & 3 deletions c/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ static PL_engine_t current_engine;

static atom_t ATOM_is_async;
static atom_t ATOM_client_id;
static atom_t ATOM_alias;
static atom_t ATOM_keepalive;
//static atom_t ATOM_use_callbacks;

Expand Down Expand Up @@ -215,6 +216,22 @@ destroy_mqtt(swi_mqtt *m)
return TRUE;
}

static PL_thread_attr_t* get_callback_thread_attrs(char* th_alias)
{
return NULL;
/*
PL_thread_attr_t *th_attrs = malloc(sizeof(PL_thread_attr_t));// customized prolog stack properties. Default is bit large at 100M each thread.
if (th_attrs==0) { return NULL; }
th_attrs->local_size=100;// K;
th_attrs->global_size=200;// K;
th_attrs->trail_size=200;// K;
th_attrs->argument_size=20;// K;
th_attrs->alias=th_alias; // "callback_thread";
th_attrs->cancel=0;
return th_attrs;
*/
}


static int set_engine_for_callbacks(swi_mqtt *m)
{
Expand All @@ -232,7 +249,8 @@ static int set_engine_for_callbacks(swi_mqtt *m)
} else {
if (!current_engine)
{
current_engine = PL_create_engine(NULL);

current_engine = PL_create_engine(get_callback_thread_attrs("th_callback_tmp"));
}
engine_to_use = current_engine;
}
Expand Down Expand Up @@ -805,6 +823,7 @@ c_mqtt_connect(term_t conn, term_t host, term_t port, term_t options)
int mqtt_port = 1883;

char* client_id;
char* alias;
int keepalive = 60;
int is_async = FALSE;
int use_hooks = FALSE;
Expand Down Expand Up @@ -857,6 +876,7 @@ c_mqtt_connect(term_t conn, term_t host, term_t port, term_t options)
_PL_get_arg(1, head, arg);

if ( name == ATOM_client_id ) { if (!PL_get_chars( arg, &client_id, CVT_WRITE | BUF_MALLOC) ) { result = FALSE; goto CLEANUP;}
} else if ( name == ATOM_alias ) { if (!PL_get_chars( arg, &alias, CVT_WRITE | BUF_MALLOC) ) { result = FALSE; goto CLEANUP;}

} else if ( name == ATOM_module ) { if (!PL_get_chars( arg, &hook_module, CVT_WRITE | BUF_MALLOC) ) { result = FALSE; goto CLEANUP;}
} else if ( name == ATOM_on_log ) { if (!PL_get_chars( arg, &hook_on_log, CVT_WRITE | BUF_MALLOC) ) { result = FALSE; goto CLEANUP;}
Expand Down Expand Up @@ -986,7 +1006,7 @@ c_mqtt_connect(term_t conn, term_t host, term_t port, term_t options)

if (is_async) {
_LOG("--- (f-c) c_mqtt_connect > create prolog engine for async\n");
m->pl_engine = PL_create_engine(NULL);
m->pl_engine = PL_create_engine(get_callback_thread_attrs("th_callback_async"));

_LOG("--- (f-c) c_mqtt_connect > connect async...\n");
// with bind: rc = mosquitto_connect_bind_async(m->mosq, mqtt_host, mqtt_port, keepalive, NULL);
Expand Down Expand Up @@ -1231,7 +1251,7 @@ c_create_engine(void)
{
if (!current_engine)
{
current_engine = PL_create_engine(NULL); // PL_current_engine();
current_engine = PL_create_engine(get_callback_thread_attrs("th_callback_gen")); // PL_current_engine();
return TRUE;
}
return FALSE;
Expand Down Expand Up @@ -1261,6 +1281,7 @@ install_mqtt(void)

ATOM_is_async = PL_new_atom("is_async");
ATOM_client_id = PL_new_atom("client_id");
ATOM_alias = PL_new_atom("alias");
ATOM_keepalive = PL_new_atom("keepalive");
//ATOM_use_callbacks = PL_new_atom("use_callbacks");

Expand Down
Binary file modified lib/x86_64-linux/mqtt.so
Binary file not shown.

0 comments on commit 8febc79

Please sign in to comment.