From f75dda8510b2852e3b369a06214e2c34234d8d33 Mon Sep 17 00:00:00 2001 From: Nicholas Shindler Date: Tue, 28 May 2024 23:53:58 +0200 Subject: [PATCH 1/9] create a mock broker and update tests and precomp with it --- Project.toml | 2 +- docs/src/api/client.md | 2 +- docs/src/getting-started.md | 4 +- docs/src/interfaces.md | 2 +- ext/PrecompileMQTT.jl | 90 ++++++++++++++++++++----------- src/MQTTClient.jl | 7 ++- src/client.jl | 10 ++-- src/connection.jl | 78 +++++++++++++++++++++------ src/handlers.jl | 3 +- src/interface.jl | 67 +++++++++-------------- src/internals.jl | 2 + src/utils.jl | 105 ++++++++++++++++++++++++++++++++++++ test/integration.test.jl | 56 +++++++++++++++++++ test/runtests.jl | 5 +- test/unittest.client.jl | 14 ++--- 15 files changed, 333 insertions(+), 114 deletions(-) create mode 100644 test/integration.test.jl diff --git a/Project.toml b/Project.toml index 189c221..a53bff1 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "MQTTClient" uuid = "985f35cc-2c3d-4943-b8c1-f0931d5f0959" authors = ["Nick Shindler "] -version = "0.3.0" +version = "0.3.1" [deps] Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" diff --git a/docs/src/api/client.md b/docs/src/api/client.md index 9d3ffad..9c549c2 100644 --- a/docs/src/api/client.md +++ b/docs/src/api/client.md @@ -1,6 +1,6 @@ ```@docs Client -MQTTConnection +Connection IOConnection MQTTClient.Message User diff --git a/docs/src/getting-started.md b/docs/src/getting-started.md index 60105df..8757135 100644 --- a/docs/src/getting-started.md +++ b/docs/src/getting-started.md @@ -25,9 +25,9 @@ using MQTTClient ## Getting started To use this library you need to follow at least these steps: -1. Create an `MQTTConnection` struct for a given broker and protocol. +1. Create an `Connection` struct for a given broker and protocol. 2. Create an instance of the `Client` struct. -3. Call the connect method with your `Client` and `MQTTConnection` instance. +3. Call the connect method with your `Client` and `Connection` instance. 4. Exchange data with the broker through publish, subscribe and unsubscribe. When subscribing, pass your `on_msg` function for that topic. 5. Disconnect from the broker. (Not strictly necessary, if you don't want to resume the session but considered good form and less likely to crash). diff --git a/docs/src/interfaces.md b/docs/src/interfaces.md index 87834c1..6cf10ba 100644 --- a/docs/src/interfaces.md +++ b/docs/src/interfaces.md @@ -6,7 +6,7 @@ Connects the `Client` instance to the specified broker. There is a synchronous a #### Arguments **Required arguments:** * **client**::Client: The client to connect to the broker. -* **connection**::MQTTConnection: The information for how the client connects to the broker. +* **connection**::Connection: The information for how the client connects to the broker. use `MakeConnection` to get the client and the connection objects. diff --git a/ext/PrecompileMQTT.jl b/ext/PrecompileMQTT.jl index 630f73b..8c0404e 100644 --- a/ext/PrecompileMQTT.jl +++ b/ext/PrecompileMQTT.jl @@ -8,38 +8,38 @@ using Sockets using MQTTClient -precompile(Tuple{typeof(Core.kwcall), NamedTuple{(:qos,), Tuple{MQTTClient.QOS}}, typeof(MQTTClient.subscribe), MQTTClient.Client, String, Function}) -precompile(Tuple{typeof(Core.kwcall), NamedTuple{(:qos,), Tuple{MQTTClient.QOS}}, typeof(MQTTClient.publish), MQTTClient.Client, String, String}) - -precompile(Tuple{typeof(Base.convert), Type{MQTTClient.Packet}, MQTTClient.Packet}) -precompile(Tuple{typeof(Base.write), Base.PipeEndpoint, Array{UInt8, 1}}) -precompile(Tuple{typeof(Base.read), Base.PipeEndpoint, Type{UInt8}}) -precompile(Tuple{typeof(Base.indexed_iterate), Tuple{Nothing, Int64}, Int64}) -precompile(Tuple{typeof(Base.read), Base.PipeEndpoint, Int64}) -precompile(Tuple{typeof(Base.haskey), Base.Dict{UInt8, Function}, UInt8}) -precompile(Tuple{typeof(Base.getindex), Base.Dict{UInt8, Function}, UInt8}) -precompile(Tuple{typeof(Base.indexed_iterate), Tuple{Nothing, Int64}, Int64, Int64}) -precompile(Tuple{typeof(Base.read), Sockets.TCPSocket, Int64}) -precompile(Tuple{typeof(Base.write), Base.PipeEndpoint, UInt8}) -precompile(Tuple{typeof(Base.fetch), Base.Channel{Any}}) -precompile(Tuple{typeof(Base.iterate), UInt16}) -precompile(Tuple{typeof(Base.something), MQTTClient.TrieNode{MQTTClient.FunctionCallback}, Nothing, Vararg{Any}}) - -precompile(Tuple{typeof(Sockets.connect), MQTTClient.Client, MQTTClient.MQTTConnection{MQTTClient.TCP}}) -precompile(Tuple{typeof(Sockets.connect), Sockets.IPv6, Int64}) -precompile(Tuple{typeof(Sockets.connect), MQTTClient.Client, MQTTClient.MQTTConnection{MQTTClient.UDS}}) -precompile(Tuple{typeof(Sockets.connect), String}) - -precompile(Tuple{typeof(MQTTClient.write_len), Sockets.TCPSocket, Int64}) -precompile(Tuple{typeof(MQTTClient.read_len), Sockets.TCPSocket}) -precompile(Tuple{typeof(MQTTClient.write_packet), MQTTClient.Client, UInt8, String, Vararg{Any}}) -precompile(Tuple{typeof(MQTTClient.mqtt_write), Base.GenericIOBuffer{Array{UInt8, 1}}, UInt8}) -precompile(Tuple{typeof(MQTTClient.write_len), Base.PipeEndpoint, Int64}) -precompile(Tuple{typeof(MQTTClient.read_len), Base.PipeEndpoint}) -precompile(Tuple{typeof(MQTTClient.mqtt_write), Base.GenericIOBuffer{Array{UInt8, 1}}, MQTTClient.QOS}) - -precompile(Tuple{Type{MQTTClient.Packet}, UInt8, Tuple{}}) -precompile(Tuple{Type{NamedTuple{(:qos,), T} where T<:Tuple}, Tuple{MQTTClient.QOS}}) +# precompile(Tuple{typeof(Core.kwcall), NamedTuple{(:qos,), Tuple{MQTTClient.QOS}}, typeof(MQTTClient.subscribe), MQTTClient.Client, String, Function}) +# precompile(Tuple{typeof(Core.kwcall), NamedTuple{(:qos,), Tuple{MQTTClient.QOS}}, typeof(MQTTClient.publish), MQTTClient.Client, String, String}) + +# precompile(Tuple{typeof(Base.convert), Type{MQTTClient.Packet}, MQTTClient.Packet}) +# precompile(Tuple{typeof(Base.write), Base.PipeEndpoint, Array{UInt8, 1}}) +# precompile(Tuple{typeof(Base.read), Base.PipeEndpoint, Type{UInt8}}) +# precompile(Tuple{typeof(Base.indexed_iterate), Tuple{Nothing, Int64}, Int64}) +# precompile(Tuple{typeof(Base.read), Base.PipeEndpoint, Int64}) +# precompile(Tuple{typeof(Base.haskey), Base.Dict{UInt8, Function}, UInt8}) +# precompile(Tuple{typeof(Base.getindex), Base.Dict{UInt8, Function}, UInt8}) +# precompile(Tuple{typeof(Base.indexed_iterate), Tuple{Nothing, Int64}, Int64, Int64}) +# precompile(Tuple{typeof(Base.read), Sockets.TCPSocket, Int64}) +# precompile(Tuple{typeof(Base.write), Base.PipeEndpoint, UInt8}) +# precompile(Tuple{typeof(Base.fetch), Base.Channel{Any}}) +# precompile(Tuple{typeof(Base.iterate), UInt16}) +# precompile(Tuple{typeof(Base.something), MQTTClient.TrieNode{MQTTClient.FunctionCallback}, Nothing, Vararg{Any}}) + +# precompile(Tuple{typeof(Sockets.connect), MQTTClient.Client, MQTTClient.Connection{MQTTClient.TCP}}) +# precompile(Tuple{typeof(Sockets.connect), Sockets.IPv6, Int64}) +# precompile(Tuple{typeof(Sockets.connect), MQTTClient.Client, MQTTClient.Connection{MQTTClient.UDS}}) +# precompile(Tuple{typeof(Sockets.connect), String}) + +# precompile(Tuple{typeof(MQTTClient.write_len), Sockets.TCPSocket, Int64}) +# precompile(Tuple{typeof(MQTTClient.read_len), Sockets.TCPSocket}) +# precompile(Tuple{typeof(MQTTClient.write_packet), MQTTClient.Client, UInt8, String, Vararg{Any}}) +# precompile(Tuple{typeof(MQTTClient.mqtt_write), Base.GenericIOBuffer{Array{UInt8, 1}}, UInt8}) +# precompile(Tuple{typeof(MQTTClient.write_len), Base.PipeEndpoint, Int64}) +# precompile(Tuple{typeof(MQTTClient.read_len), Base.PipeEndpoint}) +# precompile(Tuple{typeof(MQTTClient.mqtt_write), Base.GenericIOBuffer{Array{UInt8, 1}}, MQTTClient.QOS}) + +# precompile(Tuple{Type{MQTTClient.Packet}, UInt8, Tuple{}}) +# precompile(Tuple{Type{NamedTuple{(:qos,), T} where T<:Tuple}, Tuple{MQTTClient.QOS}}) # Precompiling the package like this provides a slower initial load of the package but faster code execution. @@ -138,6 +138,32 @@ precompile(Tuple{Type{NamedTuple{(:qos,), T} where T<:Tuple}, Tuple{MQTTClient.Q @atomicswap c.last_id = 0x0 future = unsubscribe_async(c, topic) + + ## TCP Basic Run + server = MQTTClient.MockMQTTBroker(ip"127.0.0.1", 1889) + client, conn = MakeConnection(ip"127.0.0.1", 1889) + + connect(client, conn) + + subscribe(client, "foo/bar", cb) + publish(client, "bar/foo", qos=QOS_2) + unsubscribe(client, "foo/bar") + + disconnect(client) + close(server) + + ## UDS Basic Run + server = MQTTClient.MockMQTTBroker("/tmp/testmqtt.sock") + client, conn = MakeConnection("/tmp/testmqtt.sock") + + connect(client, conn) + + subscribe(client, "foo/bar", cb) + publish(client, "bar/foo") + unsubscribe(client, "foo/bar") + + disconnect(client) + close(server) end end diff --git a/src/MQTTClient.jl b/src/MQTTClient.jl index 6af90a0..48d1fc4 100644 --- a/src/MQTTClient.jl +++ b/src/MQTTClient.jl @@ -2,7 +2,7 @@ module MQTTClient using Distributed: Future, myid, remotecall, RemoteChannel using Sockets: TCPSocket, IPAddr, PipeServer, getaddrinfo -import Sockets: connect +import Sockets: connect, listen, accept using Random: randstring import Base: ReentrantLock, lock, unlock, convert, PipeEndpoint, fetch, show import Base: @atomic, @atomicreplace, @atomicswap, Ref, RefValue, isready @@ -20,7 +20,7 @@ include("interface.jl") export MakeConnection, Client, - MQTTConnection, + Connection, IOConnection, MQTTException, User, @@ -35,6 +35,5 @@ export unsubscribe, publish_async, publish, - disconnect, - MQTT_ERR_INVAL + disconnect end diff --git a/src/client.jl b/src/client.jl index 1dcb59f..bd7887c 100644 --- a/src/client.jl +++ b/src/client.jl @@ -25,7 +25,7 @@ This client uses atomic operations to ensure thread safety for shared variables # Constructor `Client(ping_timeout::UInt64=UInt64(60))` constructs a new `Client` object with the specified ping timeout (default: 60 seconds). """ -mutable struct Client +mutable struct Client <: AbstractConfigElement @atomic state::UInt8 on_msg::TrieNode @@ -161,11 +161,9 @@ end function keep_alive_loop(client::Client)::UInt8 ping_sent = time() - if client.keep_alive > 10 - check_interval = 5 - else - check_interval = client.keep_alive / 2 - end + # TODO: improve, this causes reconnect to take ~1 second. is there a way to interupt? + check_interval = 1 + timer = Timer(0, interval = check_interval) while !isclosed(client) diff --git a/src/connection.jl b/src/connection.jl index 330883d..1b40917 100644 --- a/src/connection.jl +++ b/src/connection.jl @@ -54,20 +54,20 @@ IOConnection(path::AbstractString) = UDS(path) IOConnection() = MockIOConnection() """ - connect(protocol::UDS) -> PipeEndpoint + connect(protocol::UDS)::PipeEndpoint Establishes a connection to a Unix domain socket at the given path specified in the `UDS` struct. """ connect(protocol::UDS) = connect(protocol.path) """ - connect(protocol::TCP) -> TCPSocket + connect(protocol::TCP)::TCPSocket Establishes a TCP connection to the given IP address and port specified in the `TCP` struct. """ connect(protocol::TCP) = connect(protocol.ip, protocol.port) """ - connect(protocol::MockIOConnection) -> IOBuffer + connect(protocol::MockIOConnection)::IOBuffer Mocks a connection to an MQTT Broker with a local IOBuffer. Should only be used for testing. """ @@ -75,13 +75,13 @@ connect(protocol::MockIOConnection) = IOBuffer() """ - MQTTConnection{T <: AbstractIOConnection} + Connection{T <: AbstractIOConnection} -The `MQTTConnection` struct in Julia encapsulates the configuration and connection details required for an MQTT client to connect to an MQTT broker. +The `Connection` struct in Julia encapsulates the configuration and connection details required for an MQTT client to connect to an MQTT broker. This struct supports two types of connection protocols: TCP and Unix Domain Sockets (UDS), both of which are subtypes of `AbstractIOConnection`. The struct includes fields for protocol type, keep-alive interval, client ID, user credentials, a will message (a message that is sent by the broker if the client disconnects unexpectedly), and a flag indicating whether the session is clean (i.e., no persistent session state). -The `MQTTConnection` constructor allows for flexible instantiation with default or specified values for each field, +The `Connection` constructor allows for flexible instantiation with default or specified values for each field, enabling easy setup of connection parameters tailored to the specific requirements of the MQTT client and broker interaction. ## Fields @@ -93,22 +93,23 @@ enabling easy setup of connection parameters tailored to the specific requiremen - `clean_session::Bool`: Whether to start a clean session. ## Constructors -`MQTTConnection(protocol::T; +`Connection(protocol::T; keep_alive::Int64=32, client_id::String=randstring(8), user::User=User("", ""), will::Message=Message(false, 0x00, false, "", UInt8[]), - clean_session::Bool=true) where T <: AbstractIOConnection` constructs a new `MQTTConnection` object with the specified protocol and optional keyword arguments. + clean_session::Bool=true) where T <: AbstractIOConnection` constructs a new `Connection` object with the specified protocol and optional keyword arguments. -`MQTTConnection(protocol::T, +`Connection(protocol::T, keep_alive::Int64, client_id::String, user::User, will::Message, - clean_session::Bool) where T <: AbstractIOConnection` constructs a new `MQTTConnection` object with the specified arguments. + clean_session::Bool) where T <: AbstractIOConnection` constructs a new `Connection` object with the specified arguments. ### Example using TCP protocol with default and custom values -tcp_connection = MQTTConnection( +```julia +tcp_connection = Connection( TCP(Sockets.localhost, 1883); # Using TCP with localhost and port 1883 keep_alive=60, # Custom keep-alive interval of 60 seconds client_id="my_mqtt_client", # Custom client ID @@ -116,9 +117,11 @@ tcp_connection = MQTTConnection( will=Message(false, 0x01, false, "last/will/topic", UInt8[]), # Custom will message clean_session=true # Default clean session flag ) +``` ### Example using UDS protocol with all custom values -uds_connection_full = MQTTConnection( +```julia +uds_connection_full = Connection( UDS("/var/run/mqtt.sock"), # Using UDS with specified socket path 45, # Custom keep-alive interval of 45 seconds "another_client", # Custom client ID @@ -126,8 +129,9 @@ uds_connection_full = MQTTConnection( Message(true, 0x00, true, "will/topic", UInt8[1, 2, 3]), # Custom will message false # Custom clean session flag ) +``` """ -struct MQTTConnection{T <: AbstractIOConnection} +struct Connection{T <: AbstractIOConnection} <: AbstractConfigElement protocol::T keep_alive::Int64 client_id::String @@ -135,14 +139,14 @@ struct MQTTConnection{T <: AbstractIOConnection} will::Message clean_session::Bool - MQTTConnection(protocol::T; + Connection(protocol::T; keep_alive::Int64=32, client_id::String=randstring(8), user::User=User("", ""), will::Message=Message(false, 0x00, false, "", UInt8[]), clean_session::Bool=true) where T <: AbstractIOConnection = new{T}(protocol, keep_alive, client_id, user, will, clean_session) - MQTTConnection(protocol::T, + Connection(protocol::T, keep_alive::Int64, client_id::String, user::User, @@ -150,4 +154,46 @@ struct MQTTConnection{T <: AbstractIOConnection} clean_session::Bool) where T <: AbstractIOConnection = new{T}(protocol, keep_alive, client_id, user, will, clean_session) end -Base.show(io::IO, connection::MQTTConnection) = print(io, "MQTTConnection(Protocol: $(connection.protocol), Client ID: $(connection.client_id)", (connection.user == User("","") ? "" : ", User Name: $(connection.user.name)"), ")") +Base.show(io::IO, connection::Connection) = print(io, "Connection(Protocol: $(connection.protocol), Client ID: $(connection.client_id)", (connection.user == User("","") ? "" : ", User Name: $(connection.user.name)"), ")") + +""" + Configuration + +Container for the mqtt client and mqtt connection data. This is partially iterable, and can be spread to 2 variables with the `...` splat operator or `client, conn = conf` variable assignment. + +## Example + +```julia +# using the MakeConnection interface function +config = MakeConnection("/temp/mqtt.sock") + +# using a defined IO +io = IOConnection("localhost",1883) +config = Configuration(io) + +# spreading the variables +client, connection = Configuration(...) +``` +""" +struct Configuration + client::Client + connection::Connection + + function Configuration(io::T, + ping_timeout=UInt64(60), + keep_alive::Int64=32, + client_id::String=randstring(8), + user::User=User("", ""), + will::Message=Message(false, 0x00, false, "", UInt8[]), + clean_session::Bool=true) where {T <: AbstractIOConnection} + new(Client(ping_timeout), Connection(io, keep_alive, client_id, user, will, clean_session)) + end + function Configuration(client::Client, connection::Connection) + new(client, connection) + end +end + +Base.iterate(conf::Configuration, state=1) = state == 1 ? (conf.client, 2) : (conf.connection, nothing) +Base.length(::Configuration) = 2 +Base.IteratorSize(::Type{Configuration}) = Base.HasLength() +Base.IteratorEltype(::Type{Configuration}) = AbstractConfigElement \ No newline at end of file diff --git a/src/handlers.jl b/src/handlers.jl index be6936e..5f38f6e 100644 --- a/src/handlers.jl +++ b/src/handlers.jl @@ -82,8 +82,7 @@ function handle_pingresp(client::Client, s::IO, cmd::UInt8, flags::UInt8) if @atomic(client.ping_outstanding) == 0x1 @atomicswap client.ping_outstanding = 0x0 else - # We received a subresp packet we didn't ask for - # disconnect(client) + # We received a ping resp packet we didn't ask for @atomicswap client.state = 0x03 throw(ArgumentError("No outstanding ping. client.ping_outstanding = $(client.ping_outstanding) and should be 0x1")) end diff --git a/src/interface.jl b/src/interface.jl index 3337ba9..c79970e 100644 --- a/src/interface.jl +++ b/src/interface.jl @@ -15,16 +15,8 @@ will::Message=Message(false, 0x00, false, "", UInt8[]), clean_session::Bool=true)::Tuple - MakeConnection(io::T; - ping_timeout=UInt64(60), - keep_alive::Int64=32, - client_id::String=randstring(8), - user::User=User("", ""), - will::Message=Message(false, 0x00, false, "", UInt8[]), - clean_session::Bool=true)::Tuple where {T <: AbstractIOConnection} - Creates an MQTT client connection to an MQTT broker, handling the construction -of both the `Client` and `MQTTConnection` objects. This function provides +of both the `Client` and `Connection` objects inside the `Configuration` struct. This function provides flexible ways to specify the connection details either through a TCP connection with host and port, a Unix Domain Socket path. @@ -41,15 +33,14 @@ with host and port, a Unix Domain Socket path. - `clean_session::Bool`: Indicates whether to start a clean session (default: true). # Returns -- A tuple `(Client, MQTTConnection)` where `Client` is the MQTT client instance - and `MQTTConnection` is the connection information used to connect to the broker. +- A `Configuration` struct where `client::Client` is the MQTT client instance + and `connection::Connection` is the connection information used to connect to the broker. This function simplifies the process of setting up an MQTT client connection. Depending on the type of connection, you can specify the broker's IP address -and port, a Unix Domain Socket path, or directly provide any struct that is a subtype of `AbstractIOConnection`. -It then constructs the necessary `Client` and `MQTTConnection` objects with the +and port or a Unix Domain Socket path, it infers the Protocol and then constructs the necessary provided or default parameters. Refer to the documentation for [`Client`](@ref) and -[`MQTTConnection`](@ref) for more details on their fields and usage. +[`Connection`](@ref) object. ## Examples @@ -59,10 +50,6 @@ client, connection = MakeConnection("127.0.0.1", 1883, client_id="mqtt_client_1" # Example with Unix Domain Socket path client, connection = MakeConnection("/var/run/mqtt.sock", user=User("user", "pass")) - -# Example with provided connection -tcp_conn = TCP(Sockets.localhost, 1883) -client, connection = MakeConnection(tcp_conn; keep_alive=60, clean_session=false) ``` """ function MakeConnection(host::Union{IPAddr, String}, port::Int64; @@ -71,8 +58,8 @@ function MakeConnection(host::Union{IPAddr, String}, port::Int64; client_id::String=randstring(8), user::User=User("", ""), will::Message=Message(false, 0x00, false, "", UInt8[]), - clean_session::Bool=true)::Tuple - MakeConnection(IOConnection(host,port),ping_timeout,keep_alive,client_id,user,will,clean_session) + clean_session::Bool=true)::Configuration + Configuration(IOConnection(host,port),ping_timeout,keep_alive,client_id,user,will,clean_session) end function MakeConnection(path::String; ping_timeout=UInt64(60), @@ -80,28 +67,19 @@ function MakeConnection(path::String; client_id::String=randstring(8), user::User=User("", ""), will::Message=Message(false, 0x00, false, "", UInt8[]), - clean_session::Bool=true)::Tuple - MakeConnection(IOConnection(path),ping_timeout,keep_alive,client_id,user,will,clean_session) -end -function MakeConnection(io::T, - ping_timeout=UInt64(60), - keep_alive::Int64=32, - client_id::String=randstring(8), - user::User=User("", ""), - will::Message=Message(false, 0x00, false, "", UInt8[]), - clean_session::Bool=true)::Tuple where {T <: AbstractIOConnection} - (Client(ping_timeout), MQTTConnection(io, keep_alive, client_id, user, will, clean_session)) + clean_session::Bool=true)::Configuration + Configuration(IOConnection(path),ping_timeout,keep_alive,client_id,user,will,clean_session) end """ - connect_async(client::Client, connection::MQTTConnection) + connect_async(client::Client, connection::Connection) -Establishes an asynchronous connection to the MQTT broker using the provided `Client` and `MQTTConnection` objects. This function initializes the client, establishes the connection, and starts the necessary loops for communication. +Establishes an asynchronous connection to the MQTT broker using the provided `Client` and `Connection` objects. This function initializes the client, establishes the connection, and starts the necessary loops for communication. # Arguments - `client::Client`: The MQTT client instance. -- `connection::MQTTConnection`: The connection information used to connect to the broker. +- `connection::Connection`: The connection information used to connect to the broker. # Returns - A `Future` object that can be used to await the completion of the connection process. @@ -117,7 +95,7 @@ wait(future) # See Also - [`connect`](@ref): The synchronous version of this function. """ -function connect_async(client::Client, connection::MQTTConnection) +function connect_async(client::Client, connection::Connection) if !isready(client) @atomicswap client.state = 0x00 client.on_msg = TrieNode() @@ -188,13 +166,13 @@ end """ - connect(client::Client, connection::MQTTConnection) + connect(client::Client, connection::Connection) -Establishes a synchronous connection to the MQTT broker using the provided [`Client`](@ref) and [`MQTTConnection`](@ref) objects. This function wraps [`connect_async`](@ref) and waits for the connection process to complete. +Establishes a synchronous connection to the MQTT broker using the provided [`Client`](@ref) and [`Connection`](@ref) objects. This function wraps [`connect_async`](@ref) and waits for the connection process to complete. # Arguments - `client::Client`: The MQTT client instance. -- `connection::MQTTConnection`: The connection information used to connect to the broker. +- `connection::Connection`: The connection information used to connect to the broker. # Returns - The result of the connection process after it completes. @@ -213,9 +191,9 @@ result = connect(client, connection) # See Also - [`connect_async`](@ref): The asynchronous version of this function. - [`Client`](@ref) -- [`MQTTConnection`](@ref) +- [`Connection`](@ref) """ -connect(client::Client, connection::MQTTConnection) = fetch(connect_async(client, connection)) +connect(client::Client, connection::Connection) = fetch(connect_async(client, connection)) """ @@ -305,11 +283,18 @@ Unsubscribes the `Client` instance from the supplied topic names. Deletes the callback from the client Returns a `Future` object that contains `nothing` on success and an exception on failure. """ +function unsubscribe_async(client::Client, topic::String) + future = Future() + id = packet_id(client) + client.in_flight[id] = future + write_packet(client, UNSUBSCRIBE | 0x02, id, topic) + remove!(client.on_msg, topic) + return future +end function unsubscribe_async(client::Client, topics::String...) future = Future() id = packet_id(client) client.in_flight[id] = future - topic_data = [] write_packet(client, UNSUBSCRIBE | 0x02, id, topics...) ((t) -> remove!(client.on_msg, t)).(topics) return future diff --git a/src/internals.jl b/src/internals.jl index d2729fe..31a64f0 100644 --- a/src/internals.jl +++ b/src/internals.jl @@ -42,6 +42,8 @@ const CLIENT_STATE = Dict{UInt8, Symbol}( abstract type AbstractIOConnection end +abstract type AbstractConfigElement end + AbstractProtocol = Union{PipeEndpoint, TCPSocket} ## Enums diff --git a/src/utils.jl b/src/utils.jl index 109de51..6d1903d 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -118,4 +118,109 @@ function taskstatus(t::Task) else :ready end +end + +""" + MockMQTTBroker(args...) + MockMQTTBroker(path::String) + MockMQTTBroker(ip::Sockets.IPAddr, port::Int) + +creates a mock mqtt broker over UDS or TCP that handles CONNECT, SUBSCRIBE, UNSUBSCRIBE, PUBLISH, PING, and DISCONNECT messages. + +## Example + +```julia +using Sockets # need sockets to us ip"" + +server = MQTTClient.MockMQTTBroker(ip"127.0.0.1", 1883) +# Sockets.TCPServer(RawFD(20) active) +client, conn = MakeConnection(ip"127.0.0.1", 1883) +# MQTTClient.Configuration(MQTTClient[state: ready, read_loop: ready, write_loop: ready, keep_alive: ready] +# , Connection(Protocol: MQTTClient.TCP(ip"127.0.0.1", 1883), Client ID: OL5hUGmT)) + +connect(client, conn) +# 0x00 + +subscribe(client, "foo/bar", (args...) -> nothing) +# 2-element Vector{UInt8}: +# 0x01 +# 0x00 +publish(client, "bar/foo", qos=QOS_2) +unsubscribe(client, "foo/bar") + +disconnect(client) +# (0x00, 0x00, 0x00) +close(server) +``` + +""" +function MockMQTTBroker(args...) + server = listen(args...) + + function mock_response(cmd::UInt8, flags::UInt8, buffer::IO) + if cmd == 0x10 # CONNECT + # println("Received CONNECT packet") + return [Packet(0x20, 0x0000)] # CONNACK + elseif cmd == 0x80 # SUBSCRIBE + # println("Received SUBSCRIBE packet") + id = mqtt_read(buffer, UInt16) + return [Packet(0x90, UInt8[id >> 8, id & 0xFF, 0x01, 0x00])] # SUBACK + elseif cmd == 0x30 # PUBLISH + # println("Received PUBLISH packet") + dup = (flags & 0x08) >> 3 + qos = (flags & 0x06) >> 1 + retain = (flags & 0x01) + topic = mqtt_read(buffer, String) + if qos !== 0x00 # not QOS0 + id = mqtt_read(buffer, UInt16) + return [Packet(0x40, id)] # PUBACK + end + elseif cmd == 0xA0 # UNSUBSCRIBE + # println("Received UNSUBSCRIBE packet") + id = mqtt_read(buffer, UInt16) + return [Packet(0xB0, id)] # UNSUBACK + elseif cmd == 0xC0 # PINGREQ + # println("Recieved PING REQ packet") + return [Packet(0xD0, 0x00)] + end + [] + end + + @async while isopen(server) + conn = accept(server) + while isopen(conn) + # read data + cmd_flags = read(conn, UInt8) + len = read_len(conn) + data = read(conn, len) + buffer = PipeBuffer(data) + cmd = cmd_flags & 0xF0 + flags = cmd_flags & 0x0F + + # pause and transition. + sleep(0.001) + + if cmd == 0xE0 # DISCONNECT + # println("Received DISCONNECT packet") + # DISCONNECT does not require acknowledgment + close(conn) + break + end + + # respond + packets = mock_response(cmd, flags, buffer) + for packet in packets + buffer = PipeBuffer() + for i in packet.data + mqtt_write(buffer, i) + end + data = take!(buffer) + write(conn, packet.cmd) + write_len(conn, length(data)) + write(conn, data) + sleep(0.001) + end + end + end + server end \ No newline at end of file diff --git a/test/integration.test.jl b/test/integration.test.jl new file mode 100644 index 0000000..fc3d435 --- /dev/null +++ b/test/integration.test.jl @@ -0,0 +1,56 @@ +cb(args...) = nothing + +@testset "TCP Client" begin + server = MQTTClient.MockMQTTBroker(ip"127.0.0.1", 1889) + client, conn = MakeConnection(ip"127.0.0.1", 1889) + + connect(client, conn) + @test MQTTClient.isconnected(client) + + res = subscribe(client, "foo/bar", cb) + @test res == [0x01, 0x00] + res = publish(client, "bar/foo", qos=QOS_2) + @test isnothing(res) + res = unsubscribe(client, "foo/bar") + @test isnothing(res) + + res = disconnect(client) + @test res == (0x00, 0x00, 0x00) + + # test reconnect + connect(client, conn) + @test MQTTClient.isconnected(client) + res = disconnect(client) + @test res == (0x00, 0x00, 0x00) + + @test MQTTClient.isclosed(client) + close(server) +end + +@testset "UDS Client" begin + ## UDS Basic Run + server = MQTTClient.MockMQTTBroker("/tmp/testmqtt.sock") + client, conn = MakeConnection("/tmp/testmqtt.sock") + + connect(client, conn) + @test MQTTClient.isconnected(client) + + res = subscribe(client, "foo/bar", cb) + @test res == [0x01, 0x00] + res = publish(client, "bar/foo") + @test res == 0 + res = unsubscribe(client, "foo/bar") + @test isnothing(res) + + res = disconnect(client) + @test res == (0x00, 0x00, 0x00) + + # test reconnect + connect(client, conn) + @test MQTTClient.isconnected(client) + res = disconnect(client) + @test res == (0x00, 0x00, 0x00) + + @test MQTTClient.isclosed(client) + close(server) +end \ No newline at end of file diff --git a/test/runtests.jl b/test/runtests.jl index 9d7860d..8dcaeea 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -4,7 +4,7 @@ using MQTTClient using Distributed, Random import MQTTClient: topic_wildcard_len_check, filter_wildcard_len_check, MQTTException, Packet -import Sockets: TCPSocket, PipeServer, connect, localhost, getaddrinfo, IOError, DNSError +import Sockets: TCPSocket, PipeServer, connect, localhost, getaddrinfo, IOError, DNSError, @ip_str import Base.PipeEndpoint @testset verbose=true "client tests" begin @@ -16,6 +16,9 @@ end @testset verbose=true "topic trie tests" begin include("unittest.topic.jl") end +@testset verbose=true "integration tests" begin + include("integration.test.jl") +end # These tests need a mqtt broker running. # A mosquitto configuration file is provided that will allow these tests to be run. diff --git a/test/unittest.client.jl b/test/unittest.client.jl index a906f2f..446c5f8 100644 --- a/test/unittest.client.jl +++ b/test/unittest.client.jl @@ -76,7 +76,7 @@ end @testset verbose = true "MQTT Connection functionality" begin @testset "MQTT TCP Connection from ip" begin - conn = MQTTClient.MQTTConnection(MQTTClient.IOConnection(localhost, 1883)) + conn = MQTTClient.Connection(MQTTClient.IOConnection(localhost, 1883)) @test conn.protocol isa MQTTClient.TCP @test conn.protocol.ip == localhost @test conn.keep_alive == 32 @@ -87,7 +87,7 @@ end end @testset "MQTT TCP Connection from string" begin - conn = MQTTClient.MQTTConnection(MQTTClient.IOConnection("localhost", 1883)) + conn = MQTTClient.Connection(MQTTClient.IOConnection("localhost", 1883)) @test conn.protocol isa MQTTClient.TCP @test conn.protocol.ip == getaddrinfo("localhost") @test conn.keep_alive == 32 @@ -99,7 +99,7 @@ end @testset "MQTT UDS Connection" begin path = "/tmp/mqtt.sock" - conn = MQTTClient.MQTTConnection(MQTTClient.IOConnection(path)) + conn = MQTTClient.Connection(MQTTClient.IOConnection(path)) @test conn.protocol isa MQTTClient.UDS @test conn.protocol.path == path @test conn.keep_alive == 32 @@ -115,16 +115,16 @@ end @testset "Make MQTT tcp connection" begin c, conn = MQTTClient.MakeConnection("localhost", 1883) @test c isa MQTTClient.Client - @test conn isa MQTTClient.MQTTConnection + @test conn isa MQTTClient.Connection c, conn = MQTTClient.MakeConnection(localhost, 1883) @test c isa MQTTClient.Client - @test conn isa MQTTClient.MQTTConnection + @test conn isa MQTTClient.Connection end @testset "Make MQTT uds connection" begin c, conn = MQTTClient.MakeConnection("/tmp/mqtt.sock") @test c isa MQTTClient.Client - @test conn isa MQTTClient.MQTTConnection + @test conn isa MQTTClient.Connection end @testset "Test Client show function" begin @@ -139,7 +139,7 @@ end client, conn = MQTTClient.MakeConnection("localhost", 1883, client_id="foo") show(io, conn) str = take!(io) |> String - @test contains(str, "MQTTConnection(Protocol: MQTTClient.TCP") + @test contains(str, "Connection(Protocol: MQTTClient.TCP") end @testset "MQTT subscribe async" begin From e67773869b55d1782c0b3059d66f51b2e9bfc9af Mon Sep 17 00:00:00 2001 From: Nicholas Shindler Date: Wed, 29 May 2024 00:00:21 +0200 Subject: [PATCH 2/9] update docs --- docs/src/api/client.md | 1 + docs/src/utils.md | 1 + ext/PrecompileMQTT.jl | 34 ---------------------------------- src/MQTTClient.jl | 1 + 4 files changed, 3 insertions(+), 34 deletions(-) diff --git a/docs/src/api/client.md b/docs/src/api/client.md index 9c549c2..cbf8ec9 100644 --- a/docs/src/api/client.md +++ b/docs/src/api/client.md @@ -4,4 +4,5 @@ Connection IOConnection MQTTClient.Message User +Configuration ``` \ No newline at end of file diff --git a/docs/src/utils.md b/docs/src/utils.md index fe39ef5..be97eef 100644 --- a/docs/src/utils.md +++ b/docs/src/utils.md @@ -1,4 +1,5 @@ ```@docs MQTTClient.resolve MQTTClient.topic_eq +MQTTClient.MockMQTTBroker ``` \ No newline at end of file diff --git a/ext/PrecompileMQTT.jl b/ext/PrecompileMQTT.jl index 8c0404e..35c12aa 100644 --- a/ext/PrecompileMQTT.jl +++ b/ext/PrecompileMQTT.jl @@ -8,40 +8,6 @@ using Sockets using MQTTClient -# precompile(Tuple{typeof(Core.kwcall), NamedTuple{(:qos,), Tuple{MQTTClient.QOS}}, typeof(MQTTClient.subscribe), MQTTClient.Client, String, Function}) -# precompile(Tuple{typeof(Core.kwcall), NamedTuple{(:qos,), Tuple{MQTTClient.QOS}}, typeof(MQTTClient.publish), MQTTClient.Client, String, String}) - -# precompile(Tuple{typeof(Base.convert), Type{MQTTClient.Packet}, MQTTClient.Packet}) -# precompile(Tuple{typeof(Base.write), Base.PipeEndpoint, Array{UInt8, 1}}) -# precompile(Tuple{typeof(Base.read), Base.PipeEndpoint, Type{UInt8}}) -# precompile(Tuple{typeof(Base.indexed_iterate), Tuple{Nothing, Int64}, Int64}) -# precompile(Tuple{typeof(Base.read), Base.PipeEndpoint, Int64}) -# precompile(Tuple{typeof(Base.haskey), Base.Dict{UInt8, Function}, UInt8}) -# precompile(Tuple{typeof(Base.getindex), Base.Dict{UInt8, Function}, UInt8}) -# precompile(Tuple{typeof(Base.indexed_iterate), Tuple{Nothing, Int64}, Int64, Int64}) -# precompile(Tuple{typeof(Base.read), Sockets.TCPSocket, Int64}) -# precompile(Tuple{typeof(Base.write), Base.PipeEndpoint, UInt8}) -# precompile(Tuple{typeof(Base.fetch), Base.Channel{Any}}) -# precompile(Tuple{typeof(Base.iterate), UInt16}) -# precompile(Tuple{typeof(Base.something), MQTTClient.TrieNode{MQTTClient.FunctionCallback}, Nothing, Vararg{Any}}) - -# precompile(Tuple{typeof(Sockets.connect), MQTTClient.Client, MQTTClient.Connection{MQTTClient.TCP}}) -# precompile(Tuple{typeof(Sockets.connect), Sockets.IPv6, Int64}) -# precompile(Tuple{typeof(Sockets.connect), MQTTClient.Client, MQTTClient.Connection{MQTTClient.UDS}}) -# precompile(Tuple{typeof(Sockets.connect), String}) - -# precompile(Tuple{typeof(MQTTClient.write_len), Sockets.TCPSocket, Int64}) -# precompile(Tuple{typeof(MQTTClient.read_len), Sockets.TCPSocket}) -# precompile(Tuple{typeof(MQTTClient.write_packet), MQTTClient.Client, UInt8, String, Vararg{Any}}) -# precompile(Tuple{typeof(MQTTClient.mqtt_write), Base.GenericIOBuffer{Array{UInt8, 1}}, UInt8}) -# precompile(Tuple{typeof(MQTTClient.write_len), Base.PipeEndpoint, Int64}) -# precompile(Tuple{typeof(MQTTClient.read_len), Base.PipeEndpoint}) -# precompile(Tuple{typeof(MQTTClient.mqtt_write), Base.GenericIOBuffer{Array{UInt8, 1}}, MQTTClient.QOS}) - -# precompile(Tuple{Type{MQTTClient.Packet}, UInt8, Tuple{}}) -# precompile(Tuple{Type{NamedTuple{(:qos,), T} where T<:Tuple}, Tuple{MQTTClient.QOS}}) - - # Precompiling the package like this provides a slower initial load of the package but faster code execution. # based on tests this precompile step reduces compilation at runtime by ~25% and decreases first execution time by ~10%. @setup_workload begin diff --git a/src/MQTTClient.jl b/src/MQTTClient.jl index 48d1fc4..ab1b382 100644 --- a/src/MQTTClient.jl +++ b/src/MQTTClient.jl @@ -19,6 +19,7 @@ include("interface.jl") export MakeConnection, + Configuration, Client, Connection, IOConnection, From f9c631de7d0a682dce66f118dc26c6b8b44ebef3 Mon Sep 17 00:00:00 2001 From: Nicholas Shindler Date: Wed, 29 May 2024 11:20:55 +0200 Subject: [PATCH 3/9] fixes and formatting --- Project.toml | 2 +- README.md | 1 - docs/make.jl | 13 +- examples/basic.jl | 8 +- ext/PrecompileMQTT.jl | 18 +-- src/MQTTClient.jl | 4 +- src/client.jl | 95 +++++++------- src/connection.jl | 161 +++++++++++++----------- src/handlers.jl | 33 ++--- src/interface.jl | 260 +++++++++++++++++++++++---------------- src/internals.jl | 60 +++++---- src/topic.jl | 128 +++++++++++-------- src/utils.jl | 72 +++++------ test/dev/mocksocket.jl | 86 +++++++------ test/integration.test.jl | 18 ++- test/runtests.jl | 16 +-- test/unittest.client.jl | 107 +++++++++++----- test/unittest.topic.jl | 44 +++++-- 18 files changed, 643 insertions(+), 483 deletions(-) diff --git a/Project.toml b/Project.toml index a53bff1..2e9de75 100644 --- a/Project.toml +++ b/Project.toml @@ -15,7 +15,7 @@ PrecompileTools = "aea7be01-6a6a-4083-8856-8a6e6704d82a" PrecompileMQTT = "PrecompileTools" [compat] -julia = "1.7" +julia = "1.8" [extras] Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" diff --git a/README.md b/README.md index 8820365..897e5b8 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,6 @@ [![Dev](https://img.shields.io/badge/docs-dev-blue.svg)](https://JuliaMessaging.github.io/MQTTClient.jl/dev/) [![Build Status](https://github.com/JuliaMessaging/MQTTClient.jl/actions/workflows/CI.yml/badge.svg?branch=main)](https://github.com/JuliaMessaging/MQTTClient.jl/actions/workflows/CI.yml?query=branch%3Amain) [![Coverage](https://codecov.io/gh/JuliaMessaging/MQTTClient.jl/branch/main/graph/badge.svg)](https://codecov.io/gh/JuliaMessaging/MQTTClient.jl) -[![Coverage](https://coveralls.io/repos/github/JuliaMessaging/MQTTClient.jl/badge.svg?branch=main)](https://coveralls.io/github/JuliaMessaging/MQTTClient.jl?branch=main) [![Code Style: Blue](https://img.shields.io/badge/code%20style-blue-4495d1.svg)](https://github.com/invenia/BlueStyle) MQTT Client Library for Julia diff --git a/docs/make.jl b/docs/make.jl index bcaf9d6..ef709a3 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -1,4 +1,4 @@ -push!(LOAD_PATH,"../src/") +push!(LOAD_PATH, "../src/") using MQTTClient using Documenter @@ -21,15 +21,10 @@ makedocs(; "Getting Started" => "getting-started.md", "MQTT Interface Functions" => "interfaces.md", "MQTT Client" => "client.md", - "MQTT API" => [ - "Client" => "api/client.md", - "Interfacing Functions" => "api/interface.md", - ], + "MQTT API" => + ["Client" => "api/client.md", "Interfacing Functions" => "api/interface.md"], "Utils" => "utils.md", ], ) -deploydocs(; - repo="github.com/JuliaMessaging/MQTTClient.jl", - devbranch="main", -) +deploydocs(; repo="github.com/JuliaMessaging/MQTTClient.jl", devbranch="main") diff --git a/examples/basic.jl b/examples/basic.jl index 86ec7f6..5959d3f 100644 --- a/examples/basic.jl +++ b/examples/basic.jl @@ -6,22 +6,22 @@ payload = "Hello World!" # Define the callback for receiving messages. function on_msg(topic, payload) - println("Received message topic: [", topic, "] payload: [", String(payload), "]") + return println("Received message topic: [", topic, "] payload: [", String(payload), "]") end # Instantiate a client. -client, connection = MakeConnection(broker,1883) +client, connection = MakeConnection(broker, 1883) connect(client, connection) println("connected to $client at $(connection.protocol)") # Subscribe to the topic. -subscribe(client, topic, on_msg, qos=QOS_2) +subscribe(client, topic, on_msg; qos=QOS_2) println("subscribed to $topic") sleep(0.5) -publish(client, topic, payload, qos=QOS_2) +publish(client, topic, payload; qos=QOS_2) println("published $payload to $topic") # Unsubscribe from the topic diff --git a/ext/PrecompileMQTT.jl b/ext/PrecompileMQTT.jl index 35c12aa..0bd8c64 100644 --- a/ext/PrecompileMQTT.jl +++ b/ext/PrecompileMQTT.jl @@ -14,7 +14,7 @@ using MQTTClient # Putting some things in `@setup_workload` instead of `@compile_workload` can reduce the size of the # precompile file and potentially make loading faster. - cb(t,p) = nothing + cb(t, p) = nothing topic = "foo" payload = "bar" @@ -36,7 +36,11 @@ using MQTTClient message = MQTTClient.Message(false, UInt8(MQTTClient.QOS_0), false, topic, payload) optional = message.qos == 0x00 ? () : (0) - cmd = MQTTClient.PUBLISH | ((message.dup & 0x1) << 3) | (message.qos << 1) | message.retain + cmd = + MQTTClient.PUBLISH | + ((message.dup & 0x1) << 3) | + (message.qos << 1) | + message.retain packet = MQTTClient.Packet(cmd, [message.topic, optional..., message.payload]) buffer = PipeBuffer() for i in packet.data @@ -70,7 +74,6 @@ using MQTTClient MQTTClient.handle_pubrel(c, s, cmd, flags) p = take!(c.write_packets) - # handle_suback c = MQTTClient.Client() s = IOBuffer() @@ -96,15 +99,14 @@ using MQTTClient ## Interfaces: # subscribe c = MQTTClient.Client() - future = MQTTClient.subscribe_async(c, topic, cb, qos=MQTTClient.QOS_2) + future = MQTTClient.subscribe_async(c, topic, cb; qos=MQTTClient.QOS_2) # unsubscribe - c = MQTTClient.Client() + c = MQTTClient.Client() insert!(c.on_msg, topic, cb) @atomicswap c.last_id = 0x0 future = unsubscribe_async(c, topic) - ## TCP Basic Run server = MQTTClient.MockMQTTBroker(ip"127.0.0.1", 1889) client, conn = MakeConnection(ip"127.0.0.1", 1889) @@ -112,7 +114,7 @@ using MQTTClient connect(client, conn) subscribe(client, "foo/bar", cb) - publish(client, "bar/foo", qos=QOS_2) + publish(client, "bar/foo", "baz"; qos=QOS_2) unsubscribe(client, "foo/bar") disconnect(client) @@ -125,7 +127,7 @@ using MQTTClient connect(client, conn) subscribe(client, "foo/bar", cb) - publish(client, "bar/foo") + publish(client, "bar/foo", "baz") unsubscribe(client, "foo/bar") disconnect(client) diff --git a/src/MQTTClient.jl b/src/MQTTClient.jl index ab1b382..3baf47f 100644 --- a/src/MQTTClient.jl +++ b/src/MQTTClient.jl @@ -8,7 +8,6 @@ import Base: ReentrantLock, lock, unlock, convert, PipeEndpoint, fetch, show import Base: @atomic, @atomicreplace, @atomicswap, Ref, RefValue, isready using Base.Threads - include("utils.jl") include("internals.jl") include("topic.jl") @@ -17,8 +16,7 @@ include("connection.jl") include("handlers.jl") include("interface.jl") -export - MakeConnection, +export MakeConnection, Configuration, Client, Connection, diff --git a/src/client.jl b/src/client.jl index bd7887c..abaddb5 100644 --- a/src/client.jl +++ b/src/client.jl @@ -1,28 +1,30 @@ """ Client -The MQTT client in Julia facilitates communication between a device and an MQTT broker over a network. -It manages connections, message handling, and maintains the state of communication. -The client operates through three main loops: the read loop listens for incoming messages from the broker and processes them using designated handlers; -the write loop sends packets to the broker from a queue, ensuring thread safety with a socket lock; -and the keep-alive loop periodically sends ping requests to the broker to maintain the connection and detect disconnections. +The MQTT client in Julia facilitates communication between a device and an MQTT broker over a network. +It manages connections, message handling, and maintains the state of communication. +The client operates through three main loops: the read loop listens for incoming messages from the broker and processes them using designated handlers; +the write loop sends packets to the broker from a queue, ensuring thread safety with a socket lock; +and the keep-alive loop periodically sends ping requests to the broker to maintain the connection and detect disconnections. This client uses atomic operations to ensure thread safety for shared variables and supports asynchronous task management for efficient, non-blocking operations. # Fields -- `state::UInt8`: client state. -- `on_msg::TrieNode`: A trie mapping topics to callback functions. -- `keep_alive::UInt16`: The keep-alive time in seconds. -- `last_id::UInt16`: The last packet identifier used. -- `in_flight::Dict{UInt16, Future}`: A dictionary mapping packet identifiers to futures. -- `write_packets::AbstractChannel`: A channel for writing packets. -- `socket`: The socket used for communication with the broker. -- `socket_lock`: A lock for synchronizing access to the socket. -- `ping_timeout::UInt64`: The ping timeout in seconds. -- `ping_outstanding::Atomic{UInt8}`: An atomic counter for the number of outstanding ping requests. -- `last_sent::Atomic{Float64}`: An atomic float representing the timestamp of the last sent packet. -- `last_received::Atomic{Float64}`: An atomic float representing the timestamp of the last received packet. + + - `state::UInt8`: client state. + - `on_msg::TrieNode`: A trie mapping topics to callback functions. + - `keep_alive::UInt16`: The keep-alive time in seconds. + - `last_id::UInt16`: The last packet identifier used. + - `in_flight::Dict{UInt16, Future}`: A dictionary mapping packet identifiers to futures. + - `write_packets::AbstractChannel`: A channel for writing packets. + - `socket`: The socket used for communication with the broker. + - `socket_lock`: A lock for synchronizing access to the socket. + - `ping_timeout::UInt64`: The ping timeout in seconds. + - `ping_outstanding::Atomic{UInt8}`: An atomic counter for the number of outstanding ping requests. + - `last_sent::Atomic{Float64}`: An atomic float representing the timestamp of the last sent packet. + - `last_received::Atomic{Float64}`: An atomic float representing the timestamp of the last received packet. # Constructor + `Client(ping_timeout::UInt64=UInt64(60))` constructs a new `Client` object with the specified ping timeout (default: 60 seconds). """ mutable struct Client <: AbstractConfigElement @@ -50,26 +52,27 @@ mutable struct Client <: AbstractConfigElement read_task::Task keep_alive_task::Task - Client(ping_timeout::UInt64 = UInt64(60)) = new( - 0x00, - TrieNode(), - 0x0000, - 0x0000, - Dict{UInt16,Future}(), - Channel{Packet}(typemax(Int64)), - IOBuffer(), - ReentrantLock(), - ping_timeout, - 0x00, - 0.0, - 0.0, - Task(nothing), - Task(nothing), - Task(nothing), - ) + function Client(ping_timeout::UInt64=UInt64(60)) + return new( + 0x00, + TrieNode(), + 0x0000, + 0x0000, + Dict{UInt16,Future}(), + Channel{Packet}(typemax(Int64)), + IOBuffer(), + ReentrantLock(), + ping_timeout, + 0x00, + 0.0, + 0.0, + Task(nothing), + Task(nothing), + Task(nothing), + ) + end end - function write_loop(client::Client)::UInt8 try while !isclosed(client) @@ -115,7 +118,6 @@ function write_loop(client::Client)::UInt8 end end - function read_loop(client::Client)::UInt8 try while !isclosed(client) @@ -157,18 +159,17 @@ function read_loop(client::Client)::UInt8 end end - function keep_alive_loop(client::Client)::UInt8 ping_sent = time() # TODO: improve, this causes reconnect to take ~1 second. is there a way to interupt? check_interval = 1 - - timer = Timer(0, interval = check_interval) + + timer = Timer(0; interval=check_interval) while !isclosed(client) if time() - @atomic(client.last_sent) >= client.keep_alive || - time() - @atomic(client.last_received) >= client.keep_alive + time() - @atomic(client.last_received) >= client.keep_alive if @atomic(client.ping_outstanding) == 0x0 @atomicswap client.ping_outstanding = 0x1 try @@ -191,7 +192,7 @@ function keep_alive_loop(client::Client)::UInt8 end if @atomic(client.ping_outstanding) == 1 && - time() - ping_sent >= client.ping_timeout + time() - ping_sent >= client.ping_timeout try # No pingresp received disconnect(client) break @@ -222,7 +223,7 @@ end # write packet to mqtt broker function write_packet(client::Client, cmd::UInt8, data...) - put!(client.write_packets, Packet(cmd, data)) + return put!(client.write_packets, Packet(cmd, data)) end isready(client::Client)::Bool = client.state == 0x00 @@ -230,10 +231,12 @@ isconnected(client::Client)::Bool = client.state == 0x01 isclosed(client::Client)::Bool = client.state >= 0x02 iserror(client::Client)::Bool = client.state == 0x03 -show(io::IO, client::Client) = print( - io, - "MQTTClient[state: $(get(CLIENT_STATE, client.state, :unknown)), read_loop: $(taskstatus(client.read_task)), write_loop: $(taskstatus(client.write_task)), keep_alive: $(taskstatus(client.keep_alive_task))]\n$(show_tree(client.on_msg))", -) +function show(io::IO, client::Client) + return print( + io, + "MQTTClient[state: $(get(CLIENT_STATE, client.state, :unknown)), read_loop: $(taskstatus(client.read_task)), write_loop: $(taskstatus(client.write_task)), keep_alive: $(taskstatus(client.keep_alive_task))]\n$(show_tree(client.on_msg))", + ) +end fetch(client::Client)::Tuple{UInt8,UInt8,UInt8} = begin try diff --git a/src/connection.jl b/src/connection.jl index 1b40917..5632e3a 100644 --- a/src/connection.jl +++ b/src/connection.jl @@ -1,57 +1,60 @@ struct TCP <: AbstractIOConnection ip::IPAddr port::Int64 - TCP(ip::IPAddr = Sockets.localhost, port::Int64 = 1883) = new(ip, port) + TCP(ip::IPAddr=Sockets.localhost, port::Int64=1883) = new(ip, port) end struct UDS <: AbstractIOConnection path::AbstractString - UDS(path::AbstractString = pwd()) = new(path) + UDS(path::AbstractString=pwd()) = new(path) end -struct MockIOConnection <: AbstractIOConnection end - """ IOConnection(ip::IPAddr, port::Int64) Constructs a new `TCP` object with the specified IP address and port number. # Arguments -- `ip::IPAddr`: The IP address of the remote host. -- `port::Int64`: The port number on the remote host. + + - `ip::IPAddr`: The IP address of the remote host. + - `port::Int64`: The port number on the remote host. # Returns -- `TCP`: A new `TCP` object. ---- + - `TCP`: A new `TCP` object. + +* * * IOConnection(ip::String, port::Int64) Constructs a new `TCP` object with the specified IP address and port number. # Arguments -- `ip::String`: The IP address of the remote host as a string. -- `port::Int64`: The port number on the remote host. + + - `ip::String`: The IP address of the remote host as a string. + - `port::Int64`: The port number on the remote host. # Returns -- `TCP`: A new `TCP` object. ---- + - `TCP`: A new `TCP` object. + +* * * IOConnection(path::AbstractString) Constructs a new `UDS` object with the specified file system path. # Arguments -- `path::AbstractString`: The file system path of the socket. + + - `path::AbstractString`: The file system path of the socket. # Returns -- `UDS`: A new `UDS` object. + + - `UDS`: A new `UDS` object. """ IOConnection(ip::IPAddr, port::Int64) = TCP(ip, port) IOConnection(ip::String, port::Int64) = TCP(getaddrinfo(ip), port) IOConnection(path::AbstractString) = UDS(path) -IOConnection() = MockIOConnection() """ connect(protocol::UDS)::PipeEndpoint @@ -66,48 +69,33 @@ Establishes a TCP connection to the given IP address and port specified in the ` """ connect(protocol::TCP) = connect(protocol.ip, protocol.port) -""" - connect(protocol::MockIOConnection)::IOBuffer - -Mocks a connection to an MQTT Broker with a local IOBuffer. Should only be used for testing. -""" -connect(protocol::MockIOConnection) = IOBuffer() - - """ Connection{T <: AbstractIOConnection} -The `Connection` struct in Julia encapsulates the configuration and connection details required for an MQTT client to connect to an MQTT broker. -This struct supports two types of connection protocols: TCP and Unix Domain Sockets (UDS), both of which are subtypes of `AbstractIOConnection`. -The struct includes fields for protocol type, keep-alive interval, client ID, user credentials, a will message (a message that is sent by the broker if the client disconnects unexpectedly), -and a flag indicating whether the session is clean (i.e., no persistent session state). -The `Connection` constructor allows for flexible instantiation with default or specified values for each field, +The `Connection` struct in Julia encapsulates the configuration and connection details required for an MQTT client to connect to an MQTT broker. +This struct supports two types of connection protocols: TCP and Unix Domain Sockets (UDS), both of which are subtypes of `AbstractIOConnection`. +The struct includes fields for protocol type, keep-alive interval, client ID, user credentials, a will message (a message that is sent by the broker if the client disconnects unexpectedly), +and a flag indicating whether the session is clean (i.e., no persistent session state). +The `Connection` constructor allows for flexible instantiation with default or specified values for each field, enabling easy setup of connection parameters tailored to the specific requirements of the MQTT client and broker interaction. ## Fields -- `protocol::T`: The underlying IO connection. -- `keep_alive::Int64`: The keep-alive time in seconds. -- `client_id::String`: The client identifier. -- `user::User`: The user credentials. -- `will::Message`: The last will and testament message. -- `clean_session::Bool`: Whether to start a clean session. + + - `protocol::T`: The underlying IO connection. + - `keep_alive::Int64`: The keep-alive time in seconds. + - `client_id::String`: The client identifier. + - `user::User`: The user credentials. + - `will::Message`: The last will and testament message. + - `clean_session::Bool`: Whether to start a clean session. ## Constructors -`Connection(protocol::T; - keep_alive::Int64=32, - client_id::String=randstring(8), - user::User=User("", ""), - will::Message=Message(false, 0x00, false, "", UInt8[]), - clean_session::Bool=true) where T <: AbstractIOConnection` constructs a new `Connection` object with the specified protocol and optional keyword arguments. -`Connection(protocol::T, - keep_alive::Int64, - client_id::String, - user::User, - will::Message, - clean_session::Bool) where T <: AbstractIOConnection` constructs a new `Connection` object with the specified arguments. +`Connection(protocol::T; keep_alive::Int64=32, client_id::String=randstring(8), user::User=User("", ""), will::Message=Message(false, 0x00, false, "", UInt8[]), clean_session::Bool=true) where T <: AbstractIOConnection` constructs a new `Connection` object with the specified protocol and optional keyword arguments. + +`Connection(protocol::T, keep_alive::Int64, client_id::String, user::User, will::Message, clean_session::Bool) where T <: AbstractIOConnection` constructs a new `Connection` object with the specified arguments. ### Example using TCP protocol with default and custom values + ```julia tcp_connection = Connection( TCP(Sockets.localhost, 1883); # Using TCP with localhost and port 1883 @@ -115,11 +103,12 @@ tcp_connection = Connection( client_id="my_mqtt_client", # Custom client ID user=User("username", "password"), # Custom user credentials will=Message(false, 0x01, false, "last/will/topic", UInt8[]), # Custom will message - clean_session=true # Default clean session flag + clean_session=true, # Default clean session flag ) ``` ### Example using UDS protocol with all custom values + ```julia uds_connection_full = Connection( UDS("/var/run/mqtt.sock"), # Using UDS with specified socket path @@ -127,11 +116,11 @@ uds_connection_full = Connection( "another_client", # Custom client ID User("user", "pass"), # Custom user credentials Message(true, 0x00, true, "will/topic", UInt8[1, 2, 3]), # Custom will message - false # Custom clean session flag + false, # Custom clean session flag ) ``` """ -struct Connection{T <: AbstractIOConnection} <: AbstractConfigElement +struct Connection{T<:AbstractIOConnection} <: AbstractConfigElement protocol::T keep_alive::Int64 client_id::String @@ -139,22 +128,37 @@ struct Connection{T <: AbstractIOConnection} <: AbstractConfigElement will::Message clean_session::Bool - Connection(protocol::T; - keep_alive::Int64=32, - client_id::String=randstring(8), - user::User=User("", ""), - will::Message=Message(false, 0x00, false, "", UInt8[]), - clean_session::Bool=true) where T <: AbstractIOConnection = new{T}(protocol, keep_alive, client_id, user, will, clean_session) - - Connection(protocol::T, - keep_alive::Int64, - client_id::String, - user::User, - will::Message, - clean_session::Bool) where T <: AbstractIOConnection = new{T}(protocol, keep_alive, client_id, user, will, clean_session) + function Connection( + protocol::T; + keep_alive::Int64=32, + client_id::String=randstring(8), + user::User=User("", ""), + will::Message=Message(false, 0x00, false, "", UInt8[]), + clean_session::Bool=true, + ) where {T<:AbstractIOConnection} + return new{T}(protocol, keep_alive, client_id, user, will, clean_session) + end + + function Connection( + protocol::T, + keep_alive::Int64, + client_id::String, + user::User, + will::Message, + clean_session::Bool, + ) where {T<:AbstractIOConnection} + return new{T}(protocol, keep_alive, client_id, user, will, clean_session) + end end -Base.show(io::IO, connection::Connection) = print(io, "Connection(Protocol: $(connection.protocol), Client ID: $(connection.client_id)", (connection.user == User("","") ? "" : ", User Name: $(connection.user.name)"), ")") +function Base.show(io::IO, connection::Connection) + return print( + io, + "Connection(Protocol: $(connection.protocol), Client ID: $(connection.client_id)", + (connection.user == User("", "") ? "" : ", User Name: $(connection.user.name)"), + ")", + ) +end """ Configuration @@ -168,7 +172,7 @@ Container for the mqtt client and mqtt connection data. This is partially iterab config = MakeConnection("/temp/mqtt.sock") # using a defined IO -io = IOConnection("localhost",1883) +io = IOConnection("localhost", 1883) config = Configuration(io) # spreading the variables @@ -179,21 +183,28 @@ struct Configuration client::Client connection::Connection - function Configuration(io::T, - ping_timeout=UInt64(60), - keep_alive::Int64=32, - client_id::String=randstring(8), - user::User=User("", ""), - will::Message=Message(false, 0x00, false, "", UInt8[]), - clean_session::Bool=true) where {T <: AbstractIOConnection} - new(Client(ping_timeout), Connection(io, keep_alive, client_id, user, will, clean_session)) + function Configuration( + io::T, + ping_timeout=UInt64(60), + keep_alive::Int64=32, + client_id::String=randstring(8), + user::User=User("", ""), + will::Message=Message(false, 0x00, false, "", UInt8[]), + clean_session::Bool=true, + ) where {T<:AbstractIOConnection} + return new( + Client(ping_timeout), + Connection(io, keep_alive, client_id, user, will, clean_session), + ) end function Configuration(client::Client, connection::Connection) - new(client, connection) + return new(client, connection) end end -Base.iterate(conf::Configuration, state=1) = state == 1 ? (conf.client, 2) : (conf.connection, nothing) +function Base.iterate(conf::Configuration, state=1) + return state == 1 ? (conf.client, 2) : (conf.connection, nothing) +end Base.length(::Configuration) = 2 Base.IteratorSize(::Type{Configuration}) = Base.HasLength() -Base.IteratorEltype(::Type{Configuration}) = AbstractConfigElement \ No newline at end of file +Base.IteratorEltype(::Type{Configuration}) = AbstractConfigElement diff --git a/src/handlers.jl b/src/handlers.jl index 5f38f6e..ba3aec5 100644 --- a/src/handlers.jl +++ b/src/handlers.jl @@ -15,7 +15,6 @@ function handle_connack(client::Client, s::IO, cmd::UInt8, flags::UInt8) end end - function handle_publish(client::Client, s::IO, cmd::UInt8, flags::UInt8) dup = (flags & 0x08) >> 3 qos = (flags & 0x06) >> 1 @@ -59,12 +58,12 @@ end function handle_pubrec(client::Client, s::IO, cmd::UInt8, flags::UInt8) id = mqtt_read(s, UInt16) - write_packet(client, PUBREL | 0x02, id) + return write_packet(client, PUBREL | 0x02, id) end function handle_pubrel(client::Client, s::IO, cmd::UInt8, flags::UInt8) id = mqtt_read(s, UInt16) - write_packet(client, PUBCOMP, id) + return write_packet(client, PUBCOMP, id) end function handle_suback(client::Client, s::IO, cmd::UInt8, flags::UInt8) @@ -84,18 +83,22 @@ function handle_pingresp(client::Client, s::IO, cmd::UInt8, flags::UInt8) else # We received a ping resp packet we didn't ask for @atomicswap client.state = 0x03 - throw(ArgumentError("No outstanding ping. client.ping_outstanding = $(client.ping_outstanding) and should be 0x1")) + throw( + ArgumentError( + "No outstanding ping. client.ping_outstanding = $(client.ping_outstanding) and should be 0x1", + ), + ) end end -const HANDLERS = Dict{UInt8, Function}( - CONNACK => handle_connack, - PUBLISH => handle_publish, - PUBACK => handle_ack, - PUBREC => handle_pubrec, - PUBREL => handle_pubrel, - PUBCOMP => handle_ack, - SUBACK => handle_suback, - UNSUBACK => handle_ack, - PINGRESP => handle_pingresp - ) +const HANDLERS = Dict{UInt8,Function}( + CONNACK => handle_connack, + PUBLISH => handle_publish, + PUBACK => handle_ack, + PUBREC => handle_pubrec, + PUBREL => handle_pubrel, + PUBCOMP => handle_ack, + SUBACK => handle_suback, + UNSUBACK => handle_ack, + PINGRESP => handle_pingresp, +) diff --git a/src/interface.jl b/src/interface.jl index c79970e..c0c94ac 100644 --- a/src/interface.jl +++ b/src/interface.jl @@ -15,101 +15,120 @@ will::Message=Message(false, 0x00, false, "", UInt8[]), clean_session::Bool=true)::Tuple -Creates an MQTT client connection to an MQTT broker, handling the construction -of both the `Client` and `Connection` objects inside the `Configuration` struct. This function provides -flexible ways to specify the connection details either through a TCP connection +Creates an MQTT client connection to an MQTT broker, handling the construction +of both the `Client` and `Connection` objects inside the `Configuration` struct. This function provides +flexible ways to specify the connection details either through a TCP connection with host and port, a Unix Domain Socket path. # Arguments -- `host::Union{IPAddr, String}`: The IP address or hostname of the MQTT broker. -- `port::Int64`: The port number to connect to. -- `path::String`: The file system path for Unix Domain Socket connection. -- `io::T`: An object of subtype `AbstractIOConnection`. -- `ping_timeout::UInt64`: The ping timeout in seconds (default: 60). -- `keep_alive::Int64`: The keep-alive time in seconds (default: 32). -- `client_id::String`: The client identifier (default: a random string of length 8). -- `user::User`: The user credentials for the MQTT broker (default: anonymous user). -- `will::Message`: The last will message to be sent in case of unexpected disconnection (default: an empty will message). -- `clean_session::Bool`: Indicates whether to start a clean session (default: true). + + - `host::Union{IPAddr, String}`: The IP address or hostname of the MQTT broker. + - `port::Int64`: The port number to connect to. + - `path::String`: The file system path for Unix Domain Socket connection. + - `io::T`: An object of subtype `AbstractIOConnection`. + - `ping_timeout::UInt64`: The ping timeout in seconds (default: 60). + - `keep_alive::Int64`: The keep-alive time in seconds (default: 32). + - `client_id::String`: The client identifier (default: a random string of length 8). + - `user::User`: The user credentials for the MQTT broker (default: anonymous user). + - `will::Message`: The last will message to be sent in case of unexpected disconnection (default: an empty will message). + - `clean_session::Bool`: Indicates whether to start a clean session (default: true). # Returns -- A `Configuration` struct where `client::Client` is the MQTT client instance - and `connection::Connection` is the connection information used to connect to the broker. -This function simplifies the process of setting up an MQTT client connection. -Depending on the type of connection, you can specify the broker's IP address -and port or a Unix Domain Socket path, it infers the Protocol and then constructs the necessary -provided or default parameters. Refer to the documentation for [`Client`](@ref) and + - A `Configuration` struct where `client::Client` is the MQTT client instance + and `connection::Connection` is the connection information used to connect to the broker. + +This function simplifies the process of setting up an MQTT client connection. +Depending on the type of connection, you can specify the broker's IP address +and port or a Unix Domain Socket path, it infers the Protocol and then constructs the necessary +provided or default parameters. Refer to the documentation for [`Client`](@ref) and [`Connection`](@ref) object. ## Examples ```julia # Example with IP address and port -client, connection = MakeConnection("127.0.0.1", 1883, client_id="mqtt_client_1") +client, connection = MakeConnection("127.0.0.1", 1883; client_id="mqtt_client_1") # Example with Unix Domain Socket path -client, connection = MakeConnection("/var/run/mqtt.sock", user=User("user", "pass")) +client, connection = MakeConnection("/var/run/mqtt.sock"; user=User("user", "pass")) ``` """ -function MakeConnection(host::Union{IPAddr, String}, port::Int64; - ping_timeout=UInt64(60), - keep_alive::Int64=32, - client_id::String=randstring(8), - user::User=User("", ""), - will::Message=Message(false, 0x00, false, "", UInt8[]), - clean_session::Bool=true)::Configuration - Configuration(IOConnection(host,port),ping_timeout,keep_alive,client_id,user,will,clean_session) +function MakeConnection( + host::Union{IPAddr,String}, + port::Int64; + ping_timeout=UInt64(60), + keep_alive::Int64=32, + client_id::String=randstring(8), + user::User=User("", ""), + will::Message=Message(false, 0x00, false, "", UInt8[]), + clean_session::Bool=true, +)::Configuration + return Configuration( + IOConnection(host, port), + ping_timeout, + keep_alive, + client_id, + user, + will, + clean_session, + ) end -function MakeConnection(path::String; - ping_timeout=UInt64(60), - keep_alive::Int64=32, - client_id::String=randstring(8), - user::User=User("", ""), - will::Message=Message(false, 0x00, false, "", UInt8[]), - clean_session::Bool=true)::Configuration - Configuration(IOConnection(path),ping_timeout,keep_alive,client_id,user,will,clean_session) +function MakeConnection( + path::String; + ping_timeout=UInt64(60), + keep_alive::Int64=32, + client_id::String=randstring(8), + user::User=User("", ""), + will::Message=Message(false, 0x00, false, "", UInt8[]), + clean_session::Bool=true, +)::Configuration + return Configuration( + IOConnection(path), ping_timeout, keep_alive, client_id, user, will, clean_session + ) end - """ connect_async(client::Client, connection::Connection) Establishes an asynchronous connection to the MQTT broker using the provided `Client` and `Connection` objects. This function initializes the client, establishes the connection, and starts the necessary loops for communication. # Arguments -- `client::Client`: The MQTT client instance. -- `connection::Connection`: The connection information used to connect to the broker. + + - `client::Client`: The MQTT client instance. + - `connection::Connection`: The connection information used to connect to the broker. # Returns -- A `Future` object that can be used to await the completion of the connection process. + + - A `Future` object that can be used to await the completion of the connection process. ## Example ```julia -client, connection = MakeConnection("127.0.0.1", 1883, client_id="mqtt_client_1") +client, connection = MakeConnection("127.0.0.1", 1883; client_id="mqtt_client_1") future = connect_async(client, connection) wait(future) ``` # See Also -- [`connect`](@ref): The synchronous version of this function. + + - [`connect`](@ref): The synchronous version of this function. """ function connect_async(client::Client, connection::Connection) if !isready(client) - @atomicswap client.state = 0x00 - client.on_msg = TrieNode() - @atomicswap client.last_id = 0x0000 - client.in_flight = Dict{UInt16, Future}() - client.write_packets = Channel{Packet}(typemax(Int64)) - client.socket = IOBuffer() - client.socket_lock = ReentrantLock() - @atomicswap client.ping_outstanding = 0 - @atomicswap client.last_sent = 0.0 - @atomicswap client.last_received = 0.0 - client.write_task = Task(nothing) - client.read_task = Task(nothing) - client.keep_alive_task = Task(nothing) + @atomicswap client.state = 0x00 + client.on_msg = TrieNode() + @atomicswap client.last_id = 0x0000 + client.in_flight = Dict{UInt16,Future}() + client.write_packets = Channel{Packet}(typemax(Int64)) + client.socket = IOBuffer() + client.socket_lock = ReentrantLock() + @atomicswap client.ping_outstanding = 0 + @atomicswap client.last_sent = 0.0 + @atomicswap client.last_received = 0.0 + client.write_task = Task(nothing) + client.read_task = Task(nothing) + client.keep_alive_task = Task(nothing) end try @@ -145,37 +164,48 @@ function connect_async(client::Client, connection::Connection) end if length(connection.will.topic) > 0 - optional_will = (connection.will.topic, convert(UInt16, length(connection.will.payload)), connection.will.payload) - connect_flags |= 0x04 | ((connection.will.qos & 0x03) << 3) | ((connection.will.retain & 0x01) << 5) + optional_will = ( + connection.will.topic, + convert(UInt16, length(connection.will.payload)), + connection.will.payload, + ) + connect_flags |= + 0x04 | + ((connection.will.qos & 0x03) << 3) | + ((connection.will.retain & 0x01) << 5) end future = Future() client.in_flight[0x0000] = future - write_packet(client, CONNECT, - protocol_name, - protocol_level, - connect_flags, - client.keep_alive, - connection.client_id, - optional_user..., - optional_will...) + write_packet( + client, + CONNECT, + protocol_name, + protocol_level, + connect_flags, + client.keep_alive, + connection.client_id, + optional_user..., + optional_will..., + ) return future end - """ connect(client::Client, connection::Connection) Establishes a synchronous connection to the MQTT broker using the provided [`Client`](@ref) and [`Connection`](@ref) objects. This function wraps [`connect_async`](@ref) and waits for the connection process to complete. # Arguments -- `client::Client`: The MQTT client instance. -- `connection::Connection`: The connection information used to connect to the broker. + + - `client::Client`: The MQTT client instance. + - `connection::Connection`: The connection information used to connect to the broker. # Returns -- The result of the connection process after it completes. + + - The result of the connection process after it completes. The connect function is responsible for establishing a connection between an MQTT client and an MQTT broker. It initializes the client's state, sets up the necessary communication channels, and handles the connection handshake according to the MQTT protocol. When called, connect first ensures that the client's state and resources are properly initialized. This includes resetting the client's state, setting up the socket connection, and creating the channels and locks required for communication. The function then starts the asynchronous tasks needed to manage the read, write, and keep-alive loops, which are crucial for maintaining the connection and ensuring that messages are sent and received properly. @@ -184,18 +214,18 @@ Additionally, the connect function handles the specifics of the MQTT protocol ha ## Example ```julia -client, connection = MakeConnection("127.0.0.1", 1883, client_id="mqtt_client_1") +client, connection = MakeConnection("127.0.0.1", 1883; client_id="mqtt_client_1") result = connect(client, connection) ``` # See Also -- [`connect_async`](@ref): The asynchronous version of this function. -- [`Client`](@ref) -- [`Connection`](@ref) + + - [`connect_async`](@ref): The asynchronous version of this function. + - [`Client`](@ref) + - [`Connection`](@ref) """ connect(client::Client, connection::Connection) = fetch(connect_async(client, connection)) - """ disconnect(client::Client) @@ -225,7 +255,7 @@ function disconnect(client::Client) res = fetch(client) @debug "MQTT client disconnected with async task states: $res" - res + return res end """ @@ -234,17 +264,20 @@ end Subscribe to a topic asynchronously. # Arguments -- `client::Client`: The MQTT client. -- `topic::String`: The topic to subscribe to. -- `on_msg::Function`: The function to call when a message is received on the topic. -- `qos::UInt8`: The quality of service level to use for the subscription. Default is 0. + + - `client::Client`: The MQTT client. + - `topic::String`: The topic to subscribe to. + - `on_msg::Function`: The function to call when a message is received on the topic. + - `qos::UInt8`: The quality of service level to use for the subscription. Default is 0. # Returns -- `Future`: A future that can be used to wait for the subscription to complete. + + - `Future`: A future that can be used to wait for the subscription to complete. # Examples + ```julia -future = subscribe_async(client, "my/topic", on_msg, qos=QOS_2) +future = subscribe_async(client, "my/topic", on_msg; qos=QOS_2) ``` """ function subscribe_async(client, topic, on_msg; qos=QOS_0) @@ -263,25 +296,27 @@ end Subscribe to a topic. # Arguments -- `client::Client`: The MQTT client. -- `topic::String`: The topic to subscribe to. -- `on_msg::Function`: The function to call when a message is received on the topic. -- `qos::UInt8`: The quality of service level to use for the subscription. Default is 0. + + - `client::Client`: The MQTT client. + - `topic::String`: The topic to subscribe to. + - `on_msg::Function`: The function to call when a message is received on the topic. + - `qos::UInt8`: The quality of service level to use for the subscription. Default is 0. # Examples + ```julia subscribe(client, "my/topic", on_msg) ``` """ -subscribe(client, topic, on_msg; qos=QOS_0) = resolve(subscribe_async(client, topic, on_msg, qos=qos)) - +subscribe(client, topic, on_msg; qos=QOS_0) = + resolve(subscribe_async(client, topic, on_msg; qos=qos)) """ unsubscribe_async(client::Client, topics::String...) Unsubscribes the `Client` instance from the supplied topic names. Deletes the callback from the client -Returns a `Future` object that contains `nothing` on success and an exception on failure. +Returns a `Future` object that contains `nothing` on success and an exception on failure. """ function unsubscribe_async(client::Client, topic::String) future = Future() @@ -306,12 +341,13 @@ end Unsubscribes the `Client` instance from the supplied topic names. Waits until the unsubscribe is fully acknowledged. Returns `nothing` on success and an exception on failure. """ -unsubscribe(client::Client, topics::String...) = resolve(unsubscribe_async(client, topics...)) +unsubscribe(client::Client, topics::String...) = + resolve(unsubscribe_async(client, topics...)) """ - publish_async(client::Client, message::Message) +publish_async(client::Client, message::Message) -Publishes the message. Returns a `Future` object that contains `nothing` on success and an exception on failure. +Publishes the message. Returns a `Future` object that contains `nothing` on success and an exception on failure. """ function publish_async(client::Client, message::Message) future = Future() @@ -338,22 +374,30 @@ end qos::QOS=QOS_0, retain::Bool=false) -Pulishes a message with the specified parameters. Returns a `Future` object that contains `nothing` on success and an exception on failure. +Pulishes a message with the specified parameters. Returns a `Future` object that contains `nothing` on success and an exception on failure. +""" +publish_async( + client::Client, + topic::String, + payload...; + dup::Bool=false, + qos::QOS=QOS_0, + retain::Bool=false, +) = publish_async(client, Message(dup, UInt8(qos), retain, topic, payload...)) + """ -publish_async(client::Client, topic::String, payload...; - dup::Bool=false, - qos::QOS=QOS_0, - retain::Bool=false) = publish_async(client, Message(dup, UInt8(qos), retain, topic, payload...)) +publish(client::Client, topic::String, payload...; +dup::Bool=false, +qos::QOS=QOS_0, +retain::Bool=false) +Waits until the publish is completely acknowledged. Publishes a message with the specified parameters. Returns `nothign` on success and throws an exception on failure. """ - publish(client::Client, topic::String, payload...; - dup::Bool=false, - qos::QOS=QOS_0, - retain::Bool=false) - - Waits until the publish is completely acknowledged. Publishes a message with the specified parameters. Returns `nothign` on success and throws an exception on failure. - """ - publish(client::Client, topic::String, payload...; - dup::Bool=false, - qos::QOS=QOS_0, - retain::Bool=false) = resolve(publish_async(client, topic, payload..., dup=dup, qos=qos, retain=retain)) +publish( + client::Client, + topic::String, + payload...; + dup::Bool=false, + qos::QOS=QOS_0, + retain::Bool=false, +) = resolve(publish_async(client, topic, payload...; dup=dup, qos=qos, retain=retain)) diff --git a/src/internals.jl b/src/internals.jl index 31a64f0..1553674 100644 --- a/src/internals.jl +++ b/src/internals.jl @@ -25,44 +25,35 @@ const CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD = 0x04 const CONNECTION_REFUSED_NOT_AUTHORIZED = 0x05 # error consts -const CONNACK_ERRORS = Dict{UInt8, String}( - 0x01 => "connection refused unacceptable protocol version", - 0x02 => "connection refused identifier rejected", - 0x03 => "connection refused server unavailable", - 0x04 => "connection refused bad user name or password", - 0x05 => "connection refused not authorized", - ) - -const CLIENT_STATE = Dict{UInt8, Symbol}( - 0x00 => :ready, - 0x01 => :connected, - 0x02 => :closed, - ) +const CONNACK_ERRORS = Dict{UInt8,String}( + 0x01 => "connection refused unacceptable protocol version", + 0x02 => "connection refused identifier rejected", + 0x03 => "connection refused server unavailable", + 0x04 => "connection refused bad user name or password", + 0x05 => "connection refused not authorized", +) + +const CLIENT_STATE = Dict{UInt8,Symbol}(0x00 => :ready, 0x01 => :connected, 0x02 => :closed) ## Types abstract type AbstractIOConnection end abstract type AbstractConfigElement end -AbstractProtocol = Union{PipeEndpoint, TCPSocket} +AbstractProtocol = Union{PipeEndpoint,TCPSocket} ## Enums ## ----- # QOS values -@enum(QOS::UInt8, - QOS_0 = 0x00, - QOS_1 = 0x01, - QOS_2 = 0x02) +@enum(QOS::UInt8, QOS_0 = 0x00, QOS_1 = 0x01, QOS_2 = 0x02) ## Structs ## ------- - struct MQTTException <: Exception msg::AbstractString end - struct Packet cmd::UInt8 data::Any @@ -70,7 +61,6 @@ end Base.:(==)(p1::Packet, p2::Packet) = p1.cmd == p2.cmd && p1.data == p2.data - """ Message @@ -78,18 +68,17 @@ A composite type representing a message. # Fields -- `dup::Bool`: a boolean indicating whether the message is a duplicate. -- `qos::UInt8`: an 8-bit unsigned integer representing the quality of service. -- `retain::Bool`: a boolean indicating whether the message should be retained. -- `topic::String`: a string representing the topic. -- `payload::Array{UInt8}`: an array of 8-bit unsigned integers representing the payload. + - `dup::Bool`: a boolean indicating whether the message is a duplicate. + - `qos::UInt8`: an 8-bit unsigned integer representing the quality of service. + - `retain::Bool`: a boolean indicating whether the message should be retained. + - `topic::String`: a string representing the topic. + - `payload::Array{UInt8}`: an array of 8-bit unsigned integers representing the payload. # Constructors -- `Message(qos::QOS, topic::String, payload...)`: constructs a new message with default values for `dup` and `retain`. -- `Message(dup::Bool, qos::QOS, retain::Bool, topic::String, payload...)`: constructs a new message with all fields specified. -- `Message(dup::Bool, qos::UInt8, retain::Bool, topic::String, payload...)`: constructs a new message with all fields specified. - + - `Message(qos::QOS, topic::String, payload...)`: constructs a new message with default values for `dup` and `retain`. + - `Message(dup::Bool, qos::QOS, retain::Bool, topic::String, payload...)`: constructs a new message with all fields specified. + - `Message(dup::Bool, qos::UInt8, retain::Bool, topic::String, payload...)`: constructs a new message with all fields specified. """ struct Message dup::Bool @@ -117,7 +106,13 @@ struct Message end end -Base.:(==)(m1::Message, m2::Message) = m1.dup == m2.dup && m1.qos == m2.qos && m1.retain == m2.retain && m1.topic == m2.topic && m1.payload == m2.payload +function Base.:(==)(m1::Message, m2::Message) + return m1.dup == m2.dup && + m1.qos == m2.qos && + m1.retain == m2.retain && + m1.topic == m2.topic && + m1.payload == m2.payload +end """ User(name::String, password::String) @@ -125,6 +120,7 @@ Base.:(==)(m1::Message, m2::Message) = m1.dup == m2.dup && m1.qos == m2.qos && m A struct that represents a user with a name and password. # Examples + ```julia user = User("John", "password") @@ -136,4 +132,4 @@ struct User password::String end -Base.:(==)(u1::User, u2::User) = u1.name == u2.name && u1.password == u2.password \ No newline at end of file +Base.:(==)(u1::User, u2::User) = u1.name == u2.name && u1.password == u2.password diff --git a/src/topic.jl b/src/topic.jl index cc7e5b6..a0bff9f 100644 --- a/src/topic.jl +++ b/src/topic.jl @@ -2,7 +2,7 @@ abstract type AbstractNodeCallback end function DefaultCB(topic, payload) @debug "Recieved data on $topic without callback specified" payload - nothing + return nothing end struct EmptyCallback <: AbstractNodeCallback end @@ -17,10 +17,12 @@ struct TrieNode{F<:AbstractNodeCallback} TrieNode() = new{EmptyCallback}(Dict{String,TrieNode}(), EmptyCallback()) TrieNode(child::Dict{String,TrieNode}) = new{EmptyCallback}(child, EmptyCallback()) - TrieNode(cb::Function) = - new{FunctionCallback}(Dict{String,TrieNode}(), FunctionCallback(cb)) - TrieNode(child::Dict{String,TrieNode}, cb::Function) = - new{FunctionCallback}(child, FunctionCallback(cb)) + function TrieNode(cb::Function) + return new{FunctionCallback}(Dict{String,TrieNode}(), FunctionCallback(cb)) + end + function TrieNode(child::Dict{String,TrieNode}, cb::Function) + return new{FunctionCallback}(child, FunctionCallback(cb)) + end TrieNode(::Nothing, cb::Function) = TrieNode(cb) TrieNode(childnode::TrieNode, cb::Function) = TrieNode(childnode.children, cb) @@ -29,8 +31,9 @@ struct TrieNode{F<:AbstractNodeCallback} TrieNode(cb::FunctionCallback) = TrieNode(cb.eval) end -(node::TrieNode{FunctionCallback})(topic, payload) = - Base.invokelatest(node.cb.eval, topic, payload) +function (node::TrieNode{FunctionCallback})(topic, payload) + return Base.invokelatest(node.cb.eval, topic, payload) +end (node::TrieNode{EmptyCallback})(topic, payload) = DefaultCB(topic, payload) function Base.get(root::TrieNode, topic::String, default::Function) @@ -44,23 +47,19 @@ trie_lookup(node::Nothing, part::Any, topic_parts::Any) = nothing function trie_lookup(node::TrieNode, part::AbstractString, topic_parts::Vector) topic = trie_lookup( - get(node.children, part, nothing), - get(topic_parts, 1, nothing), - topic_parts[2:end], + get(node.children, part, nothing), get(topic_parts, 1, nothing), topic_parts[2:end] ) wildcard = trie_lookup( - get(node.children, "+", nothing), - get(topic_parts, 1, nothing), - topic_parts[2:end], + get(node.children, "+", nothing), get(topic_parts, 1, nothing), topic_parts[2:end] ) multi = get(node.children, "#", nothing) - something(topic, wildcard, multi, Some(nothing)) + return something(topic, wildcard, multi, Some(nothing)) end function Base.insert!(root::TrieNode, key::String, fn::Function) depth = count(==('/'), key) + 1 - key_parts = eachsplit(key, '/', keepempty = true) + key_parts = eachsplit(key, '/'; keepempty=true) node = root for (idx, part) in enumerate(key_parts) if idx === depth @@ -80,7 +79,7 @@ end function remove!(root::TrieNode, topic::String) parts = split(topic, '/') remove_node!(root, get(parts, 1, nothing), parts[2:end]) - nothing + return nothing end function remove_node!(node::TrieNode, part, parts::Vector) @@ -94,13 +93,15 @@ function remove_node!(node::TrieNode, part, parts::Vector) elseif part == "+" return all([ ( - ( + if ( remove_node!( - get(node.children, k, nothing), - get(parts, 1, nothing), - parts[2:end], + get(node.children, k, nothing), get(parts, 1, nothing), parts[2:end] ) && isempty(node.children[k].children) - ) ? (delete!(node.children, k); true) : false + ) + (delete!(node.children, k); true) + else + false + end ) for k in keys(node.children) ]) elseif isnothing(child_node) @@ -116,9 +117,7 @@ function remove_node!(node::TrieNode, part, parts::Vector) else # branch, go to next if remove_node!( - get(node.children, part, nothing), - get(parts, 1, nothing), - parts[2:end], + get(node.children, part, nothing), get(parts, 1, nothing), parts[2:end] ) && isempty(child_node.children) delete!(node.children, part) return true @@ -128,14 +127,18 @@ function remove_node!(node::TrieNode, part, parts::Vector) end remove_node!(node::Nothing, part, parts::Vector) = false - -prune!(node::TrieNode) = +function prune!(node::TrieNode) while !isempty(node.children) && - any([prune!(node, key, childnode) for (key, childnode) in node.children]) + any([prune!(node, key, childnode) for (key, childnode) in node.children]) + end +end +function prune!(node::TrieNode, key, childnode::TrieNode{MQTTClient.FunctionCallback}) + return if !isempty(childnode.children) + any([prune!(childnode, k, n) for (k, n) in childnode.children]) + else + false end -prune!(node::TrieNode, key, childnode::TrieNode{MQTTClient.FunctionCallback}) = - !isempty(childnode.children) ? - any([prune!(childnode, k, n) for (k, n) in childnode.children]) : false +end function prune!(node::TrieNode, key, childnode::TrieNode{MQTTClient.EmptyCallback}) if isempty(childnode.children) @@ -145,30 +148,45 @@ function prune!(node::TrieNode, key, childnode::TrieNode{MQTTClient.EmptyCallbac return any([prune!(childnode, k, n) for (k, n) in childnode.children]) end -explode(node::TrieNode{EmptyCallback}; path = []) = - isempty(node.children) ? (path, nothing) : - [explode(node.children[k], path = [path..., k]) for k in keys(node.children)] -explode(node::TrieNode{FunctionCallback}; path = []) = - isempty(node.children) ? (path, node.cb) : - ( - path, - node.cb, - [explode(node.children[k], path = [path..., k]) for k in keys(node.children)], - ) +function explode(node::TrieNode{EmptyCallback}; path=[]) + return if isempty(node.children) + (path, nothing) + else + [explode(node.children[k]; path=[path..., k]) for k in keys(node.children)] + end +end +function explode(node::TrieNode{FunctionCallback}; path=[]) + return if isempty(node.children) + (path, node.cb) + else + ( + path, + node.cb, + [explode(node.children[k]; path=[path..., k]) for k in keys(node.children)], + ) + end +end show_exploded(vec::Vector) = join(show_exploded.(vec), "\n") -show_exploded((vec, cb)::Tuple{Vector{String},FunctionCallback}) = - "$(join(vec, "/")): $(cb.eval)" -show_exploded((vec, cb, child)::Tuple{Vector{String},FunctionCallback,Vector}) = - "$(join(vec, "/")): $(cb.eval)\n$(show_exploded(child))" - +function show_exploded((vec, cb)::Tuple{Vector{String},FunctionCallback}) + return "$(join(vec, "/")): $(cb.eval)" +end +function show_exploded((vec, cb, child)::Tuple{Vector{String},FunctionCallback,Vector}) + return "$(join(vec, "/")): $(cb.eval)\n$(show_exploded(child))" +end -Base.show(io::IO, ::MIME"text/plain", node::TrieNode) = - isempty(node.children) ? print(io, node) : print(io, show_exploded(explode(node))) -Base.show(node::TrieNode) = - isempty(node.children) ? print(node) : print(show_exploded(explode(node))) +function Base.show(io::IO, ::MIME"text/plain", node::TrieNode) + return if isempty(node.children) + print(io, node) + else + print(io, show_exploded(explode(node))) + end +end +function Base.show(node::TrieNode) + return isempty(node.children) ? print(node) : print(show_exploded(explode(node))) +end -function show_tree(node::TrieNode, prefix::String = "", is_last::Bool = true) +function show_tree(node::TrieNode, prefix::String="", is_last::Bool=true) tree_string = "" # Get the children keys @@ -183,12 +201,14 @@ function show_tree(node::TrieNode, prefix::String = "", is_last::Bool = true) # Print the key with the appropriate prefix tree_string *= (prefix * current_prefix * key) - tree_string *= - isa(node.children[key].cb, FunctionCallback) ? - "{$(node.children[key].cb.eval)}\n" : "\n" + tree_string *= if isa(node.children[key].cb, FunctionCallback) + "{$(node.children[key].cb.eval)}\n" + else + "\n" + end # Recursively print the child nodes tree_string *= show_tree(node.children[key], prefix * next_prefix, is_last_child) end - tree_string + return tree_string end diff --git a/src/utils.jl b/src/utils.jl index 6d1903d..edb2db0 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -4,6 +4,7 @@ A function that compares two MQTT topics and returns a boolean value based on their equality. If the `baseT` topic contains a wildcard character `#`, the macro checks if the `compareT` topic contains the string before the wildcard character. Otherwise, it checks if the two topics are equal. # Examples + ```julia julia> topic_eq("sport/#", "sport/tennis") true @@ -13,12 +14,12 @@ false ``` """ function topic_eq(baseT, compareT) - if contains(baseT, "#") - T = split(baseT, "#")[1] - return contains(compareT, T) - else - return baseT == compareT - end + if contains(baseT, "#") + T = split(baseT, "#")[1] + return contains(compareT, T) + else + return baseT == compareT + end end mqtt_read(s::IO, ::Type{UInt16}) = ntoh(read(s, UInt16)) @@ -29,16 +30,16 @@ function mqtt_read(s::IO, ::Type{String}) end function mqtt_write(stream::IO, x::Any) - write(stream, x) + return write(stream, x) end function mqtt_write(stream::IO, x::UInt16) - write(stream, hton(x)) + return write(stream, hton(x)) end function mqtt_write(stream::IO, x::String) mqtt_write(stream, convert(UInt16, length(x))) - write(stream, x) + return write(stream, x) end function write_len(s::IO, len::Int64) @@ -49,7 +50,7 @@ function write_len(s::IO, len::Int64) b = b | 0x80 end write(s, b) - if(len == 0) + if (len == 0) break end end @@ -78,33 +79,35 @@ end Fetch the result of a `Future` object and return it. If the result is an exception, throw the exception, otherwise return the result. # Arguments -- `future`: The `Future` object to fetch the result from. + + - `future`: The `Future` object to fetch the result from. # Returns -- The result of the `Future`, or throws an exception if the result is an exception. + + - The result of the `Future`, or throws an exception if the result is an exception. """ function resolve(future) r = fetch(future) return (typeof(r) <: Exception) ? throw(r) : r end - # Helper method to check if it is possible to subscribe to a topic - function filter_wildcard_len_check(sub) - #Regex: matches any valid topic, + and # are not in allowed in strings, + is only allowed as a single symbol between two /, # is only allowed at the end - if !(occursin(r"(^[^#+]+|[+])(/([^#+]+|[+]))*(/#)?$", sub)) || length(sub) > 65535 - throw(MQTTException("Invalid topic")) - end - end - - # Helper method to check if it is possible to publish a topic - function topic_wildcard_len_check(topic) - # Search for + or # in a topic. Return MQTT_ERR_INVAL if found. - # Also returns MQTT_ERR_INVAL if the topic string is too long. - # Returns MQTT_ERR_SUCCESS if everything is fine. - if !(occursin(r"^[^#+]+$", topic)) || length(topic) > 65535 - throw(MQTTException("Invalid topic")) - end - end +# Helper method to check if it is possible to subscribe to a topic +function filter_wildcard_len_check(sub) + #Regex: matches any valid topic, + and # are not in allowed in strings, + is only allowed as a single symbol between two /, # is only allowed at the end + if !(occursin(r"(^[^#+]+|[+])(/([^#+]+|[+]))*(/#)?$", sub)) || length(sub) > 65535 + throw(MQTTException("Invalid topic")) + end +end + +# Helper method to check if it is possible to publish a topic +function topic_wildcard_len_check(topic) + # Search for + or # in a topic. Return MQTT_ERR_INVAL if found. + # Also returns MQTT_ERR_INVAL if the topic string is too long. + # Returns MQTT_ERR_SUCCESS if everything is fine. + if !(occursin(r"^[^#+]+$", topic)) || length(topic) > 65535 + throw(MQTTException("Invalid topic")) + end +end Base.wait(::Nothing) = sleep(0.001) @@ -145,21 +148,20 @@ subscribe(client, "foo/bar", (args...) -> nothing) # 2-element Vector{UInt8}: # 0x01 # 0x00 -publish(client, "bar/foo", qos=QOS_2) +publish(client, "bar/foo"; qos=QOS_2) unsubscribe(client, "foo/bar") disconnect(client) # (0x00, 0x00, 0x00) close(server) ``` - """ function MockMQTTBroker(args...) server = listen(args...) function mock_response(cmd::UInt8, flags::UInt8, buffer::IO) if cmd == 0x10 # CONNECT - # println("Received CONNECT packet") + # println("Received CONNECT packet") return [Packet(0x20, 0x0000)] # CONNACK elseif cmd == 0x80 # SUBSCRIBE # println("Received SUBSCRIBE packet") @@ -183,7 +185,7 @@ function MockMQTTBroker(args...) # println("Recieved PING REQ packet") return [Packet(0xD0, 0x00)] end - [] + return [] end @async while isopen(server) @@ -222,5 +224,5 @@ function MockMQTTBroker(args...) end end end - server -end \ No newline at end of file + return server +end diff --git a/test/dev/mocksocket.jl b/test/dev/mocksocket.jl index 777a554..823c58e 100644 --- a/test/dev/mocksocket.jl +++ b/test/dev/mocksocket.jl @@ -1,4 +1,4 @@ -import Sockets +using Sockets: Sockets import Base: read, close # commands @@ -37,7 +37,7 @@ end function read(fh::TestFileHandler, length::Integer) data = Vector{UInt8}() - for i = 1:length + for i in 1:length append!(data, take!(fh.in_channel)) end return data @@ -45,71 +45,75 @@ end function close(fh::TestFileHandler) fh.closed = true - notify(fh.closenotify) + return notify(fh.closenotify) end function write(fh::TestFileHandler, data::UInt8) - put!(fh.out_channel, data) + return put!(fh.out_channel, data) end function put(fh::TestFileHandler, data::Array{UInt8}) - for i in data - put!(fh.in_channel, i) - end + for i in data + put!(fh.in_channel, i) + end end function put(fh::TestFileHandler, data::SubArray) - for i in data - put!(fh.in_channel, i) - end + for i in data + put!(fh.in_channel, i) + end end function put(fh::TestFileHandler, data::UInt16) buffer = PipeBuffer() write(buffer, data) - put(fh, reverse(read(buffer), 1)) #TODO use take! instead? + return put(fh, reverse(read(buffer), 1)) #TODO use take! instead? end function put(fh::TestFileHandler, data::UInt8) - put!(fh.in_channel, data) + return put!(fh.in_channel, data) end function put_from_file(fh::TestFileHandler, filename) - put(fh, read_all_to_arr(filename)) + return put(fh, read_all_to_arr(filename)) end function get_mid_index(data::Array{UInt8}) - cmd = data[1] & 0xF0 - if cmd == CONNACK || cmd == PINGRESP || cmd == CONNECT || cmd == DISCONNECT || cmd == PINGREQ - return -1 - elseif cmd == PUBLISH - qos = data[1] & 0x06 - if qos == QOS_1_BYTE || qos == QOS_2_BYTE - buffer = PipeBuffer() - write(buffer, data[4]) - write(buffer, data[3]) - topic_len = read_len(buffer) - return 5 + topic_len + cmd = data[1] & 0xF0 + if cmd == CONNACK || + cmd == PINGRESP || + cmd == CONNECT || + cmd == DISCONNECT || + cmd == PINGREQ + return -1 + elseif cmd == PUBLISH + qos = data[1] & 0x06 + if qos == QOS_1_BYTE || qos == QOS_2_BYTE + buffer = PipeBuffer() + write(buffer, data[4]) + write(buffer, data[3]) + topic_len = read_len(buffer) + return 5 + topic_len + else + # QOS_0 has no message id + return -1 + end else - # QOS_0 has no message id - return -1 + # all other packets have m_id as their 3rd and 4th byte + return 3 end - else - # all other packets have m_id as their 3rd and 4th byte - return 3 - end end function put_from_file(fh::TestFileHandler, filename, messageId::UInt16) - data = read_all_to_arr(filename) - mid_index = get_mid_index(data) - if mid_index > 0 - put(fh, view(data, 1:mid_index - 1)) - put(fh, messageId) - put(fh, view(data, mid_index + 2:length(data))) - else - put(fh, data) - end + data = read_all_to_arr(filename) + mid_index = get_mid_index(data) + if mid_index > 0 + put(fh, view(data, 1:(mid_index - 1))) + put(fh, messageId) + put(fh, view(data, (mid_index + 2):length(data))) + else + put(fh, data) + end end function read_all_to_arr(filename) @@ -123,7 +127,9 @@ function read_all_to_arr(filename) end function Sockets.connect(host::AbstractString, port::Integer) - th = TestFileHandler(Channel{UInt8}(256), Channel{UInt8}(256), Condition(), Condition(), false) + th = TestFileHandler( + Channel{UInt8}(256), Channel{UInt8}(256), Condition(), Condition(), false + ) put_from_file(th, "data/input/connack.dat") return th end diff --git a/test/integration.test.jl b/test/integration.test.jl index fc3d435..e788047 100644 --- a/test/integration.test.jl +++ b/test/integration.test.jl @@ -9,7 +9,7 @@ cb(args...) = nothing res = subscribe(client, "foo/bar", cb) @test res == [0x01, 0x00] - res = publish(client, "bar/foo", qos=QOS_2) + res = publish(client, "bar/foo", "baz" qos=QOS_2) @test isnothing(res) res = unsubscribe(client, "foo/bar") @test isnothing(res) @@ -24,7 +24,11 @@ cb(args...) = nothing @test res == (0x00, 0x00, 0x00) @test MQTTClient.isclosed(client) - close(server) + @test isopen(server) + + close(server) # stop the mock server + + @test !isopen(server) end @testset "UDS Client" begin @@ -37,7 +41,7 @@ end res = subscribe(client, "foo/bar", cb) @test res == [0x01, 0x00] - res = publish(client, "bar/foo") + res = publish(client, "bar/foo", "baz") @test res == 0 res = unsubscribe(client, "foo/bar") @test isnothing(res) @@ -52,5 +56,9 @@ end @test res == (0x00, 0x00, 0x00) @test MQTTClient.isclosed(client) - close(server) -end \ No newline at end of file + @test isopen(server) + + close(server) # stop the mock server + + @test !isopen(server) +end diff --git a/test/runtests.jl b/test/runtests.jl index 8dcaeea..19c1b1b 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -3,21 +3,23 @@ using Test using MQTTClient using Distributed, Random -import MQTTClient: topic_wildcard_len_check, filter_wildcard_len_check, MQTTException, Packet -import Sockets: TCPSocket, PipeServer, connect, localhost, getaddrinfo, IOError, DNSError, @ip_str +import MQTTClient: + topic_wildcard_len_check, filter_wildcard_len_check, MQTTException, Packet +import Sockets: + TCPSocket, PipeServer, connect, localhost, getaddrinfo, IOError, DNSError, @ip_str import Base.PipeEndpoint -@testset verbose=true "client tests" begin +@testset verbose = true "client tests" begin include("unittest.client.jl") end -@testset verbose=true "utils tests" begin +@testset verbose = true "utils tests" begin include("unittest.utils.jl") end -@testset verbose=true "topic trie tests" begin +@testset verbose = true "topic trie tests" begin include("unittest.topic.jl") end -@testset verbose=true "integration tests" begin - include("integration.test.jl") +@testset verbose = true "integration tests" begin + include("integration.test.jl") end # These tests need a mqtt broker running. diff --git a/test/unittest.client.jl b/test/unittest.client.jl index 446c5f8..3aff497 100644 --- a/test/unittest.client.jl +++ b/test/unittest.client.jl @@ -1,5 +1,5 @@ -function on_msg(t,p) - (t,p) +function on_msg(t, p) + return (t, p) end @testset verbose = true "MQTT Client functionality" begin @@ -24,7 +24,9 @@ end end @testset "MQTT Message" begin - msg = MQTTClient.Message(true, QOS_0, true, "test/mqtt_jl", "testing the MQTTClient.jl package") + msg = MQTTClient.Message( + true, QOS_0, true, "test/mqtt_jl", "testing the MQTTClient.jl package" + ) @test msg isa MQTTClient.Message msg = MQTTClient.Message(false, 0x01, false, "test", "payload") @@ -32,23 +34,45 @@ end @test msg.qos == 0x01 @test msg.retain == false @test msg.topic == "test" - @test msg.payload == [UInt8('p'), UInt8('a'), UInt8('y'), UInt8('l'), UInt8('o'), UInt8('a'), UInt8('d')] + @test msg.payload == [ + UInt8('p'), + UInt8('a'), + UInt8('y'), + UInt8('l'), + UInt8('o'), + UInt8('a'), + UInt8('d'), + ] msg = MQTTClient.Message(MQTTClient.QOS_2, "test", "payload") @test msg.dup == false @test msg.qos == 0x02 @test msg.retain == false @test msg.topic == "test" - @test msg.payload == [UInt8('p'), UInt8('a'), UInt8('y'), UInt8('l'), UInt8('o'), UInt8('a'), UInt8('d')] + @test msg.payload == [ + UInt8('p'), + UInt8('a'), + UInt8('y'), + UInt8('l'), + UInt8('o'), + UInt8('a'), + UInt8('d'), + ] end @testset "write_loop" begin c = MQTTClient.Client() c.socket = TCPSocket() close(c.socket) - message = MQTTClient.Message(false, UInt8(MQTTClient.QOS_2), false, "test/foo", "payload") + message = MQTTClient.Message( + false, UInt8(MQTTClient.QOS_2), false, "test/foo", "payload" + ) optional = message.qos == 0x00 ? () : (2) - cmd = MQTTClient.PUBLISH | ((message.dup & 0x1) << 3) | (message.qos << 1) | message.retain + cmd = + MQTTClient.PUBLISH | + ((message.dup & 0x1) << 3) | + (message.qos << 1) | + message.retain packet = MQTTClient.Packet(cmd, [message.topic, optional..., message.payload]) put!(c.write_packets, packet) @@ -110,7 +134,6 @@ end end end - @testset verbose = true "MQTT interface functionality" begin @testset "Make MQTT tcp connection" begin c, conn = MQTTClient.MakeConnection("localhost", 1883) @@ -126,26 +149,27 @@ end @test c isa MQTTClient.Client @test conn isa MQTTClient.Connection end - + @testset "Test Client show function" begin io = IOBuffer() client, conn = MQTTClient.MakeConnection("localhost", 1883) show(io, client) - str = take!(io) |> String - @test str == "MQTTClient[state: ready, read_loop: ready, write_loop: ready, keep_alive: ready]\n" + str = String(take!(io)) + @test str == + "MQTTClient[state: ready, read_loop: ready, write_loop: ready, keep_alive: ready]\n" end @testset "Test Connection show function" begin io = IOBuffer() - client, conn = MQTTClient.MakeConnection("localhost", 1883, client_id="foo") + client, conn = MQTTClient.MakeConnection("localhost", 1883; client_id="foo") show(io, conn) - str = take!(io) |> String + str = String(take!(io)) @test contains(str, "Connection(Protocol: MQTTClient.TCP") end @testset "MQTT subscribe async" begin c = MQTTClient.Client() cb(p...) = print(p) - fut = MQTTClient.subscribe_async(c, "test-topic/#", cb, qos=MQTTClient.QOS_2) + fut = MQTTClient.subscribe_async(c, "test-topic/#", cb; qos=MQTTClient.QOS_2) @test fut isa Distributed.Future end @@ -174,7 +198,7 @@ end # Check that the write_packet function was called with the correct arguments p = take!(client.write_packets) - @test p == MQTTClient.Packet(MQTTClient.UNSUBSCRIBE | 0x02, (0x0002, "topic1")) + @test p == MQTTClient.Packet(MQTTClient.UNSUBSCRIBE | 0x02, (0x0002, "topic1")) cb1(x...) = print("[1] ", x) cb2(x...) = print("[2] ", x) @@ -190,11 +214,13 @@ end # Check that the write_packet function was called with the correct arguments p = take!(client.write_packets) - @test p == MQTTClient.Packet(MQTTClient.UNSUBSCRIBE | 0x02, (0x0003, "topic1", "topic2", "topic3")) + @test p == MQTTClient.Packet( + MQTTClient.UNSUBSCRIBE | 0x02, (0x0003, "topic1", "topic2", "topic3") + ) end end -@testset verbose=true "handlers" begin +@testset verbose = true "handlers" begin @testset "handle_connack" begin c = MQTTClient.Client() c.in_flight[0x0000] = Future() @@ -221,14 +247,20 @@ end #! TODO: fix this test. c = MQTTClient.Client() ch = Channel{String}(5) - cb1(t,p) = put!(ch, strip(String(p),'\0')) - cb2(t,p) = put!(ch, strip(String(p),'\0')) + cb1(t, p) = put!(ch, strip(String(p), '\0')) + cb2(t, p) = put!(ch, strip(String(p), '\0')) insert!(c.on_msg, "test", cb1) insert!(c.on_msg, "test/#", cb2) - message = MQTTClient.Message(false, UInt8(MQTTClient.QOS_0), false, "test", "payload") + message = MQTTClient.Message( + false, UInt8(MQTTClient.QOS_0), false, "test", "payload" + ) optional = message.qos == 0x00 ? () : (0) - cmd = MQTTClient.PUBLISH | ((message.dup & 0x1) << 3) | (message.qos << 1) | message.retain + cmd = + MQTTClient.PUBLISH | + ((message.dup & 0x1) << 3) | + (message.qos << 1) | + message.retain packet = MQTTClient.Packet(cmd, [message.topic, optional..., message.payload]) buffer = PipeBuffer() for i in packet.data @@ -238,9 +270,15 @@ end payload = take!(ch) @test payload == "payload" - message = MQTTClient.Message(false, UInt8(MQTTClient.QOS_1), false, "test", "payload") + message = MQTTClient.Message( + false, UInt8(MQTTClient.QOS_1), false, "test", "payload" + ) optional = message.qos == 0x00 ? () : (1) - cmd = MQTTClient.PUBLISH | ((message.dup & 0x1) << 3) | (message.qos << 1) | message.retain + cmd = + MQTTClient.PUBLISH | + ((message.dup & 0x1) << 3) | + (message.qos << 1) | + message.retain packet = MQTTClient.Packet(cmd, [message.topic, optional..., message.payload]) buffer = PipeBuffer() for i in packet.data @@ -250,9 +288,15 @@ end payload = take!(ch) @test payload == "payload" - message = MQTTClient.Message(false, UInt8(MQTTClient.QOS_2), false, "test", "payload") + message = MQTTClient.Message( + false, UInt8(MQTTClient.QOS_2), false, "test", "payload" + ) optional = message.qos == 0x00 ? () : (2) - cmd = MQTTClient.PUBLISH | ((message.dup & 0x1) << 3) | (message.qos << 1) | message.retain + cmd = + MQTTClient.PUBLISH | + ((message.dup & 0x1) << 3) | + (message.qos << 1) | + message.retain packet = MQTTClient.Packet(cmd, [message.topic, optional..., message.payload]) buffer = PipeBuffer() for i in packet.data @@ -262,9 +306,15 @@ end payload = take!(ch) @test payload == "payload" - message = MQTTClient.Message(false, UInt8(MQTTClient.QOS_2), false, "test/foo", "payload") + message = MQTTClient.Message( + false, UInt8(MQTTClient.QOS_2), false, "test/foo", "payload" + ) optional = message.qos == 0x00 ? () : (2) - cmd = MQTTClient.PUBLISH | ((message.dup & 0x1) << 3) | (message.qos << 1) | message.retain + cmd = + MQTTClient.PUBLISH | + ((message.dup & 0x1) << 3) | + (message.qos << 1) | + message.retain packet = MQTTClient.Packet(cmd, [message.topic, optional..., message.payload]) buffer = PipeBuffer() for i in packet.data @@ -313,7 +363,7 @@ end MQTTClient.handle_pubrec(c, s, cmd, flags) p = take!(c.write_packets) #!TODO: Figure out why the id changes - @test p == MQTTClient.Packet(MQTTClient.PUBREL | 0x02, (0x0300,)) + @test p == MQTTClient.Packet(MQTTClient.PUBREL | 0x02, (0x0300,)) end @testset "handle_pubrel" begin @@ -385,5 +435,4 @@ end # p = take!(c.write_packets) # @test p == MQTTClient.Packet(MQTTClient.DISCONNECT, ()) end - end diff --git a/test/unittest.topic.jl b/test/unittest.topic.jl index 688a3a4..51e3523 100644 --- a/test/unittest.topic.jl +++ b/test/unittest.topic.jl @@ -1,23 +1,45 @@ @testset "Test TrieNode creation and insertion" begin root = MQTTClient.TrieNode() - insert!(root, "topic1/device1", (topic, payload) -> println("Callback for $topic: $payload")) - insert!(root, "topic1/device2", (topic, payload) -> println("Callback for $topic: $payload")) - insert!(root, "topic2/+/foo", (topic, payload) -> println("Wildcard data for $topic: $payload")) - insert!(root, "topic2/bar/foo", (topic, payload) -> println("Callback for $topic: $payload")) + insert!( + root, "topic1/device1", (topic, payload) -> println("Callback for $topic: $payload") + ) + insert!( + root, "topic1/device2", (topic, payload) -> println("Callback for $topic: $payload") + ) + insert!( + root, + "topic2/+/foo", + (topic, payload) -> println("Wildcard data for $topic: $payload"), + ) + insert!( + root, "topic2/bar/foo", (topic, payload) -> println("Callback for $topic: $payload") + ) # Ensure that the TrieNode structure is created correctly @test root.children["topic1"].children["device1"].cb isa MQTTClient.FunctionCallback @test root.children["topic1"].children["device2"].cb isa MQTTClient.FunctionCallback - @test root.children["topic2"].children["+"].children["foo"].cb isa MQTTClient.FunctionCallback - @test root.children["topic2"].children["bar"].children["foo"].cb isa MQTTClient.FunctionCallback + @test root.children["topic2"].children["+"].children["foo"].cb isa + MQTTClient.FunctionCallback + @test root.children["topic2"].children["bar"].children["foo"].cb isa + MQTTClient.FunctionCallback end @testset "Test TrieNode removal" begin root = MQTTClient.TrieNode() - insert!(root, "topic1/device1", (topic, payload) -> println("Callback for $topic: $payload")) - insert!(root, "topic1/device2", (topic, payload) -> println("Callback for $topic: $payload")) - insert!(root, "topic2/+/foo", (topic, payload) -> println("Wildcard data for $topic: $payload")) - insert!(root, "topic2/bar/foo", (topic, payload) -> println("Callback for $topic: $payload")) + insert!( + root, "topic1/device1", (topic, payload) -> println("Callback for $topic: $payload") + ) + insert!( + root, "topic1/device2", (topic, payload) -> println("Callback for $topic: $payload") + ) + insert!( + root, + "topic2/+/foo", + (topic, payload) -> println("Wildcard data for $topic: $payload"), + ) + insert!( + root, "topic2/bar/foo", (topic, payload) -> println("Callback for $topic: $payload") + ) # Remove a node from the Trie MQTTClient.remove!(root, "topic2/bar/foo") @@ -27,4 +49,4 @@ end @test !haskey(root.children["topic2"].children, "bar") @test !haskey(root.children["topic1"].children, "device1") @test haskey(root.children["topic1"].children, "device2") -end \ No newline at end of file +end From 7ed345c0ed91194b7e20c940bf29d366aa4144c8 Mon Sep 17 00:00:00 2001 From: Nicholas Shindler Date: Wed, 29 May 2024 11:31:39 +0200 Subject: [PATCH 4/9] fix type-o in test --- test/integration.test.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration.test.jl b/test/integration.test.jl index e788047..330f4b5 100644 --- a/test/integration.test.jl +++ b/test/integration.test.jl @@ -9,7 +9,7 @@ cb(args...) = nothing res = subscribe(client, "foo/bar", cb) @test res == [0x01, 0x00] - res = publish(client, "bar/foo", "baz" qos=QOS_2) + res = publish(client, "bar/foo", "baz"; qos=QOS_2) @test isnothing(res) res = unsubscribe(client, "foo/bar") @test isnothing(res) From 12e6689f1bf1a9e8b9847a0fecf765b06ea6bf32 Mon Sep 17 00:00:00 2001 From: Nicholas Shindler Date: Wed, 29 May 2024 11:32:01 +0200 Subject: [PATCH 5/9] add format conf with blue style --- .JuliaFormatter.toml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .JuliaFormatter.toml diff --git a/.JuliaFormatter.toml b/.JuliaFormatter.toml new file mode 100644 index 0000000..4755eaa --- /dev/null +++ b/.JuliaFormatter.toml @@ -0,0 +1,5 @@ +# See https://domluna.github.io/JuliaFormatter.jl/stable/ for a list of options + +style = "blue" + +format_docstrings = true From 098bd184ba0aa83d499a0b4dbf498211bc2cf315 Mon Sep 17 00:00:00 2001 From: Nicholas Shindler Date: Wed, 29 May 2024 13:04:00 +0200 Subject: [PATCH 6/9] clean up file system + add formatter to ci --- .github/workflows/JuliaFormatter.yml | 47 +++++++++ test/dev/mocksocket.jl | 135 ------------------------- test/dev/packet.jl | 146 --------------------------- 3 files changed, 47 insertions(+), 281 deletions(-) create mode 100644 .github/workflows/JuliaFormatter.yml delete mode 100644 test/dev/mocksocket.jl delete mode 100644 test/dev/packet.jl diff --git a/.github/workflows/JuliaFormatter.yml b/.github/workflows/JuliaFormatter.yml new file mode 100644 index 0000000..916c15c --- /dev/null +++ b/.github/workflows/JuliaFormatter.yml @@ -0,0 +1,47 @@ +name: Code Formatting + +on: + pull_request: + branches: + - main + +jobs: + format: + runs-on: ubuntu-latest + + permissions: + contents: write + pull-requests: write + actions: write + + steps: + - name: Cancel Previous Runs + uses: styfle/cancel-workflow-action@0.12.1 + + - uses: actions/checkout@v4 + with: + ref: ${{ github.head_ref }} + + - uses: julia-actions/setup-julia@v2 + with: + version: 1.9 + + - name: Install JuliaFormatter and format + shell: bash + run: julia -e 'import Pkg; Pkg.add("JuliaFormatter"); using JuliaFormatter; format(".")' + + - name: Create Pull Request + id: pr + uses: peter-evans/create-pull-request@v6 + with: + commit-message: Format files using JuliaFormatter + title: ${{ format('[AUTO] Format {0} using JuliaFormatter', github.event.pull_request.number) }} + body: ${{ format('[JuliaFormatter.jl](https://github.com/domluna/JuliaFormatter.jl) would suggest these formatting changes against \#{0}.', github.event.pull_request.number) }} + labels: no changelog + branch: ${{ format('code-format/{0}', github.event.pull_request.number) }} + delete-branch: true + + - name: Fail if a PR was needed + if: ${{ steps.pr.outputs.pull-request-operation == 'created' || steps.pr.outputs.pull-request-operation == 'updated' }} + shell: bash + run: exit 1 diff --git a/test/dev/mocksocket.jl b/test/dev/mocksocket.jl deleted file mode 100644 index 823c58e..0000000 --- a/test/dev/mocksocket.jl +++ /dev/null @@ -1,135 +0,0 @@ -using Sockets: Sockets -import Base: read, close - -# commands -const CONNECT = 0x10 -const CONNACK = 0x20 -const PUBLISH = 0x30 -const PUBACK = 0x40 -const PUBREC = 0x50 -const PUBREL = 0x60 -const PUBCOMP = 0x70 -const SUBSCRIBE = 0x80 -const SUBACK = 0x90 -const UNSUBSCRIBE = 0xA0 -const UNSUBACK = 0xB0 -const PINGREQ = 0xC0 -const PINGRESP = 0xD0 -const DISCONNECT = 0xE0 -const QOS_1_BYTE = 0x02 -const QOS_2_BYTE = 0x04 - -mutable struct TestFileHandler <: IO - out_channel::Channel{UInt8} - in_channel::Channel{UInt8} - closenotify::Condition - new_input::Condition - closed::Bool -end - -function read(fh::TestFileHandler, t) - return take!(fh.in_channel) -end - -function read(fh::TestFileHandler, t::Type{UInt8}) - return take!(fh.in_channel) -end - -function read(fh::TestFileHandler, length::Integer) - data = Vector{UInt8}() - for i in 1:length - append!(data, take!(fh.in_channel)) - end - return data -end - -function close(fh::TestFileHandler) - fh.closed = true - return notify(fh.closenotify) -end - -function write(fh::TestFileHandler, data::UInt8) - return put!(fh.out_channel, data) -end - -function put(fh::TestFileHandler, data::Array{UInt8}) - for i in data - put!(fh.in_channel, i) - end -end - -function put(fh::TestFileHandler, data::SubArray) - for i in data - put!(fh.in_channel, i) - end -end - -function put(fh::TestFileHandler, data::UInt16) - buffer = PipeBuffer() - write(buffer, data) - return put(fh, reverse(read(buffer), 1)) #TODO use take! instead? -end - -function put(fh::TestFileHandler, data::UInt8) - return put!(fh.in_channel, data) -end - -function put_from_file(fh::TestFileHandler, filename) - return put(fh, read_all_to_arr(filename)) -end - -function get_mid_index(data::Array{UInt8}) - cmd = data[1] & 0xF0 - if cmd == CONNACK || - cmd == PINGRESP || - cmd == CONNECT || - cmd == DISCONNECT || - cmd == PINGREQ - return -1 - elseif cmd == PUBLISH - qos = data[1] & 0x06 - if qos == QOS_1_BYTE || qos == QOS_2_BYTE - buffer = PipeBuffer() - write(buffer, data[4]) - write(buffer, data[3]) - topic_len = read_len(buffer) - return 5 + topic_len - else - # QOS_0 has no message id - return -1 - end - else - # all other packets have m_id as their 3rd and 4th byte - return 3 - end -end - -function put_from_file(fh::TestFileHandler, filename, messageId::UInt16) - data = read_all_to_arr(filename) - mid_index = get_mid_index(data) - if mid_index > 0 - put(fh, view(data, 1:(mid_index - 1))) - put(fh, messageId) - put(fh, view(data, (mid_index + 2):length(data))) - else - put(fh, data) - end -end - -function read_all_to_arr(filename) - file = open(filename, "r") - data = Vector{UInt8}() - while !eof(file) - append!(data, read(file, UInt8)) - end - close(file) - return data -end - -function Sockets.connect(host::AbstractString, port::Integer) - th = TestFileHandler( - Channel{UInt8}(256), Channel{UInt8}(256), Condition(), Condition(), false - ) - put_from_file(th, "data/input/connack.dat") - return th -end diff --git a/test/dev/packet.jl b/test/dev/packet.jl deleted file mode 100644 index 13046b9..0000000 --- a/test/dev/packet.jl +++ /dev/null @@ -1,146 +0,0 @@ -const TOPIC_STRING = "foo" -const MESSAGE_STRING = "bar" - -function on_msg(topic, payload) - msg = p |> String - @info "Received message topic: [", topic, "] payload: [", msg, "]" - @test topic == TOPIC_STRING - @test msg == MESSAGE_STRING -end - -function is_out_correct(filename_expected::AbstractString, actual::Channel{UInt8}, mid::UInt16) - file_data = read_all_to_arr(filename_expected) - actual_data = Vector{UInt8}() - - for i in file_data - append!(actual_data, take!(actual)) - end - - mid_index = get_mid_index(file_data) - if mid_index > 0 - buffer = PipeBuffer() - write(buffer, mid) - converted_mid = take!(buffer) - file_data[mid_index] = converted_mid[2] - file_data[mid_index+1] = converted_mid[1] - end - - correct = true - i = 1 - while i <= length(file_data) - if file_data[i] != actual_data[i] - correct = false - break - end - i += 1 - end - return correct -end - -function is_out_correct(filename_expected::AbstractString, actual::Channel{UInt8}) - file = open(filename_expected, "r") - correct = true - while !eof(file) - if read(file, UInt8) != take!(actual) - correct = false - break - end - end - return correct -end - -@testset verbose=true "Running packet tests" begin - -client = Client(on_msg) -last_id::UInt16 = 0x0001 - -@testset "Testing connect" begin - connect(client, "test.mosquitto.org", 1883) - tfh::TestFileHandler = client.socket - @test is_out_correct("data/output/connect.dat", tfh.out_channel) - # CONNACK is automatically being sent in connect call -end - -@testset "Testing subscribe" begin - subscribe_async(client, (TOPIC_STRING, QOS_1), ("cba", QOS_0)) - put_from_file(tfh, "data/input/suback.dat", client.last_id) - @test is_out_correct("data/output/subreq.dat", tfh.out_channel, client.last_id) -end - -@testset "Testing unsubscribe" begin - unsubscribe_async(client, TOPIC_STRING, "cba") - put_from_file(tfh, "data/input/unsuback.dat", client.last_id) - @test is_out_correct("data/output/unsubreq.dat", tfh.out_channel, client.last_id) -end - -@testset "Testing receive publish QOS 0" begin - put_from_file(tfh, "data/input/qos0pub.dat") - @test is_out_correct("data/output/puback.dat", tfh.out_channel, last_id) -end - -@testset "Testing receive publish QOS 1" begin - put_from_file(tfh, "data/input/qos1pub.dat", last_id) - @test is_out_correct("data/output/puback.dat", tfh.out_channel, last_id) - #last_id += 1 -end - -@testset "Testing receive publish QOS 2" begin - put_from_file(tfh, "data/input/qos2pub.dat", last_id) - @test is_out_correct("data/output/pubrec.dat", tfh.out_channel, last_id) - put_from_file(tfh, "data/input/pubrel.dat", last_id) - @test is_out_correct("data/output/pubcomp.dat", tfh.out_channel, last_id) - #last_id += 1 -end - -@testset "Testing send publish QOS 0" begin - publish_async(client, "test1", "QOS_0", qos=QOS_0) - @test is_out_correct("data/output/qos0pub.dat", tfh.out_channel) -end - -@testset "Testing send publish QOS 1" begin - publish_async(client, "test2", "QOS_1", qos=QOS_1) - put_from_file(tfh, "data/input/puback.dat", client.last_id) - @test is_out_correct("data/output/qos1pub.dat", tfh.out_channel, client.last_id) -end - -@testset "Testing send publish QOS 2" begin - publish_async(client, "test3", "test", qos=QOS_2) - @test is_out_correct("data/output/qos2pub.dat", tfh.out_channel, client.last_id) - put_from_file(tfh, "data/input/pubrec.dat", client.last_id) - @test is_out_correct("data/output/pubrel.dat", tfh.out_channel, client.last_id) - put_from_file(tfh, "data/input/pubcomp.dat", client.last_id) -end - -@testset "Testing disconnect" begin - disconnect(client) - @test is_out_correct("data/output/disco.dat", tfh.out_channel) -end - - #This has to be in it's own connect flow to not interfere with other messages -@testset "Testing keep alive with response" begin - client = Client(on_msg) - - client.ping_timeout = 1 - connect(client, "test.mosquitto.org", 1883, client_id="TestID", keep_alive=1) - tfh = client.socket - @test is_out_correct("data/output/connect_keep_alive1s.dat", tfh.out_channel) # Consume output - @test is_out_correct("data/output/pingreq.dat", tfh.out_channel) - put_from_file(tfh, "data/input/pingresp.dat") -end - -@testset "Testing keep alive without response" begin - sleep(1.1) - @test is_out_correct("data/output/pingreq.dat", tfh.out_channel) - @test is_out_correct("data/output/disco.dat", tfh.out_channel) - - info("Testing unwanted pingresp") - client = Client(on_msg) - connect(client, "test.mosquitto.org", 1883, client_id="TestID", keep_alive=15) - tfh = client.socket - put_from_file(tfh, "data/input/pingresp.dat") - sleep(0.1) - @test tfh.closed -end -end - -end \ No newline at end of file From 3414cd56f417d5071310eae60fe2a228a739a731 Mon Sep 17 00:00:00 2001 From: Nicholas Shindler Date: Wed, 29 May 2024 13:13:39 +0200 Subject: [PATCH 7/9] update test matrix --- .github/workflows/CI.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index d44028a..e551053 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -18,11 +18,13 @@ jobs: fail-fast: false matrix: version: - - '1.8' + - '1.8' # minimum version - '1.9' - - '1.10' + - '1' os: - ubuntu-latest + - macOS-latest + - windows-latest arch: - x64 steps: From 85e0cbbae390ef6726e83589848dd6894e3faf9a Mon Sep 17 00:00:00 2001 From: Nicholas Shindler Date: Wed, 29 May 2024 13:18:02 +0200 Subject: [PATCH 8/9] update ci --- .github/workflows/CI.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index e551053..fc306df 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -29,6 +29,7 @@ jobs: - x64 steps: - name: Install mosquitto + if: runner.os == 'Linux' run: | sudo apt-get update sudo apt-get install mosquitto @@ -41,6 +42,7 @@ jobs: - uses: julia-actions/cache@v1 - uses: julia-actions/julia-buildpkg@v1 - name: Run mosquitto broker + if: runner.os == 'Linux' run: | sudo systemctl stop mosquitto sudo cp ${{ github.workspace }}/test/testclient/mosquitto.test.config /etc/mosquitto/conf.d/test.conf From d71bdd263570389be585e7eb28b01744b17749dc Mon Sep 17 00:00:00 2001 From: Nicholas Shindler Date: Wed, 29 May 2024 13:23:18 +0200 Subject: [PATCH 9/9] check is sys is windows when testing uds --- test/integration.test.jl | 64 +++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/test/integration.test.jl b/test/integration.test.jl index 330f4b5..47703aa 100644 --- a/test/integration.test.jl +++ b/test/integration.test.jl @@ -31,34 +31,36 @@ cb(args...) = nothing @test !isopen(server) end -@testset "UDS Client" begin - ## UDS Basic Run - server = MQTTClient.MockMQTTBroker("/tmp/testmqtt.sock") - client, conn = MakeConnection("/tmp/testmqtt.sock") - - connect(client, conn) - @test MQTTClient.isconnected(client) - - res = subscribe(client, "foo/bar", cb) - @test res == [0x01, 0x00] - res = publish(client, "bar/foo", "baz") - @test res == 0 - res = unsubscribe(client, "foo/bar") - @test isnothing(res) - - res = disconnect(client) - @test res == (0x00, 0x00, 0x00) - - # test reconnect - connect(client, conn) - @test MQTTClient.isconnected(client) - res = disconnect(client) - @test res == (0x00, 0x00, 0x00) - - @test MQTTClient.isclosed(client) - @test isopen(server) - - close(server) # stop the mock server - - @test !isopen(server) -end +if !Sys.iswindows() # since windows is not UNIX it doesn't support UDS + @testset "UDS Client" begin + ## UDS Basic Run + server = MQTTClient.MockMQTTBroker("/tmp/testmqtt.sock") + client, conn = MakeConnection("/tmp/testmqtt.sock") + + connect(client, conn) + @test MQTTClient.isconnected(client) + + res = subscribe(client, "foo/bar", cb) + @test res == [0x01, 0x00] + res = publish(client, "bar/foo", "baz") + @test res == 0 + res = unsubscribe(client, "foo/bar") + @test isnothing(res) + + res = disconnect(client) + @test res == (0x00, 0x00, 0x00) + + # test reconnect + connect(client, conn) + @test MQTTClient.isconnected(client) + res = disconnect(client) + @test res == (0x00, 0x00, 0x00) + + @test MQTTClient.isclosed(client) + @test isopen(server) + + close(server) # stop the mock server + + @test !isopen(server) + end +end \ No newline at end of file