Skip to content

Commit

Permalink
Merge pull request #75 from pipeless-ai/issue_70
Browse files Browse the repository at this point in the history
fix: Remove managers and unassign pipelines properly
  • Loading branch information
miguelaeh authored Nov 15, 2023
2 parents 13c7709 + 6b7f7a0 commit 406173a
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pipeless/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pipeless/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pipeless-ai"
version = "1.0.1"
version = "1.0.2"
edition = "2021"
authors = ["Miguel A. Cabrera Minagorri"]
description = "An open-source computer vision framework to build and deploy applications in minutes"
Expand Down
3 changes: 3 additions & 0 deletions pipeless/src/config/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ impl StreamsTable {
pub fn find_by_pipeline_id(&self, pipeline_id: uuid::Uuid) -> Option<&StreamsTableEntry> {
self.table.iter().find(|entry| entry.get_pipeline() == Some(pipeline_id))
}
pub fn find_by_pipeline_id_mut(&mut self, pipeline_id: uuid::Uuid) -> Option<&mut StreamsTableEntry> {
self.table.iter_mut().find(|entry| entry.get_pipeline() == Some(pipeline_id))
}

pub fn update_by_entry_id(&mut self, entry_id: uuid::Uuid, input_uri: &str, output_uri: Option<String>, frame_path: Vec<String>) {
if let Some(entry) = self.table.iter_mut().find(|entry| entry.id == entry_id) {
Expand Down
18 changes: 12 additions & 6 deletions pipeless/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,24 +155,32 @@ pub fn start(
}
}

// When we have a running manager whose pipeline id is not in any entry, that means the entry was deleted, stop the manager
// and remove it from the hash_map to be dropped
let mut manager_to_remove = None;
{
// When we have a running manager whose pipeline id is not in any entry, that means the entry was deleted, stop the manager
let managers_map_guard = running_managers.read().await;
for (pipeline_id, manager) in managers_map_guard.iter() {
let streams_table = streams_table.read().await;
if streams_table.find_by_pipeline_id(*pipeline_id).is_none() {
info!("Stream config entry removed. Stopping associated pipeline");
manager.stop().await;
manager_to_remove = Some(pipeline_id.clone());
}
}
}
if let Some(manager) = manager_to_remove {
let mut managers_map_guard = running_managers.write().await;
managers_map_guard.remove(&manager);
}
}
DispatcherEvent::PipelineFinished(pipeline_id) => {
let mut stream_entry;
let stream_entry;
{
let table_read_guard = streams_table.read().await;
let stream_entry_option = table_read_guard.find_by_pipeline_id(pipeline_id);
let mut table_write_guard = streams_table.write().await;
let stream_entry_option = table_write_guard.find_by_pipeline_id_mut(pipeline_id);
if let Some(entry) = stream_entry_option {
entry.unassign_pipeline();
stream_entry = entry.clone();
} else {
warn!("
Expand All @@ -184,8 +192,6 @@ pub fn start(
}
}

stream_entry.unassign_pipeline();

let using_input_file = stream_entry.get_input_uri().starts_with("file://");
let using_output_file = match stream_entry.get_output_uri() {
Some(uri) => uri.starts_with("file://"),
Expand Down

0 comments on commit 406173a

Please sign in to comment.