Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PRBv3: Add chain state check & stake update on first run of worker loop #1444

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions standalone/prb/src/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,24 @@ impl TxManager {
);
self.clone().send_to_queue(pid, tx_payload, desc).await
}
pub async fn restart_computing(
self: Arc<Self>,
pid: u64,
worker: Sr25519Public,
stake: String,
) -> Result<()> {
let desc = format!(
"Restart computing for 0x{} with stake of {} in pool #{pid}.",
worker.encode_hex::<String>(),
&stake
);
let tx_payload = EncodedPayload::new(
"PhalaStakePoolv2",
"restart_computing",
(pid, Encoded(worker.encode()), stake.parse::<u128>()?).encode(),
);
self.clone().send_to_queue(pid, tx_payload, desc).await
}
pub async fn stop_computing(self: Arc<Self>, pid: u64, worker: Sr25519Public) -> Result<()> {
let desc = format!(
"Stop computing for 0x{} in pool #{pid}.",
Expand Down
26 changes: 26 additions & 0 deletions standalone/prb/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ impl WorkerContext {
Ok(())
}

#[allow(unused_assignments)]
async fn handle_on_preparing(c: WrappedWorkerContext) -> Result<()> {
set_worker_message!(c, "Reached latest finalized height, start preparing...");
let (lm, worker, pr) = extract_essential_values!(c);
Expand Down Expand Up @@ -510,6 +511,7 @@ impl WorkerContext {
txm.clone().add_worker(pid, pubkey).await?;
}

let mut first_run = true;
set_worker_message!(c, "Waiting for session info to update...");
loop {
let cc = c.clone();
Expand All @@ -530,6 +532,30 @@ impl WorkerContext {
set_worker_message!(c, "Worker is cooling down!");
}
_ => {
if first_run {
// Check on-chain stake vs. configured stake and update if different
let stake_query = storage(
"PhalaComputation",
"Stakes",
vec![Value::from_bytes(worker_binding.as_ref().unwrap())],
);
let stake_onchain: Option<u128> = fetch_storage_bytes(&api, &stake_query).await?;
if let Some(onchain_stake) = stake_onchain {
info!("Stake: {:?}, {:?} on-chain", &worker.stake.parse::<u128>().unwrap(), &onchain_stake);
match onchain_stake {
onchain_stake if onchain_stake < worker.stake.parse::<u128>().unwrap() => {
set_worker_message!(c, "Adjusting on-chain stake...");
txm.clone().restart_computing(pid, pubkey, worker.stake).await?;
Self::set_state(c.clone(), WorkerLifecycleState::Working).await;
}
onchain_stake if onchain_stake > worker.stake.parse::<u128>().unwrap() => {
set_worker_message!(c, "Error on-chain stake higher, than configured...");
}
_ => (),
}
}
first_run = false;
}
Self::set_state(c.clone(), WorkerLifecycleState::Working).await;
}
}
Expand Down
Loading