Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fixes #9

Merged
merged 9 commits into from
Sep 21, 2023
9 changes: 7 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ version = "0.2.0"

[deps]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
PrecompileTools = "aea7be01-6a6a-4083-8856-8a6e6704d82a"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"

[weakdeps]
PrecompileTools = "aea7be01-6a6a-4083-8856-8a6e6704d82a"

[extensions]
PrecompileMQTT = "PrecompileTools"

[compat]
julia = "1.6"
julia = "1.7"

[extras]
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ This package is developed and sponsored by [MapXact](https://mapxact.com/) and i

## Contributing

This package is using [MQTT protocol v3.1.1](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html).

If you would like to contribute to the project, please submit a PR. All contributions are welcomed and appreciated.

This work is based on the MQTT.jl packages created by [femtomic](https://github.com/femtomc/MQTT.jl) and [rweilbacher](https://github.com/rweilbacher/MQTT.jl), and a lot of credit is due to their work along with the other contributors to those repositories.
Expand All @@ -42,4 +44,5 @@ This work is based on the MQTT.jl packages created by [femtomic](https://github.
* think about what we need to do and how
* the reconnect should still work
- [ ] implement clean session = false
- [ ] investigate adding global on_msg handler option back
- [ ] investigate adding global on_msg handler option back
- [ ] investigate using MQTT v5.0
14 changes: 11 additions & 3 deletions src/precompile.jl → ext/PrecompileMQTT.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# COV_EXCL_START
module PrecompileMQTT # Should be same name as the file (just like a normal package)

using MQTTClient
using PrecompileTools
using Random

precompile(Tuple{typeof(Base.indexed_iterate), Tuple{Nothing, Int64}, Int64})
precompile(Tuple{typeof(Base.indexed_iterate), Tuple{Nothing, Int64}, Int64, Int64})

# Precompiling the package like this provides a slower initial load of the package but faster code execution.
# based on tests this precompile step reduces compilation at runtime by ~25% and decreases first execution time by ~10%.
@setup_workload begin
Expand All @@ -17,9 +24,7 @@ using Random
payload = Random.randstring(20)

function on_msg(t, p)
msg = p |> String
@assert MQTTClient.topic_eq("$topic#", t)
@assert msg == payload
nothing
end

connect(client, conn)
Expand Down Expand Up @@ -65,3 +70,6 @@ using Random
disconnect(tcpclient)
end
end

end # module
# COV_EXCL_STOP
6 changes: 3 additions & 3 deletions src/MQTTClient.jl
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
module MQTTClient

using Distributed: Future, RemoteChannel
using Distributed: Future, myid, remotecall, RemoteChannel
using Sockets: TCPSocket, IPAddr, PipeServer, getaddrinfo
import Sockets: connect
using Random: randstring
import Base: ReentrantLock, lock, unlock, convert
import Base: ReentrantLock, lock, unlock, convert, PipeEndpoint, isready, Ref, RefValue, fetch
using Base.Threads


Expand All @@ -15,7 +15,7 @@ include("connection.jl")
include("handlers.jl")
include("interface.jl")

VERSION > v"1.8" ? include("precompile.jl") : @debug "PrecompileTools is most useful in versions 1.9+. $VERSION is too old, explicit precompile is not being used."
# VERSION > v"1.8" ? include("precompile.jl") : println("PrecompileTools is most useful in versions 1.9+. $VERSION is too old, explicit precompile is not being used.")

export
MakeConnection,
Expand Down
2 changes: 2 additions & 0 deletions src/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,5 @@ end
function write_packet(client::Client, cmd::UInt8, data...)
put!(client.write_packets, Packet(cmd, data))
end

Base.show(io::IO, client::Client) = print(io, "MQTTClient(Topic Subscriptions: $(collect(keys(client.on_msg))))")
9 changes: 2 additions & 7 deletions src/connection.jl
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
"""
AbstractIOConnection

Base Connection Protocol type
"""
abstract type AbstractIOConnection end

"""
TCP <: AbstractIOConnection

Expand Down Expand Up @@ -148,3 +141,5 @@ struct MQTTConnection{T <: AbstractIOConnection}
will::Message,
clean_session::Bool) where T <: AbstractIOConnection = new{T}(protocol, keep_alive, client_id, user, will, clean_session)
end

Base.show(io::IO, connection::MQTTConnection) = print(io, "MQTTConnection(Protocol: $(connection.protocol), Client ID: $(connection.client_id)", (connection.user == User("","") ? "" : ", User Name: $(connection.user.name)"), ")")
14 changes: 13 additions & 1 deletion src/handlers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,19 @@
end

payload = take!(s)
@async client.on_msg[topic](topic,payload)
if haskey(client.on_msg, topic)
@async client.on_msg[topic](topic,payload)
else
try
options = Vector{String}(collect(keys(client.on_msg)))
matches = findall(t -> topic_eq(t, topic), options)
for topic_match in options[matches]
@async client.on_msg[topic_match](topic,payload)
end
catch e
@error e

Check warning on line 75 in src/handlers.jl

View check run for this annotation

Codecov / codecov/patch

src/handlers.jl#L75

Added line #L75 was not covered by tests
end
end
end

function handle_ack(client::Client, s::IO, cmd::UInt8, flags::UInt8)
Expand Down
33 changes: 26 additions & 7 deletions src/interface.jl
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,34 @@ Establishes an MQTT connection using the given IO connection.
A tuple containing a `Client` object and an `MQTTConnection` object.
"""
function MakeConnection(io::T,
ping_timeout=UInt64(60),
keep_alive::Int64=32,
client_id::String=randstring(8),
user::User=User("", ""),
will::Message=Message(false, 0x00, false, "", UInt8[]),
clean_session::Bool=true)::Tuple where T <: AbstractIOConnection
ping_timeout=UInt64(60),
keep_alive::Int64=32,
client_id::String=randstring(8),
user::User=User("", ""),
will::Message=Message(false, 0x00, false, "", UInt8[]),
clean_session::Bool=true)::Tuple where T <: AbstractIOConnection
return (Client(ping_timeout), MQTTConnection(io, keep_alive, client_id, user, will, clean_session))

end
# NOTE: comment out for now
# TODO: add Type to Client struct at some point.
# function MakeConnection(io::TCP,
# ping_timeout=UInt64(60),
# keep_alive::Int64=32,
# client_id::String=randstring(8),
# user::User=User("", ""),
# will::Message=Message(false, 0x00, false, "", UInt8[]),
# clean_session::Bool=true)::Tuple{Client, MQTTConnection}
# return (Client{TCPSocket}(ping_timeout), MQTTConnection(io, keep_alive, client_id, user, will, clean_session))
# end
# function MakeConnection(io::UDS,
# ping_timeout=UInt64(60),
# keep_alive::Int64=32,
# client_id::String=randstring(8),
# user::User=User("", ""),
# will::Message=Message(false, 0x00, false, "", UInt8[]),
# clean_session::Bool=true)::Tuple{Client, MQTTConnection}
# return (Client{PipeEndpoint}(ping_timeout), MQTTConnection(io, keep_alive, client_id, user, will, clean_session))
# end



Expand Down
9 changes: 9 additions & 0 deletions src/internals.jl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ const CONNACK_ERRORS = Dict{UInt8, String}(
)
## Types

"""
AbstractIOConnection

Base Connection Protocol type
"""
abstract type AbstractIOConnection end

AbstractProtocol = Union{PipeEndpoint, TCPSocket}

## Enums
## -----
# QOS values
Expand Down
1 change: 1 addition & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ using Distributed, Random

import MQTTClient: topic_wildcard_len_check, filter_wildcard_len_check, MQTTException
import Sockets: TCPSocket, PipeServer, connect, localhost, getaddrinfo, IOError, DNSError
import Base.PipeEndpoint

@testset verbose=true "client tests" begin
include("unittest.client.jl")
Expand Down
30 changes: 25 additions & 5 deletions test/smoketest.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
function smoke_test(client, conn)
condition = Condition()
println(client)
println(conn)
condition = Base.Event()
topic = "foo"
payload = Random.randstring(20)
client_test_res = Channel{Bool}(100)
Expand All @@ -14,9 +16,15 @@ function smoke_test(client, conn)
notify(condition)
end
end

client = Client()
println(client)
function on_wildcard_msg(t, p)
@time "on_msg[$t]" begin
msg = p |> String
println("Received message topic: [", t, "] payload: [", msg, "]")
put!(client_test_res, msg == payload)
sleep(0.01)
notify(condition)
end
end

println("Testing reconnect")
connect(client, conn)
Expand All @@ -30,6 +38,7 @@ function smoke_test(client, conn)
@time "subscribe[QOS0]" subscribe(client, "$topic/qos0", on_msg, qos=QOS_0)
@time "subscribe[QOS1]" subscribe(client, "$topic/qos1", on_msg, qos=QOS_1)
@time "subscribe[QOS2]" subscribe(client, "$topic/qos2", on_msg, qos=QOS_2)
@time "subscribe #" subscribe(client, "wildcard/#", on_wildcard_msg, qos=QOS_2)

println("Testing publish qos 0")
publish(client, "$topic/qos0", payload, qos=QOS_0)
Expand All @@ -40,6 +49,7 @@ function smoke_test(client, conn)
wait(condition)

println("Testing publish qos 1")
reset(condition)
publish(client, "$topic/qos1", payload, qos=QOS_1)
sleep(0.02)
publish(client, "$topic/qos1", payload, qos=QOS_1)
Expand All @@ -48,13 +58,21 @@ function smoke_test(client, conn)
wait(condition)

println("Testing publish qos 2")
reset(condition)
publish(client, "$topic/qos2", payload, qos=QOS_2)
sleep(0.02)
publish(client, "$topic/qos2", payload, qos=QOS_2)
sleep(0.02)
publish_async(client, topic, payload)
wait(condition)

println("Testing wildcard topic")
reset(condition)
publish_async(client, "wildcard/foo", payload, qos=QOS_2)
sleep(0.02)
publish_async(client, "wildcard/bar", payload, qos=QOS_2)
sleep(0.02)
wait(condition)
# publish(client, topic, payload)
# sleep(0.1)
# publish(client, topic, payload)
Expand All @@ -76,7 +94,9 @@ function smoke_test(client, conn)
end

function stress_test(client, conn)
condition = Condition()
println(client)
println(conn)
condition = Base.Event()
topic1 = "foo"
topic2 = "bar"
topic3 = Random.randstring(4)
Expand Down
2 changes: 1 addition & 1 deletion test/smoketest.secure.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
print("\n"^3)

## TCP Protocol Smoke Test and Stress Test
tcp_test_client, tcp_test_conn = MakeConnection(localhost, 8883, user = MQTTClient.User("test","test"))
tcp_test_client, tcp_test_conn = MakeConnection(localhost, 8883, user = MQTTClient.User("test","test"), keep_alive = 1)
try
s = connect(tcp_test_conn.protocol)
close(s)
Expand Down
15 changes: 15 additions & 0 deletions test/unittest.client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ end
@test c isa MQTTClient.Client
@test conn isa MQTTClient.MQTTConnection
end

@testset "Test Client show function" begin
io = IOBuffer()
client, conn = MQTTClient.MakeConnection("localhost", 1883)
show(io, client)
str = take!(io) |> String
@test str == "MQTTClient(Topic Subscriptions: String[])"
end
@testset "Test Connection show function" begin
io = IOBuffer()
client, conn = MQTTClient.MakeConnection("localhost", 1883, client_id="foo")
show(io, conn)
str = take!(io) |> String
@test str == "MQTTConnection(Protocol: MQTTClient.TCP(ip\"::1\", 1883), Client ID: foo)"
end

@testset "MQTT subscribe async" begin
c = MQTTClient.Client()
Expand Down
Loading