Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

627 support for streaming via stdio #50

Merged
merged 26 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 31 additions & 14 deletions examples/file_to_pipe.exs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires to be run within a mix project. Let's add Mix.install so it can be run with elixir file_to_pipe.exs

Original file line number Diff line number Diff line change
@@ -1,19 +1,36 @@
import Membrane.ChildrenSpec
import Membrane.Testing.Assertions
alias Membrane.Testing.Pipeline
Mix.start()
Mix.shell(Mix.Shell.Quiet)
# if Mix pollutes the logs, consider redirecting its logs by overriding the
# [Mix.Shell](https://hexdocs.pm/mix/Mix.Shell.html) behaviour

# redirect membrane logs to stderr
# currently also requires 'configure: :logger, backends: []' line in config.exs
LoggerBackends.add(LoggerBackends.Console)
LoggerBackends.configure(LoggerBackends.Console, device: :standard_error)
Mix.install([{:membrane_file_plugin, path: "."}])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it be path: "../"? I believe we are in the examples/ subdirectory


[input | _] = System.argv()
defmodule FileExamplePipeline do
use Membrane.Pipeline

spec =
child(%Membrane.File.Source{location: input})
|> child(:sink, %Membrane.File.Sink{location: :stdout})
@impl true
def handle_init(_ctx, %{target: target, input: input}) do
spec =
child(%Membrane.File.Source{location: input})
|> child(:sink, %Membrane.File.Sink{location: :stdout})

{:ok, _supervisor, pipeline} = Pipeline.start(spec: spec)
{[spec: spec], %{target: target}}
end

assert_end_of_stream(pipeline, :sink, :input)
Pipeline.terminate(pipeline)
@impl true
def handle_element_end_of_stream(:sink, :input, _ctx, state) do
send(state.target, :done)
{[], state}
end
end

Membrane.File.Sink.redirect_logs_to_stderr()

[input] = System.argv()

{:ok, _supervisor, pid} =
Membrane.Pipeline.start_link(FileExamplePipeline, %{input: input, target: self()})

receive do
:done -> Membrane.Pipeline.terminate(pid)
end
41 changes: 30 additions & 11 deletions examples/pipe_to_file.exs
Original file line number Diff line number Diff line change
@@ -1,18 +1,37 @@
import Membrane.ChildrenSpec
import Membrane.Testing.Assertions
alias Membrane.Testing.Pipeline
Mix.start()
Mix.shell(Mix.Shell.Quiet)

LoggerBackends.add(LoggerBackends.Console)
LoggerBackends.configure(LoggerBackends.Console, device: :standard_error)
Mix.install([{:membrane_file_plugin, path: "."}])

[output, chunk_size_str | _] = System.argv()
{chunk_size, ""} = Integer.parse(chunk_size_str)

spec =
child(%Membrane.File.Source{location: :stdin, chunk_size: chunk_size})
|> child(:sink, %Membrane.File.Sink{location: output})
defmodule PipeToFile do
use Membrane.Pipeline

{:ok, _supervisor, pipeline} = Pipeline.start(spec: spec)
@impl true
def handle_init(_ctx, %{target: target, output: output, chunk_size: chunk_size}) do
spec =
child(%Membrane.File.Source{location: :stdin, chunk_size: chunk_size})
|> child(:sink, %Membrane.File.Sink{location: output})

assert_end_of_stream(pipeline, :sink, :input)
Pipeline.terminate(pipeline)
{[spec: spec], %{target: target}}
end

@impl true
def handle_element_end_of_stream(:sink, :input, _ctx, state) do
send(state.target, :done)
{[], state}
end
end

{:ok, _supervisor, pid} =
Membrane.Pipeline.start_link(PipeToFile, %{
target: self(),
output: output,
chunk_size: chunk_size
})

receive do
:done -> Membrane.Pipeline.terminate(pid)
end
15 changes: 12 additions & 3 deletions lib/membrane_file/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ defmodule Membrane.File.Sink do
Can also be used as a pipe to standard output by setting location to :stdout,
though this requires additional configuration.

Pipeline logs are directed to standard output by default. To separate them from the sink's output
we reccomend redirecting the logger to standard error. See 'examples/file_to_pipe.exs'

When `Membrane.File.SeekSinkEvent` is received, the element starts writing buffers starting
from `position`. By default, it overwrites previously stored bytes. You can set `insert?`
field of the event to `true` to start inserting new buffers without overwriting previous ones.
Please note, that inserting requires rewriting the file, what negatively impacts performance.
For more information refer to `Membrane.File.SeekSinkEvent` moduledoc.

Pipeline logs are directed to standard output by default. To separate them from the sink's output
we recommend redirecting the logger to standard error. For simple use cases using the default logger
configuration (like stand-alone scripts) this can be achieved by simply calling redirect_logs_to_stderr/0.
See examples/file_to_pipe.exs for a working example.
"""
use Membrane.Sink

Expand All @@ -26,6 +28,13 @@ defmodule Membrane.File.Sink do

def_input_pad :input, flow_control: :manual, demand_unit: :buffers, accepted_format: _any

@spec redirect_logs_to_stderr() :: :ok
def redirect_logs_to_stderr() do
varsill marked this conversation as resolved.
Show resolved Hide resolved
:ok = :logger.remove_handler(:default)
LoggerBackends.add(LoggerBackends.Console)
LoggerBackends.configure(LoggerBackends.Console, device: :standard_error)
end

@impl true
def handle_init(_ctx, %__MODULE__{location: :stdout}) do
{[],
Expand Down
42 changes: 36 additions & 6 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ defmodule Membrane.File.Plugin.Mixfile do
]
end

def application do
[
extra_applications: []
]
end

defp elixirc_paths(:test), do: ["lib", "test/support"]
defp elixirc_paths(_env), do: ["lib"]

Expand Down Expand Up @@ -84,3 +78,39 @@ defmodule Membrane.File.Plugin.Mixfile do
]
end
end

# Also, a small bug noticed when reproducing on elixir 1.16.0, this time related to mix:
# ```elixir
# require Logger

# Mix.start()
# Mix.shell(Mix.Shell.Quiet)

# Mix.install(
# [
# :ratio,
# :logger_backends
# ],
# force: true
# )
# ```
# ```command
# root@5da670a83b2e:/workspace/membrane/logger_mre# elixirc mre.exs 2> stderr.log
# ==> ratio
# ```
# stderr.log:
# ```log
# Every 2.0s: cat stderr.log 5da670a83b2e: Tue Jan 30 12:35:47 2024

# warning: Ratio.DecimalConversion.decimal_to_rational/1 is undefined (module Ratio.DecimalConversion is not availabl
# e or is yet to be defined)
# │
# 17 │ {ratio, Ratio.DecimalConversion.decimal_to_rational(decimal)}
# │ ~
# │
# └─ lib/ratio/coerce.ex:17:37: Coerce.Implementations.Ratio.Decimal.coerce/2

# both :extra_applications and :applications was found in your mix.exs. You most likely want to remove the :applications
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this one (I believe it's good to describe such problematic cases for instance in the PR's description).
However, we can warn user in the example description, that in case there are some warnings thrown during the compilation, the example might not work.

# key, as all applications are derived from your dependencies
# ```
# It looks like a part of the message is logged to stderr, and the last line to stdout, which, again, was a hindrance for our use case.
20 changes: 10 additions & 10 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
%{
"bunch": {:hex, :bunch, "1.6.0", "4775f8cdf5e801c06beed3913b0bd53fceec9d63380cdcccbda6be125a6cfd54", [:mix], [], "hexpm", "ef4e9abf83f0299d599daed3764d19e8eac5d27a5237e5e4d5e2c129cfeb9a22"},
"bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"},
"bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"},
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
"coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"},
"credo": {:hex, :credo, "1.7.1", "6e26bbcc9e22eefbff7e43188e69924e78818e2fe6282487d0703652bc20fd62", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "e9871c6095a4c0381c89b6aa98bc6260a8ba6addccf7f6a53da8849c748a58a2"},
"dialyxir": {:hex, :dialyxir, "1.4.2", "764a6e8e7a354f0ba95d58418178d486065ead1f69ad89782817c296d0d746a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "516603d8067b2fd585319e4b13d3674ad4f314a5902ba8130cd97dc902ce6bbd"},
"earmark_parser": {:hex, :earmark_parser, "1.4.37", "2ad73550e27c8946648b06905a57e4d454e4d7229c2dafa72a0348c99d8be5f7", [:mix], [], "hexpm", "6b19783f2802f039806f375610faa22da130b8edc21209d0bff47918bb48360e"},
"credo": {:hex, :credo, "1.7.3", "05bb11eaf2f2b8db370ecaa6a6bda2ec49b2acd5e0418bc106b73b07128c0436", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "35ea675a094c934c22fb1dca3696f3c31f2728ae6ef5a53b5d648c11180a4535"},
"dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.30.9", "d691453495c47434c0f2052b08dd91cc32bc4e1a218f86884563448ee2502dd2", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d7aaaf21e95dc5cddabf89063327e96867d00013963eadf2c6ad135506a8bc10"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"},
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"logger_backends": {:hex, :logger_backends, "1.0.0", "09c4fad6202e08cb0fbd37f328282f16539aca380f512523ce9472b28edc6bdf", [:mix], [], "hexpm", "1faceb3e7ec3ef66a8f5746c5afd020e63996df6fd4eb8cdb789e5665ae6c9ce"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.3", "d684f4bac8690e70b06eb52dad65d26de2eefa44cd19d64a8095e1417df7c8fd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "b78dc853d2e670ff6390b605d807263bf606da3c82be37f9d7f68635bd886fc9"},
"membrane_core": {:hex, :membrane_core, "1.0.0", "1b543aefd952283be1f2a215a1db213aa4d91222722ba03cd35280622f1905ee", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "352c90fd0a29942143c4bf7a727cc05c632e323f50a1a4e99321b1e8982f1533"},
"mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"},
"nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"},
"qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"},
"ratio": {:hex, :ratio, "3.0.2", "60a5976872a4dc3d873ecc57eed1738589e99d1094834b9c935b118231297cfb", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "3a13ed5a30ad0bfd7e4a86bf86d93d2b5a06f5904417d38d3f3ea6406cdfc7bb"},
Expand Down
44 changes: 23 additions & 21 deletions test/integration/stdio_test.exs
kidq330 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ defmodule Membrane.File.Integration.StdioTest do
%{cmd_out: cmd_out, cmd_err: cmd_err} = _context do
{output, rc} =
System.shell(
"bash -c ' \
set -o pipefail; \
cat #{@input_text_file} | mix run #{@pipe_to_file} #{cmd_out} 2048' \
2> #{cmd_err}",
# MIX_ENV set explicitly to dev so that file_behaviour is not mocked
env: [{"MIX_QUIET", "true"}, {"MIX_ENV", "dev"}]
"""
bash -c '
set -o pipefail;
cat #{@input_text_file} | elixir #{@pipe_to_file} #{cmd_out} 2048'
2> #{cmd_err}
"""
|> String.replace("\n", "")
)

Logger.debug("output when running script:")
Expand All @@ -48,11 +49,13 @@ defmodule Membrane.File.Integration.StdioTest do
%{cmd_out: cmd_out, cmd_err: cmd_err} = _context do
{output, rc} =
System.shell(
"bash -c ' \
set -o pipefail; \
cat #{@input_text_file} | mix run #{@pipe_to_file} #{cmd_out} 5' \
2> #{cmd_err}",
env: [{"MIX_QUIET", "true"}, {"MIX_ENV", "dev"}]
"""
bash -c '
set -o pipefail;
cat #{@input_text_file} | elixir #{@pipe_to_file} #{cmd_out} 5'
2> #{cmd_err}
"""
|> String.replace("\n", "")
)

Logger.debug("output when running script:")
Expand All @@ -67,23 +70,22 @@ defmodule Membrane.File.Integration.StdioTest do
test "pipeline from file to :stdout works",
%{cmd_err: cmd_err} = _context do
assert {"0123456789", _rc = 0} ==
System.shell("mix run #{@file_to_pipe} #{@input_text_file} \
2> #{cmd_err}",
env: [{"MIX_QUIET", "true"}, {"MIX_ENV", "dev"}]
),
System.shell("elixir #{@file_to_pipe} #{@input_text_file} 2> #{cmd_err}"),
File.read!(cmd_err)
end

test "file to stdout to stdin to file works",
%{cmd_out: cmd_out, cmd_err: cmd_err} = _context do
{output, rc} =
System.shell(
"bash -c ' \
set -o pipefail; \
mix run #{@file_to_pipe} #{@input_text_file} \
| mix run #{@pipe_to_file} #{cmd_out} 2048' \
2> #{cmd_err}",
env: [{"MIX_QUIET", "true"}, {"MIX_ENV", "dev"}]
"""
bash -c '
set -o pipefail;
elixir #{@file_to_pipe} #{@input_text_file}
| elixir #{@pipe_to_file} #{cmd_out} 2048'
2> #{cmd_err}
"""
|> String.replace("\n", "")
)

Logger.debug("output when running script:")
Expand Down