Skip to content
Josh Blum edited this page Aug 27, 2013 · 46 revisions
http://i.imgur.com/Bxs2Y.png

The following document demonstrates how to code to the Block API. The block interface has one consistent interface for authoring a block that provides access to work function, stream tags, and message passing. All interfaces exposed by the API are equally available in python and C++.

The code guide is just a guide to using the API; it does not address how to build or install custom IP. See the module guide for an example of a project that can build and install user's custom block IP: https://github.com/guruofquality/gras/wiki/Moduleguide

  • Block: A functional processing unit with inputs and outputs
  • Port: single input or output of a block
  • Source: A producer of data
  • Sink: A consumer of data
  • Connection: A flow of data from output port to input port
  • Flow graph: A collection of blocks and connections
  • Item: A unit of data. Ex: baseband sample, symbol, fft vector...
  • Stream: A continuous flow of consecutive items
  • Tag: Metadata that decorates an item in a stream
  • Message: Arbitrary objects passed down connections

http://i.imgur.com/7El6W8N.png

A block is an element of computation. The goal of a block is to take input and produce output. Blocks may have any number of inputs and outputs (including none). Inputs and outputs may be buffers, tags, or messages (more on that later).

To create a block, the user creates a class that inherits from gras::Block (c++) or gras.Block (python). The block hooks into the scheduler by overloading the work() member function. The implementation of the work() method makes various calls into the block's instance (this/self) to get and set state information about the block (see section on work function).

Example making a block c++:

#include <gras/block.hpp>

struct MyBlock : gras::Block
{
    MyBlock(void) : gras::Block("my block name")
    {
        //init stuff here
        //more on this in the sections below
    }

    void work(const InputItems &ins, const OutputItems &outs)
    {
        //work function
        //more on this in the sections below
    }
};

Example making a block python:

import gras

class MyBlock(gras.Block):

    def __init__(self):
        gras.Block.__init__(self, "my block name", other_args...)
        #init stuff here
        #more on this in the sections below

    def work(self, ins, outs):
        #work function
        #more on this in the sections below

A hierarchical block is a collection of blocks in a topology. Like a block, a hierarchical block has inputs and outputs (or none). However, unlike block, it does not implement any processing. A hierarchical block can even have arbitrary nestings of other hierarchical blocks inside of it!

To create a hierarchical block, the user creates a class that inherits from gras::HierBlock (c++) or gras.HierBlock (python). The user creates blocks and connects them with the hier block to form a meaningful processing topology.

Example making a hierarchical block c++:

#include <gras/hier_block.hpp>

struct MyHierBlock : gras::HierBlock
{
    MyHierBlock(void) : gras::HierBlock("my hier block name")
    {
        //create blocks used in this topology
        this->block0 = Block0(...);
        this->block1 = Block1(...);

        //connect block0 output port 0 to block 1 input port 0
        this->connect(this->block0, 0, this->block1, 0);

        //connect block0 output port 0 to hier block output port 0
        this->connect(this->block0, 0, *this, 0);

        //connect block1 output port 0 to hier block output port 1
        this->connect(this->block1, 0, *this, 1);
    }

    //class vars here...
};

Some observations:

  • notice the connections to this
    • this hier block has 2 outputs and no inputs
  • in connect, the number is a port index
    • the first number is an output port
    • the second number is an input port

Example making a hierarchical block python:

import gras

class MyHierBlock(gras.HierBlock):

    def __init__(self):
        gras.HierBlock.__init__(self, "my hier block name")

        #create blocks used in this topology
        self.block0 = Block0(...)
        self.block1 = Block1(...)

        #connect block0 output port 0 to block 1 input port 0
        self.connect((self.block0, 0), (self.block1, 0))

        #connect block0 output port 0 to hier block output port 0
        self.connect((self.block0, 0), (*this, 0))

        #connect block1 output port 0 to hier block output port 1
        self.connect((self.block1, 0), (*this, 1))

Some observations:

  • notice the connections to self
    • this hier block has 2 outputs and no inputs
  • in connect, each tuple represents a port
    • the first tuple is output block, output index
    • the second tuple is input block, input index

A top block is a combination of a hierarchical block and an execution unit. Just like a hierarchical block, the top block has a topology of blocks inside, made up of regular blocks and hierarchical blocks. However, a top block does not have any input or output ports.

To create a top block, the user creates a class that inherits from gras::TopBlock (c++) or gras.TopBlock (python). The user creates blocks and connects them with the top block to form a meaningful processing topology. The top block responsible for the execution of the design/topology, through use of the start()/stop() API calls.

Example making a top block c++:

#include <gras/top_block.hpp>

struct MyTopBlock : gras::TopBlock
{
    MyTopBlock(void) : gras::TopBlock("my top block name")
    {
        //create a bunch of blocks here
        //call connect on blocks, just like hier block example above
    }

    //class vars here...
};

//constructor
MyTopBlock tb;

//execute design
tb.start();

sleep(10);

//shutdown design
tb.stop();
tb.wait();

Example making a top block python:

import gras

class MyTopBlock(gras.TopBlock):
    def __init__(self):
        gras.TopBlock.__init__(self, "my top block name")
        #create a bunch of blocks here
        #call connect on blocks, just like hier block example above


#constructor
tb = MyTopBlock()

//execute design
tb.start()

import time
time.sleep(10)

//shutdown design
tb.stop()
tb.wait()

In this section, we will go into more detail about block implementation. Mainly, block and port initialization, and the processing implementation - the work function.

http://i.imgur.com/BdBe2Ti.png

A block is composed of multiple input and output ports. Each port is used for streaming items of a particular size in bytes. The user must configure the item size for each input and output port. When creating the block, the user should set the input and out sizes for each port.

When configuring ports there is a basic extension rule. If port of index n was the highest port configured, then all ports > n will share the same configuration.

Some example port size configuration in c++

-- A block with inputs and outputs size float --

MyBlock(void):
    gras::Block("my block")
{
    this->input_config(0).item_size = sizeof(float);
    this->output_config(0).item_size = sizeof(float);
}

-- A block with no inputs and outputs size float --

MySourceBlock(void):
    gras::Block("source block")
{
    this->output_config(0).item_size = sizeof(float);
}

-- A block with 2 inputs (float and double) and 1 output --

MyBlockAgain(void):
    gras::Block("my block again")
{
    this->input_config(0).item_size = sizeof(float);
    this->input_config(1).item_size = sizeof(double);
    this->output_config(0).item_size = sizeof(float);
}

Some observations:

  • The first argument is the index of the port
  • MyBlock can have any number of inputs, all size float
  • MyBlock can have any number of outputs, all size float
  • MySourceBlock has no inputs, input item size not set
  • MyBlockAgain input 0 is size float, input 1 is size double
  • MyBlockAgain inputs 2 and up are also size double

In the python world, we cannot simply specify an item size in bytes. Python needs to understand the actual data type. This is done through setting the signature, which is a list of numpy dtypes.

Some example signatures in python

-- A block with 2 inputs and 1 output --

def __init__(self):
    gras.Block.__init__(self, name="my adder",
        in_sig=[numpy.float32, numpy.float32],
        out_sig=[numpy.float32])

-- A block with no inputs and 1 output --

def __init__(self):
    gras.Block.__init__(self, name="my source",
        in_sig=None,
        out_sig=[numpy.float32])

-- A block with 2 inputs (float and double) and 1 output --

def __init__(self):
    gras.Block.__init__(self, name="my block",
        in_sig=[numpy.float32, numpy.float64],
        out_sig=[numpy.float32])

Some observations:

  • An IO signature is a list of numpy dtypes, not simply a size in bytes
    • dtypes describe both the size and type of the item
  • None may be used when the signature as zero ports
  • The same extension rules also apply to the signature
http://i.imgur.com/CQmFLS5.png

To implement processing, the user must write a "work" routine that reads inputs, processes, and writes outputs.

An example work function implementing an adder in c++

void work(const InputItems &ins, const OutputItems &outs)
{
    //min items on input and output ports
    const size_t n = std::min(ins.min(), outs.min())

    //cast buffers
    const float* in0 = ins[0].cast<const float *>();
    const float* in1 = ins[1].cast<const float *>();
    float* out = outs[0].cast<float *>();

    //process data
    for (size_t i = 0; i < nouts; i++)
    {
        out[i] = in0[i] + in1[i];
    }

    //mark input consumed, output produced
    this->consume(0, n); //consume n items on input port 0
    this->consume(1, n); //consume n items on input port 1
    this->produce(0, n); //produce n items on output port 0
}

Parameter definitions:

  • ins: vector of input buffers, where each element corresponds to an input port
  • outs: vector of output buffers, where each element corresponds to an output port

Some observations:

  • First determined n, to avoid out-of-bounds access
  • Each buffer must be cast into a usable data type
  • Input buffers must be casted to a const type
  • The produce and consume methods are called for each port

An example work function implementing an adder in python

def work(self, ins, outs):

    #min items on input and output ports
    n = min(len(ins[0]), len(ins[1]), len(outs[0]))

    #process data
    outs[0][:n] = ins[0][:n] + ins[1][:n]

    #mark input consumed, output produced
    self.consume(0, n) #consume n items on input port 0
    self.consume(1, n) #consume n items on input port 1
    self.produce(0, n) #produce n items on output port 0

Parameter definitions:

  • ins: vector of numpy arrays, where each element corresponds to an input port
  • outs: vector of numpy arrays, where each element corresponds to an output port

Some observations:

  • Casting is not necessary thanks to python's duck typing system
  • The additon of numpy arrays implies an element by element addition
  • The [:] operator is needed to select valid buffer range
  • The length of each array of items can be retrieved with len(...)

http://i.imgur.com/KS2V9ah.png

The block factory is really a three-part feature-set:

  • thread-safe method registration and accessor interface
  • factory registration and factory construction interface
  • runtime registeration and location of blocks (zeroconf)

This interface can be read about in more detail here, on the feature page for the block factory:

For documentation on building and installing custom IP into the framework:

To register a call into the system, the user simply writes standard accessor and mutator methods, and then makes a function call at constructor time to register these methods.

Example registering methods in c++:

struct MyBlock : gras::Block
{
    MyBlock(void):
        gras::Block("MyBlock"),
        foo(0)
    {
        //other work setup stuff here
        this->register_call("get_foo", &MyBlock::get_foo);
        this->register_call("set_foo", &MyBlock::set_foo);
    }

    void work(const InputItems &ins, const OutputItems &outs)
    {
        //user work function here
        //maybe we do stuff with foo
    }

    size_t get_foo(void)
    {
        return foo;
    }

    void set_foo(const size_t &new_foo)
    {
        foo = new_foo;
    }

    size_t foo;
};

Some observations:

  • Notice the "const size_t &new_foo". The arguments must be const references.
  • Register call uses the "&MyBlock::set_foo" syntax to reference a method.
  • The string name is the same as the method name, this is not a requirement.

Example registering methods in python:

class MyBlock(gras.Block):
    def __init__(self):
        gras.Block.__init__(self, "MyBlock", other params...)
        self.foo = 0 #give foo an initial value
        self.register_call("get_foo", self.get_foo)
        self.register_call("set_foo", self.set_foo)

    def work(self, ins, outs):
        #user work function here
        #maybe we do stuff with foo

    def get_foo(self):
        return self.foo

    def set_foo(self, new_foo):
        self.foo = new_foo
  • Register call uses the "self.set_foo" syntax to reference a method.
  • The string name is the same as the method name, this is not a requirement.

To access a method, use the x() operator on the block.

Example accessing methods in c++:

MyBlock *my_block = new MyBlock();

//example setting a value
my_block->x("set_foo", 42);

//example getting a value
const size_t my_foo1 = my_block->x<size_t>("get_foo");

Some observations:

  • The first argument is the name of the registered method.
  • No template specifiers are required for input arguments.
  • Return types cannot be inferred, explicit specification required.

Example accessing methods in python:

my_block = MyBlock()

#example setting a value
my_block.set_foo(42)

#example getting a value
my_foo = my_block.get_foo()

Some observations:

  • That was eazy!
  • Natural language symantics where used, no .x, or string names
    • Python's __getattr__ inserts registered methods into the object.

Users can register factory functions for their blocks into the global registry. Once this is done, blocks can be programtically created from the factory. This avoids the need to expose public headers and symbols for blocks.

Example registration of a factory in c++:

#include <gras/block.hpp>
#include <gras/factory.hpp>

struct MyBlock : gras::Block
{
    MyBlock(const int foo, const std::string &bar):
        gras::Block("MyBlock")
    {
        //constructor guts here
    }
    //other block stuff here...
};

GRAS_REGISTER_FACTORY2("/package/my_block", MyBlock, int, std::string)

Some observations:

  • The GRAS_REGISTER_FACTORY helper macro calls gras::register_factory at static init time.
    • The number 2 represents the number of arguments to the constructor
    • The data types of the constructor arguments must be specified
  • The name of the registration is done with unix style paths for organization purposes.
  • The macro used in the registration is expected to be at the global scope level.

Example registration of a factory in python:

import gras

class MyBlock(gras.Block):
    def __init__(self, foo, bar):
        gras.Block.__init__(self, 'MyBlock', args...)
        #constructor guts here
    #other block stuff here...

gras.register_factory("/package/my_block", MyBlock)

Some observations:

  • The gras.register_factory() is called at module import time.
  • The registration must be at the global scope level to be called on import.
  • The name of the registration is done with unix style paths for organization purposes.

Example call into factory in c++:

gras::Element *my_block;
my_block = gras::make("/package/my_block", 42, "mode");
//do stuff with block...
delete my_block;

Some observations:

  • The gras::make call takes the path, and expected arguments.
  • The unseen template arguments allows for native language semantics.
  • Notice the delete call, ownership of the block goes to the client.
    • Users may use a shared_ptr for automatic deletion of the block.
  • The return type of make is an Element, which is callable interface aware
    • To make gras::Block calls on the pointer, dynamic_cast it first

Example call into factory in python:

my_block = gras.make("/package/my_block", 42, "mode")
//do stuff with block...

Some observations:

  • The gras.make call takes the path, and expected arguments.
  • Ownership of the returned object is handled automatically by Python.

Zero configuration refers to the ability of a topology to self-configure. What this really means is that blocks can register themselves and locate other blocks within a hierarchy. Once a block can be located in the hierarchy, it can be configured by another block using the method access mentioned above. In GRAS, zeroconf can be achived through the Element tree.

The element tree API consists of two parts:

  • adopt_element() - allows hierarchical elements to be registered into parent/child relationships.
  • locate_element() - find an element in with the element tree given a relative or absolute unix path.

Example element tree in c++:

MyBlock *my_block0 = new MyBlock();
MyBlock *my_block1 = new MyBlock();
MyHierBlock *my_hb = new MyHierBlock();
MyTopBlock *my_tb = new MyTopBlock();

my_tb->adopt_element("hier", *my_hb);
my_hb->adopt_element("block0", *my_block0);
my_hb->adopt_element("block1", *my_block1);

... later in the code ...

void some_call_in_block1(void)
{
    gras::Element *block0;

    //block1 locating block0 from the element tree (absolute path)
    block0 = this->locate_element("/hier/block0");

    //block1 locating block0 from the element tree (relative path)
    block0 = this->locate_element("../block0");

    //now you can use properties on block0 inside block1
    block0->x("set_foo", 42);
}

Example element tree in python:

my_block0 = MyBlock()
my_block1 = MyBlock()
my_hb = MyHierBlock()
my_tb = MyTopBlock()

my_tb.adopt_element("hier", my_hb)
my_hb.adopt_element("block0", my_block0)
my_hb.adopt_element("block1", my_block1)

... later in the code ...

def some_call_in_block1(self):

    #block1 locating block0 from the element tree (absolute path)
    block0 = self.locate_element("/hier/block0")

    #block1 locating block0 from the element tree (relative path)
    block0 = self.locate_element("../block0")

    #now you can use properties on block0 inside block1
    block0.set_foo(42)

Observations:

  • The examples are drastically oversimplified vs a practically use case.
  • The paths are unix-style, and can be relative to the element, or absolute.
  • block1 gets access to block0 and its properties from inside implementation.

Recommendations:

  • Setup the parent/child relations for hierarchical elements in the same place in the code as the block connection logic.
  • When one block must access another block's properties, its best to perform the locate once in the notify_active overload.

http://i.imgur.com/cDlhxlO.png

A tag decorates a stream with metadata. A tag is associated with a particular item in a stream. An item may have more than one tag associated with it. The association of an item and tag is made through an absolute count. Every item in a stream has an absolute count. Tags use this count to identify which item in a stream to which they are associated.

A tag has the following members:

  • offset: the unique item count
  • object: a PMC container with metadata contents

Note0: The tags.hpp header also exports a type called StreamTag (a key, value pair). Users may wish to use StreamTag as the tag's object, rather than implement a custom container.

Note1: The PMC type is a highly extensible polymorphic container. More information is available on the PMC wiki page:

Tags can be read from the work function using get_input_tags(). Each input port/stream can have associated tags.

Example reading tags in c++

void work(const InputItems &ins, const OutputItems &outs)
{
    BOOST_FOREACH(const gras::Tag &t, this->get_input_tags(0))
    {
        //do something with t
    }

    //work stuff here...
}

Example reading tags in python

def work(self, ins, outs):
    for t in self.get_input_tags(0):
        #do something with t

    #work stuff here...

Some observations:

  • The first parameter of get_input_tags() specifies the port index
  • get_input_tags() returns an iterator:
    • The iterator is STL algorithm and container compliant
    • The iterator can be used with BOOST_FOREACH or a for loop

Tags can be written from the work function using post_output_tag(). Each output port/stream can have associated tags.

Example writing tags in c++

void work(const InputItems &ins, const OutputItems &outs)
{
    //create a tag and fill its members
    gras::Tag t;
    t.offset = this->get_produced(0); //absolute count for begining of outs[0]
    t.object = PMC_M("example_contents");

    //post the tag to the output port 0
    this->post_output_tag(0, t);

    //work stuff here...
}

Some observations:

  • The tag offset is 0th item of outs[0]
  • get_produced(0) + n is the offset of the nth output item of port 0

Example writing tags in python

def work(self, ins, outs):
    #create a tag and fill its members
    t = gras::Tag()
    t.offset = self.get_produced(0) #absolute count for begining of outs[0]
    t.object = PMC_M("example_contents")

    #post the tag to the output port 0
    self.post_output_tag(0, t)

    #work stuff here...

Some observations:

  • The tag offset is 0th item of outs[0]
  • get_produced(0) + n is the offset of the nth output item of port 0

After each work() call, a block will erase and propagate input tags that are past the input consumption boundary. The default behaviour is to broadcast each input tag to every output. In addition, a 1:1 input to output radio is assumed when interpolating the absolute item count to the output port. The user may override this behaviour by overloading propagate_tags().

Example overload of propagate tags in c++

void propagate_tags(const size_t which_input, const TagIter &iter)
{
    BOOST_FOREACH(const gras::Tag &t, iter)
    {
        //drop it, mutate, whatever
        //call post_output_tag(...)
    }
}

Example overload of propagate tags in python

void propagate_tags(self, which_input, iter):
    for t in iter:
        #drop it, mutate, whatever
        #call post_output_tag(...)

http://i.imgur.com/WTc29Zz.png

A block's ports may also be used for message passing purposes. Essentially, passing stream tags without the stream. GRAS provides an API to setup message ports, receive messages from upstream providers, and to post messages to downstream consumers.

Note0: The tags.hpp header also exports a type called PacketMsg (buffer + metadata). When a message should contain a data buffer, users may wish to use PacketMsg, rather than implement a custom container.

Note1: The PMC type is a highly extensible polymorphic container. More information is available on the PMC wiki page:

Messages can be read from the work function using pop_input_msg(). This method removes a message from the internal message queue on the given port and returns it to the caller.

Example reading messages in c++

void work(const InputItems &ins, const OutputItems &outs)
{
    const PMCC msg = this->pop_input_msg(0);

    //work stuff here...
}

Example reading messages in python

def work(self, ins, outs):
    msg = self.pop_input_msg(0)

    #work stuff here...

Messages can be written from the work function using post_output_msg().

Example writing messages in c++

void work(const InputItems &ins, const OutputItems &outs)
{
    //create a message
    const std::string msg = "example_contents";

    //post the message to the output port 0
    this->post_output_msg(0, msg);

    //work stuff here...
}

Example writing messages in python

def work(self, ins, outs):
    #create a message
    msg = "example_contents"

    #post the message to the output port 0
    self.post_output_msg(0, msg)

    #work stuff here...

This section describes how to use the API to configure buffer input and output requirements, to control the conditions under which work() gets called.

Each port has a configurable reserve requirement. Think of this as the minimum number of required items worth of buffering needed for the scheduler to call work(). By default, reserve items is 1, so work() is only called by default when all ports have data. See the comments for reserve_items in block.hpp for more.

Example setting the reserve_items in c++

MyBlock(void):
    gras::Block("my block")
{

    //init stuff here...

    //work will not be called until input port 0 has at least 10 items
    this->input_config(0).reserve_items = 10;
}

Example setting the reserve_items in python

def __init__(self, other_params...):
    gras.Block.__init__(self, name="my adder", other_params...)

    #work will not be called until input port 0 has at least 10 items
    self.input_config(0).reserve_items = 10

Rather than setting a reserve (or in conjunction with), a block may provide dynamic feedback to the scheduler about its input and output requirements. This is done through the mark_input_fail() and mark_output_fail() mechanisms. See the comments for mark fail in block.hpp for more info.

The basic idea is:

  • the scheduler calls work() with buffers that meet the reserve requirements
  • work decides if the input and output buffer sizes are appropriate
  • work marks fail on any ports that are insufficient in size
  • if work marks fail, work should return and not process data
  • the scheduler will call work again when failed ports size increases

Example marking fail in c++

void work(const InputItems &ins, const OutputItems &outs)
{
    //check if input port 1 has enough items, or fail
    if (ins[1].size() < 32)
    {
        this->mark_input_fail(1);
        return;
    }
    //do useful work here...
}

Example marking fail in python

def work(self, ins, outs):

    #check if input port 1 has enough items, or fail
    if len(ins[1]) < 32:
        self.mark_input_fail(1)
        return

    #do useful work here...

http://i.imgur.com/ZWuvx90.png

For more details about the the GRAS buffer model see the buffers feature page: https://github.com/guruofquality/gras/wiki/Zerocopy

The examples below are only in C++. The API for zero-copy is exposed in python. However, if you are writing blocks in python, as you might expect, zero-copy is probably not worth the trouble.

The zero-copy API provides direct access to the underlying input and output buffers presented to the work function. Blocks that do not mutate samples can take advantage of this to achieve zero-copy. This could be a block that simply does passive things with the input, or perhaps a block that only modifies tags. For example: blocks in gnuradio core like head, skiphead, delay....

Example getting the input buffer and posting to output:

void work(const InputItems &ins, const OutputItems &outs)
{
    //get a reference to the input buffer on port 0
    gras::SBuffer buffer = this->get_input_buffer(0);

    //Optional: modify buffer.offset and buffer.length
    //to control what portion of buffer is passed downstream.
    //Otherwise, buffer is ins[0].size()*item_size bytes.

    //Post this buffer to output port 0
    this->post_output_buffer(0, buffer);

    //Normal stuff: consume the input that was used
    this->consume(0, ins[0].size());
}

For more examples see blocks in GREX that use zero-copy:

  • Delay
  • StreamSelector
  • Stream2Datagram
  • Datagram2Stream

The custom buffer queue API allows the block to choose the memory used for its input and/or output buffers, rather than use the buffers provided by the scheduler.

For example: With a DMA device like a DSP unit, or perhaps one of those GPU libraries; the memory is chosen by the kernel. Therefore, by directly using the DMA buffers, the upstream block can place items directly into a DMA buffer; and likewise, the downstream block can read items directly from a DMA buffer. This saves the block from the need to memcpy into and out of the DMA buffer.

Example of a block that sources data from DMA:

struct DMASourceBlock : gras::Block
{
    DMASourceBlock(void) : gras::Block("DMASourceBlock")
    {
        //init stuff here
        this->output_config(0).item_size = 4;
    }

    //overload output buffer allocator
    //this method is called when the scheduler allocates output buffers
    BufferQueueSptr output_buffer_allocator(
        const size_t which_output,
        const SBufferConfig &config
    )
    {
        //User should make a custom class that inherits from BufferQueue
        //In this example this calss is called MyDMASourceBufferQueue.
        //The config parameter may be used as a hint for buffer size.
        //Or the config parameter may be ignored, as is often the case.
        return BufferQueueSptr(new MyDMASourceBufferQueue(config));
    }

    void work(const InputItems &ins, const OutputItems &outs)
    {
        //do something with the device to wait on a DMA to complete

        //outs[0] is already filled, call produce with the transfer size
        this->produce(0, xfer_bytes/item_size);

        //after work, the scheduler will call pop on your queue
        //do something in your queue to switch to the next buffer

        //once the downstream consumes the buffer
        //the scheduler will call push on your queue
        //do something in your queue to release the memory
    }
};

Example of a block that sinks data to DMA:

struct DMASinkBlock : gras::Block
{
    DMASinkBlock(void) : gras::Block("DMASinkBlock")
    {
        //init stuff here
        this->input_config(0).item_size = 4;
    }

    //overload input buffer allocator
    //this method is called when the scheduler allocates input buffers
    //The upstream block will use this BufferQueue for output instead.
    BufferQueueSptr input_buffer_allocator(
        const size_t which_input,
        const SBufferConfig &config
    )
    {
        //User should make a custom class that inherits from BufferQueue
        //In this example this calss is called MyDMASinkBufferQueue.
        //The config parameter may be used as a hint for buffer size.
        //Or the config parameter may be ignored, as is often the case.
        return BufferQueueSptr(new MyDMASinkBufferQueue(config));
    }

    void work(const InputItems &ins, const OutputItems &outs)
    {
        //do something with the device to commit the memory

        //consume the entire buffer
        this->produce(0, ins[0].size());

        //Wait for DMA xfer completion:
        //It is the responsibility of this work function to do the waiting.

        //As an optimization, to pipeline DMA writes,
        //this block may return without waiting on a particular xfer.
        //But this block must hold a buffer reference for any uncompleted xfer.
        //Hint: get a buffer reference with get_input_buffer().
    }

};

Much of the complication has been hidden in overloads of BufferQueue. The user must have a good understanding of the SBuffer and BufferQueue classes. The comments in the work() methods are merley a suggestion.

Not quite zero-copy, but buffer inlining allows for in-place operations where the output buffer and input buffer are the same - for the benefit of the cache. See the comments for InputPortConfig.inline_buffer in block.hpp for more info.

Code example of setting up buffer inlining:

struct MyBlock : gras::Block
{
    MyBlock(void) : gras::Block("MyBlock")
    {
        this->input_config(0).item_size = sizeof(float);
        this->output_config(0).item_size = sizeof(float);

        //modify the config for buffer inlining
        this->input_config(0).inline_buffer = true;
    }

    void work(const InputItems &, const OutputItems &);
};

For more examples, see math blocks in GREX.


http://i.imgur.com/WZjUY.png

For more details about the the GRAS affinity see the affinity feature page: https://github.com/guruofquality/gras/wiki/Affinity

GRAS is based on a thread pool model where each block or group of blocks belongs to a thread pool. The user may wish to control which blocks are in what thread pool for various performance reasons. Thread pools can be affinitized to a particular CPU core, particular NUMA node, or core within a NUMA node.

In summary, users can affinitize:

  • which thread pool a block work executes in
  • which core a thread pool runs on
  • which NUMA node a thread pool runs on
  • or which core in a given NUMA node

Example creating two thread pools c++:

//create a thread pool with 2 threads
gras::ThreadPoolConfig config0;
config0.thread_count = 2;
gras::ThreadPool tp0(config0);
tp0.set_active();

//all blocks created at this point
//will be operated in thread pool 0

//create a thread pool with 2 threads
//affinitize this pool to NUMA node 3
gras::ThreadPoolConfig config1;
config1.thread_count = 2;
config1.node_mask = (1 << 3);
gras::ThreadPool tp1(config1);
tp1.set_active();

//all blocks created at this point
//will be operated in thread pool 1

Example creating two thread pools python:

#create a thread pool with 2 threads
config0 = gras.ThreadPoolConfig()
config0.thread_count = 2
tp0 = gras.ThreadPool(config0)
tp0.set_active()

#all blocks created at this point
#will be operated in thread pool 0

#create a thread pool with 2 threads
#affinitize this pool to NUMA node 3
config1 = gras.ThreadPoolConfig()
config1.thread_count = 2
config1.node_mask = (1 << 3)
tp1 = gras.ThreadPool(config1)
tp1.set_active()

#all blocks created at this point
#will be operated in thread pool 1

Some observations:

  • The set_active() call control selects the thread pool for the blocks
  • The NUMA affinity takes a mask, 1 << 3 selects node 3
  • See thread_pool.hpp for more in-depth documentation on API

By default, GRAS allocates memory with the standard malloc/new convention. The user may tell a block to allocate buffer memory so it resides on a physical NUMA node. Memory affinity is set through the set_buffer_affinity() method on the block. Simply set set_buffer_affinity(which_node) before the flow graph is started.