Skip to content

Commit

Permalink
Better buffering and ack cleanup (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
mworrell authored May 29, 2024
1 parent a92c8e0 commit a8563f3
Showing 1 changed file with 135 additions and 44 deletions.
179 changes: 135 additions & 44 deletions src/mqtt_sessions_process.erl
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
%% @doc Process handling one single MQTT session.
%% Transports attaches and detaches from this session.
%% @author Marc Worrell <[email protected]>
%% @copyright 2018-2022 Marc Worrell
%% @copyright 2018-2024 Marc Worrell

%% Copyright 2018-2022 Marc Worrell
%% Copyright 2018-2024 Marc Worrell
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,8 +60,8 @@
-define(SESSION_EXPIRY_DEFAULT, 3600). % Maximum allowed session expiration
-define(MESSAGE_EXPIRY_DEFAULT, 3600).

-define(MAX_INFLIGHT, 500). % Max in-flight messages for any QoS
-define(MAX_INFLIGHT_ACK, 100). % Max in-flight messages waiting with QoS 1 or 2
-define(MAX_QUEUED, 500). % Max pending messages for any QoS
-define(MAX_INFLIGHT_ACK, 250). % Max in-flight or pending messages waiting with QoS 1 or 2


-define(KILL_TIMEOUT, 5000).
Expand Down Expand Up @@ -105,7 +105,7 @@
-record(queued, {
type :: atom(),
msg_nr :: pos_integer(),
packet_id = undefined :: undefined | non_neg_integer(),
packet_id = undefined :: undefined | packet_id(),
queued :: non_neg_integer(),
expiry :: non_neg_integer(),
qos = 0 :: 0..2,
Expand Down Expand Up @@ -240,11 +240,21 @@ handle_call({incoming_data, NewData, ConnectionPid}, _From, #state{ incoming_dat
{reply, ok, StateRest#state{ keep_alive_counter = 3, incoming_data = Rest }};
{error, Reason} when is_atom(Reason) ->
% illegal packet, disconnect and wait for new connection
?LOG_INFO("Error decoding incoming data: ~p", [ Reason ]),
?LOG_WARNING(#{
in => mqtt_sessions,
text => <<"Error decoding incoming data - disconnecting">>,
result => error,
reason => Reason
}),
{reply, {error, Reason}, force_disconnect(State)}
end;
handle_call({incoming_data, _NewData, ConnectionPid}, _From, State) ->
?LOG_DEBUG("MQTT session incoming data from ~p, expected from ~p", [ConnectionPid, State#state.connection_pid]),
?LOG_DEBUG(#{
in => mqtt_sessions,
text => <<"MQTT session incoming data from unexpected Pid">>,
from_pid => ConnectionPid,
expected_pid => State#state.connection_pid
}),
{reply, {error, wrong_connection}, State};
handle_call(Cmd, _From, State) ->
{stop, {unknown_cmd, Cmd}, State}.
Expand Down Expand Up @@ -303,7 +313,11 @@ handle_info({'DOWN', _Mref, process, Pid, _Reason}, State) ->
{noreply, State1};

handle_info(Info, State) ->
?LOG_INFO("Unknown info message ~p", [Info]),
?LOG_INFO(#{
in => mqtt_sessions,
text => <<"Ignored unknown info message">>,
info_msg => Info
}),
{noreply, State}.

code_change(_Vsn, State, _Extra) ->
Expand Down Expand Up @@ -354,13 +368,29 @@ handle_incoming(#{ type := connect } = Msg, Options, State) ->
handle_incoming(#{ type := auth } = Msg, _Options, State) ->
packet_connect_auth(Msg, State);
handle_incoming(#{ type := Type }, _Options, #state{ connection_pid = undefined } = State) ->
?LOG_INFO("Dropping packet for MQTT session ~p ~s (~p) for receiving ~p when not connected.",
[State#state.pool, State#state.client_id, self(), Type]),
?LOG_INFO(#{
in => mqtt_sessions,
text => <<"Dropping packet for MQTT session when not connected.">>,
result => error,
reason => not_connected,
pool => State#state.pool,
client_id => State#state.client_id,
session_pid => self(),
message_type => Type
}),
{error, not_connected};
handle_incoming(#{ type := Type }, _Options, #state{ is_session_present = false } = State) ->
% Only AUTH and CONNECT before the CONNACK
?LOG_INFO("Killing MQTT session ~p ~s (~p) for receiving ~p when no session started.",
[State#state.pool, State#state.client_id, self(), Type]),
?LOG_INFO(#{
in => mqtt_sessions,
text => <<"MQTT received non AUTH or CONNECT before CONNACK - killed session">>,
result => error,
reason => no_connack,
pool => State#state.pool,
client_id => State#state.client_id,
session_pid => self(),
message_type => Type
}),
{stop, State};
handle_incoming(#{ type := publish } = Msg, _Options, State) ->
packet_publish(Msg, State);
Expand Down Expand Up @@ -392,7 +422,11 @@ handle_incoming(#{ type := disconnect } = Msg, _Options, State) ->
packet_disconnect(Msg, State);

handle_incoming(#{ type := Type }, _Options, State) ->
?LOG_INFO("MQTT dropping unhandled packet with type ~p", [Type]),
?LOG_INFO(#{
in => mqtt_sessions,
text => <<"MQTT dropping unhandled packet with type">>,
message_type => Type
}),
{ok, State}.

% ---------------------------------------------------------------------------------------
Expand Down Expand Up @@ -458,8 +492,8 @@ handle_connect_auth(Msg, Options, StateIfAccept, #state{ runtime = Runtime, is_s
handle_connect_auth_1({ok, #{ type := connack, reason_code := ?MQTT_RC_SUCCESS } = ConnAck, UserContext1},
#{ clean_start := CleanStart }, StateIfAccept, #state{ is_session_present = IsSessionPresent }) ->
StateCleaned = maybe_clean_start(CleanStart, StateIfAccept),
%% Set the session_present flag to true, when the runtime omitted it, and when there is a

%% Set the session_present flag to true, when the runtime omitted it, and when there is a
%% session present.
ConnAck1 = case maps:find(session_present, ConnAck) of
{ok, _} -> ConnAck;
Expand All @@ -480,7 +514,14 @@ handle_connect_auth_1({ok, #{ type := connack, reason_code := ?MQTT_RC_SUCCESS }
{ok, State3};
handle_connect_auth_1({ok, #{ type := connack, reason_code := ReasonCode } = ConnAck, _UserContext1}, _Msg, StateIfAccept, _State) ->
_ = reply_connack(ConnAck, StateIfAccept),
?LOG_DEBUG("MQTT connect/auth refused (~p): ~p", [ReasonCode, ConnAck]),
?LOG_INFO(#{
in => mqtt_sessions,
text => <<"MQTT connect/auth refused">>,
result => error,
reason => connection_refused,
reason_code => ReasonCode,
connack => ConnAck
}),
{error, connection_refused};
handle_connect_auth_1({ok, #{ type := auth } = Auth, UserContext1}, _Msg, StateIfAccept, _State) ->
State1 = StateIfAccept#state{
Expand All @@ -491,7 +532,14 @@ handle_connect_auth_1({ok, #{ type := auth } = Auth, UserContext1}, _Msg, StateI
State2#state.session_expiry_interval, State2#state.user_context),
{ok, State2};
handle_connect_auth_1({error, Reason}, Msg, _StateIfAccept, _State) ->
?LOG_INFO("MQTT connect/auth refused (~p): ~p", [Reason, Msg]),
?LOG_INFO(#{
in => mqtt_sessions,
text => <<"MQTT connect/auth refused">>,
result => error,
reason => connection_refused,
msg_reason => Reason,
message => Msg
}),
{error, connection_refused}.


Expand Down Expand Up @@ -626,8 +674,12 @@ packet_pubrel(#{ packet_id := PacketId, reason_code := ?MQTT_RC_SUCCESS }, #stat
end;
packet_pubrel(#{ packet_id := PacketId, reason_code := RC }, #state{ awaiting_rel = WaitRel } = State) ->
% Error server/client out of sync - remove the wait-rel for this packet_id
?LOG_INFO("PUBREL with reason ~p for packet ~p",
[ RC, PacketId ]),
?LOG_INFO(#{
in => mqtt_sessions,
text => <<"PUBREL with non success reason for packet">>,
reason_code => RC,
packet_id => PacketId
}),
WaitRel1 = maps:remove(PacketId, WaitRel),
{ok, State#state{ awaiting_rel = WaitRel1 }}.

Expand All @@ -638,8 +690,14 @@ packet_puback(#{ packet_id := PacketId }, #state{ awaiting_ack = WaitAck } = Sta
{ok, {_MsgNr, puback, _Msg}} ->
maps:remove(PacketId, WaitAck);
{ok, {_MsgNr, Wait, Msg}} ->
?LOG_WARNING("PUBACK for message ~p waiting for ~p. Message: ~p",
[ PacketId, Wait, Msg ]),
?LOG_WARNING(#{
in => mqtt_sessions,
text => <<"PUBACK for message wating for something else - dropping pending ack">>,
result => error,
packet_id => PacketId,
wait => Wait,
message => Msg
}),
maps:remove(PacketId, WaitAck);
error ->
WaitAck
Expand All @@ -654,8 +712,14 @@ packet_pubrec(#{ packet_id := PacketId, reason_code := RC }, #state{ awaiting_ac
{ok, {_MsgNr, pubcomp, _Msg}} ->
maps:remove(PacketId, WaitAck);
{ok, {_MsgNr, Wait, Msg}} ->
?LOG_WARNING("PUBREC for message ~p waiting for ~p. Message: ~p",
[ PacketId, Wait, Msg ]),
?LOG_WARNING(#{
in => mqtt_sessions,
text => <<"PUBREC for message wating for something else - dropping pending ack">>,
result => error,
packet_id => PacketId,
wait => Wait,
message => Msg
}),
maps:remove(PacketId, WaitAck);
error ->
WaitAck
Expand All @@ -668,8 +732,14 @@ packet_pubrec(#{ packet_id := PacketId }, #state{ awaiting_ack = WaitAck } = Sta
{ok, {_MsgNr, pubcomp, _Msg}} ->
{WaitAck, ?MQTT_RC_SUCCESS};
{ok, {_MsgNr, Wait, Msg}} ->
?LOG_WARNING("PUBREC for message ~p waiting for ~p. Message: ~p",
[ PacketId, Wait, Msg ]),
?LOG_WARNING(#{
in => mqtt_sessions,
text => <<"PUBREC for message wating for something else - dropping pending ack">>,
result => error,
packet_id => PacketId,
wait => Wait,
message => Msg
}),
{maps:remove(PacketId, WaitAck), ?MQTT_RC_PACKET_ID_NOT_FOUND};
error ->
{WaitAck, ?MQTT_RC_PACKET_ID_NOT_FOUND}
Expand All @@ -688,8 +758,14 @@ packet_pubcomp(#{ packet_id := PacketId }, #state{ awaiting_ack = WaitAck } = St
{ok, {_MsgNr, pubcomp, _Msg}} ->
maps:remove(PacketId, WaitAck);
{ok, {_MsgNr, Wait, Msg}} ->
?LOG_WARNING("PUBREC for message ~p waiting for ~p. Message: ~p",
[ PacketId, Wait, Msg ]),
?LOG_WARNING(#{
in => mqtt_sessions,
text => <<"PUBCOMP for message wating for something else - dropping pending ack">>,
result => error,
packet_id => PacketId,
wait => Wait,
message => Msg
}),
maps:remove(PacketId, WaitAck);
error ->
WaitAck
Expand Down Expand Up @@ -786,11 +862,11 @@ relay_publish(#{ type := publish, message := Msg } = MqttMsg, State) ->
0 ->
reply(Msg2#{ packet_id => 0 }, StatePurged);
_ ->
case maps:size(StatePurged#state.awaiting_ack) > ?MAX_INFLIGHT_ACK of
case maps:size(StatePurged#state.awaiting_ack) >= ?MAX_INFLIGHT_ACK of
true ->
?LOG_DEBUG(#{
?LOG_INFO(#{
in => mqtt_session,
text => <<"Dropping QoS 1/2 message, too many inflight acks">>,
text => <<"Not accepting QoS 1/2 message, too many inflight or queued acks">>,
result => error,
reason => buffer_full
}),
Expand Down Expand Up @@ -932,15 +1008,13 @@ disconnect_transport(#state{ transport = Transport } = State) when is_function(T
reply(undefined, State) ->
State;
reply(Msg, #state{ transport = undefined } = State) ->
State1 = maybe_purge(State),
queue(Msg, State1);
maybe_purge( queue(Msg, State) );
reply(Msg, State) ->
State1 = maybe_purge(State),
case send_transport(Msg, State1) of
case send_transport(Msg, State) of
ok ->
State1;
State;
{error, _} ->
queue(Msg, State1#state{ transport = undefined })
maybe_purge( queue(Msg, State#state{ transport = undefined }) )
end.

send_transport(_Msg, #state{ transport = undefined }) ->
Expand Down Expand Up @@ -974,26 +1048,45 @@ queue_1(#{ type := Type } = Msg, #state{ msg_nr = MsgNr, pending = Pending } = S
msg_nr = MsgNr,
type = Type,
queued = Now,
packet_id = maps:get(packet_id, Msg, 0),
expiry = Now + maps:get(message_expiry_interval, Props, ?MESSAGE_EXPIRY_DEFAULT),
qos = maps:get(qos, Msg, 1),
message = Msg
},
State#state{ pending = queue:in(Item, Pending)}.
State#state{ pending = queue:in(Item, Pending) }.

maybe_purge(#state{ pending = Queue, awaiting_ack = WaitAcks } = State) ->
case queue:len(Queue) > ?MAX_INFLIGHT orelse maps:size(WaitAcks) > ?MAX_INFLIGHT_ACK of
case queue:len(Queue) > ?MAX_QUEUED orelse maps:size(WaitAcks) > ?MAX_INFLIGHT_ACK of
true ->
PacketIdsBefore = queue:fold(
fun
(#queued{ qos = 0 }, Acc) -> Acc;
(#queued{ packet_id = PacketId }, Acc) -> [ PacketId | Acc ]
end,
[],
Queue),
PurgedQueue = purge(Queue),
PacketIds = queue:fold(
PacketIdsAfter = queue:fold(
fun
(#queued{ qos = 0 }, Acc) -> Acc;
(#queued{ packet_id = PacketId }, Acc) -> [ PacketId | Acc ]
end,
[],
PurgedQueue),
PurgedPacketIds = PacketIdsBefore -- PacketIdsAfter,
PurgedWaitAcks = maps:without(PurgedPacketIds, WaitAcks),
?LOG_DEBUG(#{
in => mqtt_sessions,
text => <<"Purged pending messages">>,
result => ok,
pending_before => queue:len(Queue),
pending_after => queue:len(PurgedQueue),
dropped_acks => length(PurgedPacketIds),
pending_acks => maps:size(PurgedWaitAcks)
}),
State#state{
pending = PurgedQueue,
awaiting_ack = maps:with(PacketIds, WaitAcks)
awaiting_ack = PurgedWaitAcks
};
false ->
State
Expand All @@ -1002,10 +1095,8 @@ maybe_purge(#state{ pending = Queue, awaiting_ack = WaitAcks } = State) ->
purge(Queue) ->
{value, #queued{ queued = Oldest }} = queue:peek(Queue),
{value, #queued{ queued = Newest }} = queue:peek_r(Queue),

PurgeTime = mqtt_sessions_timestamp:timestamp(),
QoS0PurgeAge = (Newest - Oldest) / 2,

QoS0PurgeAge = (Newest - Oldest) div 2,
Queue1 = queue:filter(
fun
(#queued{ qos = 0, queued = Queued, expiry = Expiry }) ->
Expand All @@ -1014,7 +1105,7 @@ purge(Queue) ->
PurgeTime < Expiry
end,
Queue),
case queue:len(Queue1) > ?MAX_INFLIGHT of
case queue:len(Queue1) > ?MAX_QUEUED of
true ->
% Drop all QoS 0 messages
queue:filter(fun(#queued{ qos = QoS }) -> QoS > 0 end, Queue1);
Expand Down

0 comments on commit a8563f3

Please sign in to comment.