Skip to content

Commit

Permalink
feat(sync): full sync pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
StarlightIbuki committed Dec 3, 2024
1 parent 2df6a82 commit 48c5373
Show file tree
Hide file tree
Showing 8 changed files with 751 additions and 38 deletions.
5 changes: 5 additions & 0 deletions kong-3.10.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,11 @@ build = {
["kong.db.strategies.off.connector"] = "kong/db/strategies/off/connector.lua",
["kong.db.strategies.off.tags"] = "kong/db/strategies/off/tags.lua",

["kong.db.resumable_chunker] = "kong/db/resumable_chunker/init.lua",
["kong.db.resumable_chunker.chain] = "kong/db/resumable_chunker/chain.lua",
["kong.db.resumable_chunker.strategy] = "kong/db/resumable_chunker/strategy.lua",
["kong.db.resumable_chunker.utils] = "kong/db/resumable_chunker/utils.lua",

["kong.db.migrations.state"] = "kong/db/migrations/state.lua",
["kong.db.migrations.subsystems"] = "kong/db/migrations/subsystems.lua",
["kong.db.migrations.core"] = "kong/db/migrations/core/init.lua",
Expand Down
209 changes: 171 additions & 38 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,24 @@ local constants = require("kong.constants")
local concurrency = require("kong.concurrency")
local isempty = require("table.isempty")
local events = require("kong.runloop.events")
local lrucache = require("resty.lrucache")


local EMPTY = require("kong.tools.table").EMPTY
local insert_entity_for_txn = declarative.insert_entity_for_txn
local delete_entity_for_txn = declarative.delete_entity_for_txn
local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY
local CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = constants.CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY
local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY
local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH
local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, }
local SYNC_MAX_RETRY = 5
local encode_base64 = ngx.encode_base64
local decode_base64 = ngx.decode_base64
local cjson_encode = require("cjson.safe").encode

local MAX_RETRY = 5


local assert = assert
Expand All @@ -27,16 +35,57 @@ local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_INFO = ngx.INFO
local ngx_NOTICE = ngx.NOTICE
local ngx_DEBUG = ngx.DEBUG


-- number of versions behind before a full sync is forced
local DEFAULT_FULL_SYNC_THRESHOLD = 512


function _M.new(strategy)
local function decode_pagination_status(pagination_status)
if not pagination_status then
return nil, nil
end

-- base64 encoded json
local decoded = decode_base64(pagination_status)
if not decoded then
return nil, "failed to base64 decode pagination status:" .. err
end

decoded, err = cjson_decode(decoded)
if not decoded then
return nil, "failed to cjson decode pagination status:" .. err
end

return decoded.version, decoded.page_size, decoded.next_page
end



local function encode_pagination_status(version, page_size, next_page)
local data = {
version = version,
page_size = page_size,
next_page = next_page,
}

local json, err = cjson_encode(data)
if not json then
return nil, "failed to encode pagination:" .. err
end

return encode_base64(json)
end


function _M.new(strategy, opts)
opts = opts or EMPTY

local self = {
strategy = strategy,
page_size = opts.page_size,
}

return setmetatable(self, _MT)
Expand All @@ -48,20 +97,78 @@ local function inc_sync_result(res)
end


local function full_sync_result()
local deltas, err = declarative.export_config_sync()
if not deltas then
return nil, err
function _M:full_sync_result(full_sync_status)
local target_version, page_size, next_page = decode_pagination_status(full_sync_status)

-- page_size mean err here
if (not target_version) and page_size then
return nil, "communication error: " .. page_size
end

-- try fetch from cache
local config_deltas
if target_version then
config_deltas = self.full_sync_cache:get(target_version)
-- DP tries to fetch unknown version or cache expired/evicted/missed
-- we consider it the first time full sync call
if not config_deltas then
ngx_log(ngx_NOTICE, "full sync cache miss for version: ", target_version)
end
end

-- first time full sync call, need wipe and begin the full sync session
local first_time = not config_deltas
if first_time then
-- set the target_version for the first time
config_deltas, target_version = declarative.export_config_sync()
if not config_deltas then
return nil, target_version
end

local ok, err = self.full_sync_cache:set(target_version, config_deltas)
if not ok then
return "failed to cache full sync deltas: " .. err
end
end

local begin = next_page or 1
page_size = page_size or self.page_size
next_page = begin + page_size

-- at this point,
-- config_deltas, target_version, page_size, next_page are all guaranteed to be non-nil

-- no more deltas. end the session for DP
local last_time = next_page > #config_deltas

-- get the deltas for the current page
local deltas, n = {}, 1
for i = begin, next_page - 1 do
local delta = config_deltas[i]
if not delta then
break
end

deltas[n] = delta
n = n + 1
end

-- TODO: handle new deltas those which happen during the full sync

local full_sync_status
if not last_time then
full_sync_status = encode_pagination_status(target_version, page_size, next_page)
end

-- wipe dp lmdb, full sync
return { default = { deltas = deltas, wipe = true, }, }
return { default = { deltas = deltas, wipe = first_time, full_sync_done = last_time, full_sync_status = full_sync_status }, }
end


function _M:init_cp(manager)
local purge_delay = manager.conf.cluster_data_plane_purge_delay

self.full_sync_cache = lrucache.new(10)

-- number of versions behind before a full sync is forced
local FULL_SYNC_THRESHOLD = manager.conf.cluster_full_sync_threshold or
DEFAULT_FULL_SYNC_THRESHOLD
Expand Down Expand Up @@ -109,7 +216,8 @@ function _M:init_cp(manager)

-- is the node empty? If so, just do a full sync to bring it up to date faster
if default_namespace_version == 0 or
latest_version - default_namespace_version > FULL_SYNC_THRESHOLD
latest_version - default_namespace_version > FULL_SYNC_THRESHOLD or
default_namespace.full_sync_status
then
-- we need to full sync because holes are found

Expand All @@ -118,7 +226,7 @@ function _M:init_cp(manager)
", current_version: ", default_namespace_version,
", forcing a full sync")

return full_sync_result()
return self:full_sync_result(default_namespace.full_sync_status)
end

-- do we need an incremental sync?
Expand Down Expand Up @@ -210,14 +318,19 @@ local function is_rpc_ready()
end


local function do_sync()
local function do_sync(dp_status)
if not is_rpc_ready() then
return nil, "rpc is not ready"
end

-- when in a partial sync, even if a update notification triggers a sync, it will
-- be blocked by the mutex, and it will continue to do the rest of the sync
local in_full_sync = dp_status.full_sync_status

local msg = { default =
{ version =
tonumber(declarative.get_current_hash()) or 0,
full_sync_status = in_full_sync,
},
}

Expand All @@ -228,7 +341,7 @@ local function do_sync()
end

-- ns_deltas should look like:
-- { default = { deltas = { ... }, wipe = true, }, }
-- { default = { deltas = { ... }, wipe = true, full_sync_done = false, full_sync_status = ...}, }

local ns_delta = ns_deltas.default
if not ns_delta then
Expand All @@ -242,6 +355,11 @@ local function do_sync()
return true
end

if ns_delta.full_sync_status then
-- full sync is in progress
in_full_sync = true
end

-- we should find the correct default workspace
-- and replace the old one with it
local default_ws_changed
Expand All @@ -258,9 +376,15 @@ local function do_sync()

local t = txn.begin(512)

-- begining of the full sync session, wipe the lmdb and purge the cache
local wipe = ns_delta.wipe
if wipe then
t:db_drop(false)
kong.core_cache:purge()
kong.cache:purge()
-- we are at a unready state
-- consider the config empty
t:set(DECLARATIVE_HASH_KEY, DECLARATIVE_EMPTY_CONFIG_HASH)
end

local db = kong.db
Expand Down Expand Up @@ -309,8 +433,8 @@ local function do_sync()
", version: ", delta_version,
", type: ", delta_type)

-- wipe the whole lmdb, should not have events
if not wipe then
-- during the full sync, should not emit events
if not in_full_sync then
ev = { delta_type, old_entity and "update" or "create", delta_entity, old_entity, }
end

Expand All @@ -321,8 +445,8 @@ local function do_sync()
return nil, err
end

-- If we will wipe lmdb, we don't need to delete it from lmdb.
if old_entity and not wipe then
-- during the full sync, should not emit events
if old_entity and not in_full_sync then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts)
if not res then
return nil, err
Expand All @@ -334,14 +458,14 @@ local function do_sync()
", version: ", delta_version,
", type: ", delta_type)

-- wipe the whole lmdb, should not have events
if not wipe then
-- delete the entity, opts for getting correct lmdb key
if not in_full_sync then
ev = { delta_type, "delete", old_entity, }
end
end -- if delta_entity ~= nil and delta_entity ~= ngx_null

-- wipe the whole lmdb, should not have events
if not wipe then
-- during the full sync, should not emit events
if not in_full_sync then
crud_events_n = crud_events_n + 1
crud_events[crud_events_n] = ev
end
Expand All @@ -354,9 +478,12 @@ local function do_sync()
end
end -- for _, delta

-- store current sync version
t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version))

-- only update the sync version if not in full sync/ full sync done
if (not in_full_sync) or ns_delta.full_sync_done then
-- store current sync version
t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version))
end

-- store the correct default workspace uuid
if default_ws_changed then
t:set(DECLARATIVE_DEFAULT_WORKSPACE_KEY, kong.default_workspace)
Expand All @@ -367,42 +494,48 @@ local function do_sync()
return nil, err
end

if wipe then
kong.core_cache:purge()
kong.cache:purge()
dp_status.full_sync_status = ns_delta.full_sync_status

-- the full sync is done
if ns_delta.full_sync_done then
-- Trigger other workers' callbacks like reconfigure_handler.
--
-- Full sync could rebuild route, plugins and balancer route, so their
-- hashes are nil.
-- Until this point, the dataplane is not ready to serve requests or to
-- do delta syncs.
local reconfigure_data = { kong.default_workspace, nil, nil, nil, }
local ok, err = events.declarative_reconfigure_notify(reconfigure_data)
if not ok then
return nil, err
end
return events.declarative_reconfigure_notify(reconfigure_data)
end

else
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.entity, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end
-- emit the CRUD events
-- if in_full_sync, no events should be added into the queue
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.entity, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end

return true
end


local sync_handler
sync_handler = function(premature, try_counter)
sync_handler = function(premature, try_counter, dp_status)
if premature then
return
end

local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, do_sync)
local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function() do_sync(dp_status) end)
if not res and err ~= "timeout" then
ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err)
end

if dp_status.full_sync_status then
-- full sync is in progress
-- continue to sync
return _M:sync_once()
end

-- try_counter is not set, only run once
if not try_counter then
return
Expand Down Expand Up @@ -432,12 +565,12 @@ end


function _M:sync_once(delay)
return ngx.timer.at(delay or 0, sync_handler, SYNC_MAX_RETRY)
return ngx.timer.at(delay or 0, sync_handler, SYNC_MAX_RETRY, self)
end


function _M:sync_every(delay)
return ngx.timer.every(delay, sync_handler)
return ngx.timer.every(delay, sync_handler, nil, self)
end


Expand Down
Loading

0 comments on commit 48c5373

Please sign in to comment.