Skip to content

Commit

Permalink
Timestamp queue wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Feb 14, 2024
1 parent f112f94 commit 1c84d08
Showing 1 changed file with 84 additions and 0 deletions.
84 changes: 84 additions & 0 deletions lib/membrane/timestamp_queue.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
defmodule Membrane.TimestampQueue do
@moduledoc """
"""

alias Membrane.{Buffer, Event, Pad, StreamFormat}
alias Membrane.Element.Action

@type buffer_entry :: {:buffer, Buffer.t(), buffer_time :: Membrane.Time.t()}
@type stream_format_entry :: {:stream_format, StreamFormat.t()}
@type event_entry :: {:event, Event.t()}

@type queue_entry :: buffer_entry() | stream_format_entry() | event_entry()

@type pad_queue :: %{
pad_ref: Pad.ref(),
first_buffer_pts: integer(),
qex: Qex.t(),
size_in_unit: non_neg_integer(),
paused_demand?: boolean()
}

@type t :: %__MODULE__{
current_queue_time: Membrane.Time.t() | nil,
pause_demand_boundary: pos_integer() | :infinity,
pause_demand_boundary_unit: :buffers | :bytes,
pad_queues: %{optional(Pad.ref()) => pad_queue()}
}

defstruct [
:current_queue_time,
pause_demand_boundary: :infinity,
pause_demand_boundary_unit: :buffers,
pad_queues: %{}
]

@type option ::
{:pause_demand_boundary, pos_integer() | :infinity}
| {:pause_demand_boundary_unit, :buffers | :bytes}
@type options :: [option]

@spec new(options) :: t()
def new(options \\ []), do: struct!(__MODULE__, options)

@type suggested_action :: Action.pause_auto_demand() | Action.resume_auto_demand()
@type suggested_actions :: [suggested_action()]

@spec feed_with_buffer(t(), Pad.ref(), Buffer.t()) :: {suggested_actions(), t()}
def feed_with_buffer(%__MODULE__{} = timestamp_queue, pad_ref, buffer) do
get_and_update_in(
timestamp_queue,
[:pad_queues, pad_ref],
&push_to_pad_queue(&1, timestamp_queue, pad_ref, :buffer, buffer)
)
end

defp push_to_pad_queue(nil, timestamp_queue, pad_ref, :buffer, buffer) do
metric = Membrane.Buffer.Metric.from_unit(timestamp_queue.pause_demand_boundary_unit)
buffer_size = metric.buffers_size([buffer])

pad_queue = %{
pad_ref: pad_ref,
fist_buffer_pts: buffer.pts,
qex: Qex.new(buffer: buffer),
size_in_unit: buffer_size,
paused_demand?: false
}

actions_after_feed(pad_queue, timestamp_queue.pause_demand_boundary)
end

# defp push_to_pad_queue(pad_queue, timestamp_queue, pad_ref, :buffer, buffer) do ..

defp actions_after_feed(pad_queue, pause_demand_boundary) do
if not pad_queue.paused_demand? and pad_queue.size_in_unit >= pause_demand_boundary do
{[pause_auto_demand: pad_queue.pad_ref], %{pad_queue | paused_demand?: true}}
else
{[], pad_queue}
end
end

defp get_buffer_time(buffer_pts, first_buffer_pts, current_queue_time) do
buffer_pts - first_buffer_pts + current_queue_time
end
end

0 comments on commit 1c84d08

Please sign in to comment.