Skip to content

Commit

Permalink
optimize code
Browse files Browse the repository at this point in the history
* remove SplitOutputStreamManager
* optimize distributed node test case
  • Loading branch information
JackLau1222 committed Oct 15, 2024
1 parent b21f586 commit 75c2b75
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 293 deletions.
16 changes: 2 additions & 14 deletions bmf/engine/c_engine/include/output_stream_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ class OutputStreamManager {

std::vector<int> get_stream_id_list();

virtual int post_process(Task &task);
int post_process(Task &task);

virtual int propagate_packets(int stream_id,
int propagate_packets(int stream_id,
std::shared_ptr<SafeQueue<Packet>> packets);

bool any_of_downstream_full();
Expand All @@ -58,17 +58,5 @@ class OutputStreamManager {
int max_id_;
};

class SplitOutputStreamManager : public OutputStreamManager {
public:
SplitOutputStreamManager(std::vector<StreamConfig> output_streams);

int post_process(Task &task) override;
};

int create_output_stream_manager(
std::string const &manager_type, int node_id,
std::vector<StreamConfig> output_streams,
std::shared_ptr<OutputStreamManager> &output_stream_manager);

END_BMF_ENGINE_NS
#endif // BMF_OUTPUT_STREAM_MANAGER_H
8 changes: 2 additions & 6 deletions bmf/engine/c_engine/src/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "../include/node.h"
#include "../include/input_stream_manager.h"
#include "../include/output_stream_manager.h"
#include "../include/module_factory.h"
#include "../include/callback_layer.h"

Expand Down Expand Up @@ -89,11 +88,8 @@ Node::Node(int node_id, NodeConfig &node_config, NodeCallBack &node_callback,
// should be set to low
infinity_node_ = module_->is_infinity();

// output_stream_manager_ =
// std::make_shared<OutputStreamManager>(node_config.get_output_streams());
create_output_stream_manager(node_config.get_output_manager(), node_id,
node_config.get_output_streams(),
output_stream_manager_);
output_stream_manager_ =
std::make_shared<OutputStreamManager>(node_config.get_output_streams());

InputStreamManagerCallBack callback;
callback.scheduler_cb = callback_.scheduler_cb;
Expand Down
64 changes: 36 additions & 28 deletions bmf/engine/c_engine/src/optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,18 +301,30 @@ NodeConfig create_split_node(int id, StreamConfig input_stream,
{"notify", input_stream.get_notify()}
});

std::ostringstream output_identifier;
output_identifier << "split_module_" << id << "_0";
for (int i = 0; i < dist_nums; i++) {
std::ostringstream output_identifier;
output_identifier << "split_module_" << id << "_" << i;

info["output_streams"].push_back({
{"alias", ""},
{"identifier", output_identifier.str()},
{"notify", ""}
});
}

// std::ostringstream output_identifier;
// output_identifier << "split_module_" << id << "_0";

// info["output_streams"] = nlohmann::json::array();
// info["output_streams"].push_back({
// {"alias", ""},
// {"identifier", output_identifier.str()},
// {"notify", ""}
// });

info["output_streams"] = nlohmann::json::array();
info["output_streams"].push_back({
{"alias", ""},
{"identifier", output_identifier.str()},
{"notify", ""}
});
// info["option"] = option_.json_value_;
info["scheduler"] = scheduler;
info["dist_nums"] = dist_nums;
info["dist_nums"] = 1;
info["input_manager"] = "immediate";

return NodeConfig(info);
Expand Down Expand Up @@ -354,7 +366,7 @@ NodeConfig create_assemble_node(int id, std::vector<StreamConfig> input_streams,
});
// info["option"] = option_.json_value_;
info["scheduler"] = scheduler;
info["dist_nums"] = dist_nums;
info["dist_nums"] = 1;
info["input_manager"] = "immediate";

return NodeConfig(info);
Expand All @@ -374,50 +386,46 @@ void process_distributed_node(std::vector<bmf_engine::NodeConfig> &nodes) {
// repoint to memory address after allocation
node = &nodes[nodes_index];
upstream_node = nullptr;
// creat and insert a split node before the current node
// create and insert split node
auto split_node = create_split_node(nodes.size(),
node->get_input_streams()[0],
nodes.size(), 1);
split_node.set_output_manager("split");
node->change_input_stream_identifier(split_node.output_streams[0].
get_identifier());
// nodes.insert(nodes.begin() + nodes_index, split_node);
nodes.size(), dist_nums);
// split_node.set_output_manager("split");
nodes.push_back(split_node);

node->change_input_stream_identifier(split_node.output_streams[0].
get_identifier());
// store input streams for assemble node
std::vector<StreamConfig> input_streams;
// input_streams.push_back(nodes[nodes_index + 1].output_streams[0]);
input_streams.push_back(node->output_streams[0]);
std::vector<StreamConfig> assemble_input_streams;
assemble_input_streams.push_back(node->output_streams[0]);

// creat and insert copies of the current node
for (int i = 1; i < dist_nums; ++i) {
//node = &nodes[nodes_index + 1];
auto new_node = NodeConfig(*node);
new_node.set_id(nodes.size());
new_node.change_input_stream_identifier(split_node.output_streams[i].
get_identifier());
new_node.change_output_stream_identifier();
new_node.set_dist_nums(1);
new_node.set_scheduler(new_node.get_id());
input_streams.push_back(new_node.output_streams[0]);
// nodes.insert(nodes.begin() + nodes_index + 1 + i, new_node);
assemble_input_streams.push_back(new_node.output_streams[0]);
nodes.push_back(new_node);
}
// nodes[nodes_index + dist_nums].set_dist_nums(1);
node->set_dist_nums(1);

// creat and insert assemble node after the copied nodes
// creat and insert assemble node
auto assemble_node = create_assemble_node(nodes.size(),
input_streams,
assemble_input_streams,
nodes.size(), 1);
// nodes.insert(nodes.begin() + nodes_index + 1 + dist_nums, assemble_node);
nodes.push_back(assemble_node);

// link downstream node's inputstream and assemble node's outputstream
for (auto &tem_node : nodes)
for (auto &input_stream : tem_node.input_streams)
if (input_stream.get_identifier() ==
// nodes[nodes_index + 1].output_streams[0].get_identifier()
node->output_streams[0].get_identifier()
&& tem_node.get_id() != assemble_node.get_id())
node->output_streams[0].get_identifier() &&
tem_node.get_id() != assemble_node.get_id())
tem_node.change_input_stream_identifier((assemble_node.
get_output_streams())[0].
get_identifier());
Expand Down
1 change: 0 additions & 1 deletion bmf/engine/c_engine/src/output_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,4 @@ int OutputStream::add_upstream_nodes(int node_id) {
}
return 0;
}

END_BMF_ENGINE_NS
28 changes: 0 additions & 28 deletions bmf/engine/c_engine/src/output_stream_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,32 +175,4 @@ int OutputStreamManager::get_outlink_nodes_id(std::vector<int> &nodes_id) {
nodes_id.push_back(it.first);
return 0;
}

SplitOutputStreamManager::SplitOutputStreamManager(
std::vector<StreamConfig> output_streams) : OutputStreamManager(output_streams) {

}

int SplitOutputStreamManager::post_process(Task &task) {
for (auto &t : task.outputs_queue_) {
auto q = std::make_shared<SafeQueue<Packet>>(t.second);
output_streams_[t.first]->add_packets(q);
// split packets to distributed downstream node
output_streams_[t.first]->split_packets();
}
return 0;
}

int create_output_stream_manager(
std::string const &manager_type, int node_id,
std::vector<StreamConfig> output_streams,
std::shared_ptr<OutputStreamManager> &output_stream_manager) {
if (manager_type == "split") {
output_stream_manager = std::make_shared<SplitOutputStreamManager>(output_streams);
} else {
output_stream_manager = std::make_shared<OutputStreamManager>(output_streams);
}
return 0;
}

END_BMF_ENGINE_NS
38 changes: 38 additions & 0 deletions bmf/test/distributed_node/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Define the C++ standard
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED True)

# Set compiler flags
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -O0 -fPIC")

# Include directories
include_directories(
/root/workspace/bmf_OSPP/output/bmf/include
.
)

# Link directories
link_directories(/root/workspace/bmf_OSPP/output/bmf/lib)

# Source files
set(LIB_SRCS_SPLIT split_module.cpp)
set(LIB_SRCS_COPY copy_module.cpp)
set(LIB_SRCS_ASSEMBLE assemble_module.cpp)
set(MAIN_SRCS cpp_demo.cpp)

# Create shared libraries for copy and assemble modules
add_library(split_module SHARED ${LIB_SRCS_SPLIT})
add_library(copy_module SHARED ${LIB_SRCS_COPY})
add_library(assemble_module SHARED ${LIB_SRCS_ASSEMBLE})

# Link the external libraries to the modules
target_link_libraries(split_module engine bmf_module_sdk)
target_link_libraries(copy_module engine bmf_module_sdk)
target_link_libraries(assemble_module engine bmf_module_sdk)

# Create the executable and link the shared libraries
add_executable(cpp_demo ${MAIN_SRCS})
target_link_libraries(cpp_demo split_module copy_module assemble_module engine bmf_module_sdk)

# Clean up build files
set_directory_properties(PROPERTIES ADDITIONAL_MAKE_CLEAN_FILES "cpp_demo")
59 changes: 0 additions & 59 deletions bmf/test/distributed_node/Makefile

This file was deleted.

Loading

0 comments on commit 75c2b75

Please sign in to comment.