Skip to content

Commit

Permalink
Track recordings
Browse files Browse the repository at this point in the history
  • Loading branch information
sgfn committed Dec 18, 2024
1 parent 2ef1582 commit ff8a388
Show file tree
Hide file tree
Showing 4 changed files with 367 additions and 3 deletions.
2 changes: 1 addition & 1 deletion lib/ex_webrtc/media/opus.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule ExWebRTC.Media.Opus do
Returns the duration in milliseconds.
"""
@spec duration(binary()) :: {:ok, float()} | {:error, term()}
@spec duration(binary()) :: {:ok, number()} | {:error, term()}
def duration(<<config::5, rest::bitstring>>) do
with {:ok, frame_count} <- get_frame_count(rest) do
{:ok, frame_count * get_frame_duration(config)}
Expand Down
193 changes: 193 additions & 0 deletions lib/ex_webrtc/recorder.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
defmodule ExWebRTC.Recorder do
@moduledoc """
Saves received RTP packets to a file for later processing/analysis.
Dumps raw RTP packets fed to it in a custom format. Use `Recorder.Converter` to process them.
"""

use GenServer

alias ExWebRTC.MediaStreamTrack

require Logger

@default_base_dir "./recordings"

@typedoc """
Options that can be passed to `start_link/1`.
* `base_dir` - Base directory where Recorder will save its artifacts. `#{@default_base_dir}` by default.
* `on_start` - Callback that will be executed just after the Recorder is (re)started.
It should return the initial list of tracks to be added.
"""
@type option ::
{:base_dir, String.t()}
| {:on_start, (-> [MediaStreamTrack.t()])}

@type options :: [option()]

# Necessary to start Recorder under a supervisor using `{Recorder, [recorder_opts, gen_server_opts]}`
@doc false
@spec child_spec(list()) :: Supervisor.child_spec()
def child_spec(args) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, args}
}
end

@doc """
Starts a new `ExWebRTC.Recorder` process.
`ExWebRTC.Recorder` is a `GenServer` under the hood, thus this function allows for
passing the generic `t:GenServer.options/0` as an argument.
"""
@spec start(options(), GenServer.options()) :: GenServer.on_start()
def start(recorder_opts \\ [], gen_server_opts \\ []) do
GenServer.start(__MODULE__, recorder_opts, gen_server_opts)
end

@doc """
Starts a new `ExWebRTC.Recorder` process.
Works identically to `start/2`, but links to the calling process.
"""
@spec start_link(options(), GenServer.options()) :: GenServer.on_start()
def start_link(recorder_opts \\ [], gen_server_opts \\ []) do
GenServer.start_link(__MODULE__, recorder_opts, gen_server_opts)
end

@doc """
Adds new tracks to the recording.
"""
@spec add_tracks(GenServer.server(), [MediaStreamTrack.t()]) :: :ok | :error
def add_tracks(recorder, tracks) do
# XXX need?
try do
GenServer.call(recorder, {:add_tracks, tracks})
catch
_exit_or_error, _e ->
Logger.error("Recorder is down, not adding tracks")
:error
end
end

@doc """
Records a received packet on the given track.
"""
@spec record(
GenServer.server(),
MediaStreamTrack.id(),
MediaStreamTrack.rid() | nil,
ExRTP.Packet.t()
) :: :ok
def record(recorder, track_id, rid, %ExRTP.Packet{} = packet) do
recv_time = System.monotonic_time(:millisecond)
GenServer.cast(recorder, {:record, track_id, rid, recv_time, packet})
end

@impl true
def init(config) do
base_dir =
(config[:base_dir] || @default_base_dir)
|> Path.join(UUID.uuid4())
|> Path.expand()

:ok = File.mkdir_p!(base_dir)
Logger.info("Starting recorder. Recordings will be saved under: #{base_dir}")

state = %{
base_dir: base_dir,
tracks: %{}
}

if config[:on_start] == nil do
{:ok, state}
else
{:ok, state, {:continue, {:on_start, config[:on_start]}}}
end
end

@impl true
def handle_continue({:on_start, on_start}, state) do
tracks = on_start.()

if Enum.empty?(tracks) do
{:noreply, state}
else
state = do_add_tracks(tracks, state)
{:noreply, state}
end
end

@impl true
def handle_call({:add_tracks, tracks}, _from, state) do
state = do_add_tracks(tracks, state)
{:reply, :ok, state}
end

@impl true
def handle_cast({:record, track_id, rid, recv_time, packet}, state)
when is_map_key(state.tracks, track_id) do
%{file: file, rid_map: rid_map} = state.tracks[track_id]

case rid_map do
%{^rid => rid_idx} ->
:ok = IO.binwrite(file, serialize_packet(packet, rid_idx, recv_time))

_other ->
Logger.warning("""
Tried to save packet for unknown rid. Ignoring. Track id: #{inspect(track_id)}, rid: #{inspect(rid)}.\
""")
end

{:noreply, state}
end

@impl true
def handle_cast({:record, track_id, _rid, _recv_time, _packet}, state) do
Logger.warning("""
Tried to save packet for unknown track id. Ignoring. Track id: #{inspect(track_id)}.\
""")

{:noreply, state}
end

@impl true
def handle_info(_msg, state) do
{:noreply, state}
end

defp do_add_tracks(tracks, state) do
start_time = DateTime.utc_now()

tracks =
Map.new(tracks, fn track ->
path = Path.join(state.base_dir, "#{track.id}.rtpx")
file = File.open!(path, [:write])
rid_map = (track.rids || [nil]) |> Enum.with_index() |> Map.new()

{track.id,
%{kind: track.kind, rid_map: rid_map, path: path, file: file, start_time: start_time}}
end)

state = %{state | tracks: Map.merge(state.tracks, tracks)}
report_path = Path.join(state.base_dir, "report.json")

report =
Map.new(state.tracks, fn {id, track} ->
track = Map.delete(track, :file)
{id, track}
end)

:ok = File.write!(report_path, Jason.encode!(report))

%{state | tracks: tracks}
end

defp serialize_packet(packet, rid_idx, recv_time) do
packet = ExRTP.Packet.encode(packet)
packet_size = byte_size(packet)
<<rid_idx::8, recv_time::64, packet_size::32, packet::binary>>
end
end
171 changes: 171 additions & 0 deletions lib/ex_webrtc/recorder/converter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
defmodule ExWebRTC.Recorder.Converter do
@moduledoc """
Processes RTP packet files saved by `Recorder`.
At the moment, `Converter` works only with VP8 video and Opus audio.
"""

require Logger

alias ExWebRTC.RTP.JitterBuffer.PacketStore
alias ExWebRTC.RTPCodecParameters
alias ExWebRTC.RTP.Depayloader
alias ExWebRTC.Media.{IVF, Ogg}

# TODO: Allow changing these values
@ivf_header_opts [
# <<fourcc::little-32>> = "VP80"
fourcc: 808_996_950,
height: 720,
width: 1280,
num_frames: 1024,
timebase_denum: 24,
timebase_num: 1
]

# TODO: Support codecs other than VP8/Opus
@video_codec_params %RTPCodecParameters{
payload_type: 96,
mime_type: "video/VP8",
clock_rate: 90_000
}

@audio_codec_params %RTPCodecParameters{
payload_type: 111,
mime_type: "audio/opus",
clock_rate: 48_000,
channels: 2
}

@default_output_path "./converter_output"

@doc """
Convert the saved dumps of tracks in the report to IVF and Ogg files.
"""
@spec convert!(Path.t(), Path.t()) :: :ok | no_return()
def convert!(report_path, output_path \\ @default_output_path) do
report_path =
report_path
|> Path.expand()
|> then(
&if(File.dir?(&1),
do: Path.join(&1, "report.json"),
else: &1
)
)

output_path = Path.expand(output_path)
File.mkdir_p!(output_path)

report =
report_path
|> File.read!()
|> Jason.decode!()

for {id, track} <- report do
%{
"path" => path,
"kind" => kind,
"rid_map" => rid_map
} = track

file = File.open!(path)

packets =
read_packets(file, Map.new(rid_map, fn {_rid, rid_idx} -> {rid_idx, %PacketStore{}} end))

case kind do
"video" ->
convert_video_track(id, rid_map, output_path, packets)

"audio" ->
{:ok, writer} =
output_path
|> Path.join("#{id}.ogg")
|> Ogg.Writer.open()

{:ok, depayloader} = Depayloader.new(@audio_codec_params)
do_convert_audio_track(packets |> Map.values() |> hd(), depayloader, writer)
end
end

:ok
end

defp convert_video_track(id, rid_map, output_path, packets) do
for {rid, rid_idx} <- rid_map do
filename = if rid == "nil", do: "#{id}.ivf", else: "#{id}_#{rid}.ivf"

{:ok, writer} =
output_path
|> Path.join(filename)
|> IVF.Writer.open(@ivf_header_opts)

{:ok, depayloader} = Depayloader.new(@video_codec_params)
do_convert_video_track(packets[rid_idx], depayloader, writer)
end
end

defp do_convert_video_track(packets, depayloader, writer, frames_cnt \\ 0)
defp do_convert_video_track([], _depayloader, writer, _frames_cnt), do: IVF.Writer.close(writer)

defp do_convert_video_track([packet | rest], depayloader, writer, frames_cnt) do
case Depayloader.depayload(depayloader, packet) do
{nil, depayloader} ->
do_convert_video_track(rest, depayloader, writer, frames_cnt)

{vp8_frame, depayloader} ->
frame = %IVF.Frame{timestamp: frames_cnt, data: vp8_frame}
{:ok, writer} = IVF.Writer.write_frame(writer, frame)
do_convert_video_track(rest, depayloader, writer, frames_cnt + 1)
end
end

defp do_convert_audio_track([], _depayloader, writer), do: Ogg.Writer.close(writer)

defp do_convert_audio_track([packet | rest], depayloader, writer) do
{opus_packet, depayloader} = Depayloader.depayload(depayloader, packet)
{:ok, writer} = Ogg.Writer.write_packet(writer, opus_packet)
do_convert_audio_track(rest, depayloader, writer)
end

defp read_packets(file, stores) do
case read_packet(file) do
{:ok, rid_idx, packet} ->
stores = Map.update!(stores, rid_idx, &insert_packet_to_store(&1, packet))
read_packets(file, stores)

{:error, :not_enough_data} ->
Logger.warning("Error decoding RTP packet")
read_packets(file, stores)

:eof ->
Map.new(stores, fn {rid_idx, store} ->
{rid_idx, store |> PacketStore.dump() |> Enum.reject(&is_nil/1)}
end)
end
end

defp read_packet(file) do
case IO.binread(file, 13) do
<<rid_idx::8, _recv_time::64, packet_size::32>> ->
with {:ok, packet} <- file |> IO.binread(packet_size) |> ExRTP.Packet.decode() do
{:ok, rid_idx, packet}
end

:eof ->
:eof
end
end

defp insert_packet_to_store(store, packet) do
case PacketStore.insert(store, packet) do
{:ok, store} ->
store

{:error, :late_packet} ->
Logger.warning("Decoded late RTP packet")
store
end
end
end
4 changes: 2 additions & 2 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
"ex_doc": {:hex, :ex_doc, "0.31.2", "8b06d0a5ac69e1a54df35519c951f1f44a7b7ca9a5bb7a260cd8a174d6322ece", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "317346c14febaba9ca40fd97b5b5919f7751fb85d399cc8e7e8872049f37e0af"},
"ex_dtls": {:hex, :ex_dtls, "0.16.0", "3ae38025ccc77f6db573e2e391602fa9bbc02253c137d8d2d59469a66cbe806b", [:mix], [{:bundlex, "~> 1.5.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2a4e30d74c6ddf95cc5b796423293c06a0da295454c3823819808ff031b4b361"},
"ex_ice": {:hex, :ex_ice, "0.9.0", "d1a7e31b9cc52faf668f001f870344d3f9094955bafb6af62d84b7b4c2dd6b36", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.2.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "8f256faeb9cc5409d2177e68918198c7ef64372a729c8e1590699546554419aa"},
"ex_ice": {:hex, :ex_ice, "0.9.2", "7f5513416a8fe33b36d988dd30d6bb79ddd7cfa408e09e2e3d3e3a97e075614d", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.2.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "70f688582dbe36a82cf8bbedf5adb2f0b89996620e229213bd7ff9a9b642e571"},
"ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"},
"ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"},
"ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"},
Expand All @@ -23,7 +23,7 @@
"excoveralls": {:hex, :excoveralls, "0.18.3", "bca47a24d69a3179951f51f1db6d3ed63bca9017f476fe520eb78602d45f7756", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "746f404fcd09d5029f1b211739afb8fb8575d775b21f6a3908e7ce3e640724c6"},
"file_system": {:hex, :file_system, "1.0.1", "79e8ceaddb0416f8b8cd02a0127bdbababe7bf4a23d2a395b983c1f8b3f73edd", [:mix], [], "hexpm", "4414d1f38863ddf9120720cd976fce5bdde8e91d8283353f0e31850fa89feb9e"},
"finch": {:hex, :finch, "0.19.0", "c644641491ea854fc5c1bbaef36bfc764e3f08e7185e1f084e35e0672241b76d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc5324ce209125d1e2fa0fcd2634601c52a787aff1cd33ee833664a5af4ea2b6"},
"hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"},
"hpax": {:hex, :hpax, "1.0.1", "c857057f89e8bd71d97d9042e009df2a42705d6d690d54eca84c8b29af0787b0", [:mix], [], "hexpm", "4e2d5a4f76ae1e3048f35ae7adb1641c36265510a2d4638157fbcb53dda38445"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
"makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"},
Expand Down

0 comments on commit ff8a388

Please sign in to comment.