Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support keepalive events on load tests #23

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 126 additions & 78 deletions load_test/lib/load_test/user/sse.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule SseUser do
require Logger

alias SseUser.SseConnection

defmodule SseState do
defstruct [
:user_name,
Expand All @@ -13,41 +15,9 @@ defmodule SseUser do
]
end

defp build_headers(context, topic) do
iat = :os.system_time(:second)
exp = iat + context.sse_jwt_expiration

jwt = %{
"iss" => context.sse_jwt_issuer,
"exp" => exp,
"iat" => iat,
"aud" => context.sse_jwt_audience,
"sub" => topic
}

jws = %{
"alg" => "HS256"
}

signed = JOSE.JWT.sign(context.sse_jwt_secret, jws, jwt)
{%{alg: :jose_jws_alg_hmac}, compact_signed} = JOSE.JWS.compact(signed)

[{["Authorization"], "Bearer #{compact_signed}"}]
end

def run(context, user_name, topic, expected_messages) do
url = context.sse_url

Logger.debug(fn ->
"#{user_name}: Starting SSE client on url #{url}, topic #{topic}, expecting #{length(expected_messages)} messages"
end)

headers = build_headers(context, topic)
http_request_opts = []

{:ok, request_id} =
:httpc.request(:get, {url, headers}, http_request_opts, [{:sync, false}, {:stream, :self}])

state = %SseState{
user_name: user_name,
start_time: :os.system_time(:millisecond),
Expand All @@ -60,42 +30,37 @@ defmodule SseUser do
end
}

# Adding a padding message for the connection message
wait_for_messages(state, request_id, ["" | expected_messages])
end

defp wait_for_messages(state, request_id, [first_message | remaining_messages]) do
Logger.debug(fn -> "#{header(state)} Waiting for message: #{first_message}" end)
SseConnection.start(context, fn -> header(state) end, url, topic)

receive do
{:http, {_, {:error, msg}}} ->
Logger.error("#{header(state)} Http error: #{inspect(msg)}")
:ok = :httpc.cancel_request(request_id)
Stats.inc_msg_received_http_error()
raise("#{header(state)} Http error")

{:http, {_, :stream, msg}} ->
msg = String.trim(msg)
Logger.debug(fn -> "#{header(state)} Received message: #{inspect(msg)}" end)
check_message(state, msg, first_message)

{:http, {_, :stream_start, headers}} ->
{~c"x-sse-server", server} = List.keyfind(headers, ~c"x-sse-server", 0)

{:sse_connected, server, request_id} ->
Logger.info(fn ->
"#{header(state)} Connected, waiting: #{length(remaining_messages) + 1} messages, url #{state.url}, remote server: #{server}"
"#{header(state)} Connected, waiting for messages, url #{state.url}, remote server: #{server}"
end)

state.start_publisher_callback.()

msg ->
Logger.error("#{header(state)} Unexpected message #{inspect(msg)}")
:ok = :httpc.cancel_request(request_id)
raise("#{header(state)} Unexpected message")
wait_for_messages(state, request_id, expected_messages)

{:dead} ->
Logger.error("#{header(state)} SSE connection died")

other_message ->
Logger.error("#{header(state)} Unexpected message: #{inspect(other_message)}")
end
end

defp wait_for_messages(state, request_id, [first_message | remaining_messages]) do
Logger.info(fn -> "#{header(state)} Waiting for message: #{first_message}" end)

receive do
{:sse_event, sse_event} ->
Logger.debug(fn -> "#{header(state)} Received message: #{inspect(sse_event)}" end)
check_message(state, sse_event, first_message)
after
state.sse_timeout ->
Logger.error(
"#{header(state)} Timeout waiting for message (timeout=#{state.sse_timeout}ms), remaining: #{length(remaining_messages) + 1} messages, url #{state.url}"
"#{header(state)} Timeout waiting for message (timeout=#{state.sse_timeout}ms), url #{state.url}"
)

Stats.inc_msg_received_timeout()
Expand All @@ -120,31 +85,114 @@ defmodule SseUser do
end

defp check_message(state, received_message, expected_message) do
clean_received_message = String.replace(received_message, ~r"id: .*\nevent: .*\n", "")
delay = :os.system_time(:millisecond) - String.to_integer(received_message.id)
Stats.observe_propagation(delay)

try do
[_, ts, message, _, _] = String.split(clean_received_message, " ", parts: 5)
current_ts = :os.system_time(:millisecond)
delay = current_ts - String.to_integer(ts)
Stats.observe_propagation(delay)
Logger.info(fn ->
"#{header(state)} Propagation delay for message #{received_message.data} is #{delay}ms"
end)

Logger.debug(fn ->
"#{header(state)} Propagation delay for message #{message} is #{delay}ms"
end)
[_ts, message, _, _] = String.split(received_message.data, " ", parts: 5)

if message == expected_message do
Stats.inc_msg_received_ok()
else
Stats.inc_msg_received_unexpected_message()
if message == expected_message do
Stats.inc_msg_received_ok()
else
Stats.inc_msg_received_unexpected_message()

Logger.error(
"#{header(state)} Received unexpected message on url #{state.url}: #{inspect(received_message)} instead of #{expected_message}"
)
Logger.error(
"#{header(state)} Received unexpected message on url #{state.url}: #{inspect(received_message)} instead of #{expected_message}"
)
end
end

defmodule SseConnection do
# Start the SSE connection in a sub-process to intercept SSE events and only forward application events to the main process
def start(context, log_context, url, topic) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log_context is a static string which contains a timestamp. So we can not pass it as is

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log_context is a static string which contains a timestamp. So we can not pass it as is

sse_process = self()

{:ok, _task} =
Task.start_link(fn ->
Logger.info("Starting SSE client on url #{url}, topic #{topic}")
headers = build_http_headers(context, topic)

http_request_opts = []

{:ok, request_id} =
:httpc.request(:get, {url, headers}, http_request_opts, [
{:sync, false},
{:stream, :self}
])

loop(log_context, request_id, sse_process)
end)
end

defp build_http_headers(context, topic) do
iat = :os.system_time(:second)
exp = iat + context.sse_jwt_expiration

jwt = %{
"iss" => context.sse_jwt_issuer,
"exp" => exp,
"iat" => iat,
"aud" => context.sse_jwt_audience,
"sub" => topic
}

jws = %{
"alg" => "HS256"
}

signed = JOSE.JWT.sign(context.sse_jwt_secret, jws, jwt)
{%{alg: :jose_jws_alg_hmac}, compact_signed} = JOSE.JWS.compact(signed)

[{["Authorization"], "Bearer #{compact_signed}"}]
end

defp loop(log_context, request_id, main_process) do
receive do
{:http, {_, {:error, msg}}} ->
Logger.error("#{log_context.()} Http error: #{inspect(msg)}")
:ok = :httpc.cancel_request(request_id)
Stats.inc_msg_received_http_error()
raise("#{log_context.()} Http error")

{:http, {_, :stream_start, headers}} ->
{~c"x-sse-server", server} = List.keyfind(headers, ~c"x-sse-server", 0)

send(main_process, {:sse_connected, server, request_id})

{:http, {_, :stream, msg}} ->
sse_event = parse_sse_event(msg)

case sse_event.event do
# Events not part of the application messages, they are filtered out
event_name when event_name in ["timeout", "ping", "reconnect"] ->
Logger.debug("Received technical SSE event: #{event_name}")

# Event part of the application messages, they are forwarded to the main process
_other_event ->
send(main_process, {:sse_event, sse_event})
end

other_message ->
Logger.error("#{log_context.()} Unexpected message #{inspect(other_message)}")
:ok = :httpc.cancel_request(request_id)
Stats.inc_msg_received_http_error()
send(main_process, {:dead})
end
rescue
e ->
Logger.error("#{header(state)} #{inspect(e)}")
Stats.inc_msg_received_error()

loop(log_context, request_id, main_process)
end

defp parse_sse_event(sse_event_chunk) do
String.split(sse_event_chunk, "\n")
|> Enum.reject(fn line -> String.length(String.trim(line)) == 0 end)
|> Enum.map(fn line ->
[key, value] = String.split(line, ~r/\: ?/, parts: 2)
{String.to_atom(key), value}
end)
|> Enum.into(%{})
end
end
end