Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make daemons working dir configurable #555

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ jobs:
sleep 10
dora stop --name ci-rust-test --grace-duration 5s
cd ..
dora destroy
dora up --working-dir examples/rust-dataflow
dora build examples/rust-dataflow/dataflow_dynamic.yml
dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic
cargo run -p rust-dataflow-example-sink-dynamic
Expand Down Expand Up @@ -319,10 +321,13 @@ jobs:
sleep 10
dora stop --name ci-python-test --grace-duration 5s
pip install opencv-python
dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic
python ../examples/python-dataflow/plot_dynamic.py
dora destroy
cd ..
dora up --working-dir examples/python-dataflow
dora start examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic
python examples/python-dataflow/plot_dynamic.py
sleep 5
dora stop --name ci-python-test --grace-duration 5s
dora stop --name ci-python-dynamic --grace-duration 5s
dora destroy

clippy:
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 20 additions & 12 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ struct Args {
#[clap(subcommand)]
command: Command,
}

/// dora-rs cli client
#[derive(Debug, clap::Subcommand)]
enum Command {
Expand Down Expand Up @@ -87,6 +86,8 @@ enum Command {
/// Use a custom configuration
#[clap(long, hide = true, value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
config: Option<PathBuf>,
#[clap(long, default_value = ".")]
working_dir: PathBuf,
},
/// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first.
Destroy {
Expand Down Expand Up @@ -189,6 +190,9 @@ enum Command {
/// Suppresses all log output to stdout.
#[clap(long)]
quiet: bool,

#[clap(long, default_value = ".")]
working_dir: PathBuf,
},
/// Run runtime
Runtime,
Expand Down Expand Up @@ -310,8 +314,11 @@ fn run() -> eyre::Result<()> {
args,
internal_create_with_path_dependencies,
} => template::create(args, internal_create_with_path_dependencies)?,
Command::Up { config } => {
up::up(config.as_deref())?;
Command::Up {
config,
working_dir,
} => {
up::up(config.as_deref(), working_dir)?;
}
Command::Logs {
dataflow,
Expand Down Expand Up @@ -352,13 +359,9 @@ fn run() -> eyre::Result<()> {
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
if !coordinator_addr.is_loopback() {
dataflow_descriptor.check_in_daemon(&working_dir, &[], true)?;
} else {
dataflow_descriptor
.check(&working_dir)
.wrap_err("Could not validate yaml")?;
}
dataflow_descriptor
.check(&working_dir)
.wrap_err("Could not validate yaml")?;

let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("failed to connect to dora coordinator")?;
Expand Down Expand Up @@ -442,7 +445,12 @@ fn run() -> eyre::Result<()> {
machine_id,
run_dataflow,
quiet: _,
working_dir,
} => {
let working_dir = working_dir
.canonicalize()
.context("failed to canonicalize working dir path")?
.to_owned();
let rt = Builder::new_multi_thread()
.enable_all()
.build()
Expand All @@ -458,13 +466,13 @@ fn run() -> eyre::Result<()> {
);
}

Daemon::run_dataflow(&dataflow_path).await
Daemon::run_dataflow(&dataflow_path, working_dir).await
}
None => {
if coordinator_addr.ip() == LOCALHOST {
tracing::info!("Starting in local mode");
}
Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port).await
Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port, working_dir).await
}
}
})
Expand Down
8 changes: 5 additions & 3 deletions binaries/cli/src/up.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST};
use dora_core::topics::{ControlRequest, DORA_COORDINATOR_PORT_CONTROL_DEFAULT};
use eyre::Context;
use std::path::PathBuf;
use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration};
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
struct UpConfig {}

pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> {
pub(crate) fn up(config_path: Option<&Path>, working_dir: PathBuf) -> eyre::Result<()> {
let UpConfig {} = parse_dora_config(config_path)?;
let coordinator_addr = (LOCALHOST, DORA_COORDINATOR_PORT_CONTROL_DEFAULT).into();
let mut session = match connect_to_coordinator(coordinator_addr) {
Expand All @@ -26,7 +27,7 @@ pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> {
};

if !daemon_running(&mut *session)? {
start_daemon().wrap_err("failed to start dora-daemon")?;
start_daemon(working_dir).wrap_err("failed to start dora-daemon")?;

// wait a bit until daemon is connected
let mut i = 0;
Expand Down Expand Up @@ -93,11 +94,12 @@ fn start_coordinator() -> eyre::Result<()> {
Ok(())
}

fn start_daemon() -> eyre::Result<()> {
fn start_daemon(working_dir: PathBuf) -> eyre::Result<()> {
let mut cmd =
Command::new(std::env::current_exe().wrap_err("failed to get current executable path")?);
cmd.arg("daemon");
cmd.arg("--quiet");
cmd.arg("--working-dir").arg(working_dir);
cmd.spawn().wrap_err("failed to run `dora daemon`")?;

println!("started dora daemon");
Expand Down
1 change: 1 addition & 0 deletions binaries/coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ futures-concurrency = "7.1.0"
serde_json = "1.0.86"
names = "0.14.0"
ctrlc = "3.2.5"
dirs = "5.0.1"
22 changes: 17 additions & 5 deletions binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ async fn start_inner(
let mut dataflow_results: HashMap<Uuid, BTreeMap<String, Result<(), String>>> = HashMap::new();
let mut archived_dataflows: HashMap<Uuid, ArchivedDataflow> = HashMap::new();
let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new();
let mut daemon_working_dirs: HashMap<String, PathBuf> = HashMap::new();

while let Some(event) = events.next().await {
if event.log() {
Expand Down Expand Up @@ -168,6 +169,7 @@ async fn start_inner(
mut connection,
dora_version: daemon_version,
listen_port,
working_dir,
} => {
let coordinator_version: &&str = &env!("CARGO_PKG_VERSION");
let version_check = if &daemon_version == coordinator_version {
Expand Down Expand Up @@ -207,9 +209,10 @@ async fn start_inner(
"closing previous connection `{machine_id}` on new register"
);
}
daemon_working_dirs.insert(machine_id.clone(), working_dir.clone());
}
(Err(err), _) => {
tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}");
tracing::warn!("failed to register daemon connection and daemon working_dir for machine `{machine_id}`: {err}");
}
(Ok(_), Err(err)) => {
tracing::warn!("failed to confirm daemon connection for machine `{machine_id}`: {err}");
Expand Down Expand Up @@ -316,7 +319,6 @@ async fn start_inner(
local_working_dir,
} => {
let name = name.or_else(|| names::Generator::default().next());

let inner = async {
if let Some(name) = name.as_deref() {
// check that name is unique
Expand All @@ -329,10 +331,11 @@ async fn start_inner(
}
let dataflow = start_dataflow(
dataflow,
local_working_dir,
name,
&mut daemon_connections,
&clock,
local_working_dir,
&mut daemon_working_dirs,
)
.await?;
Ok(dataflow)
Expand Down Expand Up @@ -850,16 +853,24 @@ async fn retrieve_logs(

async fn start_dataflow(
dataflow: Descriptor,
working_dir: PathBuf,
name: Option<String>,
daemon_connections: &mut HashMap<String, DaemonConnection>,
clock: &HLC,
local_working_dir: PathBuf,
daemon_working_dirs: &mut HashMap<String, PathBuf>,
) -> eyre::Result<RunningDataflow> {
let SpawnedDataflow {
uuid,
machines,
nodes,
} = spawn_dataflow(dataflow, working_dir, daemon_connections, clock).await?;
} = spawn_dataflow(
dataflow,
daemon_connections,
clock,
local_working_dir,
daemon_working_dirs,
)
.await?;
Ok(RunningDataflow {
uuid,
name,
Expand Down Expand Up @@ -950,6 +961,7 @@ pub enum DaemonEvent {
machine_id: String,
connection: TcpStream,
listen_port: u16,
working_dir: PathBuf,
},
}

Expand Down
2 changes: 2 additions & 0 deletions binaries/coordinator/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ pub async fn handle_connection(
machine_id,
dora_version,
listen_port,
working_dir,
} => {
let event = DaemonEvent::Register {
dora_version,
machine_id,
connection,
listen_port,
working_dir,
};
let _ = events_tx.send(Event::Daemon(event)).await;
break;
Expand Down
44 changes: 18 additions & 26 deletions binaries/coordinator/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,12 @@
#[tracing::instrument(skip(daemon_connections, clock))]
pub(super) async fn spawn_dataflow(
dataflow: Descriptor,
working_dir: PathBuf,
daemon_connections: &mut HashMap<String, DaemonConnection>,
clock: &HLC,
working_dir: PathBuf,
daemon_working_dirs: &mut HashMap<String, PathBuf>,
) -> eyre::Result<SpawnedDataflow> {
let remote_machine_id: Vec<_> = daemon_connections
.iter()
.filter_map(|(id, c)| {
if !c.listen_socket.ip().is_loopback() {
Some(id.as_str())
} else {
None
}
})
.collect();
dataflow.check_in_daemon(&working_dir, &remote_machine_id, false)?;
dataflow.check(&working_dir)?;

let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let uuid = Uuid::new_v7(Timestamp::now(NoContext));
Expand All @@ -49,21 +40,22 @@
.map(|c| (m.clone(), c.listen_socket))
})
.collect::<Result<BTreeMap<_, _>, _>>()?;

let spawn_command = SpawnDataflowNodes {
dataflow_id: uuid,
working_dir,
nodes: nodes.clone(),
machine_listen_ports,
dataflow_descriptor: dataflow,
};
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Spawn(spawn_command),
timestamp: clock.new_timestamp(),
})?;

for machine in &machines {
tracing::trace!("Spawning dataflow `{uuid}` on machine `{machine}`");
let working_dir = daemon_working_dirs
.get(machine)
.ok_or_else(|| eyre!("no daemon working dir for machine `{machine}`"))?
.to_owned();
let spawn_command = SpawnDataflowNodes {
dataflow_id: uuid,
working_dir: working_dir,

Check warning on line 50 in binaries/coordinator/src/run/mod.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant field names in struct initialization
nodes: nodes.clone(),
machine_listen_ports: machine_listen_ports.clone(),
dataflow_descriptor: dataflow.clone(),
};
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Spawn(spawn_command),
timestamp: clock.new_timestamp(),
})?;
spawn_dataflow_on_machine(daemon_connections, machine, &message)
.await
.wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine}`"))?;
Expand Down
4 changes: 3 additions & 1 deletion binaries/daemon/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use dora_core::{
message::uhlc::HLC,
};
use eyre::{eyre, Context};
use std::{io::ErrorKind, net::SocketAddr};
use std::{io::ErrorKind, net::SocketAddr, path::PathBuf};
use tokio::{
net::TcpStream,
sync::{mpsc, oneshot},
Expand All @@ -26,6 +26,7 @@ pub async fn register(
machine_id: String,
listen_port: u16,
clock: &HLC,
working_dir: PathBuf,
) -> eyre::Result<impl Stream<Item = Timestamped<CoordinatorEvent>>> {
let mut stream = TcpStream::connect(addr)
.await
Expand All @@ -38,6 +39,7 @@ pub async fn register(
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
machine_id,
listen_port,
working_dir,
},
timestamp: clock.new_timestamp(),
})?;
Expand Down
Loading
Loading