Skip to content

Commit

Permalink
Merge pull request #492 from tock/dev/streaming-process-slice
Browse files Browse the repository at this point in the history
libtock: add util/streaming_process_slice
  • Loading branch information
brghena authored Mar 4, 2025
2 parents 8212d86 + 1a64253 commit 39f83a8
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 0 deletions.
1 change: 1 addition & 0 deletions libtock/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ $(LIBNAME)_SRCS += $(wildcard $($(LIBNAME)_DIR)/sensors/*.c)
$(LIBNAME)_SRCS += $(wildcard $($(LIBNAME)_DIR)/sensors/syscalls/*.c)
$(LIBNAME)_SRCS += $(wildcard $($(LIBNAME)_DIR)/storage/*.c)
$(LIBNAME)_SRCS += $(wildcard $($(LIBNAME)_DIR)/storage/syscalls/*.c)
$(LIBNAME)_SRCS += $(wildcard $($(LIBNAME)_DIR)/util/*.c)

# Temporary hack for alarm
$(LIBNAME)_SRCS += $(wildcard $($(LIBNAME)_DIR)/internal/*.c)
Expand Down
18 changes: 18 additions & 0 deletions libtock/util/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# `libtock/util`

This directory contains utilities related to abstractions provided by
the Tock kernel. Currently, the following utilities exist:

- Streaming Process Slice:
[`streaming_process_slice.h`](./streaming_process_slice.h)

A contract over read-write allows for losslessly streaming data from the Tock
kernel to a userspace process.

Applications like ADC sampling or network stacks require the kernel to provide
a process with a continuous, lossless stream of data from a source that is not
rate-controlled by the process. This utility implements the userspace-side of
a simple protocol to achieve this goal, without requiring kernel-side
buffering and by utilizing the atomic swap semantics of Tock’s allow system
call. For more information on this contract, see
<https://docs.tockos.org/kernel/utilities/streaming_process_slice/struct.streamingprocessslice>
144 changes: 144 additions & 0 deletions libtock/util/streaming_process_slice.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#include "streaming_process_slice.h"
#include <string.h>

static void streaming_process_slice_prepare_header(uint8_t* buf) {
buf[0] = 0; // version[H]
buf[1] = 0; // version[L]
buf[2] = 0; // flags[H]
buf[3] = 0; // flags[L]
buf[4] = 0; // write offset
buf[5] = 0; // write offset
buf[6] = 0; // write offset
buf[7] = 0; // write offset
}

returncode_t streaming_process_slice_init(
streaming_process_slice_state_t* state,
uint32_t driver,
uint32_t allow,
void* buffer_a,
size_t size_a,
void* buffer_b,
size_t size_b) {
// Ensure that both buffers can hold the streaming process slice header:
if (size_a < STREAMING_PROCESS_SLICE_HEADER_LEN || size_b < STREAMING_PROCESS_SLICE_HEADER_LEN) {
return RETURNCODE_ESIZE;
}

state->driver = driver;
state->allow = allow;

// Initially, `buffer_a` is used as the application-owned buffer.
state->app_buffer_ptr = buffer_a;
state->app_buffer_size = size_a;
state->app_buffer_is_b = false; // false -> buffer_a

// Write a streaming process slice header to bufferB, and use it as the
// kernel-owned buffer. Its pointer and length are stored within the kernel's
// allow slot. We currently only support version 0, and don't set the `halt`
// flag:
streaming_process_slice_prepare_header(buffer_b);

allow_rw_return_t allow_res =
allow_readwrite(driver, allow, buffer_b, size_b);
if (!allow_res.success) {
memset(state, 0, sizeof(streaming_process_slice_state_t));
}

return tock_status_to_returncode(allow_res.status);
}

returncode_t streaming_process_slice_get_and_swap(
streaming_process_slice_state_t* state,
uint8_t** buffer,
uint32_t* size,
bool* exceeded) {
// Prepare the current app buffer to be shared with the kernel (writing a
// zeroed-out header):
streaming_process_slice_prepare_header(state->app_buffer_ptr);

// Swap the current app buffer for the kernel buffer:
allow_rw_return_t allow_res =
allow_readwrite(state->driver, state->allow, state->app_buffer_ptr,
state->app_buffer_size);

// Initialize to safe dummy values in case the allow was not successful
uint8_t* ret_buffer = NULL;
uint32_t ret_size = 0;
bool ret_exceeded = false;
if (allow_res.success) {
// Record the new app buffer:
state->app_buffer_ptr = allow_res.ptr;
state->app_buffer_size = allow_res.size;
state->app_buffer_is_b = !state->app_buffer_is_b;

// Return information about the received payload:
ret_buffer = state->app_buffer_ptr + 8;
memcpy(&ret_size, state->app_buffer_ptr + 4, sizeof(uint32_t));
ret_exceeded = (state->app_buffer_ptr[3] & 0x01) == 0x01;
}

// Write return values if provided with non-NULL pointers:
if (buffer != NULL) {
*buffer = ret_buffer;
}
if (size != NULL) {
*size = ret_size;
}
if (exceeded != NULL) {
*exceeded = ret_exceeded;
}

return tock_status_to_returncode(allow_res.status);
}

returncode_t streaming_process_slice_deinit(
streaming_process_slice_state_t* state,
uint8_t** buffer_a,
size_t* size_a,
uint8_t** buffer_b,
size_t* size_b) {
// Safe default values:
uint8_t* ret_buffer_a = NULL;
size_t ret_size_b = 0;
uint8_t* ret_buffer_b = NULL;
size_t ret_size_a = 0;

// Unallow the buffer currently allowed to the kernel:
allow_rw_return_t unallow_res =
allow_readwrite(state->driver, state->allow, NULL, 0);

if (unallow_res.success) {
// The unallow worked, recreate the full, initial buffer from the app and
// kernel halves:
if (state->app_buffer_is_b) {
ret_buffer_a = unallow_res.ptr;
ret_size_a = unallow_res.size;
ret_buffer_b = state->app_buffer_ptr;
ret_size_b = state->app_buffer_size;
} else {
ret_buffer_a = state->app_buffer_ptr;
ret_size_a = state->app_buffer_size;
ret_buffer_b = unallow_res.ptr;
ret_size_b = unallow_res.size;
}

// Wipe the state struct:
memset(state, 0, sizeof(streaming_process_slice_state_t));
}

if (buffer_a != NULL) {
*buffer_a = ret_buffer_a;
}
if (size_a != NULL) {
*size_a = ret_size_a;
}
if (buffer_b != NULL) {
*buffer_b = ret_buffer_b;
}
if (size_b != NULL) {
*size_b = ret_size_b;
}

return tock_status_to_returncode(unallow_res.status);
}
92 changes: 92 additions & 0 deletions libtock/util/streaming_process_slice.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#include "../tock.h"

#define STREAMING_PROCESS_SLICE_HEADER_LEN 8

typedef struct {
uint32_t driver;
uint32_t allow;
uint8_t* app_buffer_ptr;
size_t app_buffer_size;
bool app_buffer_is_b;
} streaming_process_slice_state_t;

// Initialize a "streaming process slice" read-write allow slot
//
// This method allows a userspace buffer into a "streaming process slice" allow
// slot, implementing its atomic-swap semantics and header layout. The streaming
// process slice abstraction allows a userspace process to lossessly receive
// data from a kernel capsule. This is done by maintaining two buffers, where at
// any time one of which is owned by the kernel (for writing new, incoming data
// into) and one by the application, to process received data. These buffers are
// atomically swapped by the application, upon receipt of a signal that some
// data has been inserted into the kernel-owned buffer (such as an upcall).
//
// This method abstracts this interface by consuming two buffers, owned by the
// application and kernel respectively. It tracks all necessary state in the
// `streaming_process_slice_state_t` object. This struct is initialized by this
// method.
//
// The passed buffers must be each be able to hold at least the streaming
// process slice headers (`STREAMING_PROCESS_SLICE_HEADER_LEN` bytes), in
// addition to any payload.
//
// In case of an error while allowing the kernel-owned buffer to the specified
// driver and read-write allow slot, this function converts this error-status
// into a returncode using `tock_status_to_returncode` and returns it to the
// caller. When this method returns `RETURNCODE_SUCCESS`, the passed buffers are
// assumed to be owned by this `streaming_process_slice_state_t` and must not be
// used until after a successful call to `streaming_process_slice_deinit`. When
// either buffer is of insufficient size, it returns `RETURNCODE_ESIZE` and does
// not perform any allow operation or initialize the state reference.
returncode_t streaming_process_slice_init(
streaming_process_slice_state_t* state,
uint32_t driver,
uint32_t allow,
void* buffer_a,
size_t size_a,
void* buffer_b,
size_t size_b);

// Swap kernel- for app-owned buffer and get received payload
//
// This method atomically swaps the kernel-owned and application-owned buffers
// backing this streaming process slice. This function will reset the
// application-owned buffers header, applying any flags set in the
// `streaming_process_slice_state_t` and setting the write offset to `0`.
//
// Following the swap operation, when returning `RETURNCODE_SUCCESS`, it
// provides the buffer's payload and any kernel-set flags to the caller through
// the `buffer`, `size`, and `exceeded` arguments respectively. Callers must
// either provide pointers to variables for these values, or set them to `NULL`
// in case they are not interested in any given value.
//
// This function forwards any error from the underlying `allow_readwrite`
// operation in its return value. In case of a return value other than
// `RETURNCODE_SUCCESS`, any values returned in `buffer`, `size` and `exceeded`
// must not be considered valid.
returncode_t streaming_process_slice_get_and_swap(
streaming_process_slice_state_t* state,
uint8_t** buffer,
uint32_t* size,
bool* exceeded);

// Deinitialize an initialized `streaming_process_slice_state_t`
//
// This function returns the buffers passed into `streaming_process_slice_init`
// through the `buffer_a`, `size_a`, `buffer_b` and `size_b` arguments (if not
// set to `NULL` respectively). It ensures that the `buffer_a` and `size_a`
// arguments passed to `streaming_process_slice_init` match those of `buffer_a`
// and `size_a` for this function, i.e., it will not swap `buffer_a` for
// `buffer_b`, regardless of the number of calls to
// `streaming_process_slice_get_and_swap`.
//
// This function forwards any error from the underlying `allow_readwrite`
// operation in its return value. In case of a return value other than
// `RETURNCODE_SUCCESS`, any values returned in `buffer`, `size` and `exceeded`
// must not be considered valid.
returncode_t streaming_process_slice_deinit(
streaming_process_slice_state_t* state,
uint8_t** buffer_a,
size_t* size_a,
uint8_t** buffer_b,
size_t* size_b);

0 comments on commit 39f83a8

Please sign in to comment.