Skip to content

Commit

Permalink
Add files via upload
Browse files Browse the repository at this point in the history
  • Loading branch information
olsky committed Jun 7, 2016
1 parent 1dda138 commit 7656802
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 34 deletions.
43 changes: 11 additions & 32 deletions c/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,6 @@
#define PACK_REVISION 5
#define PACK_VERSION_NUMBER (PACK_MAJOR*10000+PACK_MINOR*100+PACK_REVISION)


#ifdef O_PLMT_MF
#define LOCK(mf) pthread_mutex_lock(&(mf)->mutex)
#define UNLOCK(mf) pthread_mutex_unlock(&(mf)->mutex)
#else
#define LOCK(mf)
#define UNLOCK(mf)
#endif

#define MKATOM(n) ATOM_ ## n = PL_new_atom(#n);

static PL_engine_t current_engine;
Expand All @@ -63,10 +54,6 @@ static atom_t ATOM_protocol_version; // V31 or V311
static atom_t ATOM_v31;
static atom_t ATOM_v311;

//static atom_t ATOM_flow;
//static atom_t ATOM_foreign;
//static atom_t ATOM_prolog;

static atom_t ATOM_level;
static atom_t ATOM_log;

Expand Down Expand Up @@ -96,8 +83,6 @@ static functor_t FUNCTOR_qos1;
static functor_t FUNCTOR_retain1;
static functor_t FUNCTOR_message_id1;

//static functor_t FUNCTOR_flow1;

static functor_t FUNCTOR_level1;
static functor_t FUNCTOR_log1;

Expand Down Expand Up @@ -549,8 +534,6 @@ get_swi_mqtt(term_t handle, swi_mqtt **mp)
void *data;
if ( PL_get_blob(handle, &data, NULL, &type) && type == &mqtt_blob)
{ swi_mqtt *m = data;
// assert(mf->magic == MEMFILE_MAGIC);
LOCK(m);
if ( m->symbol )
{ *mp = m;
return TRUE;
Expand All @@ -566,7 +549,6 @@ static void
release_swi_mqtt(swi_mqtt *m)
{
_LOG("--- (f-b) release_swi_mqtt\n");
UNLOCK(m);
}


Expand Down Expand Up @@ -704,8 +686,8 @@ c_mqtt_pub(term_t conn, term_t topic, term_t payload, term_t options)

swi_mqtt *m;
int mid;
int buf_len = 2048;
char buf[buf_len];
//int buf_len = 2048;
//char buf[buf_len];
char* mqtt_topic = NULL;
char* mqtt_payload = NULL;
char* payload_type = NULL;
Expand Down Expand Up @@ -786,11 +768,13 @@ c_mqtt_pub(term_t conn, term_t topic, term_t payload, term_t options)

_LOG("--- (f-c) c_mqtt_pub > qos: %d retain: %d payload: %s\n", qos, retain, mqtt_payload);

memset(buf, 0, (buf_len+1)*sizeof(char));
snprintf(buf, buf_len, "%s", mqtt_payload);

//memset(buf, 0, (buf_len+1)*sizeof(char));
//snprintf(buf, buf_len, "%s", mqtt_payload);

_LOG("--- (f-c) c_mqtt_pub > publish...\n");
mosq_rc = mosquitto_publish(m->mosq, &mid, mqtt_topic, strlen(buf), buf, qos, retain);
// mosq_rc = mosquitto_publish(m->mosq, &mid, mqtt_topic, strlen(buf), buf, qos, retain);
mosq_rc = mosquitto_publish(m->mosq, &mid, mqtt_topic, strlen(mqtt_payload), mqtt_payload, qos, retain);
if (mosq_rc == MOSQ_ERR_SUCCESS)
{
_LOG("--- (f-c) c_mqtt_pub > publish done\n");
Expand Down Expand Up @@ -1273,6 +1257,8 @@ install_mqtt(void)
{
_LOG("--- (f-c) install_mqtt\n");

mosquitto_lib_init();

ATOM_is_async = PL_new_atom("is_async");
ATOM_client_id = PL_new_atom("client_id");
ATOM_keepalive = PL_new_atom("keepalive");
Expand All @@ -1291,10 +1277,6 @@ install_mqtt(void)
ATOM_v31 = PL_new_atom("v31");
ATOM_v311 = PL_new_atom("v311");

// ATOM_flow = PL_new_atom("flow");
// ATOM_foreign = PL_new_atom("foreign");
// ATOM_prolog = PL_new_atom("prolog");

ATOM_level = PL_new_atom("level");
ATOM_log = PL_new_atom("log");

Expand Down Expand Up @@ -1360,11 +1342,8 @@ install_mqtt(void)
PL_register_foreign("c_create_engine", 0, c_create_engine, 0);
PL_register_foreign("c_destroy_engine", 0, c_destroy_engine, 0);

signal(SIGINT, handle_signal);
signal(SIGTERM, handle_signal);


mosquitto_lib_init();
// signal(SIGINT, handle_signal);
// signal(SIGTERM, handle_signal);
}


Expand Down
4 changes: 2 additions & 2 deletions examples/async.pl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
async_pub(Topic, Value) :-
mqtt_connect(A, 'localhost', 1883, [alias(swi_mqtt1), client_id(swi_mqtt_client1), keepalive(10), is_async(true)]),
mqtt_connect(A, 'localhost', 1883, [alias(swi_mqtt1), client_id(swi_mqtt_client1), keepalive(10), is_async(true), debug_hooks(true)]),
mqtt_pub(A, Topic, Value),
sleep(5),
mqtt_disconnect(A).
Expand All @@ -26,7 +26,7 @@
*/
async_sub(Topic) :-
mqtt_connect(A, 'localhost', 1883, [alias(swi_mqtt2), client_id(swi_mqtt_client2), keepalive(10), is_async(true)]),
mqtt_connect(A, 'localhost', 1883, [alias(swi_mqtt2), client_id(swi_mqtt_client2), keepalive(10), is_async(true), debug_hooks(true)]),
mqtt_sub(A, Topic, []),
% mqtt_sub(A, '/#', []),
true.
Expand Down
Binary file modified lib/x86_64-linux/mqtt.so
Binary file not shown.

0 comments on commit 7656802

Please sign in to comment.