Skip to content

Commit

Permalink
Add behaviour and dynamic dispatch for (de)payloaders (#147)
Browse files Browse the repository at this point in the history
Co-authored-by: Michał Śledź <[email protected]>
  • Loading branch information
sgfn and mickel8 authored Aug 9, 2024
1 parent f69f0d7 commit 8f7866e
Show file tree
Hide file tree
Showing 15 changed files with 329 additions and 94 deletions.
26 changes: 18 additions & 8 deletions examples/save_to_file/lib/save_to_file/peer_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule SaveToFile.PeerHandler do
}

alias ExWebRTC.Media.{IVF, Ogg}
alias ExWebRTC.RTP.{Opus, VP8}
alias ExWebRTC.RTP.Depayloader

@behaviour WebSock

Expand Down Expand Up @@ -54,6 +54,7 @@ defmodule SaveToFile.PeerHandler do
video_writer: nil,
video_depayloader: nil,
audio_writer: nil,
audio_depayloader: nil,
frames_cnt: 0
}

Expand Down Expand Up @@ -140,9 +141,11 @@ defmodule SaveToFile.PeerHandler do
timebase_num: 1
)

{:ok, video_depayloader} = @video_codecs |> hd() |> Depayloader.new()

state = %{
state
| video_depayloader: VP8.Depayloader.new(),
| video_depayloader: video_depayloader,
video_writer: video_writer,
video_track_id: id
}
Expand All @@ -153,8 +156,15 @@ defmodule SaveToFile.PeerHandler do
defp handle_webrtc_msg({:track, %MediaStreamTrack{kind: :audio, id: id}}, state) do
# by default uses 1 mono channel and 48k clock rate
{:ok, audio_writer} = Ogg.Writer.open(@audio_file)
{:ok, audio_depayloader} = @audio_codecs |> hd() |> Depayloader.new()

state = %{
state
| audio_depayloader: audio_depayloader,
audio_writer: audio_writer,
audio_track_id: id
}

state = %{state | audio_writer: audio_writer, audio_track_id: id}
{:ok, state}
end

Expand All @@ -166,11 +176,11 @@ defmodule SaveToFile.PeerHandler do

defp handle_webrtc_msg({:rtp, id, nil, packet}, %{video_track_id: id} = state) do
state =
case VP8.Depayloader.write(state.video_depayloader, packet) do
{:ok, video_depayloader} ->
case Depayloader.depayload(state.video_depayloader, packet) do
{nil, video_depayloader} ->
%{state | video_depayloader: video_depayloader}

{:ok, vp8_frame, video_depayloader} ->
{vp8_frame, video_depayloader} ->
frame = %IVF.Frame{timestamp: state.frames_cnt, data: vp8_frame}
{:ok, video_writer} = IVF.Writer.write_frame(state.video_writer, frame)

Expand All @@ -186,10 +196,10 @@ defmodule SaveToFile.PeerHandler do
end

defp handle_webrtc_msg({:rtp, id, nil, packet}, %{audio_track_id: id} = state) do
opus_packet = Opus.Depayloader.depayload(packet)
{opus_packet, depayloader} = Depayloader.depayload(state.audio_depayloader, packet)
{:ok, audio_writer} = Ogg.Writer.write_packet(state.audio_writer, opus_packet)

{:ok, %{state | audio_writer: audio_writer}}
{:ok, %{state | audio_depayloader: depayloader, audio_writer: audio_writer}}
end

defp handle_webrtc_msg(_msg, state), do: {:ok, state}
Expand Down
11 changes: 7 additions & 4 deletions examples/send_from_file/lib/send_from_file/peer_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule SendFromFile.PeerHandler do
}

alias ExWebRTC.Media.{IVF, Ogg}
alias ExWebRTC.RTP.{Opus, VP8}
alias ExWebRTC.RTP.Payloader

@behaviour WebSock

Expand Down Expand Up @@ -60,9 +60,10 @@ defmodule SendFromFile.PeerHandler do
{:ok, _sender} = PeerConnection.add_track(pc, audio_track)

{:ok, _header, video_reader} = IVF.Reader.open(@video_file)
video_payloader = VP8.Payloader.new(800)
{:ok, video_payloader} = @video_codecs |> hd() |> Payloader.new(max_payload_size: 800)

{:ok, audio_reader} = Ogg.Reader.open(@audio_file)
{:ok, audio_payloader} = @audio_codecs |> hd() |> Payloader.new()

state = %{
peer_connection: pc,
Expand All @@ -71,6 +72,7 @@ defmodule SendFromFile.PeerHandler do
video_reader: video_reader,
video_payloader: video_payloader,
audio_reader: audio_reader,
audio_payloader: audio_payloader,
next_video_timestamp: Enum.random(0..@max_rtp_timestamp),
next_audio_timestamp: Enum.random(0..@max_rtp_timestamp),
next_video_sequence_number: Enum.random(0..@max_rtp_seq_no),
Expand Down Expand Up @@ -112,7 +114,7 @@ defmodule SendFromFile.PeerHandler do

case IVF.Reader.next_frame(state.video_reader) do
{:ok, frame} ->
{rtp_packets, payloader} = VP8.Payloader.payload(state.video_payloader, frame.data)
{rtp_packets, payloader} = Payloader.payload(state.video_payloader, frame.data)

# 3_000 = 90_000 (VP8 clock rate) / 30 FPS
next_sequence_number =
Expand Down Expand Up @@ -158,7 +160,7 @@ defmodule SendFromFile.PeerHandler do
# and time spent on reading and parsing the file
Process.send_after(self(), :send_audio, duration)

rtp_packet = Opus.Payloader.payload(packet)
{[rtp_packet], payloader} = Payloader.payload(state.audio_payloader, packet)

rtp_packet = %{
rtp_packet
Expand All @@ -177,6 +179,7 @@ defmodule SendFromFile.PeerHandler do
state = %{
state
| audio_reader: reader,
audio_payloader: payloader,
next_audio_timestamp: next_timestamp,
next_audio_sequence_number: next_sequence_number
}
Expand Down
42 changes: 42 additions & 0 deletions lib/ex_webrtc/rtp/depayloader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
defmodule ExWebRTC.RTP.Depayloader do
@moduledoc """
RTP depayloader.
It unpacks RTP pakcets into audio/video frames.
"""

alias ExWebRTC.RTPCodecParameters

@opaque depayloader :: struct()

@doc """
Creates a new depayloader that matches the passed codec parameters.
"""
@spec new(RTPCodecParameters.t()) ::
{:ok, depayloader()} | {:error, :no_depayloader_for_codec}
def new(codec_params) do
with {:ok, module} <- to_depayloader_module(codec_params.mime_type) do
depayloader = module.new()
{:ok, depayloader}
end
end

@doc """
Processes binary data from a single RTP packet, and outputs a frame if assembled.
Returns the frame (or `nil` if a frame could not be depayloaded yet)
together with the updated depayloader.
"""
@spec depayload(depayloader(), ExRTP.Packet.t()) :: {binary() | nil, depayloader()}
def depayload(%module{} = depayloader, packet) do
module.depayload(depayloader, packet)
end

defp to_depayloader_module(mime_type) do
case String.downcase(mime_type) do
"video/vp8" -> {:ok, ExWebRTC.RTP.Depayloader.VP8}
"audio/opus" -> {:ok, ExWebRTC.RTP.Depayloader.Opus}
_other -> {:error, :no_depayloader_for_codec}
end
end
end
19 changes: 19 additions & 0 deletions lib/ex_webrtc/rtp/depayloader_behaviour.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule ExWebRTC.RTP.Depayloader.Behaviour do
@moduledoc false

@type depayloader :: struct()

@doc """
Creates a new depayloader struct.
"""
@callback new() :: depayloader()

@doc """
Processes binary data from a single RTP packet, and outputs a frame if assembled.
Returns the frame (or `nil` if a frame could not be depayloaded yet)
together with the updated depayloader struct.
"""
@callback depayload(depayloader(), packet :: ExRTP.Packet.t()) ::
{binary() | nil, depayloader()}
end
32 changes: 21 additions & 11 deletions lib/ex_webrtc/rtp/opus/depayloader.ex
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
defmodule ExWebRTC.RTP.Opus.Depayloader do
@moduledoc """
Decapsualtes Opus audio out of RTP packet.
Based on [RFC 7587: RTP Payload Format for the Opus Speech and Audio Codec](https://datatracker.ietf.org/doc/html/rfc7587).
"""
defmodule ExWebRTC.RTP.Depayloader.Opus do
@moduledoc false
# Decapsualtes Opus audio out of RTP packet.
#
# Based on [RFC 7587: RTP Payload Format for the Opus Speech and Audio Codec](https://datatracker.ietf.org/doc/html/rfc7587).

alias ExRTP.Packet

@doc """
Takes Opus packet out of an RTP packet.
"""
@spec depayload(Packet.t()) :: binary()
def depayload(%Packet{payload: payload}), do: payload
@behaviour ExWebRTC.RTP.Depayloader.Behaviour

@type t :: %__MODULE__{}

defstruct []

@impl true
@spec new() :: t()
def new() do
%__MODULE__{}
end

@impl true
@spec depayload(t(), Packet.t()) :: {binary(), t()}
def depayload(%__MODULE__{} = depayloader, %Packet{payload: payload}),
do: {payload, depayloader}
end
30 changes: 18 additions & 12 deletions lib/ex_webrtc/rtp/opus/payloader.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
defmodule ExWebRTC.RTP.Opus.Payloader do
@moduledoc """
Encapsulates Opus audio packet into an RTP packet.
defmodule ExWebRTC.RTP.Payloader.Opus do
@moduledoc false
# Encapsulates Opus audio packet into an RTP packet.
#
# Based on [RFC 7587: RTP Payload Format for the Opus Speech and Audio Codec](https://datatracker.ietf.org/doc/html/rfc7587).

Based on [RFC 7587: RTP Payload Format for the Opus Speech and Audio Codec](https://datatracker.ietf.org/doc/html/rfc7587).
"""
@behaviour ExWebRTC.RTP.Payloader.Behaviour

@doc """
Packs Opus packet into an RTP packet.
@type t :: %__MODULE__{}

Fields from RTP header like ssrc, timestamp etc. are set to 0.
"""
@spec payload(binary()) :: ExRTP.Packet.t()
def payload(packet) when packet != <<>> do
ExRTP.Packet.new(packet)
defstruct []

@impl true
def new(_max_payload_size) do
%__MODULE__{}
end

@impl true
@spec payload(t(), binary()) :: {[ExRTP.Packet.t()], t()}
def payload(%__MODULE__{} = payloader, packet) when packet != <<>> do
{[ExRTP.Packet.new(packet)], payloader}
end
end
46 changes: 46 additions & 0 deletions lib/ex_webrtc/rtp/payloader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule ExWebRTC.RTP.Payloader do
@moduledoc """
RTP payloader.
It packs audio/video frames into one or more RTP packets.
"""

alias ExWebRTC.RTPCodecParameters

@opaque payloader :: struct()

@doc """
Creates a new payloader that matches the passed codec parameters.
Opts:
* max_payload_size - determines the maximum size of a single RTP packet outputted by the payloader.
It must be greater than `100`, and is set to `1000` by default.
"""
@spec new(RTPCodecParameters.t(), max_payload_size: integer()) ::
{:ok, payloader()} | {:error, :no_payloader_for_codec}
def new(codec_params, opts \\ []) do
with {:ok, module} <- to_payloader_module(codec_params.mime_type) do
max_payload_size = opts[:max_payload_size] || 1000
payloader = module.new(max_payload_size)
{:ok, payloader}
end
end

@doc """
Packs a frame into one or more RTP packets.
Returns the packets together with the updated payloader.
"""
@spec payload(payloader(), binary()) :: {[ExRTP.Packet.t()], payloader()}
def payload(%module{} = payloader, frame) do
module.payload(payloader, frame)
end

defp to_payloader_module(mime_type) do
case String.downcase(mime_type) do
"video/vp8" -> {:ok, ExWebRTC.RTP.Payloader.VP8}
"audio/opus" -> {:ok, ExWebRTC.RTP.Payloader.Opus}
_other -> {:error, :no_payloader_for_codec}
end
end
end
17 changes: 17 additions & 0 deletions lib/ex_webrtc/rtp/payloader_behaviour.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule ExWebRTC.RTP.Payloader.Behaviour do
@moduledoc false

@type payloader :: struct()

@doc """
Creates a new payloader struct.
"""
@callback new(max_payload_size :: integer()) :: payloader()

@doc """
Packs a frame into one or more RTP packets.
Returns the packets together with the updated payloader struct.
"""
@callback payload(payloader(), frame :: binary()) :: {[ExRTP.Packet.t()], payloader()}
end
Loading

0 comments on commit 8f7866e

Please sign in to comment.