Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clustering/rpc): support jsonrpc notification #13948

Merged
merged 44 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
f898bae
json rpc notification
chronolaw Nov 29, 2024
01c971c
notify func
chronolaw Nov 29, 2024
d857a28
check callback
chronolaw Nov 29, 2024
0b93cd2
clean
chronolaw Nov 29, 2024
027de85
manager.notify
chronolaw Nov 29, 2024
cda23fa
concentrator _local_call
chronolaw Nov 29, 2024
71f439a
clean
chronolaw Nov 29, 2024
e8749e0
notify new version
chronolaw Nov 29, 2024
b13864d
test with multiple workers
chronolaw Nov 29, 2024
c1da016
future
chronolaw Nov 29, 2024
1254755
clean
chronolaw Dec 3, 2024
5ecc980
custom plugin for testing
chronolaw Dec 4, 2024
74622f9
clean
chronolaw Dec 4, 2024
d2fae35
wait_until for log
chronolaw Dec 4, 2024
57bc592
clean
chronolaw Dec 4, 2024
0a69976
clean
chronolaw Dec 4, 2024
af8adf6
helpers.pwait_until
chronolaw Dec 4, 2024
34515d3
debug logs
chronolaw Dec 4, 2024
a0c6c03
timeout 10
chronolaw Dec 4, 2024
6f52680
comments
chronolaw Dec 4, 2024
570efa8
timeout 15
chronolaw Dec 4, 2024
8ca7cbd
sleep 0.5
chronolaw Dec 4, 2024
439d13c
rewrite phase
chronolaw Dec 4, 2024
8f4851e
_log_prefix
chronolaw Dec 4, 2024
a9ee628
kong.configuration.role
chronolaw Dec 4, 2024
543e7c8
init phase
chronolaw Dec 5, 2024
c0e8a77
init_worker phase
chronolaw Dec 5, 2024
43f2ebd
check assert
chronolaw Dec 5, 2024
9d1bcd4
check wait for notification
chronolaw Dec 5, 2024
1bcb368
helpers.get_db_utils
chronolaw Dec 5, 2024
0bace1a
remove test cases
chronolaw Dec 6, 2024
c372a93
test with incremental sync
chronolaw Dec 16, 2024
fbd1ccc
typo fix
chronolaw Dec 16, 2024
9307bbc
simplify code of call()
chronolaw Dec 16, 2024
34a8406
refactor tests
chronolaw Dec 16, 2024
fda4f27
lint fix
chronolaw Dec 16, 2024
38e67c8
comments
chronolaw Dec 16, 2024
d688798
Revert "lint fix"
chronolaw Dec 17, 2024
a5b16a4
Revert "refactor tests"
chronolaw Dec 17, 2024
893e8f9
Revert "typo fix"
chronolaw Dec 17, 2024
047fcf9
Revert "test with incremental sync"
chronolaw Dec 17, 2024
efbe190
Revert "test with multiple workers"
chronolaw Dec 17, 2024
de7ca1e
revert notify call
chronolaw Dec 17, 2024
b01dc09
Revert "comments"
chronolaw Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions kong/clustering/rpc/concentrator.lua
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,14 @@ function _M:_event_loop(lconn)
"unknown requester for RPC")

local res, err = self.manager:_local_call(target_id, payload.method,
payload.params)
payload.params, not payload.id)

-- notification has no callback or id
if not payload.id then
ngx_log(ngx_DEBUG, "[rpc] notification has no response")
goto continue
end

if res then
-- call success
res, err = self:_enqueue_rpc_response(reply_to, {
Expand All @@ -180,6 +187,8 @@ function _M:_event_loop(lconn)
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err)
end
end

::continue::
end
end
end
Expand Down Expand Up @@ -287,9 +296,13 @@ end
-- This way the manager code wouldn't tell the difference
-- between calls made over WebSocket or concentrator
function _M:call(node_id, method, params, callback)
local id = self:_get_next_id()
local id

self.interest[id] = callback
-- notification has no callback or id
if callback then
id = self:_get_next_id()
self.interest[id] = callback
end

return self:_enqueue_rpc_request(node_id, {
jsonrpc = jsonrpc.VERSION,
Expand Down
27 changes: 21 additions & 6 deletions kong/clustering/rpc/future.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,36 @@ local STATE_SUCCEED = 3
local STATE_ERRORED = 4


function _M.new(node_id, socket, method, params)
function _M.new(node_id, socket, method, params, is_notification)
local self = {
method = method,
params = params,
sema = semaphore.new(),
socket = socket,
node_id = node_id,
id = nil,
result = nil,
error = nil,
state = STATE_NEW, -- STATE_*
is_notification = is_notification,
}

if not is_notification then
self.id = nil
self.result = nil
self.error = nil
self.state = STATE_NEW -- STATE_*
self.sema = semaphore.new()
end

return setmetatable(self, _MT)
end


-- start executing the future
function _M:start()
-- notification has no callback
if self.is_notification then
return self.socket:call(self.node_id,
self.method,
self.params)
end

assert(self.state == STATE_NEW)
self.state = STATE_IN_PROGRESS

Expand Down Expand Up @@ -60,6 +71,10 @@ end


function _M:wait(timeout)
if self.is_notification then
return nil, "the notification cannot be waited"
end

assert(self.state == STATE_IN_PROGRESS)

local res, err = self.sema:wait(timeout)
Expand Down
68 changes: 45 additions & 23 deletions kong/clustering/rpc/manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ local CLUSTERING_PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL
local parse_proxy_url = require("kong.clustering.utils").parse_proxy_url


local _log_prefix = "[rpc] "
local RPC_MATA_V1 = "kong.meta.v1"
local RPC_SNAPPY_FRAMED = "x-snappy-framed"

Expand Down Expand Up @@ -276,7 +277,7 @@ end
-- low level helper used internally by :call() and concentrator
-- this one does not consider forwarding using concentrator
-- when node does not exist
function _M:_local_call(node_id, method, params)
function _M:_local_call(node_id, method, params, is_notification)
if not self.client_capabilities[node_id] then
return nil, "node is not connected, node_id: " .. node_id
end
Expand All @@ -289,9 +290,14 @@ function _M:_local_call(node_id, method, params)

local s = next(self.clients[node_id]) -- TODO: better LB?

local fut = future.new(node_id, s, method, params)
local fut = future.new(node_id, s, method, params, is_notification)
assert(fut:start())

-- notification need not to wait
if is_notification then
return true
end

local ok, err = fut:wait(5)
if err then
return nil, err
Expand All @@ -305,9 +311,7 @@ function _M:_local_call(node_id, method, params)
end


-- public interface, try call on node_id locally first,
-- if node is not connected, try concentrator next
function _M:call(node_id, method, ...)
function _M:_call_or_notify(is_notification, node_id, method, ...)
local cap = utils.parse_method_name(method)

local res, err = self:_find_node_and_check_capability(node_id, cap)
Expand All @@ -318,50 +322,68 @@ function _M:call(node_id, method, ...)
local params = {...}

ngx_log(ngx_DEBUG,
"[rpc] calling ", method,
_log_prefix,
is_notification and "notifying " or "calling ",
method,
"(node_id: ", node_id, ")",
" via ", res == "local" and "local" or "concentrator"
)

if res == "local" then
res, err = self:_local_call(node_id, method, params)
res, err = self:_local_call(node_id, method, params, is_notification)

if not res then
ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", err)
ngx_log(ngx_DEBUG, _log_prefix, method, " failed, err: ", err)
return nil, err
end

ngx_log(ngx_DEBUG, "[rpc] ", method, " succeeded")
ngx_log(ngx_DEBUG, _log_prefix, method, " succeeded")

return res
end

assert(res == "concentrator")

-- try concentrator
local fut = future.new(node_id, self.concentrator, method, params)
local fut = future.new(node_id, self.concentrator, method, params, is_notification)
assert(fut:start())

if is_notification then
return true
end

local ok, err = fut:wait(5)

if err then
ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", err)
ngx_log(ngx_DEBUG, _log_prefix, method, " failed, err: ", err)

return nil, err
end

if ok then
ngx_log(ngx_DEBUG, "[rpc] ", method, " succeeded")
ngx_log(ngx_DEBUG, _log_prefix, method, " succeeded")

return fut.result
end

ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", fut.error.message)
ngx_log(ngx_DEBUG, _log_prefix, method, " failed, err: ", fut.error.message)

return nil, fut.error.message
end


-- public interface, try call on node_id locally first,
-- if node is not connected, try concentrator next
function _M:call(node_id, method, ...)
return self:_call_or_notify(false, node_id, method, ...)
end


function _M:notify(node_id, method, ...)
return self:_call_or_notify(true, node_id, method, ...)
end


-- handle incoming client connections
function _M:handle_websocket()
local rpc_protocol = ngx_var.http_sec_websocket_protocol
Expand All @@ -379,15 +401,15 @@ function _M:handle_websocket()
end

if not meta_v1_supported then
ngx_log(ngx_ERR, "[rpc] unknown RPC protocol: " ..
ngx_log(ngx_ERR, _log_prefix, "unknown RPC protocol: " ..
tostring(rpc_protocol) ..
", doesn't know how to communicate with client")
return ngx_exit(ngx.HTTP_CLOSE)
end

local cert, err = validate_client_cert(self.conf, self.cluster_cert, ngx_var.ssl_client_raw_cert)
if not cert then
ngx_log(ngx_ERR, "[rpc] client's certificate failed validation: ", err)
ngx_log(ngx_ERR, _log_prefix, "client's certificate failed validation: ", err)
return ngx_exit(ngx.HTTP_CLOSE)
end

Expand All @@ -396,14 +418,14 @@ function _M:handle_websocket()

local wb, err = server:new(WS_OPTS)
if not wb then
ngx_log(ngx_ERR, "[rpc] unable to establish WebSocket connection with client: ", err)
ngx_log(ngx_ERR, _log_prefix, "unable to establish WebSocket connection with client: ", err)
return ngx_exit(ngx.HTTP_CLOSE)
end

-- if timeout (default is 5s) we will close the connection
local node_id, err = self:_handle_meta_call(wb)
if not node_id then
ngx_log(ngx_ERR, "[rpc] unable to handshake with client: ", err)
ngx_log(ngx_ERR, _log_prefix, "unable to handshake with client: ", err)
return ngx_exit(ngx.HTTP_CLOSE)
end

Expand All @@ -415,7 +437,7 @@ function _M:handle_websocket()
self:_remove_socket(s)

if not res then
ngx_log(ngx_ERR, "[rpc] RPC connection broken: ", err, " node_id: ", node_id)
ngx_log(ngx_ERR, _log_prefix, "RPC connection broken: ", err, " node_id: ", node_id)
return ngx_exit(ngx.ERROR)
end

Expand Down Expand Up @@ -488,7 +510,7 @@ function _M:connect(premature, node_id, host, path, cert, key)

local ok, err = c:connect(uri, opts)
if not ok then
ngx_log(ngx_ERR, "[rpc] unable to connect to peer: ", err)
ngx_log(ngx_ERR, _log_prefix, "unable to connect to peer: ", err)
goto err
end

Expand All @@ -497,7 +519,7 @@ function _M:connect(premature, node_id, host, path, cert, key)
-- FIXME: resp_headers should not be case sensitive

if not resp_headers or not resp_headers["sec_websocket_protocol"] then
ngx_log(ngx_ERR, "[rpc] peer did not provide sec_websocket_protocol, node_id: ", node_id)
ngx_log(ngx_ERR, _log_prefix, "peer did not provide sec_websocket_protocol, node_id: ", node_id)
c:send_close() -- can't do much if this fails
goto err
end
Expand All @@ -506,15 +528,15 @@ function _M:connect(premature, node_id, host, path, cert, key)
local meta_cap = resp_headers["sec_websocket_protocol"]

if meta_cap ~= RPC_MATA_V1 then
ngx_log(ngx_ERR, "[rpc] did not support protocol : ", meta_cap)
ngx_log(ngx_ERR, _log_prefix, "did not support protocol : ", meta_cap)
c:send_close() -- can't do much if this fails
goto err
end

-- if timeout (default is 5s) we will close the connection
local ok, err = self:_meta_call(c, meta_cap, node_id)
if not ok then
ngx_log(ngx_ERR, "[rpc] unable to handshake with server, node_id: ", node_id,
ngx_log(ngx_ERR, _log_prefix, "unable to handshake with server, node_id: ", node_id,
" err: ", err)
c:send_close() -- can't do much if this fails
goto err
Expand All @@ -529,7 +551,7 @@ function _M:connect(premature, node_id, host, path, cert, key)
self:_remove_socket(s)

if not ok then
ngx_log(ngx_ERR, "[rpc] connection to node_id: ", node_id, " broken, err: ",
ngx_log(ngx_ERR, _log_prefix, "connection to node_id: ", node_id, " broken, err: ",
err, ", reconnecting in ", reconnection_delay, " seconds")
end
end
Expand Down
25 changes: 20 additions & 5 deletions kong/clustering/rpc/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ function _M._dispatch(premature, self, cb, payload)
if not res then
ngx_log(ngx_WARN, "[rpc] RPC callback failed: ", err)

-- notification has no response
if not payload.id then
return
end

res, err = self.outgoing:push(new_error(payload.id, jsonrpc.SERVER_ERROR,
err))
if not res then
Expand All @@ -77,6 +82,12 @@ function _M._dispatch(premature, self, cb, payload)
return
end

-- notification has no response
if not payload.id then
ngx_log(ngx_DEBUG, "[rpc] notification has no response")
return
end

-- success
res, err = self.outgoing:push({
jsonrpc = jsonrpc.VERSION,
Expand Down Expand Up @@ -151,7 +162,7 @@ function _M:start()
ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload.method, " (id: ", payload.id, ")")

local dispatch_cb = self.manager.callbacks.callbacks[payload.method]
if not dispatch_cb then
if not dispatch_cb and payload.id then
Copy link
Contributor

@chobits chobits Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the Json-RPC 2.0 spec, I think the reply MUST contain id field. So could we check whether there is a payload.id right after the line local payload = decompress_payload(data)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like

local payload = decompress_payload(data)

-- check id firstly
if not payload.id then
   return error_log("not found id")
end

if type(payload.id) ~= number then
   return error_log("invalid id: not a number")
end

-- From now on, we assure that payload.id exists and it is a number
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the examles in json rpc spec:

--> {"jsonrpc": "2.0", "method": "update", "params": [1,2,3,4,5]}
--> {"jsonrpc": "2.0", "method": "foobar"}

local res, err = self.outgoing:push(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND))
if not res then
return nil, "unable to send \"METHOD_NOT_FOUND\" error back to client: " .. err
Expand All @@ -162,9 +173,9 @@ function _M:start()

-- call dispatch
local res, err = kong.timer:named_at(string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s",
self.node_id, payload.id, payload.method),
self.node_id, payload.id or 0, payload.method),
0, _M._dispatch, self, dispatch_cb, payload)
if not res then
if not res and payload.id then
local reso, erro = self.outgoing:push(new_error(payload.id, jsonrpc.INTERNAL_ERROR))
if not reso then
return nil, "unable to send \"INTERNAL_ERROR\" error back to client: " .. erro
Expand Down Expand Up @@ -271,9 +282,13 @@ end
function _M:call(node_id, method, params, callback)
assert(node_id == self.node_id)

local id = self:_get_next_id()
local id

self.interest[id] = callback
-- notification has no callback or id
if callback then
id = self:_get_next_id()
self.interest[id] = callback
end

return self.outgoing:push({
jsonrpc = jsonrpc.VERSION,
Expand Down
Loading