Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(plugins/aws-lambda): support Lambda response streaming #13176

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/unreleased/kong/feat-aws-lambda-streaming.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
message: |
**AWS-Lambda**: A new configuration field `invoke_mode` is now added to control whether to invoke AWS Lambda in buffered mode or streaming mode.
type: feature
scope: Plugin
302 changes: 214 additions & 88 deletions kong/plugins/aws-lambda/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ local VIA_HEADER = constants.HEADERS.VIA
local VIA_HEADER_VALUE = meta._NAME .. "/" .. meta._VERSION

local request_util = require "kong.plugins.aws-lambda.request-util"
local AWS_Stream = require("resty.aws.stream")
local build_request_payload = request_util.build_request_payload
local extract_proxy_response = request_util.extract_proxy_response
local remove_array_mt_for_empty_table = request_util.remove_array_mt_for_empty_table
local process_http_integration_response = request_util.process_http_integration_response

local aws = require("resty.aws")
local AWS_GLOBAL_CONFIG
Expand Down Expand Up @@ -71,10 +73,215 @@ local AWSLambdaHandler = {
}


function AWSLambdaHandler:access(conf)
local function invoke_buffered(conf, lambda_service)
-- TRACING: set KONG_WAITING_TIME start
local kong_wait_time_start = get_now()

local res, err = lambda_service:invoke({
FunctionName = conf.function_name,
InvocationType = conf.invocation_type,
LogType = conf.log_type,
Payload = build_request_payload(conf),
Qualifier = conf.qualifier,
})

-- TRACING: set KONG_WAITING_TIME stop
local ctx = ngx.ctx
local lambda_wait_time_total = get_now() - kong_wait_time_start
-- setting the latency here is a bit tricky, but because we are not
-- actually proxying, it will not be overwritten
ctx.KONG_WAITING_TIME = lambda_wait_time_total
kong.ctx.plugin.waiting_time = lambda_wait_time_total

if err then
return error(err)
end

local content = res.body
if res.status >= 400 then
return error(content.Message)
end

local headers = res.headers

-- Remove Content-Length header returned by Lambda service,
-- to make sure returned response length will be correctly calculated
-- afterwards.
headers["Content-Length"] = nil
-- We're responding with the header returned from Lambda service
-- Remove hop-by-hop headers to prevent it from being sent to client
if ngx_var.http2 then
headers["Connection"] = nil
headers["Keep-Alive"] = nil
headers["Proxy-Connection"] = nil
headers["Upgrade"] = nil
headers["Transfer-Encoding"] = nil
end

local status
if conf.is_proxy_integration then
local proxy_response, err = extract_proxy_response(content)
if not proxy_response then
kong.log.err(err)
return kong.response.exit(502, { message = "Bad Gateway",
error = "could not JSON decode Lambda " ..
"function response: " .. err })
end

status = proxy_response.status_code
headers = kong.table.merge(headers, proxy_response.headers)
content = proxy_response.body
end

if not status then
if conf.unhandled_status
and headers["X-Amz-Function-Error"] == "Unhandled"
then
status = conf.unhandled_status

else
status = res.status
end
end

headers = kong.table.merge(headers) -- create a copy of headers

if kong.configuration.enabled_headers[VIA_HEADER] then
headers[VIA_HEADER] = VIA_HEADER_VALUE
end

-- TODO: remove this in the next major release
-- function to remove array_mt metatables from empty tables
-- This is just a backward compatibility code to keep a
-- long-lived behavior that Kong responsed JSON objects
-- instead of JSON arrays for empty arrays.
if conf.empty_arrays_mode == "legacy" then
local ct = headers["Content-Type"]
if ct and ct:lower():match("application/.*json") then
content = remove_array_mt_for_empty_table(content)
end
end

return kong.response.exit(status, content, headers)
end

local function invoke_streaming(conf, lambda_service)
-- TRACING: set KONG_WAITING_TIME start
local kong_wait_time_start = get_now()

local res, err = lambda_service:invokeWithResponseStream({
FunctionName = conf.function_name,
InvocationType = conf.invocation_type,
LogType = conf.log_type,
Payload = build_request_payload(conf),
Qualifier = conf.qualifier,
})
if err or res == nil then
-- print("error" .. err)
return error(err)
end

local headers = res.headers
-- We're responding with the header returned from Lambda service
-- Remove hop-by-hop headers to prevent it from being sent to client
if ngx_var.http2 then
headers["Connection"] = nil
headers["Keep-Alive"] = nil
headers["Proxy-Connection"] = nil
headers["Upgrade"] = nil
headers["Transfer-Encoding"] = nil
end
headers = kong.table.merge(headers) -- create a copy of headers
if kong.configuration.enabled_headers[VIA_HEADER] then
headers[VIA_HEADER] = VIA_HEADER_VALUE
end

-- if error, set status and header, return error
if res.status > 400 then
ngx.status = res.status
for k, v in pairs(headers) do
ngx.header[k] = v
-- print("Header [" .. k .. "] = " .. v)
end
return error(res.body.Message)
end

-- not error, set default status and content type
ngx.status = 200
headers["Content-Type"] = headers["x-amzn-Remapped-Content-Type"]

-- record x-amzn-Remapped-Content-Type and clear it
local is_http_integration_response = headers["x-amzn-Remapped-Content-Type"] == "application/vnd.awslambda.http-integration-response"
headers["x-amzn-Remapped-Content-Type"] = nil

-- read from the body stream
local reader = res.body_reader
local buffer_size = 8192

-- for http integration response, parse the prelude just like Lambda Function Url
local first_chunk_body = ""
if is_http_integration_response then
first_chunk_body, err = process_http_integration_response(reader, buffer_size, headers)
if err then
return error(err)
end
end

-- set headers to ngx
for k, v in pairs(headers) do
ngx.header[k] = v
-- print("Header [" .. k .. "] = " .. v)
end

-- send the first chunk if any
if first_chunk_body ~= "" then
ngx.print(first_chunk_body)
ngx.flush(true)
end

repeat
local chunk, err = reader(buffer_size)
if err then
return error(err)
end

if chunk then
-- the chunk is in `application/vnd.amazon.eventstream` formatted
-- which is a binary format, we need to parse it
local parser, err = AWS_Stream:new(chunk, false)
if err or parser == nil then
-- print("ERROR: ", err)
return error(err)
end

while true do
local msg = parser:next_message()

if not msg then
break
end

-- print(require("pl.pretty").write(msg))
if msg.headers[":event-type"] == "PayloadChunk" then
-- print(msg.body)
ngx.print(msg.body)
ngx.flush(true)
end
end
end
until not chunk

-- TRACING: set KONG_WAITING_TIME stop
local ctx = ngx.ctx
-- setting the latency here is a bit tricky, but because we are not
-- actually proxying, it will not be overwritten
ctx.KONG_WAITING_TIME = get_now() - kong_wait_time_start

ngx.eof()
ngx.exit(ngx.OK)
end

function AWSLambdaHandler:access(conf)
if initialize then
initialize()
end
Expand Down Expand Up @@ -166,94 +373,13 @@ function AWSLambdaHandler:access(conf)
LAMBDA_SERVICE_CACHE:set(cache_key, lambda_service)
end

local upstream_body_json = build_request_payload(conf)

local res, err = lambda_service:invoke({
FunctionName = conf.function_name,
InvocationType = conf.invocation_type,
LogType = conf.log_type,
Payload = upstream_body_json,
Qualifier = conf.qualifier,
})

-- TRACING: set KONG_WAITING_TIME stop
local ctx = ngx.ctx
local lambda_wait_time_total = get_now() - kong_wait_time_start
-- setting the latency here is a bit tricky, but because we are not
-- actually proxying, it will not be overwritten
ctx.KONG_WAITING_TIME = lambda_wait_time_total
kong.ctx.plugin.waiting_time = lambda_wait_time_total

if err then
return error(err)
end

local content = res.body
if res.status >= 400 then
return error(content.Message)
if conf.invoke_mode == "BUFFERED" then
return invoke_buffered(conf, lambda_service)
elseif conf.invoke_mode == "RESPONSE_STREAM" then
return invoke_streaming(conf, lambda_service)
else
return error(fmt("invalid invoke mode (%s)", conf.invoke_mode))
end

local headers = res.headers

-- Remove Content-Length header returned by Lambda service,
-- to make sure returned response length will be correctly calculated
-- afterwards.
headers["Content-Length"] = nil
-- We're responding with the header returned from Lambda service
-- Remove hop-by-hop headers to prevent it from being sent to client
if ngx_var.http2 then
headers["Connection"] = nil
headers["Keep-Alive"] = nil
headers["Proxy-Connection"] = nil
headers["Upgrade"] = nil
headers["Transfer-Encoding"] = nil
end

local status
if conf.is_proxy_integration then
local proxy_response, err = extract_proxy_response(content)
if not proxy_response then
kong.log.err(err)
return kong.response.exit(502, { message = "Bad Gateway",
error = "could not JSON decode Lambda " ..
"function response: " .. err })
end

status = proxy_response.status_code
headers = kong.table.merge(headers, proxy_response.headers)
content = proxy_response.body
end

if not status then
if conf.unhandled_status
and headers["X-Amz-Function-Error"] == "Unhandled"
then
status = conf.unhandled_status

else
status = res.status
end
end

headers = kong.table.merge(headers) -- create a copy of headers

if kong.configuration.enabled_headers[VIA_HEADER] then
headers[VIA_HEADER] = VIA_HEADER_VALUE
end

-- TODO: remove this in the next major release
-- function to remove array_mt metatables from empty tables
-- This is just a backward compatibility code to keep a
-- long-lived behavior that Kong responsed JSON objects
-- instead of JSON arrays for empty arrays.
if conf.empty_arrays_mode == "legacy" then
local ct = headers["Content-Type"]
if ct and ct:lower():match("application/.*json") then
content = remove_array_mt_for_empty_table(content)
end
end

return kong.response.exit(status, content, headers)
end


Expand Down
Loading