Skip to content
This repository has been archived by the owner on Nov 6, 2022. It is now read-only.

Commit

Permalink
Implement zthrottle inspired throttling
Browse files Browse the repository at this point in the history
This will implement a zthrottle inspired throttling that will space out
requests of a burst with N seconds in between, ignoring all but the
first and the last request of the burst.
This will make sure the last push of a burst is never swallowed and
always reaches the device while still throttling the burst.
  • Loading branch information
tmolitor-stud-tu committed Jul 17, 2021
1 parent 9cb0062 commit 1ff96fd
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 45 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ should be used for chat apps.
This setting will also make http forms available at all `POST` HTTP endpoints
for easier manual testing of your setup by simply using your browser of choice.
- **push\_appserver\_rate\_limit** *(number)*
Allow this much requests per second. Default: `5`.
Allow only one request everey N seconds. Default: `5`.
The throttle will always space out incoming requests by this timeframe and make sure
that the last request of a burst will always be handled at the beginning of the next
timeframe (the first request of a burst will be handled immediately, all other requests
in between the first and the last one will be ignored completely).
This should mitigate some DOS attacks.
- **push\_appserver\_local\_cache** *(boolean)*
Set this to `false` to deactivate local caching of device tokens and node settings.
Expand Down
72 changes: 35 additions & 37 deletions mod_push_appserver/mod_push_appserver.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ if have_id then
end
local t = require "util.throttle"
local string = string;
local zthrottle = module:require "zthrottle";

-- configuration
local body_size_limit = 4096; -- 4 KB
local use_local_cache = module:get_option_boolean("push_appserver_local_cache", true); -- use cache (should be false on HA setups with multiple servers using a replicated sql database)
local debugging = module:get_option_boolean("push_appserver_debugging", false); -- debugging (should be false on production servers)
local rate_limit = module:get_option_number("push_appserver_rate_limit", 5); -- allow only five pushes per second (try to mitigate DOS attacks)

-- global state
local throttles = {}
-- space out pushes with an interval of 5 seconds ignoring all but the first and last push in this interval (moving the last push to the end of the interval)
-- (try to prevent denial of service attacks and save battery on mobile devices)
zthrottle:set_distance(module:get_option_number("push_appserver_rate_limit", 5));

--- sanity
local parser_body_limit = module:context("*"):get_option_number("http_max_content_size", 10*1024*1024);
Expand Down Expand Up @@ -145,14 +145,6 @@ else
end)();
end

-- throttling (try to prevent denial of service attacks)
local function create_throttle(node)
if not throttles[node] then
throttles[node] = t.create(rate_limit, 1);
end
return throttles[node];
end

-- html helper
local function html_skeleton()
local header, footer;
Expand Down Expand Up @@ -302,19 +294,10 @@ module:hook("iq/host", function(event)
local settings = push_store:get(node);
if not settings or secret ~= settings["secret"] then return sendError(origin, stanza, "Unknown node or secret"); end

-- throttling
local push_priority = (summary and summary["last-message-body"] ~= nil) and "high" or "silent"
local throttle = create_throttle(push_priority.."@"..settings["node"]);
if not throttle:poll(1) then
module:log("warn", "Rate limit for node '%s' reached, ignoring push request (and returning 'wait' error)", settings["node"]);
origin.send(st.error_reply(stanza, "wait", "resource-constraint", "Ratelimit reached"));
return true;
end

-- callback to handle synchronous and asynchronous iq responses
local async_callback = function(success)
if success or success == nil then
module:log("warn", "Push handler for type '%s' not executed successfully%s", settings["type"], type(success) == "string" and ": "..success or ": handler not found");
module:log("error", "Push handler for type '%s' not executed successfully%s", settings["type"], type(success) == "string" and ": "..success or ": handler not found");
origin.send(st.error_reply(stanza, "wait", "internal-server-error", type(success) == "string" and success or "Internal error in push handler"));
settings["last_push_error"] = datetime.datetime();
else
Expand All @@ -324,10 +307,21 @@ module:hook("iq/host", function(event)
push_store:set(node, settings);
end

module:log("info", "Firing event '%s' (node = '%s', secret = '%s')", "incoming-push-to-"..settings["type"], settings["node"], settings["secret"]);
local success = module:fire_event("incoming-push-to-"..settings["type"], {async_callback = async_callback, origin = origin, settings = settings, summary = summary, stanza = stanza});
-- true indicates handling via async_callback, everything else is synchronous and must be handled directly
if not (type(success) == "boolean" and success) then async_callback(success); end
-- throttling
local event = {async_callback = async_callback, origin = origin, settings = settings, summary = summary, stanza = stanza};
local handler_push_priority = tostring(module:fire_event("determine-"..settings["type"].."-priority", event));
local zthrottle_id = handler_push_priority.."@"..settings["node"];
local ztrottle_retval = zthrottle:incoming(zthrottle_id, function()
module:log("info", "Firing event '%s' (node = '%s', secret = '%s')", "incoming-push-to-"..settings["type"], settings["node"], settings["secret"]);
local success = module:fire_event("incoming-push-to-"..settings["type"], event);
-- true indicates handling via async_callback, everything else is synchronous and must be handled directly
if not (type(success) == "boolean" and success) then async_callback(success); end
end);
if ztrottle_retval == "ignored" then
module:log("info", "Rate limit for node '%s' reached, ignoring push request (and returning 'wait' error)", settings["node"]);
origin.send(st.error_reply(stanza, "wait", "resource-constraint", "Ratelimit reached"));
return true;
end
return true;
end);

Expand Down Expand Up @@ -436,13 +430,6 @@ local function serve_push_v1(event, path)
return "ERROR\nNode or secret not found!";
end

-- throttling
local throttle = create_throttle(node);
if not throttle:poll(1) then
module:log("warn", "Rate limit for node '%s' reached, ignoring push request (and returning error 'Ratelimit reached')", node);
return "ERROR\nRatelimit reached!";
end

local async_callback = function(success)
if success or success == nil then
module:log("warn", "Push handler for type '%s' not executed successfully%s", settings["type"], type(success) == "string" and ": "..success or ": handler not found");
Expand All @@ -454,10 +441,21 @@ local function serve_push_v1(event, path)
end
push_store:set(node, settings);
end
module:log("info", "Firing event '%s' (node = '%s', secret = '%s')", "incoming-push-to-"..settings["type"], settings["node"], settings["secret"]);
local success = module:fire_event("incoming-push-to-"..settings["type"], {async_callback = async_callback, settings = settings});
-- true indicates handling via async_callback, everything else is synchronous and must be handled directly
if not (type(success) == "boolean" and success) then async_callback(success); end

-- throttling
local event = {async_callback = async_callback, settings = settings};
local handler_push_priority = tostring(module:fire_event("determine-"..settings["type"].."-priority", event));
local zthrottle_id = handler_push_priority.."@"..node;
local ztrottle_retval = zthrottle:incoming(zthrottle_id, function()
module:log("info", "Firing event '%s' (node = '%s', secret = '%s')", "incoming-push-to-"..settings["type"], node, settings["secret"]);
local success = module:fire_event("incoming-push-to-"..settings["type"], event);
-- true indicates handling via async_callback, everything else is synchronous and must be handled directly
if not (type(success) == "boolean" and success) then async_callback(success); end
end);
if ztrottle_retval == "ignored" then
module:log("warn", "Rate limit for node '%s' reached, ignoring push request (and returning error 'Ratelimit reached')", node);
return "ERROR\nRatelimit reached!";
end

return true; -- keep connection open until async_callback is called
end
Expand Down
36 changes: 36 additions & 0 deletions mod_push_appserver/zthrottle.lib.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
local time = require "util.time";

local distance = 0;
local api = {};
local data = {}

function api:set_distance(d)
distance = d;
end

function api:incoming(id, callback)
if not data[id] then data[id] = {}; end
-- directly call callback() if the last call for this id was more than `distance` seconds away
if not data[id]["last_call"] or time.now() > data[id]["last_call"] + distance then
data[id]["last_call"] = time.now();
if data[id]["timer"] then data[id]["timer"]:stop(); data[id]["timer"] = nil; end
module:log("info", "Calling callback directly");
callback();
return "allowed";
-- use timer to delay second invocation
elseif not data[id]["timer"] then
data[id]["timer"] = module:add_timer(distance - (time.now() - data[id]["last_call"]), function()
data[id]["timer"] = nil;
data[id]["last_call"] = time.now();
module:log("info", "Calling delayed callback");
callback();
end);
return "delayed";
-- ignore all other invocations until the delayed one fired
else
module:log("debug", "Ignoring incoming call");
return "ignored";
end
end

return api;
18 changes: 13 additions & 5 deletions mod_push_appserver_apns/mod_push_appserver_apns.lua
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,20 @@ local function close_connection()
end

-- handlers
local function apns_handler(event)
local settings, summary, async_callback = event.settings, event.summary, event.async_callback;
-- prepare data to send (using latest binary format, not the legacy binary format or the new http/2 format)
local payload;
local function apns_push_priority_handler(event)
-- determine real push priorit to use when sending to apns
local priority = push_priority;
if push_priority == "auto" then
priority = (summary and summary["last-message-body"] ~= nil) and "high" or "silent";
priority = (event.summary and event.summary["last-message-body"] ~= nil) and "high" or "silent";
end
return priority;
end

local function apns_handler(event)
local settings, async_callback = event.settings, event.async_callback;
-- prepare data to send (using latest binary format, not the legacy binary format or the new http/2 format)
local payload;
local priority = apns_push_priority_handler(event);
if priority == "high" then
payload = '{"aps":{'..(mutable_content and '"mutable-content":"1",' or '')..'"alert":{"title":"New Message", "body":"New Message"}, "sound":"default"}}';
else
Expand Down Expand Up @@ -294,10 +300,12 @@ end
certstring = readAll(apns_cert);
keystring = readAll(apns_key);
module:hook("incoming-push-to-apns", apns_handler);
module:hook("determine-apns-priority", apns_push_priority_handler);
module:log("info", "Appserver APNS submodule loaded");
function module.unload()
if module.unhook then
module:unhook("incoming-push-to-apns", apns_handler);
module:unhook("determine-apns-priority", apns_push_priority_handler);
end
module:log("info", "Appserver APNS submodule unloaded");
end
10 changes: 8 additions & 2 deletions mod_push_appserver_fcm/mod_push_appserver_fcm.lua
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,16 @@ local function send_request(data, callback)
end

-- handlers
local function fcm_push_priority_handler(event)
return (push_priority=="high" and "high" or "normal");
end

local function fcm_handler(event)
local settings, summary, async_callback = event.settings, event.summary, event.async_callback;
local settings, async_callback = event.settings, event.async_callback;
local data = {
["to"] = tostring(settings["token"]),
["collapse_key"] = "mod_push_appserver_fcm.collapse",
["priority"] = (push_priority=="high" and "high" or "normal"),
["priority"] = fcm_push_priority_handler(event),
["data"] = {},
};
if push_ttl and push_ttl > 0 then data["time_to_live"] = push_ttl; end -- ttl is optional (google's default: 4 weeks)
Expand Down Expand Up @@ -133,10 +137,12 @@ end

-- setup
module:hook("incoming-push-to-fcm", fcm_handler);
module:hook("determine-fcm-priority", fcm_push_priority_handler);
module:log("info", "Appserver FCM submodule loaded");
function module.unload()
if module.unhook then
module:unhook("incoming-push-to-fcm", fcm_handler);
module:unhook("determine-fcm-priority", fcm_push_priority_handler);
end
module:log("info", "Appserver FCM submodule unloaded");
end

0 comments on commit 1ff96fd

Please sign in to comment.