Skip to content

Commit

Permalink
reproc(++): Pass I/O timeout via start options.
Browse files Browse the repository at this point in the history
  • Loading branch information
DaanDeMeyer committed Jan 5, 2020
1 parent 3483908 commit a9bdf2f
Show file tree
Hide file tree
Showing 15 changed files with 61 additions and 67 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Changelog

## 10.0.1

### reproc

- Pass `timeout` once via `reproc_options` instead of passing it via
`reproc_read`, `reproc_write` and `reproc_drain`.

### reproc++

- Pass `timeout` once via `reproc::options` instead of passing it via
`process::read`, `process::write` and `reproc::drain`.

## 10.0.0

### reproc
Expand Down
2 changes: 1 addition & 1 deletion reproc/examples/drain.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ int main(void)
// given string. Passing the same sink to both output streams makes sure the
// output from both streams is combined into a single string.
reproc_sink sink = reproc_sink_string(&output);
r = reproc_drain(process, sink, sink, REPROC_INFINITE);
r = reproc_drain(process, sink, sink);
if (r < 0) {
goto finish;
}
Expand Down
2 changes: 1 addition & 1 deletion reproc/examples/git-status.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ int main(void)
// stdout and stderr output in the same string, we pass `NULL` since we
// don't need to know which stream was read from.
uint8_t buffer[4096];
r = reproc_read(process, NULL, buffer, sizeof(buffer), REPROC_INFINITE);
r = reproc_read(process, NULL, buffer, sizeof(buffer));
if (r < 0) {
break;
}
Expand Down
24 changes: 10 additions & 14 deletions reproc/include/reproc/reproc.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ typedef struct reproc_options {
waiting indefinitely for the child process to exit.
*/
reproc_stop_actions stop;
/*!
Maximum time to wait for `reproc_read` or `reproc_write` to complete.
When `timeout` is zero, `reproc_read` and `reproc_write` will wait
indefinitely for any I/O to complete.
*/
int timeout;
} reproc_options;

/*! Allocate a new `reproc_t` instance on the heap. */
Expand Down Expand Up @@ -159,10 +166,6 @@ stderr stream and returns the amount of bytes read.
If `stream` is not `NULL`, it is used to store the stream that was
read from (`REPROC_STREAM_OUT` or `REPROC_STREAM_ERR`).
If no output stream is closed or read from within the given timeout, this
function returns `REPROC_ETIMEDOUT`. If one of the output streams is closed,
`timeout` is reset before waiting again for the other stream.
If both streams are closed by the child process or weren't opened with
`REPROC_REDIRECT_PIPE`, this function returns `REPROC_EPIPE`.
Expand All @@ -173,17 +176,12 @@ Actionable errors:
REPROC_EXPORT int reproc_read(reproc_t *process,
REPROC_STREAM *stream,
uint8_t *buffer,
size_t size,
int timeout);
size_t size);

/*!
Writes `size` bytes from `buffer` to the standard input (stdin) of the child
process.
If no data can be written within the given timeout, this function returns
`REPROC_ETIMEDOUT`. After writing some data, `timeout` is reset before trying to
write again.
(POSIX) By default, writing to a closed stdin pipe terminates the parent process
with the `SIGPIPE` signal. `reproc_write` will only return `REPROC_EPIPE` if
this signal is ignored by the parent process.
Expand All @@ -196,10 +194,8 @@ Actionable errors:
- `REPROC_EPIPE`
- `REPROC_ETIMEDOUT`
*/
REPROC_EXPORT int reproc_write(reproc_t *process,
const uint8_t *buffer,
size_t size,
int timeout);
REPROC_EXPORT int
reproc_write(reproc_t *process, const uint8_t *buffer, size_t size);

/*!
Closes the child process standard stream indicated by `stream`.
Expand Down
5 changes: 1 addition & 4 deletions reproc/include/reproc/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ stdout and stderr respectively. The same sink may be passed to both `out` and
`stream` set to `REPROC_STREAM_IN` to give each sink the chance to process all
output from the previous call to `reproc_drain` one by one.
Each call to `reproc_read` is passed the given timeout. If a call to
`reproc_read` times out, this function returns `REPROC_ETIMEDOUT`.
Note that his function returns 0 instead of `REPROC_EPIPE` when both output
streams of the child process are closed.
Expand All @@ -38,7 +35,7 @@ Actionable errors:
- `REPROC_ETIMEDOUT`
*/
REPROC_EXPORT int
reproc_drain(reproc_t *process, reproc_sink out, reproc_sink err, int timeout);
reproc_drain(reproc_t *process, reproc_sink out, reproc_sink err);

/*!
Stores the output (both stdout and stderr) of a process in `output`.
Expand Down
18 changes: 9 additions & 9 deletions reproc/src/reproc.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ struct reproc_t {
struct stdio stdio;
int status;
reproc_stop_actions stop;
int timeout;
};

enum { STATUS_NOT_STARTED = -2, STATUS_IN_PROGRESS = -1 };
Expand Down Expand Up @@ -71,7 +72,8 @@ reproc_t *reproc_new(void)
.stdio = { .in = HANDLE_INVALID,
.out = HANDLE_INVALID,
.err = HANDLE_INVALID },
.status = STATUS_NOT_STARTED };
.status = STATUS_NOT_STARTED,
.timeout = REPROC_INFINITE };

return process;
}
Expand Down Expand Up @@ -118,6 +120,7 @@ int reproc_start(reproc_t *process,
goto finish;
}

process->timeout = options.timeout == 0 ? REPROC_INFINITE : options.timeout;
process->stop = options.stop;

bool is_noop = process->stop.first.action == REPROC_STOP_NOOP &&
Expand Down Expand Up @@ -152,8 +155,7 @@ int reproc_start(reproc_t *process,
int reproc_read(reproc_t *process,
REPROC_STREAM *stream,
uint8_t *buffer,
size_t size,
int timeout)
size_t size)
{
assert_return(process, REPROC_EINVAL);
assert_return(buffer, REPROC_EINVAL);
Expand All @@ -163,7 +165,8 @@ int reproc_read(reproc_t *process,

while (true) {
// Get the first pipe that will have data available to be read.
r = pipe_wait(process->stdio.out, process->stdio.err, &ready, timeout);
r = pipe_wait(process->stdio.out, process->stdio.err, &ready,
process->timeout);
if (r < 0) {
return r;
}
Expand Down Expand Up @@ -194,10 +197,7 @@ int reproc_read(reproc_t *process,
return r; // bytes read
}

int reproc_write(reproc_t *process,
const uint8_t *buffer,
size_t size,
int timeout)
int reproc_write(reproc_t *process, const uint8_t *buffer, size_t size)
{
assert_return(process, REPROC_EINVAL);
assert_return(buffer, REPROC_EINVAL);
Expand All @@ -209,7 +209,7 @@ int reproc_write(reproc_t *process,
int r = -1;

do {
r = pipe_write(process->stdio.in, buffer, size, timeout);
r = pipe_write(process->stdio.in, buffer, size, process->timeout);

if (r == REPROC_EPIPE) {
process->stdio.in = handle_destroy(process->stdio.in);
Expand Down
7 changes: 2 additions & 5 deletions reproc/src/sink.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
#include <stdlib.h>
#include <string.h>

int reproc_drain(reproc_t *process,
reproc_sink out,
reproc_sink err,
int timeout)
int reproc_drain(reproc_t *process, reproc_sink out, reproc_sink err)
{
assert_return(process, REPROC_EINVAL);
assert_return(out.function, REPROC_EINVAL);
Expand All @@ -31,7 +28,7 @@ int reproc_drain(reproc_t *process,

while (true) {
REPROC_STREAM stream = { 0 };
r = reproc_read(process, &stream, buffer, ARRAY_SIZE(buffer), timeout);
r = reproc_read(process, &stream, buffer, ARRAY_SIZE(buffer));
if (r < 0) {
break;
}
Expand Down
2 changes: 1 addition & 1 deletion reproc/test/argv.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ int main(void)

char *output = NULL;
reproc_sink sink = reproc_sink_string(&output);
r = reproc_drain(process, sink, sink, REPROC_INFINITE);
r = reproc_drain(process, sink, sink);
assert(r == 0);
assert(output != NULL);

Expand Down
2 changes: 1 addition & 1 deletion reproc/test/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ int main(void)

char *output = NULL;
reproc_sink sink = reproc_sink_string(&output);
r = reproc_drain(process, sink, sink, REPROC_INFINITE);
r = reproc_drain(process, sink, sink);
assert(r == 0);
assert(output != NULL);

Expand Down
10 changes: 5 additions & 5 deletions reproc/test/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ static void io(const char *mode, const char *input, const char *expected)
r = reproc_start(process, argv, (reproc_options){ 0 });
assert(r == 0);

r = reproc_write(process, (uint8_t *) input, strlen(input), REPROC_INFINITE);
r = reproc_write(process, (uint8_t *) input, strlen(input));
assert(r == 0);

r = reproc_close(process, REPROC_STREAM_IN);
assert(r == 0);

char *output = NULL;
reproc_sink sink = reproc_sink_string(&output);
r = reproc_drain(process, sink, sink, REPROC_INFINITE);
r = reproc_drain(process, sink, sink);
assert(r == 0);
assert(output != NULL);

Expand All @@ -47,17 +47,17 @@ static void timeout(void)

const char *argv[3] = { RESOURCE_DIRECTORY "/io", "stdout", NULL };

r = reproc_start(process, argv, (reproc_options){ 0 });
r = reproc_start(process, argv, (reproc_options){ .timeout = 200 });
assert(r == 0);

uint8_t buffer = 0;
r = reproc_read(process, NULL, &buffer, sizeof(buffer), 200);
r = reproc_read(process, NULL, &buffer, sizeof(buffer));
assert(r == REPROC_ETIMEDOUT);

r = reproc_close(process, REPROC_STREAM_IN);
assert(r == 0);

r = reproc_read(process, NULL, &buffer, sizeof(buffer), REPROC_INFINITE);
r = reproc_read(process, NULL, &buffer, sizeof(buffer));
assert(r == REPROC_EPIPE);

reproc_destroy(process);
Expand Down
2 changes: 1 addition & 1 deletion reproc/test/overflow.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ int main(void)

char *output = NULL;
reproc_sink sink = reproc_sink_string(&output);
r = reproc_drain(process, sink, sink, REPROC_INFINITE);
r = reproc_drain(process, sink, sink);
assert(r >= 0);
assert(output != NULL);

Expand Down
2 changes: 1 addition & 1 deletion reproc/test/working-directory.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ int main(void)

char *output = NULL;
reproc_sink sink = reproc_sink_string(&output);
r = reproc_drain(process, sink, sink, REPROC_INFINITE);
r = reproc_drain(process, sink, sink);
assert(r == 0);

replace(output, '\\', '/');
Expand Down
16 changes: 7 additions & 9 deletions reprocxx/include/reproc++/reproc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ REPROCXX_EXPORT extern const int terminate;
reproc++. */
using milliseconds = std::chrono::duration<int, std::milli>;

REPROCXX_EXPORT extern const milliseconds infinite;

enum class redirect { pipe, inherit, discard };

enum class stop { noop, wait, terminate, kill };
Expand Down Expand Up @@ -65,12 +67,12 @@ struct options {
} redirect = {};

struct stop_actions stop = {};

reproc::milliseconds timeout = reproc::infinite;
};

enum class stream { in, out, err };

REPROCXX_EXPORT extern const milliseconds infinite;

/*! Improves on reproc's API by adding RAII and changing the API of some
functions to be more idiomatic C++. */
class process {
Expand All @@ -91,16 +93,12 @@ class process {
/*! `reproc_read` but returns a tuple of (stream, bytes read, error) and
defaults to waiting indefinitely for each read to complete.*/
REPROCXX_EXPORT std::tuple<stream, size_t, std::error_code>
read(uint8_t *buffer,
size_t size,
reproc::milliseconds timeout = reproc::infinite) noexcept;
read(uint8_t *buffer, size_t size) noexcept;

/*! reproc_write` but defaults to waiting indefinitely for each write to
complete. */
REPROCXX_EXPORT std::error_code
write(const uint8_t *buffer,
size_t size,
reproc::milliseconds timeout = reproc::infinite) noexcept;
REPROCXX_EXPORT std::error_code write(const uint8_t *buffer,
size_t size) noexcept;

REPROCXX_EXPORT std::error_code close(stream stream) noexcept;

Expand Down
7 changes: 2 additions & 5 deletions reprocxx/include/reproc++/sink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ bool sink(stream stream, const uint8_t *buffer, size_t size);
```
*/
template <typename Out, typename Err>
std::error_code drain(process &process,
Out &&out,
Err &&err,
reproc::milliseconds timeout = reproc::infinite)
std::error_code drain(process &process, Out &&out, Err &&err)
{
static constexpr uint8_t initial = 0;

Expand All @@ -41,7 +38,7 @@ std::error_code drain(process &process,
stream stream = {};
size_t bytes_read = 0;
std::tie(stream, bytes_read, ec) = process.read(buffer.data(),
buffer.size(), timeout);
buffer.size());
if (ec) {
break;
}
Expand Down
17 changes: 7 additions & 10 deletions reprocxx/src/reproc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,26 @@ std::error_code process::start(const arguments &arguments,
{ static_cast<REPROC_REDIRECT>(options.redirect.in),
static_cast<REPROC_REDIRECT>(options.redirect.out),
static_cast<REPROC_REDIRECT>(options.redirect.err) },
reproc_stop_actions_from(options.stop)
reproc_stop_actions_from(options.stop),
options.timeout.count()
};

int r = reproc_start(process_.get(), arguments.data(), reproc_options);

return error_code_from(r);
}

std::tuple<stream, size_t, std::error_code>
process::read(uint8_t *buffer,
size_t size,
reproc::milliseconds timeout) noexcept
std::tuple<stream, size_t, std::error_code> process::read(uint8_t *buffer,
size_t size) noexcept
{
REPROC_STREAM stream = {};
int r = reproc_read(process_.get(), &stream, buffer, size, timeout.count());
int r = reproc_read(process_.get(), &stream, buffer, size);
return { static_cast<enum stream>(stream), r, error_code_from(r) };
}

std::error_code process::write(const uint8_t *buffer,
size_t size,
reproc::milliseconds timeout) noexcept
std::error_code process::write(const uint8_t *buffer, size_t size) noexcept
{
int r = reproc_write(process_.get(), buffer, size, timeout.count());
int r = reproc_write(process_.get(), buffer, size);
return error_code_from(r);
}

Expand Down

0 comments on commit a9bdf2f

Please sign in to comment.