From 599b97089030c0fab0db4c3e325c6af26c3ebb8e Mon Sep 17 00:00:00 2001 From: Xiaochen Wang Date: Fri, 6 Dec 2024 10:20:58 +0800 Subject: [PATCH 01/10] test(hybrid): enable incremental sync test in 11-status_spec.lua --- .../09-hybrid_mode/11-status_spec.lua | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/spec/02-integration/09-hybrid_mode/11-status_spec.lua b/spec/02-integration/09-hybrid_mode/11-status_spec.lua index c6ada743ee1d..774fda44a2e6 100644 --- a/spec/02-integration/09-hybrid_mode/11-status_spec.lua +++ b/spec/02-integration/09-hybrid_mode/11-status_spec.lua @@ -74,9 +74,6 @@ for _, strategy in helpers.each_strategy() do end) describe("dp status ready endpoint for no config", function() - -- XXX FIXME - local skip_inc_sync = inc_sync == "on" and pending or it - lazy_setup(function() assert(start_kong_cp()) assert(start_kong_dp()) @@ -108,7 +105,7 @@ for _, strategy in helpers.each_strategy() do -- now dp receive config from cp, so dp should be ready - skip_inc_sync("should return 200 on data plane after configuring", function() + it("should return 200 on data plane after configuring", function() helpers.wait_until(function() local http_client = helpers.http_client('127.0.0.1', dp_status_port) @@ -119,7 +116,10 @@ for _, strategy in helpers.each_strategy() do local status = res and res.status http_client:close() - if status == 200 then + + if (inc_sync == "on" and status == 503) or + (inc_sync == "off" and status == 200) + then return true end end, 10) @@ -138,7 +138,9 @@ for _, strategy in helpers.each_strategy() do local status = res and res.status http_client:close() - if status == 200 then + if (inc_sync == "on" and status == 503) or + (inc_sync == "off" and status == 200) + then return true end end, 10) @@ -156,11 +158,13 @@ for _, strategy in helpers.each_strategy() do local status = res and res.status http_client:close() - if status == 200 then + + if (inc_sync == "on" and status == 503) or + (inc_sync == "off" and status == 200) + then return true end end, 10) - end) end) From 55453dee73b99b3fd865eb4d83d55d54cbf9c2b4 Mon Sep 17 00:00:00 2001 From: Xiaochen Wang Date: Wed, 11 Dec 2024 02:03:13 -0600 Subject: [PATCH 02/10] 1 --- .../09-hybrid_mode/11-status_spec.lua | 40 +++++++++++++++++-- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/spec/02-integration/09-hybrid_mode/11-status_spec.lua b/spec/02-integration/09-hybrid_mode/11-status_spec.lua index 774fda44a2e6..54314e7fc658 100644 --- a/spec/02-integration/09-hybrid_mode/11-status_spec.lua +++ b/spec/02-integration/09-hybrid_mode/11-status_spec.lua @@ -4,7 +4,8 @@ local helpers = require "spec.helpers" local cp_status_port = helpers.get_available_port() local dp_status_port = 8100 -for _, v in ipairs({ {"off", "off"}, {"on", "off"}, {"on", "on"}, }) do +--for _, v in ipairs({ {"off", "off"}, {"on", "off"}, {"on", "on"}, }) do +for _, v in ipairs({ {"on", "on"}, }) do local rpc, inc_sync = v[1], v[2] for _, strategy in helpers.each_strategy() do @@ -105,7 +106,7 @@ for _, strategy in helpers.each_strategy() do -- now dp receive config from cp, so dp should be ready - it("should return 200 on data plane after configuring", function() + it("should return 200 on data plane after configuring #ttt", function() helpers.wait_until(function() local http_client = helpers.http_client('127.0.0.1', dp_status_port) @@ -165,9 +166,40 @@ for _, strategy in helpers.each_strategy() do return true end end, 10) + + -- insert one entity to make dp ready for incremental sync + if inc_sync == "on" then + ngx.sleep(1) + print("+++++ post /services") + local admin_client = helpers.admin_client(10000) + local res = assert(admin_client:post("/services", { + body = { name = "service-001", url = "https://127.0.0.1:15556/request", }, + headers = {["Content-Type"] = "application/json"} + })) + assert.res_status(201, res) + admin_client:close() + + print("++++++= sleep") + --ngx.sleep(1000000) + helpers.wait_until(function() + local http_client = helpers.http_client('127.0.0.1', dp_status_port) + + local res = http_client:send({ + method = "GET", + path = "/status/ready", + }) + + local status = res and res.status + http_client:close() +-- print("++++++++++" .. status) + + if status == 200 then + return true + end + end, 10) + end + end) end) end) - - end) end -- for _, strategy end -- for inc_sync From e4f399ffa590b8d9dbeeb0c868be8fe565fbaf58 Mon Sep 17 00:00:00 2001 From: Xiaochen Wang Date: Wed, 11 Dec 2024 02:05:22 -0600 Subject: [PATCH 03/10] 1 --- spec/02-integration/09-hybrid_mode/11-status_spec.lua | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/spec/02-integration/09-hybrid_mode/11-status_spec.lua b/spec/02-integration/09-hybrid_mode/11-status_spec.lua index 54314e7fc658..71a4a0a66c54 100644 --- a/spec/02-integration/09-hybrid_mode/11-status_spec.lua +++ b/spec/02-integration/09-hybrid_mode/11-status_spec.lua @@ -27,6 +27,7 @@ for _, strategy in helpers.each_strategy() do status_listen = "127.0.0.1:" .. dp_status_port, cluster_rpc = rpc, cluster_incremental_sync = inc_sync, + log_level = "info", }) end @@ -42,6 +43,7 @@ for _, strategy in helpers.each_strategy() do status_listen = "127.0.0.1:" .. cp_status_port, cluster_rpc = rpc, cluster_incremental_sync = inc_sync, + log_level = "info", }) end @@ -169,7 +171,12 @@ for _, strategy in helpers.each_strategy() do -- insert one entity to make dp ready for incremental sync if inc_sync == "on" then - ngx.sleep(1) + + + -- sleep > 10s , it will succeed + ngx.sleep(11) + + print("+++++ post /services") local admin_client = helpers.admin_client(10000) local res = assert(admin_client:post("/services", { From d176dd35f3b43804b2b21afe40c32cd5885b2d71 Mon Sep 17 00:00:00 2001 From: Xiaochen Wang Date: Wed, 11 Dec 2024 02:07:44 -0600 Subject: [PATCH 04/10] 1 --- kong/api/endpoints.lua | 1 + kong/clustering/rpc/concentrator.lua | 11 ++++++- kong/clustering/rpc/future.lua | 2 ++ kong/clustering/rpc/manager.lua | 2 +- kong/clustering/rpc/socket.lua | 11 ++++++- kong/clustering/services/sync/hooks.lua | 17 ++++++++--- kong/clustering/services/sync/rpc.lua | 40 ++++++++++++++++++++----- kong/db/declarative/import.lua | 2 ++ kong/db/strategies/off/init.lua | 1 + kong/hooks.lua | 1 + kong/init.lua | 5 ++++ kong/router/atc.lua | 6 ++++ kong/router/init.lua | 1 + kong/router/traditional.lua | 3 ++ 14 files changed, 89 insertions(+), 14 deletions(-) diff --git a/kong/api/endpoints.lua b/kong/api/endpoints.lua index 1a0b41bbd3ca..b6d422eb2e82 100644 --- a/kong/api/endpoints.lua +++ b/kong/api/endpoints.lua @@ -406,6 +406,7 @@ local function post_collection_endpoint(schema, foreign_schema, foreign_field_na self.args.post[foreign_field_name] = foreign_schema:extract_pk_values(foreign_entity) end + ngx.log(ngx.INFO, "xxx insert entity :", schema.name) local entity, _, err_t = insert_entity(self, db, schema, method) if err_t then return handle_error(err_t) diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index 68bb0bc33880..1b86b9155f68 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -21,7 +21,7 @@ local is_timeout = rpc_utils.is_timeout local ngx_log = ngx.log local ngx_ERR = ngx.ERR local ngx_WARN = ngx.WARN -local ngx_DEBUG = ngx.DEBUG +local ngx_DEBUG = ngx.INFO -- ngx.DEBUG local RESP_CHANNEL_PREFIX = "rpc:resp:" -- format: rpc:resp: @@ -83,6 +83,7 @@ local function enqueue_notifications(notifications, notifications_queue) assert(notifications_queue) if notifications then + ngx.log(ngx.INFO, "xxxx enqueue_notifications: ", require("inspect")(notifications)) for _, n in ipairs(notifications) do assert(notifications_queue:push(n)) end @@ -118,6 +119,8 @@ function _M:_event_loop(lconn) local payload = cjson_decode(n.payload) assert(payload.jsonrpc == jsonrpc.VERSION) + ngx.log(ngx.INFO, "xxx concentrator event_loop resp:") + -- response local cb = self.interest[payload.id] self.interest[payload.id] = nil -- edge trigger only once @@ -134,6 +137,7 @@ function _M:_event_loop(lconn) end else + ngx.log(ngx.INFO, "xxx concentrator event_loop req:") -- other CP inside the cluster asked us to forward a call assert(n.channel:sub(1, #REQ_CHANNEL_PREFIX) == REQ_CHANNEL_PREFIX, "unexpected concentrator request channel name: " .. n.channel) @@ -198,6 +202,8 @@ function _M:_event_loop(lconn) local _, notifications res, err, _, notifications = lconn:query(sql or "SELECT 1;") -- keepalive + ngx.log(ngx.INFO, "xxx concetrator lconn:query ", sql or "SELECT 1;", " -> ", + require("inspect")(res), " err:" , err, " notifications:", require("inspect")(notifications)) if not res then return nil, "query to Postgres failed: " .. err end @@ -256,6 +262,7 @@ function _M:_enqueue_rpc_request(node_id, payload) 5, self.db.connector:escape_literal(cjson_encode(payload)), self.db.connector:escape_literal(REQ_CHANNEL_PREFIX .. node_id)) + ngx.log(ngx.INFO, "xxx concentrator _enqueue_rpc_request: ", sql) return self.db.connector:query(sql) end @@ -291,6 +298,8 @@ function _M:call(node_id, method, params, callback) self.interest[id] = callback + ngx.log(ngx.INFO, "xxx concentrator:call _enqueue_rpc_request()", node_id, " ", method) + return self:_enqueue_rpc_request(node_id, { jsonrpc = jsonrpc.VERSION, method = method, diff --git a/kong/clustering/rpc/future.lua b/kong/clustering/rpc/future.lua index 68ed82720f09..6afbc187bf00 100644 --- a/kong/clustering/rpc/future.lua +++ b/kong/clustering/rpc/future.lua @@ -53,6 +53,8 @@ function _M:start() return true end + ngx.log(ngx.INFO, "xxx fut start: socket:call", self.node_id, self.method) + return self.socket:call(self.node_id, self.method, self.params, callback) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index c3925c5073cb..4e360f2d61cd 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -20,7 +20,7 @@ local string_tools = require("kong.tools.string") local ipairs = ipairs local ngx_var = ngx.var local ngx_ERR = ngx.ERR -local ngx_DEBUG = ngx.DEBUG +local ngx_DEBUG = ngx.INFO -- ngx.DEBUG local ngx_log = ngx.log local ngx_exit = ngx.exit local ngx_time = ngx.time diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 045ca8c75577..5cebf7edbc75 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -31,7 +31,7 @@ local PING_WAIT = CLUSTERING_PING_INTERVAL * 1.5 local PING_TYPE = "PING" local PONG_TYPE = "PONG" local ngx_WARN = ngx.WARN -local ngx_DEBUG = ngx.DEBUG +local ngx_DEBUG = ngx.INFO -- ngx.DEBUG -- create a new socket wrapper, wb is the WebSocket object to use @@ -91,12 +91,15 @@ end -- start reader and writer thread and event loop function _M:start() + ngx.log(ngx.INFO, "xxx socket:start ", debug.traceback()) self.read_thread = ngx.thread.spawn(function() local last_seen = ngx_time() while not exiting() do local data, typ, err = self.wb:recv_frame() + ngx.log(ngx.INFO, "xxx reading from websocket: ", data, " typ:", typ, " err:", err) + if err then if not is_timeout(err) then return nil, err @@ -198,7 +201,11 @@ function _M:start() self.write_thread = ngx.thread.spawn(function() while not exiting() do local payload, err = self.outgoing:pop(5) + + ngx.log(ngx.INFO, "xxx socket write_thread: outgoing:pop: ", payload and payload.method, " err:", err) + if err then + ngx.log(ngx.ERR, "xxx socket write thread quit!!!!!!!!!!!!!!!!!! ", err) return nil, err end @@ -275,6 +282,8 @@ function _M:call(node_id, method, params, callback) self.interest[id] = callback + ngx.log(ngx.INFO, "xxx socket call, self.outgoing:push:", method) + return self.outgoing:push({ jsonrpc = jsonrpc.VERSION, method = method, diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua index a9368f755061..876cdbea82e1 100644 --- a/kong/clustering/services/sync/hooks.lua +++ b/kong/clustering/services/sync/hooks.lua @@ -10,7 +10,7 @@ local ipairs = ipairs local ngx_null = ngx.null local ngx_log = ngx.log local ngx_ERR = ngx.ERR -local ngx_DEBUG = ngx.DEBUG +local ngx_DEBUG = ngx.INFO -- ngx.DEBUG local DEFAULT_PAGE_SIZE = 512 @@ -27,6 +27,7 @@ end local function get_all_nodes_with_sync_cap() local res, err = kong.db.clustering_data_planes:page(DEFAULT_PAGE_SIZE) + ngx.log(ngx.INFO, "xxx kong.db.clustering_data_planes:page ->", #res, " err:", err) if err then return nil, "unable to query DB " .. err end @@ -52,6 +53,8 @@ local function get_all_nodes_with_sync_cap() end +local notify_hash = 100001 + function _M:notify_all_nodes() local latest_version, err = self.strategy:get_latest_version() if not latest_version then @@ -59,12 +62,16 @@ function _M:notify_all_nodes() return end - ngx_log(ngx_DEBUG, "[kong.sync.v2] notifying all nodes of new version: ", latest_version) + ngx_log(ngx_DEBUG, "xxx [kong.sync.v2] notifying all nodes of new version: ", latest_version, " notify hash:", notify_hash) - local msg = { default = { new_version = latest_version, }, } + local msg = { default = { new_version = latest_version, }, hash = notify_hash, } + notify_hash = notify_hash + 1 - for _, node in ipairs(get_all_nodes_with_sync_cap()) do + local nodes, err = get_all_nodes_with_sync_cap() + ngx.log(ngx.INFO, "xxx nodes=", require("inspect")(nodes), " err=", err) + for _, node in ipairs(nodes) do local res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", msg) + ngx.log(ngx.INFO, "xxx notify ", node, " new_version = ", latest_version, " err:", err) if not res then if not err:find("requested capability does not exist", nil, true) then ngx_log(ngx_ERR, "unable to notify ", node, " new version: ", err) @@ -96,6 +103,8 @@ function _M:entity_delta_writer(entity, name, options, ws_id, is_delete) local d = gen_delta(entity, name, options, ws_id, is_delete) local deltas = { d, } + ngx.log(ngx.INFO, "xxx generate entity :", require("inspect")(entity)) + local res, err = self.strategy:insert_delta(deltas) if not res then self.strategy:cancel_txn() diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 4100afbb9675..c6963cc8f083 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -27,7 +27,7 @@ local ngx_null = ngx.null local ngx_log = ngx.log local ngx_ERR = ngx.ERR local ngx_INFO = ngx.INFO -local ngx_DEBUG = ngx.DEBUG +local ngx_DEBUG = ngx.INFO -- ngx.DEBUG -- number of versions behind before a full sync is forced @@ -44,7 +44,9 @@ end local function inc_sync_result(res) - return { default = { deltas = res, wipe = false, }, } + local r = { default = { deltas = res, wipe = false, }, } + ngx.log(ngx.INFO, "xxx generated res = ", require("inspect")(r), ": ", debug.traceback("Backtrace:", level)) + return r end @@ -53,6 +55,7 @@ local function full_sync_result() if not deltas then return nil, err end + ngx.log(ngx.INFO, "xxx generated full sync res = ", require("inspect")(deltas)) -- wipe dp lmdb, full sync return { default = { deltas = deltas, wipe = true, }, } @@ -71,7 +74,7 @@ function _M:init_cp(manager) -- Params: versions: list of current versions of the database -- example: { default = { version = 1000, }, } manager.callbacks:register("kong.sync.v2.get_delta", function(node_id, current_versions) - ngx_log(ngx_DEBUG, "[kong.sync.v2] config push (connected client)") + ngx_log(ngx_DEBUG, "[kong.sync.v2] config push (connected client), current_version=", require("inspect")(current_versions), debug.traceback()) local rpc_peers if kong.rpc then @@ -98,6 +101,9 @@ function _M:init_cp(manager) config_hash = fmt("%032d", default_namespace_version), rpc_capabilities = rpc_peers and rpc_peers[node_id] or {}, }, { ttl = purge_delay, no_broadcast_crud_event = true, }) + + ngx.log(ngx.INFO, "xxx kong.db.clustering_data_planes:upsert ", tostring(node_id), " ttl:", tostring(purge_delay), " -> ", tostring(ok), " err:", tostring(err)) + if not ok then ngx_log(ngx_ERR, "unable to update clustering data plane status: ", err) end @@ -107,6 +113,8 @@ function _M:init_cp(manager) return nil, err end + ngx_log(ngx_DEBUG, "xxx dp v:", default_namespace_version, " cp v:", latest_version) + -- is the node empty? If so, just do a full sync to bring it up to date faster if default_namespace_version == 0 or latest_version - default_namespace_version > FULL_SYNC_THRESHOLD @@ -160,6 +168,9 @@ function _M:init_dp(manager) -- Params: new_versions: list of namespaces and their new versions, like: -- { default = { new_version = 1000, }, } manager.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, new_versions) + ngx.log(ngx.INFO, "xxx DP: kong.sync.v2.notify_new_version notify received:", + node_id, " new_versions:", require("inspect")(new_versions), " :", debug.traceback()) + -- TODO: currently only default is supported, and anything else is ignored local default_new_version = new_versions.default if not default_new_version then @@ -171,8 +182,11 @@ function _M:init_dp(manager) return nil, "'new_version' key does not exist" end + local lmdb_ver = tonumber(declarative.get_current_hash()) or 0 + ngx.log(ngx.ERR, "xxx DP notify received version:", version, " lmdb_ver:", lmdb_ver) if lmdb_ver < version then + ngx.log(ngx.ERR, "xxx DP notify return self:sync_once()") -- set lastest version to shm kong_shm:set(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY, version) return self:sync_once() @@ -221,7 +235,12 @@ local function do_sync() }, } +ngx.log(ngx.INFO, "xxx : call get_delta:", require("inspect")(msg)) + local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", msg) + +ngx.log(ngx.INFO, "xxx : ns_deltas:", require("inspect")(ns_deltas)) + if not ns_deltas then ngx_log(ngx_ERR, "sync get_delta error: ", err) return true @@ -273,6 +292,8 @@ local function do_sync() -- delta should look like: -- { type = ..., entity = { ... }, version = 1, ws_id = ..., } for _, delta in ipairs(deltas) do +ngx.log(ngx.INFO, "xxx : ", delta.type) + local delta_version = delta.version local delta_type = delta.type local delta_entity = delta.entity @@ -392,12 +413,16 @@ local function do_sync() end -local function sync_handler(premature) +local function sync_handler(premature, t) + ngx.log(ngx.INFO, "xxx sync_handler:", t) if premature then return end + ngx.log(ngx.INFO, "xxx try mutex: do_sync") local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, do_sync) + ngx.log(ngx.INFO, "xxx try mutex: do_sync: ", res, " err:", err) + if not res and err ~= "timeout" then ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err) end @@ -418,12 +443,13 @@ end function sync_once_impl(premature, retry_count) + ngx.log(ngx.INFO, "xxx sync once") if premature then return end - sync_handler() - + sync_handler() + local latest_notified_version = ngx.shared.kong:get(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY) local current_version = tonumber(declarative.get_current_hash()) or 0 @@ -451,7 +477,7 @@ end function _M:sync_every(delay) - return ngx.timer.every(delay, sync_handler) + return ngx.timer.every(delay, sync_handler, " timer.every ") end diff --git a/kong/db/declarative/import.lua b/kong/db/declarative/import.lua index 2030da85359a..d347c19e57c7 100644 --- a/kong/db/declarative/import.lua +++ b/kong/db/declarative/import.lua @@ -286,6 +286,8 @@ local function _set_entity_for_txn(t, entity_name, item, options, is_delete) local itm_key = item_key(entity_name, ws_id, pk) + ngx.log(ngx.INFO, "xxx pk:", tostring(pk), " itm_key:", tostring(itm_key)) + -- if we are deleting, item_value and idx_value should be nil local itm_value, idx_value diff --git a/kong/db/strategies/off/init.lua b/kong/db/strategies/off/init.lua index 1fab71dac502..dc75c25ad29c 100644 --- a/kong/db/strategies/off/init.lua +++ b/kong/db/strategies/off/init.lua @@ -366,6 +366,7 @@ local function select_by_field(self, field, value, options) workspace_id(schema, options) local key = unique_field_key(schema.name, ws_id, field, value) + ngx.log(ngx.INFO, "xxx select from lmdb:", key) return select_by_key(schema, key, true) end diff --git a/kong/hooks.lua b/kong/hooks.lua index 2cbb3a7b65ba..b45246102d23 100644 --- a/kong/hooks.lua +++ b/kong/hooks.lua @@ -78,6 +78,7 @@ end function _M.run_hook(name, a0, a1, a2, a3, a4, a5, ...) + -- ngx.log(ngx.INFO, "xxx run hooks:", name) if not hooks[name] then return a0 -- return only the first value end diff --git a/kong/init.lua b/kong/init.lua index a4f66a1450ab..3c96ef8fef63 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -1178,6 +1178,7 @@ end function Kong.access() + ngx.log(ngx.INFO, "xxx access in") local ctx = ngx.ctx local has_timing = ctx.has_timing @@ -1196,7 +1197,9 @@ function Kong.access() ctx.KONG_PHASE = PHASES.access + ngx.log(ngx.INFO, "xxx access before") runloop.access.before(ctx) + ngx.log(ngx.INFO, "xxx access before end") local plugins_iterator = runloop.get_plugins_iterator() @@ -1231,6 +1234,7 @@ function Kong.access() end runloop.wasm_attach(ctx) + ngx.log(ngx.INFO, "xxx access after") runloop.access.after(ctx) ctx.KONG_ACCESS_ENDED_AT = get_updated_now_ms() @@ -1239,6 +1243,7 @@ function Kong.access() -- we intent to proxy, though balancer may fail on that ctx.KONG_PROXIED = true + ngx.log(ngx.INFO, "xxx access proxied") if ctx.buffered_proxying then local upgrade = var.upstream_upgrade or "" diff --git a/kong/router/atc.lua b/kong/router/atc.lua index 31aaf6c5a4ed..beacd8515e72 100644 --- a/kong/router/atc.lua +++ b/kong/router/atc.lua @@ -441,6 +441,7 @@ end function _M:exec(ctx) + ngx.log(ngx.INFO, "xxx exec") local fields = self.fields local req_uri = ctx and ctx.request_uri or var.request_uri @@ -459,9 +460,13 @@ function _M:exec(ctx) CACHE_PARAMS.uri = req_uri CACHE_PARAMS.host = req_host + ngx.log(ngx.INFO, "xxx CACHE_PARAMS:", require("inspect")(CACHE_PARAMS)) local cache_key = fields:get_cache_key(CACHE_PARAMS) -- cache lookup + -- + + ngx.log(ngx.INFO, "xxx get cache_key:[", cache_key, "]") local match_t = self.cache:get(cache_key) if not match_t then @@ -573,6 +578,7 @@ end function _M:exec(ctx) + ngx.log(ngx.INFO, "xxx exec ") local fields = self.fields -- cache key calculation diff --git a/kong/router/init.lua b/kong/router/init.lua index abec995a5091..3bed88c05793 100644 --- a/kong/router/init.lua +++ b/kong/router/init.lua @@ -22,6 +22,7 @@ local FLAVOR_TO_MODULE = { function _M:exec(ctx) + ngx.log(ngx.INFO, "xxx exec") return self.trad.exec(ctx) end diff --git a/kong/router/traditional.lua b/kong/router/traditional.lua index 7f9bad76cdf9..150934ccf954 100644 --- a/kong/router/traditional.lua +++ b/kong/router/traditional.lua @@ -1542,6 +1542,7 @@ function _M.new(routes, cache, cache_neg) src_ip, src_port, dst_ip, dst_port, sni, req_headers) + ngx.log(ngx.INFO, "xxx find_route: ") check_select_params(req_method, req_uri, req_host, req_scheme, src_ip, src_port, @@ -1753,6 +1754,7 @@ function _M.new(routes, cache, cache_neg) local exec if is_http then exec = function(ctx) + ngx.log(ngx.INFO, "xxx exec") local req_method = get_method() local req_uri = ctx and ctx.request_uri or var.request_uri local req_host = get_header("host", ctx) @@ -1780,6 +1782,7 @@ function _M.new(routes, cache, cache_neg) nil, nil, -- src_ip, src_port nil, nil, -- dst_ip, dst_port sni, headers) +ngx.log(ngx.INFO, "xxx find route: ", tostring(match_t)) if match_t then -- debug HTTP request header logic add_debug_headers(ctx, header, match_t) From fb4a39b364690d1db347f371ceb3d478a342f757 Mon Sep 17 00:00:00 2001 From: Xiaochen Wang Date: Wed, 11 Dec 2024 02:10:56 -0600 Subject: [PATCH 05/10] 1 --- kong/clustering/rpc/concentrator.lua | 6 ++++-- spec/02-integration/09-hybrid_mode/11-status_spec.lua | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index 1b86b9155f68..06a33ab7fd7b 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -202,8 +202,10 @@ function _M:_event_loop(lconn) local _, notifications res, err, _, notifications = lconn:query(sql or "SELECT 1;") -- keepalive - ngx.log(ngx.INFO, "xxx concetrator lconn:query ", sql or "SELECT 1;", " -> ", - require("inspect")(res), " err:" , err, " notifications:", require("inspect")(notifications)) +if sql then + ngx.log(ngx.INFO, "xxx concetrator lconn:query ", sql or "SELECT 1;", " -> ", + require("inspect")(res), " err:" , err, " notifications:", require("inspect")(notifications)) +end if not res then return nil, "query to Postgres failed: " .. err end diff --git a/spec/02-integration/09-hybrid_mode/11-status_spec.lua b/spec/02-integration/09-hybrid_mode/11-status_spec.lua index 71a4a0a66c54..457b5be73409 100644 --- a/spec/02-integration/09-hybrid_mode/11-status_spec.lua +++ b/spec/02-integration/09-hybrid_mode/11-status_spec.lua @@ -174,7 +174,7 @@ for _, strategy in helpers.each_strategy() do -- sleep > 10s , it will succeed - ngx.sleep(11) + ngx.sleep(1) print("+++++ post /services") From b618ea636ca7b54fb8e640829eddaf51408ed741 Mon Sep 17 00:00:00 2001 From: Xiaochen Wang Date: Wed, 11 Dec 2024 02:26:07 -0600 Subject: [PATCH 06/10] 1 --- kong/clustering/rpc/socket.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 5cebf7edbc75..e68ee45d3541 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -91,7 +91,7 @@ end -- start reader and writer thread and event loop function _M:start() - ngx.log(ngx.INFO, "xxx socket:start ", debug.traceback()) + -- ngx.log(ngx.INFO, "xxx socket:start ", debug.traceback()) self.read_thread = ngx.thread.spawn(function() local last_seen = ngx_time() From c8d763759f9b584f95aec8e6f06d76fea5313a89 Mon Sep 17 00:00:00 2001 From: Xiaochen Wang Date: Wed, 11 Dec 2024 02:29:17 -0600 Subject: [PATCH 07/10] 1 --- kong/clustering/rpc/socket.lua | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index e68ee45d3541..5ed0cf536df5 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -98,7 +98,7 @@ function _M:start() while not exiting() do local data, typ, err = self.wb:recv_frame() - ngx.log(ngx.INFO, "xxx reading from websocket: ", data, " typ:", typ, " err:", err) + --ngx.log(ngx.INFO, "xxx reading from websocket: ", data, " typ:", typ, " err:", err) if err then if not is_timeout(err) then @@ -202,10 +202,10 @@ function _M:start() while not exiting() do local payload, err = self.outgoing:pop(5) - ngx.log(ngx.INFO, "xxx socket write_thread: outgoing:pop: ", payload and payload.method, " err:", err) + --ngx.log(ngx.INFO, "xxx socket write_thread: outgoing:pop: ", payload and payload.method, " err:", err) if err then - ngx.log(ngx.ERR, "xxx socket write thread quit!!!!!!!!!!!!!!!!!! ", err) + --ngx.log(ngx.ERR, "xxx socket write thread quit!!!!!!!!!!!!!!!!!! ", err) return nil, err end From 6077dd21f8f7726839e78246f3daa400a4cd6af9 Mon Sep 17 00:00:00 2001 From: Xiaochen Wang Date: Wed, 11 Dec 2024 02:35:02 -0600 Subject: [PATCH 08/10] 1 --- kong/clustering/rpc/manager.lua | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 4e360f2d61cd..34ffac7b4c03 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -199,6 +199,8 @@ function _M:_handle_meta_call(c) local capabilities_list = info.rpc_capabilities local node_id = info.kong_node_id +ngx.log(ngx.INFO, "xxx meta call processing succeeds! assign self.client_capabilities[", node_id,"]") + self.client_capabilities[node_id] = { set = pl_tablex_makeset(capabilities_list), list = capabilities_list, From 25fdbd70527343dfcf257e183aae3da11136a6c9 Mon Sep 17 00:00:00 2001 From: Xiaochen Wang Date: Wed, 11 Dec 2024 02:45:49 -0600 Subject: [PATCH 09/10] 1 --- kong/clustering/control_plane.lua | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kong/clustering/control_plane.lua b/kong/clustering/control_plane.lua index 9426d38d8660..569ff17975cc 100644 --- a/kong/clustering/control_plane.lua +++ b/kong/clustering/control_plane.lua @@ -262,6 +262,9 @@ function _M:handle_cp_websocket(cert) -- only update rpc_capabilities if dp_id is connected rpc_capabilities = rpc_peers and rpc_peers[dp_id] or {}, }, { ttl = purge_delay, no_broadcast_crud_event = true, }) + +ngx.log(ngx.INFO, "xxx kong.db.clustering_data_planes:upsert ", tostring(dp_id), " ttl:", tostring(purge_delay), " -> ", tostring(ok), " err:", tostring(err)) + if not ok then ngx_log(ngx_ERR, _log_prefix, "unable to update clustering data plane status: ", err, log_suffix) end From ad420e8448c306c3973af8b6cb60e2f38504062a Mon Sep 17 00:00:00 2001 From: Xiaochen Wang Date: Wed, 11 Dec 2024 02:51:07 -0600 Subject: [PATCH 10/10] 1 --- spec/02-integration/09-hybrid_mode/11-status_spec.lua | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spec/02-integration/09-hybrid_mode/11-status_spec.lua b/spec/02-integration/09-hybrid_mode/11-status_spec.lua index 457b5be73409..7db6212fdb09 100644 --- a/spec/02-integration/09-hybrid_mode/11-status_spec.lua +++ b/spec/02-integration/09-hybrid_mode/11-status_spec.lua @@ -127,6 +127,8 @@ for _, strategy in helpers.each_strategy() do end end, 10) + ngx.sleep(1) + assert(helpers.stop_kong("serve_cp", nil, nil, "QUIT", false)) -- DP should keep return 200 after CP is shut down