diff --git a/standalone/prb/src/tx.rs b/standalone/prb/src/tx.rs index 86251e5c2b..76df18b6c4 100644 --- a/standalone/prb/src/tx.rs +++ b/standalone/prb/src/tx.rs @@ -636,6 +636,24 @@ impl TxManager { ); self.clone().send_to_queue(pid, tx_payload, desc).await } + pub async fn restart_computing( + self: Arc, + pid: u64, + worker: Sr25519Public, + stake: String, + ) -> Result<()> { + let desc = format!( + "Restart computing for 0x{} with stake of {} in pool #{pid}.", + worker.encode_hex::(), + &stake + ); + let tx_payload = EncodedPayload::new( + "PhalaStakePoolv2", + "restart_computing", + (pid, Encoded(worker.encode()), stake.parse::()?).encode(), + ); + self.clone().send_to_queue(pid, tx_payload, desc).await + } pub async fn stop_computing(self: Arc, pid: u64, worker: Sr25519Public) -> Result<()> { let desc = format!( "Stop computing for 0x{} in pool #{pid}.", diff --git a/standalone/prb/src/worker.rs b/standalone/prb/src/worker.rs index 6b66dba474..d9ea575b95 100644 --- a/standalone/prb/src/worker.rs +++ b/standalone/prb/src/worker.rs @@ -422,6 +422,7 @@ impl WorkerContext { Ok(()) } + #[allow(unused_assignments)] async fn handle_on_preparing(c: WrappedWorkerContext) -> Result<()> { set_worker_message!(c, "Reached latest finalized height, start preparing..."); let (lm, worker, pr) = extract_essential_values!(c); @@ -510,6 +511,7 @@ impl WorkerContext { txm.clone().add_worker(pid, pubkey).await?; } + let mut first_run = true; set_worker_message!(c, "Waiting for session info to update..."); loop { let cc = c.clone(); @@ -530,6 +532,30 @@ impl WorkerContext { set_worker_message!(c, "Worker is cooling down!"); } _ => { + if first_run { + // Check on-chain stake vs. configured stake and update if different + let stake_query = storage( + "PhalaComputation", + "Stakes", + vec![Value::from_bytes(worker_binding.as_ref().unwrap())], + ); + let stake_onchain: Option = fetch_storage_bytes(&api, &stake_query).await?; + if let Some(onchain_stake) = stake_onchain { + info!("Stake: {:?}, {:?} on-chain", &worker.stake.parse::().unwrap(), &onchain_stake); + match onchain_stake { + onchain_stake if onchain_stake < worker.stake.parse::().unwrap() => { + set_worker_message!(c, "Adjusting on-chain stake..."); + txm.clone().restart_computing(pid, pubkey, worker.stake).await?; + Self::set_state(c.clone(), WorkerLifecycleState::Working).await; + } + onchain_stake if onchain_stake > worker.stake.parse::().unwrap() => { + set_worker_message!(c, "Error on-chain stake higher, than configured..."); + } + _ => (), + } + } + first_run = false; + } Self::set_state(c.clone(), WorkerLifecycleState::Working).await; } }