Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New feature: Local distribution capability #129

Merged
merged 8 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bmf/demo/video_enhance/enhance_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def run():
option={
"fp32": True,
"output_scale": 2,
"dist_nums": 3
},
entry="enhance_module.EnhanceModule",
input_manager="immediate",
Expand Down
45 changes: 45 additions & 0 deletions bmf/engine/c_engine/include/assemble_module.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2024 Babit Authors
*
* This file is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This file is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*/

#ifndef BMF_ASSEMBLE_MODULE_H
#define BMF_ASSEMBLE_MODULE_H

#include <bmf/sdk/module.h>
#include <bmf/sdk/module_registry.h>

USE_BMF_SDK_NS
class AssembleModule : public Module {
public:
AssembleModule(int node_id, JsonParam json_param);

int reset();

int process(Task &task);

int close();

std::map<int, bool> in_eof_;

int last_input_num_;

int last_output_num_;

int queue_index_;

std::map<int, std::shared_ptr<std::queue<Packet>>> queue_map_;
};

REGISTER_MODULE_CLASS(AssembleModule)

#endif // BMF_PASS_THROUGH_MODULE_H
24 changes: 24 additions & 0 deletions bmf/engine/c_engine/include/graph_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class ModuleConfig {

ModuleConfig(JsonParam &module_config);

ModuleConfig(const ModuleConfig &other);

std::string get_module_name();
std::string get_module_type();
std::string get_module_path();
Expand Down Expand Up @@ -67,8 +69,12 @@ class StreamConfig {

StreamConfig(JsonParam &stream_config);

StreamConfig(const StreamConfig &other);

std::string get_alias();

void set_identifier(std::string i);

std::string get_identifier();

std::string get_notify();
Expand Down Expand Up @@ -96,6 +102,8 @@ class NodeMetaInfo {

NodeMetaInfo(nlohmann::json &node_meta);

NodeMetaInfo(const NodeMetaInfo &other);

int32_t get_premodule_id();

int32_t get_bundle();
Expand Down Expand Up @@ -129,6 +137,8 @@ class NodeConfig {

NodeConfig(JsonParam &node_config);

NodeConfig(const NodeConfig &other);

ModuleConfig get_module_info();

NodeMetaInfo get_node_meta();
Expand All @@ -145,12 +155,24 @@ class NodeConfig {

void add_output_stream(StreamConfig output_stream);

void change_input_stream_identifier(std::string identifier);

void change_output_stream_identifier(size_t order = 0);

std::string get_input_manager();

void set_id(int id);

int get_id();

void set_scheduler(int scheduler);

int get_scheduler();

void set_dist_nums(int dist_nums);

int get_dist_nums();

std::string get_alias();

std::string get_action();
Expand All @@ -169,6 +191,8 @@ class NodeConfig {
std::vector<StreamConfig> output_streams;
JsonParam option;
int scheduler;
// distributed_nums
int dist_nums = 1;
std::string input_manager = "immediate";
std::string alias;
std::string action;
Expand Down
5 changes: 5 additions & 0 deletions bmf/engine/c_engine/include/optimizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ StreamConfig has_circle(std::vector<NodeConfig> opt_nodes,
NodeConfig merged_node, std::map<int, bool> &rec_stack);
StreamConfig find_first_circle_node(std::vector<NodeConfig> opt_nodes,
NodeConfig merged_node);
NodeConfig create_split_node(int id, StreamConfig input_stream,
int scheduler, int dist_nums);
NodeConfig create_assemble_node(int id, std::vector<StreamConfig> input_streams,
int scheduler, int dist_nums);
void process_distributed_node(std::vector<NodeConfig> &nodes);
void optimize(std::vector<NodeConfig> &nodes);
void merge_subgraph(GraphConfig &main_config, GraphConfig &sub_config,
int sub_node_id);
Expand Down
43 changes: 43 additions & 0 deletions bmf/engine/c_engine/include/split_module.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2024 Babit Authors
*
* This file is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This file is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*/

#ifndef BMF_SPLIT_MODULE_H
#define BMF_SPLIT_MODULE_H

#include <bmf/sdk/module.h>
#include <bmf/sdk/module_registry.h>

USE_BMF_SDK_NS
class SplitModule : public Module {
public:
SplitModule(int node_id, JsonParam json_param);

int reset();

int process(Task &task);

int close();

bool in_eof_;

int last_input_num_;

int last_output_num_;

int stream_index_;
};

REGISTER_MODULE_CLASS(SplitModule)

#endif // BMF_PASS_THROUGH_MODULE_H
107 changes: 107 additions & 0 deletions bmf/engine/c_engine/src/assemble_module.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2024 Babit Authors
*
* This file is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This file is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*/

#include "../include/assemble_module.h"
#include <bmf/sdk/log.h>

AssembleModule::AssembleModule(int node_id, JsonParam json_param)
: Module(node_id, json_param) {
BMFLOG_NODE(BMF_INFO, node_id_) << "assemble module";
last_input_num_ = 0;
last_output_num_ = 0;
queue_index_ = 0;
return;
}

int AssembleModule::process(Task &task) {
if (task.get_inputs().size() != last_input_num_) {
BMFLOG_NODE(BMF_DEBUG, node_id_)
<< "Input Queue size changed from " << last_input_num_ << " to "
<< task.get_inputs().size();
last_input_num_ = task.get_inputs().size();
// init queue_map_
for (int i = 0; i < last_input_num_; i++) {
std::shared_ptr<std::queue<Packet>> tmp_queue =
std::make_shared<std::queue<Packet>>();
queue_map_.insert(
std::pair<int, std::shared_ptr<std::queue<Packet>>>(
i, tmp_queue));
}
}
if (task.get_outputs().size() != last_output_num_) {
BMFLOG_NODE(BMF_DEBUG, node_id_)
<< "Output Queue size changed from " << last_output_num_ << " to "
<< task.get_outputs().size();
last_output_num_ = task.get_outputs().size();
}

if (in_eof_.size() != task.get_inputs().size()) {
in_eof_.clear();
for (auto input_queue : task.get_inputs())
in_eof_[input_queue.first] = false;
}
// assemble data from multi input queue
auto tem_queue = task.get_inputs();
// cache pkts into queue_map_
for (size_t i = 0; i < tem_queue.size(); i++) {
while (!tem_queue[i]->empty()) {
auto q = tem_queue[i];
Packet pkt = q->front();
q->pop();
queue_map_[i]->push(pkt);
}
}

while (!queue_map_[queue_index_]->empty()) {
// pass through pkts
Packet packet;
auto queue = queue_map_.find(queue_index_);

if (in_eof_[queue_index_] == true)
continue;

packet = queue->second->front();
queue->second->pop();

task.fill_output_packet(0, packet);
if (packet.timestamp() == BMF_EOF) {
in_eof_[queue_index_] = true;
}
BMFLOG_NODE(BMF_DEBUG, node_id_)
<< "get packet :" << packet.timestamp()
<< " data:" << packet.type_info().name
<< " in queue:" << queue_index_;

queue_index_ = (queue_index_ + 1) % queue_map_.size();
}

bool all_eof = true;
for (auto f_eof : in_eof_) {
if (f_eof.second == false) {
all_eof = false;
break;
}
}
if (all_eof)
task.set_timestamp(DONE);

return 0;
}

int AssembleModule::reset() {
in_eof_.clear();
return 0;
}

int AssembleModule::close() { return 0; }
Loading
Loading