diff --git a/bmf/engine/c_engine/include/assemble_module.h b/bmf/engine/c_engine/include/assemble_module.h index 87b338e4..b210876c 100644 --- a/bmf/engine/c_engine/include/assemble_module.h +++ b/bmf/engine/c_engine/include/assemble_module.h @@ -17,7 +17,6 @@ #include #include -#include "safe_queue.h" USE_BMF_SDK_NS class AssembleModule : public Module { @@ -38,7 +37,7 @@ class AssembleModule : public Module { int queue_index_; - std::map>> queue_map_; + std::map>> queue_map_; }; REGISTER_MODULE_CLASS(AssembleModule) diff --git a/bmf/engine/c_engine/src/assemble_module.cpp b/bmf/engine/c_engine/src/assemble_module.cpp index 92b7293d..f4b78058 100644 --- a/bmf/engine/c_engine/src/assemble_module.cpp +++ b/bmf/engine/c_engine/src/assemble_module.cpp @@ -32,10 +32,10 @@ int AssembleModule::process(Task &task) { last_input_num_ = task.get_inputs().size(); // init queue_map_ for (int i = 0; i < last_input_num_; i++) { - std::shared_ptr> tmp_queue = - std::make_shared>(); + std::shared_ptr> tmp_queue = + std::make_shared>(); queue_map_.insert( - std::pair>>( + std::pair>>( i, tmp_queue)); } } @@ -71,18 +71,19 @@ int AssembleModule::process(Task &task) { if (in_eof_[queue_index_] == true) continue; - if (queue->second->pop(packet)) { - task.fill_output_packet(0, packet); - if (packet.timestamp() == BMF_EOF) { - in_eof_[queue_index_] = true; - } - BMFLOG_NODE(BMF_DEBUG, node_id_) - << "get packet :" << packet.timestamp() - << " data:" << packet.type_info().name - << " in queue:" << queue_index_; + packet = queue->second->front(); + queue->second->pop(); - queue_index_ = (queue_index_ + 1) % queue_map_.size(); + task.fill_output_packet(0, packet); + if (packet.timestamp() == BMF_EOF) { + in_eof_[queue_index_] = true; } + BMFLOG_NODE(BMF_DEBUG, node_id_) + << "get packet :" << packet.timestamp() + << " data:" << packet.type_info().name + << " in queue:" << queue_index_; + + queue_index_ = (queue_index_ + 1) % queue_map_.size(); } bool all_eof = true; diff --git a/bmf/engine/c_engine/src/optimizer.cpp b/bmf/engine/c_engine/src/optimizer.cpp index 1c7c5a78..5ce3e273 100644 --- a/bmf/engine/c_engine/src/optimizer.cpp +++ b/bmf/engine/c_engine/src/optimizer.cpp @@ -427,8 +427,8 @@ void process_distributed_node(std::vector &nodes) { node->output_streams[0].get_identifier() && tem_node.get_id() != assemble_node.get_id()) { tem_node.change_input_stream_identifier((assemble_node. - get_output_streams())[0]. - get_identifier()); + get_output_streams())[0]. + get_identifier()); } } }