Skip to content

Channel

Yuzhen Huang edited this page Nov 29, 2017 · 2 revisions

FlexPS provides Channel abstraction for users to exchange data among multiple nodes. It can probably be used in data preprocessing stage, e.g., data repartition. With channel, users can send data to the destination node/threads in a BSP manner.

Usage

  1. User may create the channel in the node level:
const uint32_t num_local_threads = 2;
const uint32_t num_global_threads = num_local_threads * nodes.size();
auto ret = id_mapper.GetChannelThreads(num_local_threads, num_global_threads);
Channel ch(num_local_threads, num_global_threads, ret.first, ret.second, &mailbox);
  1. Then get the local channels:
auto local_channels = ch.GetLocalChannels();

The size of the local_channels is the size of local threads you want to use with the Channel.

  1. In each node, sprawn num_local_threads threads to use the local_channels. Check the complete example in examples/channel_example.cpp.

API

Some API for LocalChannel:

  uint32_t GetId() const { return tid_; }
  /*
   * Push an SArrayBinStream to remote thread.
   */
  void PushTo(uint32_t id, const SArrayBinStream& bin);
  /*
   * Sync and get messages
   */
  std::vector<SArrayBinStream> SyncAndGet();

Make sure LocalChannel is used in local threads of each node.

Channel:

  // The constructor
  Channel(uint32_t num_local_threads, uint32_t num_global_threads,
          std::vector<uint32_t> local_thread_ids, std::unordered_map<uint32_t, uint32_t> id_map,
          AbstractMailbox* mailbox);
  /*
   * Get a vector of local_channels_.
   * The size of the vector is equal to num_local_threads_.
   * This function should be called in the main threads of all the processes.
   */
  std::vector<LocalChannel*> GetLocalChannels();

Work with Engine

Engine has functions GetIdMapper() and GetMailbox() to retrieve the id_mapper and mailbox from engine.

To use Channel with Engine, you may create the Engine and StartEverything(), then you can get the id_mapper and mailbox to construct the Channel.

Engine engine(node, nodes);
engine.StartEverything();
// Retrieve id_mapper and mailbox
auto* id_mapper = engine.GetIdMapper();
auto* mailbox = engine.GetMailbox();
// Create Channel
const uint32_t num_local_threads = 2;
const uint32_t num_global_threads = num_local_threads * nodes.size();
auto ret = id_mapper->GetChannelThreads(num_local_threads, num_global_threads);
Channel ch(num_local_threads, num_global_threads, ret.first, ret.second, mailbox);
// Use Channel as mentioned above.
...
...
id_mapper->ReleaseChannelThreads();
engine.StopEverything();

Notes

Channel related implementation are in comm/channel.[ch]pp, comm/local_channel.[ch]pp, driver/simple_id_mapper.[ch]pp. An example can be found in example/channel_example.cpp.

You may also try Channel together with HDFSManager and KVEngine. This part has not been tested yet.

The SimpleIdMapper does not support creating multiple ChannelThreads currently. So you can only create and use one set of channels. Remember to call ReleaseChannelThreads() to give the channel threads (queues) back to the id mapper.

SArrayBinStream is designed to be used in the same way as BinStream in Husky.

Clone this wiki locally