Skip to content

Commit

Permalink
Merge pull request #11 from JuliaMessaging/feature/upgrade-datastruct…
Browse files Browse the repository at this point in the history
…ures

Upgrade datastructures and concurrency

+ atomics: most of the atomic implementations used a deprecated set of methods. the client should be updated to use the @atomic macros
+ subscription: the dictionary storage of topics was a bit simple causing increasingly complex validations and string matching. topics are moved to a Trie data structure where callback functions are stored in a trie graph with topic pieces as keys.
+ asynchronous process management: particularly when disconnecting the read and write loops would throw errors, better process management is added to prevent loops hanging waiting for data.
  • Loading branch information
NickMcSweeney authored May 28, 2024
2 parents 4ebb3af + 17bdb7a commit 7407565
Show file tree
Hide file tree
Showing 31 changed files with 750 additions and 513 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ jobs:
fail-fast: false
matrix:
version:
- '1.7'
- '1.8'
- '1.9'
- '1.10'
Expand All @@ -42,7 +41,7 @@ jobs:
- name: Run mosquitto broker
run: |
sudo systemctl stop mosquitto
sudo cp ${{ github.workspace }}/test/mosquitto.test.config /etc/mosquitto/conf.d/test.conf
sudo cp ${{ github.workspace }}/test/testclient/mosquitto.test.config /etc/mosquitto/conf.d/test.conf
sudo sed -i -e "s/password_file mosquitto.test.passwordfile/password_file \/etc\/mosquitto\/passwordfile/g" /etc/mosquitto/conf.d/test.conf
sudo mosquitto_passwd -c -b /etc/mosquitto/passwordfile test test
sudo systemctl restart mosquitto
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "MQTTClient"
uuid = "985f35cc-2c3d-4943-b8c1-f0931d5f0959"
authors = ["Nick Shindler <[email protected]>"]
version = "0.2.1"
version = "0.3.0"

[deps]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
[![Code Style: Blue](https://img.shields.io/badge/code%20style-blue-4495d1.svg)](https://github.com/invenia/BlueStyle)

MQTT Client Library for Julia

#### Important! Version `0.3.0` or higher requires at least Julia `v1.8`

This library provides a MQTT Client and functions for interfacing with a standard MQTT broker. This includes publishing messages, and subscribing to topics to receive published messages. See the [documentation](https://JuliaMessaging.github.io/MQTTClient.jl) for more information.

Expand Down Expand Up @@ -40,7 +42,7 @@ This work is based on the MQTT.jl packages created by [femtomic](https://github.
* separate handle_pubrecrel into two different methods and fix them
- [ ] review connect method
* make it not hardcoded
- [ ] disconnect_async/disconnect
- [x] disconnect_async/disconnect
* think about what we need to do and how
* the reconnect should still work
- [ ] implement clean session = false
Expand Down
1 change: 0 additions & 1 deletion docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ makedocs(;
"MQTT Client" => "client.md",
"MQTT API" => [
"Client" => "api/client.md",
"Internal Functions" => "api/handlers.md",
"Interfacing Functions" => "api/interface.md",
],
"Utils" => "utils.md",
Expand Down
12 changes: 1 addition & 11 deletions docs/src/api/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,6 @@
Client
MQTTConnection
IOConnection
TCP
UDS
MQTTException
AbstractIOConnection
Packet
Message
MQTTClient.Message
User
write_loop
read_loop
keep_alive_loop
packet_id
write_packet
```
8 changes: 0 additions & 8 deletions docs/src/api/handlers.md

This file was deleted.

26 changes: 12 additions & 14 deletions docs/src/client.md
Original file line number Diff line number Diff line change
@@ -1,34 +1,32 @@
## Client struct
The client struct is used to store state for an MQTT connection. All callbacks, apart from `on_message`, can't be set through the constructor and have to be set manually after instantiating the `Client` struct.

**Fields in the Client that are relevant for the library user:**
* **ping_timeout**::UInt64: Time, in seconds, the Client waits for the PINGRESP after sending a PINGREQ before he disconnects ; *default = 60 seconds*
* **on_message**::Function: This function gets called upon receiving a publish message from the broker.
* **on_disconnect**::Function:
* **on_connect**::Function:
* **on_subscribe**::Function:
* **on_unsubscribe**::Function:

##### Constructors

```julia
Client()
```

Specify a custom ping_timeout
Specify a custom ping_timeout of 600 seconds
```julia
Client(ping_timeout::UInt64 = 700)
Client(600)
```

Use the wrapping function to get the client and the connection metadata struct. this is equivalent to using the Client constructor; but with more specific syntax.
passing a single unix path or a ip and a port here will determine which protocol is used to communication.
Use the wrapping function to get the client and the connection metadata struct. This generates a client and a connection object that can be used for making connections. The connection object stores information about how to connect the client to the broker.

Passing a ip and a port will be infered as a TCP connection.
```julia
client, connection = MakeConnection()
client, connection = MakeConnection("localhost", 1883)
```
Passing a single path string will be infered as a UDS connection.
```julia
client, connection = MakeConnection("/tmp/mqtt.sock")
```

additional information can be specified for when the connection is made.
Additional information can be specified when the client and connection objects are constructed.
```julia
client, connection = MakeConnection("/tmp/mqtt.sock", keep_alive=60, client_id="TestClient", user=User("name", "pw"), will=Message(QOS_2, "TestClient/will", "payload", more_payload_data))
```

## Message struct
The `Message` struct is the data structure for generic MQTT messages. This is mostly used internally but is exposed to the user in some cases for easier to read arguments (Passing a "will" to the connect method uses the `Message` struct for example).
Expand Down
12 changes: 5 additions & 7 deletions docs/src/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ Samples are available in the `examples` directory.
using MQTTClient
```

Advanced Usage
--------------
The _read_loop_, _write_loop_ _keep_alive_loop_, and _on_msg_ callback are all called as async processes via `@async`.

## Getting started
To use this library you need to follow at least these steps:
1. Create an `MQTTConnection` struct for a given broker and protocol.
Expand All @@ -44,7 +40,7 @@ broker = "test.mosquitto.org"

#Define the callback for receiving messages.
function on_msg(topic, payload)
info("Received message topic: [", topic, "] payload: [", String(payload), "]")
@info("Received message topic: [", topic, "] payload: [", String(payload), "]")
end

#Instantiate a client and connection.
Expand All @@ -54,7 +50,7 @@ connect(client, connection)
#to this topic.
publish(client, "jlExample", "Hello World!", retain=true)
#Subscribe to the topic we sent a retained message to.
subscribe(client, "jlExample", on_msg, qos=QOS_1))
subscribe(client, "jlExample", on_msg, qos=QOS_1)
#Unsubscribe from the topic
unsubscribe(client, "jlExample")
#Disconnect from the broker. Not strictly needed as the broker will also
Expand All @@ -74,4 +70,6 @@ For storing messages that are awaiting acknowledgment, `Client` has a `Dict`, ma

Once the connect method is called on a `Client`, relevant fields are initialized and the julia `connect` method is called to get a connected socket. Then two background tasks are started that perpetually check for messages to send and receive. If `keep_alive` is non-zero another tasks get started that handles sending the keep alive and verifying the pingresp arrived in time.

TODO explain read and write loop a bit
TODO explain read and write loop a bit

TODO explain topic trie structure
59 changes: 33 additions & 26 deletions docs/src/interfaces.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,22 @@ Connects the `Client` instance to the specified broker. There is a synchronous a
#### Arguments
**Required arguments:**
* **client**::Client: The client to connect to the broker.
* **host**::AbstractString: The hostname or ip address of the broker.
* **connection**::MQTTConnection: The information for how the client connects to the broker.

**Optional arguments:**
* **port**::Integer: The port to use ; *default = 1883*
* **keep_alive**::Int64: If the client hasn't sent or received a message within this time limit, it will ping the broker to verify the connection is still active. A value of 0 means no pings will be sent. ; *default = 0*
* **client_id**::String: The id of the client. This should be unique per broker. Some brokers allow an empty client_id for a stateless connection (this means clean_session needs to be true). ; *default = random 8 char string*
* **user**::User: The user, password pair for authentication with the broker. Password can be empty even if user isn't. The password should probably be encrypted. ; *default = empty pair*
* **will**::Message: The will of this client. This message gets published on the specified topic once the client disconnects from the broker. The type of this argument is `Message`, consult with it's documentation above for more info. ; *default = empty will*
* **clean_session**::Bool: Specifies whether or not a connection should be resumed. This implies this `Client` instance was previously connected to this broker. ; *default = true*
use `MakeConnection` to get the client and the connection objects.

#### Call example
The dup and retain flag of a will have to be false so it's safest to use the minimal `Message` constructor (Refer to `Message` documentation above).

```julia
connect(client, connection)
```

#### Synchronous connect
This method waits until the client is connected to the broker. TODO add return documentation
This method waits until the client is connected to the broker.


#### Asynchronous connect
This method doesn't wait and returns a `Future` object. You may wait on this object with the fetch method. This future completes once the client is fully connected. TODO add future data documentation
This method doesn't wait and returns a `Future` object. You may wait on this object with the fetch method. This future completes once the client is fully connected.

## Publish
[MQTT v3.1.1 Doc](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037)
Expand All @@ -49,27 +42,27 @@ Publishes a message to the broker connected to the `Client` instance provided as
#### Call example
These are valid `payload...` examples.
```julia
publish(c, "hello/world")
publish(c, "hello/world", "Test", 6, 4.2)
publish(client, "hello" "world")
publish(client, "foo/bar", "hello world")
```

This is a valid use of the optional arguments.
```julia
publish(c, "hello/world", "Test", 6, 4.2, qos=QOS_1, retain=true)
publish(client, "foo/bar", "hello world", qos=QOS_1, retain=true)
```

#### Synchronous publish
This method waits until the publish message has been processed completely and successfully. So in case of QOS 2 it waits until the PUBCOMP has been received. TODO add return documentation
This method waits until the publish message has been processed completely and successfully. So in case of QOS 2 it waits until the PUBCOMP has been received.


#### Asynchronous publish
This method doesn't wait and returns a `Future` object. You may choose to wait on this object. This future completes once the publish message has been processed completely and successfully. So in case of QOS 2 it waits until the PUBCOMP has been received. TODO change future data documentation
This method doesn't wait and returns a `Future` object. You may choose to wait on this object. This future completes once the publish message has been processed completely and successfully. So in case of QOS 2 it waits until the PUBCOMP has been received.


## Subscribe
[MQTT v3.1.1 Doc](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718063)

Subscribes the `Client` instance, provided as a parameter, to the specified topics. There is a synchronous and an asynchronous version available. Both versions take the same arguments.
Subscribes the `Client` instance, provided as a parameter, to the specified topics. There is a synchronous and an asynchronous version available. Both versions take the same arguments.

#### Arguments
**Required arguments:**
Expand All @@ -79,26 +72,40 @@ Subscribes the `Client` instance, provided as a parameter, to the specified topi
* **qos**::QOS: the named argument to set the QOS, defaults to QOS_0.

#### Call example
This example subscribes to the topic "test" with QOS_2 and "test2" with QOS_0.
This example subscribes to the topic "test" with QOS_2.
```julia
cb(topic, payload) = println("[$topic] $(String(payload))")
subscribe(client, "test", cb, qos=QOS_2))
```

While a lambda function can be used, it can help to define the callback function.
```julia
subscribe(c, "test", ((t,p)->do_a_thing(p)), qos=QOS_2))
cb(topic, payload) = println("[$topic] $(String(payload))")
subscribe(client, "foo/bar", cb, qos=QOS_2)
subscribe(client, "foo/baz", ((args...) -> nothing), qos=QOS_2)
```

```bash
julia> client.on_msg
foo/bar: cb
foo/baz: #15
```

#### Synchronous subscribe
This method waits until the subscribe message has been successfully sent and acknowledged. TODO add return documentation
This method waits until the subscribe message has been successfully sent and acknowledged.

```julia
subscribe(c, "test", on_msg, qos=QOS_2))
```

#### Asynchronous subscribe
This method doesn't wait and returns a `Future` object. You may choose to wait on this object. This future completes once the subscribe message has been successfully sent and acknowledged. TODO change future data documentation
This method doesn't wait and returns a `Future` object. You may choose to wait on this object. This future completes once the subscribe message has been successfully sent and acknowledged.


## Unsubscribe
[MQTT v3.1.1 Doc](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718072)

This method unsubscribes the `Client` instance from the specified topics. There is a synchronous and an asynchronous version available. Both versions take the same arguments.
This method unsubscribes the Client from the specified topics. There is a synchronous and an asynchronous version available. Both versions take the same arguments.

#### Arguments
**Required arguments:**
Expand All @@ -107,15 +114,15 @@ This method unsubscribes the `Client` instance from the specified topics. There

#### Example call
```julia
unsubscribe(c, "test1", "test2", "test3")
unsubscribe(client, "test")
```

#### Synchronous unsubscribe
This method waits until the unsubscribe method has been sent and acknowledged. TODO add return documentation
This method waits until the unsubscribe method has been sent and acknowledged.


#### Asynchronous unsubscribe
This method doesn't wait and returns a `Future` object. You may wait on this object with the fetch method. This future completes once the unsubscribe message has been sent and acknowledged. TODO add future data documentation
This method doesn't wait and returns a `Future` object. You may wait on this object with the fetch method. This future completes once the unsubscribe message has been sent and acknowledged.

## Disconnect
[MQTT v3.1.1 Doc](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090)
Expand All @@ -128,5 +135,5 @@ Disconnects the `Client` instance gracefully, shuts down the background tasks an

#### Example call
```julia
disconnect(c)
disconnect(client)
```
11 changes: 2 additions & 9 deletions docs/src/utils.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
```@docs
mqtt_channel
topic_eq
mqtt_read
mqtt_write
write_len
read_len
resolve
filter_wildcard_len_check
topic_wildcard_len_check
MQTTClient.resolve
MQTTClient.topic_eq
```
Loading

2 comments on commit 7407565

@NickMcSweeney
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator register

Release notes:

Improved the stability of the client particularly for disconnect.

Changes

  • Upgraded atomics in Client to use the @atomic macro
  • Added checks in the read and write loop to verify available data so they will no longer block task finishing on disconnect
  • Implemented Trie/Prefix tree for topics to provide a more robust interface for subscription/unsubscription and managing multiple callback functions
  • update tests
  • update docs

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/107784

Tagging

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.3.0 -m "<description of version>" 74075658c087fc9b875ee422bb70cdb3ea5958b9
git push origin v0.3.0

Please sign in to comment.