Skip to content

Commit

Permalink
clear the buffer of mqtt messages instead of only consuming one per f…
Browse files Browse the repository at this point in the history
…rame
  • Loading branch information
goatchurchprime committed Apr 22, 2024
1 parent 74f91fe commit d9ff4ce
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 12 deletions.
19 changes: 9 additions & 10 deletions addons/mqtt/mqtt.gd
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ func _process(delta):

elif brokerconnectmode == BCM_WAITING_CONNACK or brokerconnectmode == BCM_CONNECTED:
receiveintobuffer()
wait_msg()
while wait_msg():
pass
if brokerconnectmode == BCM_CONNECTED and pingticksnext0 < Time.get_ticks_msec():
pingreq()
pingticksnext0 = Time.get_ticks_msec() + pinginterval*1000
Expand Down Expand Up @@ -381,20 +382,19 @@ func unsubscribe(stopic):
func wait_msg():
var n = receivedbuffer.size()
if n < 2:
return OK
return false
var op = receivedbuffer[0]
var i = 1
var sz = receivedbuffer[i] & 0x7f
while (receivedbuffer[i] & 0x80):
i += 1
if i == n:
return 0
return false
sz += (receivedbuffer[i] & 0x7f) << ((i-1)*7)
i += 1
if n < i + sz:
return OK
return false

var E = OK
if op == CP_PINGRESP:
assert (sz == 0)
if verbose_level >= 2:
Expand Down Expand Up @@ -433,22 +433,21 @@ func wait_msg():
if verbose_level:
print("Bad connection retcode=", retcode) # see https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html
emit_signal("broker_connection_failed")
E = FAILED

elif op == CP_PUBREC:
assert (sz == 2)
var apid = (receivedbuffer[i]<<8) + receivedbuffer[i+1]
if verbose_level >= 2:
print("PUBACK[%d]" % apid)
emit_signal("publish_acknowledge", apid)
emit_signal("publish_acknowledgewait_msg", apid)

elif op == CP_SUBACK:
assert (sz == 3)
var apid = (receivedbuffer[i]<<8) + receivedbuffer[i+1]
if verbose_level:
print("SUBACK[%d] ret=%02x" % [apid, receivedbuffer[i+2]])
if receivedbuffer[i+2] == 0x80:
E = FAILED
#if receivedbuffer[i+2] == 0x80:
# E = FAILED

elif op == CP_UNSUBACK:
assert (sz == 2)
Expand All @@ -461,7 +460,7 @@ func wait_msg():
print("Unknown MQTT opcode op=%x" % op)

trimreceivedbuffer(i + sz)
return E
return true

func trimreceivedbuffer(n):
if n == receivedbuffer.size():
Expand Down
2 changes: 1 addition & 1 deletion mqttexample.tscn
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ text = "URL: "
[node name="brokeraddress" type="LineEdit" parent="VBox/HBoxBroker"]
layout_mode = 2
size_flags_horizontal = 3
text = "test.mosquitto.org"
text = "broker.hivemq.com"

[node name="Label2" type="Label" parent="VBox/HBoxBroker"]
layout_mode = 2
Expand Down
2 changes: 1 addition & 1 deletion project.godot
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ config_version=5

config/name="Godot-mqtt"
run/main_scene="res://mqttexample.tscn"
config/features=PackedStringArray("4.1", "Forward Plus")
config/features=PackedStringArray("4.2", "Forward Plus")
config/icon="res://icon.png"

[audio]
Expand Down

0 comments on commit d9ff4ce

Please sign in to comment.