From ca4f469cfd49d8df96ce3c7cee3e59ab377910a1 Mon Sep 17 00:00:00 2001 From: mohanson Date: Mon, 4 Mar 2024 14:17:55 +0800 Subject: [PATCH] Just copy codes in it --- Cargo.lock | 4 +- script/Cargo.toml | 2 +- script/src/lib.rs | 3 + script/src/v2_scheduler.rs | 778 +++++++++++++++++++++++++++++++++++++ script/src/v2_syscalls.rs | 529 +++++++++++++++++++++++++ script/src/v2_types.rs | 275 +++++++++++++ 6 files changed, 1588 insertions(+), 3 deletions(-) create mode 100644 script/src/v2_scheduler.rs create mode 100644 script/src/v2_syscalls.rs create mode 100644 script/src/v2_types.rs diff --git a/Cargo.lock b/Cargo.lock index ff99548f85..f828918306 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1733,7 +1733,7 @@ dependencies = [ [[package]] name = "ckb-vm" version = "0.24.9" -source = "git+https://github.com/chenyukang/ckb-vm.git?branch=yukang-local-changes#9cd3c20b009ea8e74a46143cafdde0acac9e323b" +source = "git+https://github.com/libraries/ckb-vm?branch=spawn2#953a9474ce7d5d0d29cc8cd173265a150805a54a" dependencies = [ "byteorder", "bytes", @@ -1750,7 +1750,7 @@ dependencies = [ [[package]] name = "ckb-vm-definitions" version = "0.24.9" -source = "git+https://github.com/chenyukang/ckb-vm.git?branch=yukang-local-changes#9cd3c20b009ea8e74a46143cafdde0acac9e323b" +source = "git+https://github.com/libraries/ckb-vm?branch=spawn2#953a9474ce7d5d0d29cc8cd173265a150805a54a" dependencies = [ "paste", ] diff --git a/script/Cargo.toml b/script/Cargo.toml index 9230b358c1..c595451420 100644 --- a/script/Cargo.toml +++ b/script/Cargo.toml @@ -22,7 +22,7 @@ ckb-traits = { path = "../traits", version = "= 0.114.0-pre" } byteorder = "1.3.1" ckb-types = { path = "../util/types", version = "= 0.114.0-pre" } ckb-hash = { path = "../util/hash", version = "= 0.114.0-pre" } -ckb-vm = { git = "https://github.com/chenyukang/ckb-vm.git", version = "=0.24.9", idefault-features = false, branch = "yukang-local-changes"} +ckb-vm = { git = "https://github.com/libraries/ckb-vm", branch = "spawn2", features = ["asm"] } faster-hex = "0.6" ckb-logger = { path = "../util/logger", version = "= 0.114.0-pre", optional = true } serde = { version = "1.0", features = ["derive"] } diff --git a/script/src/lib.rs b/script/src/lib.rs index b24f94db63..02c44f3953 100644 --- a/script/src/lib.rs +++ b/script/src/lib.rs @@ -6,6 +6,9 @@ mod type_id; mod types; mod verify; mod verify_env; +mod v2_scheduler; +mod v2_syscalls; +mod v2_types; pub use crate::error::{ScriptError, TransactionScriptError}; pub use crate::syscalls::spawn::update_caller_machine; diff --git a/script/src/v2_scheduler.rs b/script/src/v2_scheduler.rs new file mode 100644 index 0000000000..6da642000f --- /dev/null +++ b/script/src/v2_scheduler.rs @@ -0,0 +1,778 @@ +use crate::v2_syscalls::INDEX_OUT_OF_BOUND; +use crate::v2_types::PipeIoArgs; +use crate::{ + v2_syscalls::{ + transferred_byte_cycles, MachineContext, INVALID_PIPE, JOIN_FAILURE, OTHER_END_CLOSED, + SUCCESS, + }, + v2_types::{ + DataPieceId, FullSuspendedState, Message, PipeId, RunMode, TxData, VmId, VmState, + FIRST_PIPE_SLOT, FIRST_VM_ID, + }, +}; +use crate::{ScriptVersion, TransactionScriptsVerifier}; +use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider}; +use ckb_types::core::Cycle; +use ckb_vm::{ + bytes::Bytes, + cost_model::estimate_cycles, + elf::parse_elf, + machine::{ + asm::{AsmCoreMachine, AsmMachine}, + CoreMachine, DefaultMachineBuilder, Pause, SupportMachine, + }, + memory::Memory, + registers::A0, + snapshot2::{DataSource, Snapshot2}, + Error, Register, +}; +use std::sync::{Arc, Mutex}; +use std::{ + collections::{BTreeMap, HashMap}, + mem::size_of, +}; + +const ROOT_VM_ID: VmId = FIRST_VM_ID; +const MAX_INSTANTIATED_VMS: usize = 4; + +/// A single Scheduler instance is used to verify a single script +/// within a CKB transaction. +/// +/// A scheduler holds & manipulates a core, the scheduler also holds +/// all CKB-VM machines, each CKB-VM machine also gets a mutable reference +/// of the core for IO operations. +pub struct Scheduler< + DL: CellDataProvider + HeaderProvider + ExtensionProvider + Send + Sync + Clone + 'static, +> { + tx_data: TxData
, + // In fact, Scheduler here has the potential to totally replace + // TransactionScriptsVerifier, nonetheless much of current syscall + // implementation is strictly tied to TransactionScriptsVerifier, we + // are using it here to save some extra code. + verifier: TransactionScriptsVerifier
, + + total_cycles: Cycle, + next_vm_id: VmId, + next_pipe_slot: u64, + states: BTreeMap, + pipes: HashMap, + inherited_fd: BTreeMap>, + instantiated: BTreeMap, AsmMachine)>, + suspended: HashMap>, + terminated_vms: HashMap, + + // message_box is expected to be empty before returning from `run` + // function, there is no need to persist messages. + message_box: Arc>>, +} + +impl + Scheduler
+{ + /// Create a new scheduler from empty state + pub fn new(tx_data: TxData
, verifier: TransactionScriptsVerifier
) -> Self { + Self { + tx_data, + verifier, + total_cycles: 0, + next_vm_id: FIRST_VM_ID, + next_pipe_slot: FIRST_PIPE_SLOT, + states: BTreeMap::default(), + pipes: HashMap::default(), + inherited_fd: BTreeMap::default(), + instantiated: BTreeMap::default(), + suspended: HashMap::default(), + message_box: Arc::new(Mutex::new(Vec::new())), + terminated_vms: HashMap::default(), + } + } + + pub fn consumed_cycles(&self) -> Cycle { + self.total_cycles + } + + /// Resume a previously suspended scheduler state + pub fn resume( + tx_data: TxData
, + verifier: TransactionScriptsVerifier
, + full: FullSuspendedState, + ) -> Self { + Self { + tx_data, + verifier, + total_cycles: full.total_cycles, + next_vm_id: full.next_vm_id, + next_pipe_slot: full.next_pipe_slot, + states: full + .vms + .iter() + .map(|(id, state, _)| (*id, state.clone())) + .collect(), + pipes: full.pipes.into_iter().collect(), + inherited_fd: full.inherited_fd.into_iter().collect(), + instantiated: BTreeMap::default(), + suspended: full + .vms + .into_iter() + .map(|(id, _, snapshot)| (id, snapshot)) + .collect(), + message_box: Arc::new(Mutex::new(Vec::new())), + terminated_vms: full.terminated_vms.into_iter().collect(), + } + } + + /// Suspend current scheduler into a serializable full state + pub fn suspend(mut self) -> Result { + let mut vms = Vec::with_capacity(self.states.len()); + let instantiated_ids: Vec<_> = self.instantiated.keys().cloned().collect(); + for id in instantiated_ids { + self.suspend_vm(&id)?; + } + for (id, state) in self.states { + let snapshot = self.suspended.remove(&id).unwrap(); + vms.push((id, state, snapshot)); + } + Ok(FullSuspendedState { + total_cycles: self.total_cycles, + next_vm_id: self.next_vm_id, + next_pipe_slot: self.next_pipe_slot, + vms, + pipes: self.pipes.into_iter().collect(), + inherited_fd: self.inherited_fd.into_iter().collect(), + terminated_vms: self.terminated_vms.into_iter().collect(), + }) + } + + /// This is the only entrypoint for running the scheduler, + /// both newly created instance and resumed instance are supported. + /// It accepts 2 run mode, one can either limit the cycles to execute, + /// or use a pause signal to trigger termination. + /// + /// Only when the execution terminates without VM errors, will this + /// function return an exit code(could still be non-zero) and total + /// consumed cycles. + /// + /// Err would be returned in the following cases: + /// * Cycle limit reached, the returned error would be ckb_vm::Error::CyclesExceeded, + /// * Pause trigger, the returned error would be ckb_vm::Error::Pause, + /// * Other terminating errors + pub fn run(&mut self, mode: RunMode) -> Result<(i8, Cycle), Error> { + if self.states.is_empty() { + // Booting phase, we will need to initialize the first VM. + assert_eq!( + self.boot_vm(&DataPieceId::Program, 0, u64::max_value(), &[])?, + ROOT_VM_ID + ); + } + assert!(self.states.contains_key(&ROOT_VM_ID)); + + let (pause, mut limit_cycles) = match mode { + RunMode::LimitCycles(limit_cycles) => (Pause::new(), limit_cycles), + RunMode::Pause(pause) => (pause, u64::max_value()), + }; + + while self.states[&ROOT_VM_ID] != VmState::Terminated { + let consumed_cycles = self.iterate(pause.clone(), limit_cycles)?; + limit_cycles = consumed_cycles + .checked_sub(consumed_cycles) + .ok_or(Error::CyclesExceeded)?; + } + + // At this point, root VM cannot be suspended + let root_vm = &self.instantiated[&ROOT_VM_ID]; + Ok((root_vm.1.machine.exit_code(), self.total_cycles)) + } + + // This is internal function that does the actual VM execution loop. + // Here both pause signal and limit_cycles are provided so as to simplify + // branches. + fn iterate(&mut self, pause: Pause, limit_cycles: Cycle) -> Result { + // 1. Process all pending VM reads & writes + self.process_io()?; + // 2. Run an actual VM + // Find a runnable VM that has the largest ID + let vm_id_to_run = self + .states + .iter() + .rev() + .filter(|(_, state)| matches!(state, VmState::Runnable)) + .map(|(id, _)| *id) + .next(); + if vm_id_to_run.is_none() { + return Err(Error::Unexpected( + "A deadlock situation has been reached!".to_string(), + )); + } + let vm_id_to_run = vm_id_to_run.unwrap(); + // log::debug!("Running VM {}", vm_id_to_run); + let (result, consumed_cycles) = { + self.ensure_vms_instantiated(&[vm_id_to_run])?; + let (context, machine) = self.instantiated.get_mut(&vm_id_to_run).unwrap(); + context.set_base_cycles(self.total_cycles); + machine.set_max_cycles(limit_cycles); + machine.machine.set_pause(pause); + let result = machine.run(); + let consumed_cycles = { + let c = machine.machine.cycles(); + machine.machine.set_cycles(0); + c + }; + // This shall be the only place where total_cycles gets updated + self.total_cycles = self + .total_cycles + .checked_add(consumed_cycles) + .ok_or(Error::CyclesOverflow)?; + (result, consumed_cycles) + }; + // 3. Process message box, update VM states accordingly + self.process_message_box()?; + assert!(self.message_box.lock().expect("lock").is_empty()); + // log::debug!("VM states: {:?}", self.states); + // log::debug!("Pipes and owners: {:?}", self.pipes); + // 4. If the VM terminates, update VMs in join state, also closes its pipes + match result { + Ok(code) => { + // log::debug!("VM {} terminates with code {}", vm_id_to_run, code); + self.terminated_vms.insert(vm_id_to_run, code); + // When root VM terminates, the execution stops immediately, we will purge + // all non-root VMs, and only keep root VM in states. + // When non-root VM terminates, we only purge the VM's own states. + if vm_id_to_run == ROOT_VM_ID { + self.ensure_vms_instantiated(&[vm_id_to_run])?; + self.instantiated.retain(|id, _| *id == vm_id_to_run); + self.suspended.clear(); + self.states.clear(); + self.states.insert(vm_id_to_run, VmState::Terminated); + } else { + let mut joining_vms: Vec<(VmId, u64)> = Vec::new(); + self.states.iter().for_each(|(vm_id, state)| { + if let VmState::Join { + target_vm_id, + exit_code_addr, + } = state + { + if *target_vm_id == vm_id_to_run { + joining_vms.push((*vm_id, *exit_code_addr)); + } + } + }); + // For all joining VMs, update exit code, then mark them as + // runnable state. + for (vm_id, exit_code_addr) in joining_vms { + self.ensure_vms_instantiated(&[vm_id])?; + let (_, machine) = self.instantiated.get_mut(&vm_id).unwrap(); + machine + .machine + .memory_mut() + .store8(&exit_code_addr, &u64::from_i8(code))?; + machine.machine.set_register(A0, SUCCESS as u64); + self.states.insert(vm_id, VmState::Runnable); + } + // Close pipes + self.pipes.retain(|_, vm_id| *vm_id != vm_id_to_run); + // Clear terminated VM states + self.states.remove(&vm_id_to_run); + self.instantiated.remove(&vm_id_to_run); + self.suspended.remove(&vm_id_to_run); + } + Ok(consumed_cycles) + } + Err(Error::External(msg)) if msg == "YIELD" => Ok(consumed_cycles), + Err(e) => Err(e), + } + } + + fn process_message_box(&mut self) -> Result<(), Error> { + let messages: Vec = self.message_box.lock().expect("lock").drain(..).collect(); + for message in messages { + match message { + Message::Spawn(vm_id, args) => { + // All pipes must belong to the correct owner + for pipe in &args.pipes { + if !(self.pipes.contains_key(pipe) && (self.pipes[pipe] == vm_id)) { + return Err(Error::Unexpected(format!( + "VM {} does not own pipe {}!", + vm_id, pipe.0, + ))); + } + } + // TODO: spawn limits + let spawned_vm_id = + self.boot_vm(&args.data_piece_id, args.offset, args.length, &args.argv)?; + // Move passed pipes from spawner to spawnee + for pipe in &args.pipes { + self.pipes.insert(*pipe, spawned_vm_id); + } + // here we keep the original version of file descriptors. + // if one fd is moved afterward, this inherited file descriptors doesn't change. + // log::info!( + // "VmId = {} with Inherited file descriptor {:?}", + // spawned_vm_id, + // args.pipes + // ); + self.inherited_fd.insert(spawned_vm_id, args.pipes.clone()); + + self.ensure_vms_instantiated(&[vm_id])?; + { + let (_, machine) = self.instantiated.get_mut(&vm_id).unwrap(); + machine + .machine + .memory_mut() + .store64(&args.instance_id_addr, &spawned_vm_id)?; + machine.machine.set_register(A0, SUCCESS as u64); + } + } + Message::Join(vm_id, args) => { + if let Some(exit_code) = self.terminated_vms.get(&args.target_id).copied() { + self.ensure_vms_instantiated(&[vm_id])?; + { + let (_, machine) = self.instantiated.get_mut(&vm_id).unwrap(); + machine + .machine + .memory_mut() + .store8(&args.exit_code_addr, &u64::from_i8(exit_code))?; + machine.machine.set_register(A0, SUCCESS as u64); + self.states.insert(vm_id, VmState::Runnable); + } + continue; + } + if !self.states.contains_key(&args.target_id) { + self.ensure_vms_instantiated(&[vm_id])?; + { + let (_, machine) = self.instantiated.get_mut(&vm_id).unwrap(); + machine.machine.set_register(A0, JOIN_FAILURE as u64); + } + continue; + } + // Return code will be updated when the joining VM exits + self.states.insert( + vm_id, + VmState::Join { + target_vm_id: args.target_id, + exit_code_addr: args.exit_code_addr, + }, + ); + } + Message::Pipe(vm_id, args) => { + // TODO: pipe limits + let (p1, p2, slot) = PipeId::create(self.next_pipe_slot); + self.next_pipe_slot = slot; + // log::debug!("VM {} creates pipes ({}, {})", vm_id, p1.0, p2.0); + + self.pipes.insert(p1, vm_id); + self.pipes.insert(p2, vm_id); + + self.ensure_vms_instantiated(&[vm_id])?; + { + let (_, machine) = self.instantiated.get_mut(&vm_id).unwrap(); + machine + .machine + .memory_mut() + .store64(&args.pipe1_addr, &p1.0)?; + machine + .machine + .memory_mut() + .store64(&args.pipe2_addr, &p2.0)?; + machine.machine.set_register(A0, SUCCESS as u64); + } + } + Message::PipeRead(vm_id, args) => { + if !(self.pipes.contains_key(&args.pipe) && (self.pipes[&args.pipe] == vm_id)) { + self.ensure_vms_instantiated(&[vm_id])?; + { + let (_, machine) = self.instantiated.get_mut(&vm_id).unwrap(); + machine.machine.set_register(A0, INVALID_PIPE as u64); + } + continue; + } + if !self.pipes.contains_key(&args.pipe.other_pipe()) { + self.ensure_vms_instantiated(&[vm_id])?; + { + let (_, machine) = self.instantiated.get_mut(&vm_id).unwrap(); + machine.machine.set_register(A0, OTHER_END_CLOSED as u64); + } + continue; + } + // Return code will be updated when the read operation finishes + self.states.insert( + vm_id, + VmState::WaitForRead { + pipe: args.pipe, + length: args.length, + buffer_addr: args.buffer_addr, + length_addr: args.length_addr, + }, + ); + } + Message::PipeWrite(vm_id, args) => { + if !(self.pipes.contains_key(&args.pipe) && (self.pipes[&args.pipe] == vm_id)) { + self.ensure_vms_instantiated(&[vm_id])?; + { + let (_, machine) = self.instantiated.get_mut(&vm_id).unwrap(); + machine.machine.set_register(A0, INVALID_PIPE as u64); + } + continue; + } + if !self.pipes.contains_key(&args.pipe.other_pipe()) { + self.ensure_vms_instantiated(&[vm_id])?; + { + let (_, machine) = self.instantiated.get_mut(&vm_id).unwrap(); + machine.machine.set_register(A0, OTHER_END_CLOSED as u64); + } + continue; + } + // Return code will be updated when the write operation finishes + self.states.insert( + vm_id, + VmState::WaitForWrite { + pipe: args.pipe, + consumed: 0, + length: args.length, + buffer_addr: args.buffer_addr, + length_addr: args.length_addr, + }, + ); + } + Message::InheritedFileDescriptor(vm_id, args) => { + self.ensure_vms_instantiated(&[vm_id])?; + let (_, machine) = self.instantiated.get_mut(&vm_id).unwrap(); + let PipeIoArgs { + buffer_addr, + length_addr, + .. + } = args; + let input_length = machine + .machine + .inner_mut() + .memory_mut() + .load64(&length_addr)?; + let inherited_fd = &self.inherited_fd[&vm_id]; + let actual_length = inherited_fd.len() as u64; + if buffer_addr == 0 { + if input_length == 0 { + machine + .machine + .inner_mut() + .memory_mut() + .store64(&length_addr, &actual_length)?; + machine.machine.set_register(A0, SUCCESS as u64); + } else { + // TODO: in the previous convention + // https://github.com/nervosnetwork/rfcs/blob/master/rfcs/0009-vm-syscalls/0009-vm-syscalls.md#partial-loading + // this will load data in to address 0 without notice. It is now marked as an error. + machine.machine.set_register(A0, INDEX_OUT_OF_BOUND as u64); + } + continue; + } + let mut buffer_addr2 = buffer_addr; + let copy_length = u64::min(input_length, actual_length); + for i in 0..copy_length { + let fd = inherited_fd[i as usize].0; + machine + .machine + .inner_mut() + .memory_mut() + .store64(&buffer_addr2, &fd)?; + buffer_addr2 += size_of::() as u64; + } + machine + .machine + .inner_mut() + .memory_mut() + .store64(&length_addr, &actual_length)?; + machine.machine.set_register(A0, SUCCESS as u64); + } + } + } + Ok(()) + } + + fn process_io(&mut self) -> Result<(), Error> { + let mut reads: HashMap = HashMap::default(); + let mut closed_pipes: Vec = Vec::new(); + self.states.iter().for_each(|(vm_id, state)| { + if let VmState::WaitForRead { pipe, .. } = state { + if self.pipes.contains_key(&pipe.other_pipe()) { + reads.insert(*pipe, (*vm_id, state.clone())); + } else { + closed_pipes.push(*vm_id); + } + } + }); + let mut pairs: Vec<[(VmId, VmState); 2]> = Vec::new(); + self.states.iter().for_each(|(vm_id, state)| { + if let VmState::WaitForWrite { pipe, .. } = state { + if self.pipes.contains_key(&pipe.other_pipe()) { + if let Some((read_vm_id, read_state)) = reads.get(&pipe.other_pipe()) { + pairs.push([(*read_vm_id, read_state.clone()), (*vm_id, state.clone())]); + } + } else { + closed_pipes.push(*vm_id); + } + } + }); + // Finish read / write syscalls for pipes that are closed on the other end + for vm_id in closed_pipes { + match self.states[&vm_id].clone() { + VmState::WaitForRead { length_addr, .. } => { + let (_, read_machine) = self.instantiated.get_mut(&vm_id).unwrap(); + read_machine + .machine + .memory_mut() + .store64(&length_addr, &0)?; + read_machine.machine.set_register(A0, SUCCESS as u64); + self.states.insert(vm_id, VmState::Runnable); + } + VmState::WaitForWrite { + consumed, + length_addr, + .. + } => { + let (_, write_machine) = self.instantiated.get_mut(&vm_id).unwrap(); + write_machine + .machine + .memory_mut() + .store64(&length_addr, &consumed)?; + write_machine.machine.set_register(A0, SUCCESS as u64); + self.states.insert(vm_id, VmState::Runnable); + } + _ => (), + } + } + // Transfering data from write pipes to read pipes + for [(read_vm_id, read_state), (write_vm_id, write_state)] in pairs { + let VmState::WaitForRead { + length: read_length, + buffer_addr: read_buffer_addr, + length_addr: read_length_addr, + .. + } = read_state + else { + unreachable!() + }; + let VmState::WaitForWrite { + pipe: write_pipe, + mut consumed, + length: write_length, + buffer_addr: write_buffer_addr, + length_addr: write_length_addr, + } = write_state + else { + unreachable!() + }; + + self.ensure_vms_instantiated(&[read_vm_id, write_vm_id])?; + { + let fillable = read_length; + let consumable = write_length - consumed; + let copiable = std::cmp::min(fillable, consumable); + + // Actual data copying + // TODO: charge cycles + let data = self + .instantiated + .get_mut(&write_vm_id) + .unwrap() + .1 + .machine + .memory_mut() + .load_bytes(write_buffer_addr.wrapping_add(consumed), copiable)?; + self.instantiated + .get_mut(&read_vm_id) + .unwrap() + .1 + .machine + .memory_mut() + .store_bytes(read_buffer_addr, &data)?; + + // Read syscall terminates as soon as some data are filled + let (_, read_machine) = self.instantiated.get_mut(&read_vm_id).unwrap(); + read_machine + .machine + .memory_mut() + .store64(&read_length_addr, &copiable)?; + read_machine.machine.set_register(A0, SUCCESS as u64); + self.states.insert(read_vm_id, VmState::Runnable); + + // Write syscall, however, terminates only when all the data + // have been written, or when the pairing read pipe is closed. + consumed += copiable; + if consumed == write_length { + // write VM has fulfilled its write request + let (_, write_machine) = self.instantiated.get_mut(&write_vm_id).unwrap(); + write_machine + .machine + .memory_mut() + .store64(&write_length_addr, &write_length)?; + write_machine.machine.set_register(A0, SUCCESS as u64); + self.states.insert(write_vm_id, VmState::Runnable); + } else { + // Only update write VM state + self.states.insert( + write_vm_id, + VmState::WaitForWrite { + pipe: write_pipe, + consumed, + length: write_length, + buffer_addr: write_buffer_addr, + length_addr: write_length_addr, + }, + ); + } + } + } + Ok(()) + } + + // Ensure VMs are instantiated + fn ensure_vms_instantiated(&mut self, ids: &[VmId]) -> Result<(), Error> { + if ids.len() > MAX_INSTANTIATED_VMS { + return Err(Error::Unexpected(format!( + "At most {} VMs can be instantiated but {} are requested!", + MAX_INSTANTIATED_VMS, + ids.len() + ))); + } + + let mut uninstantiated_ids: Vec = ids + .iter() + .filter(|id| !self.instantiated.contains_key(id)) + .copied() + .collect(); + while (!uninstantiated_ids.is_empty()) && (self.instantiated.len() < MAX_INSTANTIATED_VMS) { + let id = uninstantiated_ids.pop().unwrap(); + self.resume_vm(&id)?; + } + + if !uninstantiated_ids.is_empty() { + // instantiated is a BTreeMap, an iterator on it maintains key order to ensure deterministic behavior + let suspendable_ids: Vec = self + .instantiated + .keys() + .filter(|id| !ids.contains(id)) + .copied() + .collect(); + + assert!(suspendable_ids.len() >= uninstantiated_ids.len()); + for i in 0..uninstantiated_ids.len() { + self.suspend_vm(&suspendable_ids[i])?; + self.resume_vm(&uninstantiated_ids[i])?; + } + } + + Ok(()) + } + + // Resume a suspended VM + fn resume_vm(&mut self, id: &VmId) -> Result<(), Error> { + println!("Resuming VM: {}", id); + if !self.suspended.contains_key(id) { + return Err(Error::Unexpected(format!("VM {:?} is not suspended!", id))); + } + let snapshot = &self.suspended[id]; + let (context, mut machine) = self.create_dummy_vm(id)?; + { + let mut sc = context.snapshot2_context().lock().expect("lock"); + sc.resume(&mut machine.machine, snapshot)?; + } + // TODO: charge cycles + self.instantiated.insert(*id, (context, machine)); + self.suspended.remove(id); + Ok(()) + } + + // Suspend an instantiated VM + fn suspend_vm(&mut self, id: &VmId) -> Result<(), Error> { + // log::debug!("Suspending VM: {}", id); + if !self.instantiated.contains_key(id) { + return Err(Error::Unexpected(format!( + "VM {:?} is not instantiated!", + id + ))); + } + // TODO: charge cycles + let (context, machine) = self.instantiated.get_mut(id).unwrap(); + let snapshot = { + let sc = context.snapshot2_context().lock().expect("lock"); + sc.make_snapshot(&mut machine.machine)? + }; + self.suspended.insert(*id, snapshot); + self.instantiated.remove(id); + Ok(()) + } + + fn boot_vm( + &mut self, + data_piece_id: &DataPieceId, + offset: u64, + length: u64, + args: &[Bytes], + ) -> Result { + // Newly booted VM will be instantiated by default + while self.instantiated.len() >= MAX_INSTANTIATED_VMS { + // instantiated is a BTreeMap, first_entry will maintain key order + let id = *self.instantiated.first_entry().unwrap().key(); + self.suspend_vm(&id)?; + } + + let id = self.next_vm_id; + self.next_vm_id += 1; + let (context, mut machine) = self.create_dummy_vm(&id)?; + { + let mut sc = context.snapshot2_context().lock().expect("lock"); + let (program, _) = sc.data_source().load_data(data_piece_id, offset, length)?; + let metadata = parse_elf::(&program, machine.machine.version())?; + let bytes = machine.load_program_with_metadata(&program, &metadata, args)?; + sc.mark_program(&mut machine.machine, &metadata, data_piece_id, offset)?; + machine + .machine + .add_cycles_no_checking(transferred_byte_cycles(bytes))?; + } + self.instantiated.insert(id, (context, machine)); + self.states.insert(id, VmState::Runnable); + + Ok(id) + } + + // Create a new VM instance with syscalls attached + fn create_dummy_vm(&self, id: &VmId) -> Result<(MachineContext
, AsmMachine), Error> { + // The code here looks slightly weird, since I don't want to copy over all syscall + // impls here again. Ideally, this scheduler package should be merged with ckb-script, + // or simply replace ckb-script. That way, the quirks here will be eliminated. + let version = self + .verifier + .select_version(&self.tx_data.script_group.script) + .map_err(|e| Error::Unexpected(format!("Select version error: {:?}", e)))?; + // log::debug!("Creating VM {} using version {:?}", id, version); + let core_machine = AsmCoreMachine::new( + version.vm_isa(), + version.vm_version(), + // We will update max_cycles for each machine when it gets a chance to run + u64::max_value(), + ); + let machine_context = + MachineContext::new(*id, self.message_box.clone(), self.tx_data.clone()); + let machine_builder = DefaultMachineBuilder::new(core_machine) + .instruction_cycle_func(Box::new(estimate_cycles)) + // ckb-vm iterates syscalls in insertion order, by putting + // MachineContext at the first place, we can override other + // syscalls with implementations from MachineContext. For example, + // we can override load_cell_data syscall with a new implementation. + .syscall(Box::new(machine_context.clone())); + let syscalls = self.verifier.generate_syscalls( + // Skip current spawn implementation + if version == ScriptVersion::V2 { + ScriptVersion::V1 + } else { + version + }, + &self.tx_data.script_group, + Default::default(), + ); + let machine_builder = syscalls + .into_iter() + .fold(machine_builder, |builder, syscall| builder.syscall(syscall)); + let default_machine = machine_builder.build(); + Ok((machine_context, AsmMachine::new(default_machine))) + } +} diff --git a/script/src/v2_syscalls.rs b/script/src/v2_syscalls.rs new file mode 100644 index 0000000000..aa256897d7 --- /dev/null +++ b/script/src/v2_syscalls.rs @@ -0,0 +1,529 @@ +// Syscall implementation + +use crate::v2_types::{ + DataPieceId, JoinArgs, Message, PipeArgs, PipeId, PipeIoArgs, SpawnArgs, TxData, VmId, +}; +use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider}; +use ckb_vm::{ + bytes::Bytes, + machine::SupportMachine, + memory::{Memory, FLAG_EXECUTABLE, FLAG_FREEZED}, + registers::{A0, A1, A2, A3, A4, A5, A7}, + snapshot2::{DataSource, Snapshot2Context}, + syscalls::Syscalls, + Error, Register, +}; +use std::sync::{Arc, Mutex}; + +#[derive(Clone)] +pub struct MachineContext< + DL: CellDataProvider + HeaderProvider + ExtensionProvider + Send + Sync + Clone + 'static, +> { + id: VmId, + base_cycles: Arc>, + message_box: Arc>>, + snapshot2_context: Arc>>>, +} + +impl + MachineContext
+{ + pub fn new(id: VmId, message_box: Arc>>, tx_data: TxData
) -> Self { + Self { + id, + base_cycles: Arc::new(Mutex::new(0)), + message_box, + snapshot2_context: Arc::new(Mutex::new(Snapshot2Context::new(tx_data))), + } + } + + pub fn snapshot2_context(&self) -> &Arc>>> { + &self.snapshot2_context + } + + pub fn base_cycles(&self) -> u64 { + *self.base_cycles.lock().expect("lock") + } + + pub fn set_base_cycles(&mut self, base_cycles: u64) { + *self.base_cycles.lock().expect("lock") = base_cycles; + } + + // The different architecture here requires a re-implementation on current + // cycles syscall. + fn current_cycles(&mut self, machine: &mut Mac) -> Result<(), Error> { + let cycles = self + .base_cycles() + .checked_add(machine.cycles()) + .ok_or(Error::CyclesOverflow)?; + machine.set_register(A0, Mac::REG::from_u64(cycles)); + Ok(()) + } + + // Reimplementation of load_cell_data but keep tracks of pages that are copied from + // surrounding transaction data. Those pages do not need to be added to snapshots. + fn load_cell_data(&mut self, machine: &mut Mac) -> Result<(), Error> { + let index = machine.registers()[A3].to_u64(); + let source = machine.registers()[A4].to_u64(); + + let data_piece_id = match DataPieceId::try_from((source, index)) { + Ok(id) => id, + Err(e) => { + // Current implementation would throw an error immediately + // for some source values, but return INDEX_OUT_OF_BOUND error + // for other values. Here for simplicity, we would return + // INDEX_OUT_OF_BOUND error in all cases. But the code might + // differ to mimic current on-chain behavior + println!("DataPieceId parsing error: {:?}", e); + machine.set_register(A0, Mac::REG::from_u8(INDEX_OUT_OF_BOUND)); + return Ok(()); + } + }; + + let addr = machine.registers()[A0].to_u64(); + let size_addr = machine.registers()[A1].clone(); + let size = machine.memory_mut().load64(&size_addr)?.to_u64(); + let offset = machine.registers()[A2].to_u64(); + + let mut sc = self.snapshot2_context().lock().expect("lock"); + let (wrote_size, full_size) = + match sc.store_bytes(machine, addr, &data_piece_id, offset, size) { + Ok(val) => val, + Err(Error::External(m)) if m == "INDEX_OUT_OF_BOUND" => { + // This comes from TxData results in an out of bound error, to + // mimic current behavior, we would return INDEX_OUT_OF_BOUND error. + machine.set_register(A0, Mac::REG::from_u8(INDEX_OUT_OF_BOUND)); + return Ok(()); + } + Err(e) => return Err(e), + }; + + machine + .memory_mut() + .store64(&size_addr, &Mac::REG::from_u64(full_size))?; + machine.add_cycles_no_checking(transferred_byte_cycles(wrote_size))?; + machine.set_register(A0, Mac::REG::from_u8(SUCCESS)); + Ok(()) + } + + // Reimplementation of load_cell_data_as_code but keep tracks of pages that are copied from + // surrounding transaction data. Those pages do not need to be added to snapshots. + // + // Different from load_cell_data, this method showcases advanced usage of Snapshot2, where + // one manually does the actual memory copying, then calls track_pages method to setup metadata + // used by Snapshot2. It does not rely on higher level methods provided by Snapshot2. + fn load_cell_data_as_code( + &mut self, + machine: &mut Mac, + ) -> Result<(), Error> { + let addr = machine.registers()[A0].to_u64(); + let memory_size = machine.registers()[A1].to_u64(); + let content_offset = machine.registers()[A2].to_u64(); + let content_size = machine.registers()[A3].to_u64(); + + let index = machine.registers()[A4].to_u64(); + let source = machine.registers()[A5].to_u64(); + + let data_piece_id = match DataPieceId::try_from((source, index)) { + Ok(id) => id, + Err(e) => { + // Current implementation would throw an error immediately + // for some source values, but return INDEX_OUT_OF_BOUND error + // for other values. Here for simplicity, we would return + // INDEX_OUT_OF_BOUND error in all cases. But the code might + // differ to mimic current on-chain behavior + println!("DataPieceId parsing error: {:?}", e); + machine.set_register(A0, Mac::REG::from_u8(INDEX_OUT_OF_BOUND)); + return Ok(()); + } + }; + + let mut sc = self.snapshot2_context().lock().expect("lock"); + // We are using 0..u64::max_value() to fetch full cell, there is + // also no need to keep the full length value. Since cell's length + // is already full length. + let (cell, _) = match sc + .data_source() + .load_data(&data_piece_id, 0, u64::max_value()) + { + Ok(val) => val, + Err(Error::External(m)) if m == "INDEX_OUT_OF_BOUND" => { + // This comes from TxData results in an out of bound error, to + // mimic current behavior, we would return INDEX_OUT_OF_BOUND error. + machine.set_register(A0, Mac::REG::from_u8(INDEX_OUT_OF_BOUND)); + return Ok(()); + } + Err(e) => return Err(e), + }; + + let content_end = content_offset + .checked_add(content_size) + .ok_or(Error::MemOutOfBound)?; + if content_offset >= cell.len() as u64 + || content_end > cell.len() as u64 + || content_size > memory_size + { + machine.set_register(A0, Mac::REG::from_u8(SLICE_OUT_OF_BOUND)); + return Ok(()); + } + + machine.memory_mut().init_pages( + addr, + memory_size, + FLAG_EXECUTABLE | FLAG_FREEZED, + Some(cell.slice((content_offset as usize)..(content_end as usize))), + 0, + )?; + sc.track_pages(machine, addr, memory_size, &data_piece_id, content_offset)?; + + machine.add_cycles_no_checking(transferred_byte_cycles(memory_size))?; + machine.set_register(A0, Mac::REG::from_u8(SUCCESS)); + Ok(()) + } + + // Reimplementing debug syscall for printing debug messages + fn debug(&mut self, machine: &mut Mac) -> Result<(), Error> { + let mut addr = machine.registers()[A0].to_u64(); + let mut buffer = Vec::new(); + + loop { + let byte = machine + .memory_mut() + .load8(&Mac::REG::from_u64(addr))? + .to_u8(); + if byte == 0 { + break; + } + buffer.push(byte); + addr += 1; + } + + machine.add_cycles_no_checking(transferred_byte_cycles(buffer.len() as u64))?; + let s = String::from_utf8(buffer) + .map_err(|e| Error::External(format!("String from buffer {e:?}")))?; + println!("VM {}: {}", self.id, s); + + Ok(()) + } + + // New, concurrent spawn implementation + fn spawn(&mut self, machine: &mut Mac) -> Result<(), Error> { + let index = machine.registers()[A0].to_u64(); + let source = machine.registers()[A1].to_u64(); + + let data_piece_id = match DataPieceId::try_from((source, index)) { + Ok(id) => id, + Err(e) => { + // Current implementation would throw an error immediately + // for some source values, but return INDEX_OUT_OF_BOUND error + // for other values. Here for simplicity, we would return + // INDEX_OUT_OF_BOUND error in all cases. But the code might + // differ to mimic current on-chain behavior + println!("DataPieceId parsing error: {:?}", e); + machine.set_register(A0, Mac::REG::from_u8(INDEX_OUT_OF_BOUND)); + return Ok(()); + } + }; + + let bounds = machine.registers()[A2].to_u64(); + let offset = bounds >> 32; + let length = bounds as u32 as u64; + + let argv = { + let argc = machine.registers()[A3].to_u64(); + let mut argv_addr = machine.registers()[A4].to_u64(); + let mut argv_vec = Vec::with_capacity(argc as usize); + for _ in 0..argc { + let target_addr = machine + .memory_mut() + .load64(&Mac::REG::from_u64(argv_addr))? + .to_u64(); + let cstr = load_c_string(machine, target_addr)?; + argv_vec.push(cstr); + argv_addr += 8; + } + argv_vec + }; + + let (instance_id_addr, pipes) = { + let spgs_addr = machine.registers()[A5].to_u64(); + let instance_id_addr_addr = spgs_addr; + let instance_id_addr = machine + .memory_mut() + .load64(&Mac::REG::from_u64(instance_id_addr_addr))? + .to_u64(); + let pipes_addr_addr = spgs_addr.wrapping_add(8); + let mut pipes_addr = machine + .memory_mut() + .load64(&Mac::REG::from_u64(pipes_addr_addr))? + .to_u64(); + + let mut pipes = vec![]; + if pipes_addr != 0 { + loop { + let pipe = machine + .memory_mut() + .load64(&Mac::REG::from_u64(pipes_addr))? + .to_u64(); + if pipe == 0 { + break; + } + pipes.push(PipeId(pipe)); + pipes_addr += 8; + } + } + (instance_id_addr, pipes) + }; + + // We are fetching the actual cell here for some in-place validation + { + let sc = self.snapshot2_context().lock().expect("lock"); + let (_, full_length) = match sc.data_source().load_data(&data_piece_id, 0, 0) { + Ok(val) => val, + Err(Error::External(m)) if m == "INDEX_OUT_OF_BOUND" => { + // This comes from TxData results in an out of bound error, to + // mimic current behavior, we would return INDEX_OUT_OF_BOUND error. + machine.set_register(A0, Mac::REG::from_u8(INDEX_OUT_OF_BOUND)); + return Ok(()); + } + Err(e) => return Err(e), + }; + if offset >= full_length { + machine.set_register(A0, Mac::REG::from_u8(SLICE_OUT_OF_BOUND)); + return Ok(()); + } + if length > 0 { + let end = offset.checked_add(length).ok_or(Error::MemOutOfBound)?; + if end > full_length { + machine.set_register(A0, Mac::REG::from_u8(SLICE_OUT_OF_BOUND)); + return Ok(()); + } + } + } + + // TODO: charge spawn base cycles + self.message_box.lock().expect("lock").push(Message::Spawn( + self.id, + SpawnArgs { + data_piece_id, + offset, + length, + argv, + pipes, + instance_id_addr, + }, + )); + + // At this point, all execution has been finished, and it is expected + // to return Ok(()) denoting success. However we want spawn to yield + // its control back to scheduler, so a runnable VM with a higher ID can + // start its execution first. That's why we actually return a yield error + // here. + Err(Error::External("YIELD".to_string())) + } + + // Join syscall blocks till the specified VM finishes execution, then + // returns with its exit code + fn join(&mut self, machine: &mut Mac) -> Result<(), Error> { + let target_id = machine.registers()[A0].to_u64(); + let exit_code_addr = machine.registers()[A1].to_u64(); + + // TODO: charge cycles + self.message_box.lock().expect("lock").push(Message::Join( + self.id, + JoinArgs { + target_id, + exit_code_addr, + }, + )); + + // Like spawn, join yields control upon success + Err(Error::External("YIELD".to_string())) + } + + // Fetch current instance ID + fn instance_id(&mut self, machine: &mut Mac) -> Result<(), Error> { + // TODO: charge cycles + machine.set_register(A0, Mac::REG::from_u64(self.id)); + Ok(()) + } + + // Create a pair of pipes + fn pipe(&mut self, machine: &mut Mac) -> Result<(), Error> { + let pipe1_addr = machine.registers()[A0].to_u64(); + let pipe2_addr = pipe1_addr.wrapping_add(8); + + // TODO: charge cycles + self.message_box.lock().expect("lock").push(Message::Pipe( + self.id, + PipeArgs { + pipe1_addr, + pipe2_addr, + }, + )); + + Err(Error::External("YIELD".to_string())) + } + + // Write to pipe + fn pipe_write(&mut self, machine: &mut Mac) -> Result<(), Error> { + let buffer_addr = machine.registers()[A0].to_u64(); + let length_addr = machine.registers()[A1].to_u64(); + let length = machine + .memory_mut() + .load64(&Mac::REG::from_u64(length_addr))? + .to_u64(); + let pipe = PipeId(machine.registers()[A2].to_u64()); + + // We can only do basic checks here, when the message is actually processed, + // more complete checks will be performed. + // We will also leave to the actual write operation to test memory permissions. + if !pipe.is_write() { + machine.set_register(A0, Mac::REG::from_u8(INVALID_PIPE)); + return Ok(()); + } + + // TODO: charge cycles + self.message_box + .lock() + .expect("lock") + .push(Message::PipeWrite( + self.id, + PipeIoArgs { + pipe, + length, + buffer_addr, + length_addr, + }, + )); + + // A0 will be updated once the write operation is fulfilled + Err(Error::External("YIELD".to_string())) + } + + // Read from pipe + fn pipe_read(&mut self, machine: &mut Mac) -> Result<(), Error> { + let buffer_addr = machine.registers()[A0].to_u64(); + let length_addr = machine.registers()[A1].to_u64(); + let length = machine + .memory_mut() + .load64(&Mac::REG::from_u64(length_addr))? + .to_u64(); + let pipe = PipeId(machine.registers()[A2].to_u64()); + + // We can only do basic checks here, when the message is actually processed, + // more complete checks will be performed. + // We will also leave to the actual write operation to test memory permissions. + if !pipe.is_read() { + machine.set_register(A0, Mac::REG::from_u8(INVALID_PIPE)); + return Ok(()); + } + + // TODO: charge cycles + self.message_box + .lock() + .expect("lock") + .push(Message::PipeRead( + self.id, + PipeIoArgs { + pipe, + length, + buffer_addr, + length_addr, + }, + )); + + // A0 will be updated once the read operation is fulfilled + Err(Error::External("YIELD".to_string())) + } + fn inherited_file_descriptors( + &mut self, + machine: &mut Mac, + ) -> Result<(), Error> { + let buffer_addr = machine.registers()[A0].to_u64(); + let length_addr = machine.registers()[A1].to_u64(); + self.message_box + .lock() + .expect("lock") + .push(Message::InheritedFileDescriptor( + self.id, + PipeIoArgs { + pipe: PipeId(0), + length: 0, + buffer_addr, + length_addr, + }, + )); + Err(Error::External("YIELD".to_string())) + } +} + +impl< + Mac: SupportMachine, + DL: CellDataProvider + HeaderProvider + ExtensionProvider + Send + Sync + Clone + 'static, + > Syscalls for MachineContext
+{ + fn initialize(&mut self, _machine: &mut Mac) -> Result<(), Error> { + Ok(()) + } + + fn ecall(&mut self, machine: &mut Mac) -> Result { + let code = machine.registers()[A7].to_u64(); + match code { + 2042 => self.current_cycles(machine), + 2091 => self.load_cell_data_as_code(machine), + 2092 => self.load_cell_data(machine), + 2177 => self.debug(machine), + // The syscall numbers here are picked intentionally to be different + // than currently assigned syscall numbers for spawn calls + 2601 => self.spawn(machine), + 2602 => self.join(machine), + 2603 => self.instance_id(machine), + 2604 => self.pipe(machine), + 2605 => self.pipe_write(machine), + 2606 => self.pipe_read(machine), + 2607 => self.inherited_file_descriptors(machine), + _ => return Ok(false), + }?; + Ok(true) + } +} + +// Below are all simple utilities copied over from ckb-script package to +// ease the implementation. + +/// How many bytes can transfer when VM costs one cycle. +// 0.25 cycles per byte +const BYTES_PER_CYCLE: u64 = 4; + +/// Calculates how many cycles spent to load the specified number of bytes. +pub(crate) fn transferred_byte_cycles(bytes: u64) -> u64 { + // Compiler will optimize the divisin here to shifts. + (bytes + BYTES_PER_CYCLE - 1) / BYTES_PER_CYCLE +} + +pub(crate) const SUCCESS: u8 = 0; +pub(crate) const INDEX_OUT_OF_BOUND: u8 = 1; +pub(crate) const SLICE_OUT_OF_BOUND: u8 = 3; +pub(crate) const JOIN_FAILURE: u8 = 5; +pub(crate) const INVALID_PIPE: u8 = 6; +pub(crate) const OTHER_END_CLOSED: u8 = 7; + +fn load_c_string(machine: &mut Mac, addr: u64) -> Result { + let mut buffer = Vec::new(); + let mut addr = addr; + + loop { + let byte = machine + .memory_mut() + .load8(&Mac::REG::from_u64(addr))? + .to_u8(); + if byte == 0 { + break; + } + buffer.push(byte); + addr += 1; + } + + Ok(Bytes::from(buffer)) +} diff --git a/script/src/v2_types.rs b/script/src/v2_types.rs new file mode 100644 index 0000000000..831835dc06 --- /dev/null +++ b/script/src/v2_types.rs @@ -0,0 +1,275 @@ +// Core data structures here + +use crate::ScriptGroup; +use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider}; +use ckb_types::core::{cell::ResolvedTransaction, Cycle}; +use ckb_vm::{ + bytes::Bytes, + machine::Pause, + snapshot2::{DataSource, Snapshot2}, + Error, RISCV_GENERAL_REGISTER_NUMBER, +}; +use std::mem::size_of; +use std::sync::Arc; + +pub type VmId = u64; + +pub const FIRST_VM_ID: VmId = 0; + +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +pub struct PipeId(pub(crate) u64); + +pub const FIRST_PIPE_SLOT: u64 = 2; + +impl PipeId { + pub fn create(slot: u64) -> (PipeId, PipeId, u64) { + (PipeId(slot), PipeId(slot + 1), slot + 2) + } + + pub fn other_pipe(&self) -> PipeId { + PipeId(self.0 ^ 0x1) + } + + pub fn is_read(&self) -> bool { + self.0 % 2 == 0 + } + + pub fn is_write(&self) -> bool { + self.0 % 2 == 1 + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum VmState { + Runnable, + Terminated, + Join { + target_vm_id: VmId, + exit_code_addr: u64, + }, + WaitForWrite { + pipe: PipeId, + consumed: u64, + length: u64, + buffer_addr: u64, + length_addr: u64, + }, + WaitForRead { + pipe: PipeId, + length: u64, + buffer_addr: u64, + length_addr: u64, + }, +} + +#[derive(Clone, Debug)] +pub struct SpawnArgs { + pub data_piece_id: DataPieceId, + pub offset: u64, + pub length: u64, + pub argv: Vec, + pub pipes: Vec, + pub instance_id_addr: u64, +} + +#[derive(Clone, Debug)] +pub struct JoinArgs { + pub target_id: VmId, + pub exit_code_addr: u64, +} + +#[derive(Clone, Debug)] +pub struct PipeArgs { + pub pipe1_addr: u64, + pub pipe2_addr: u64, +} + +#[derive(Clone, Debug)] +pub struct PipeIoArgs { + pub pipe: PipeId, + pub length: u64, + pub buffer_addr: u64, + pub length_addr: u64, +} + +#[derive(Clone, Debug)] +pub enum Message { + Spawn(VmId, SpawnArgs), + Join(VmId, JoinArgs), + Pipe(VmId, PipeArgs), + PipeRead(VmId, PipeIoArgs), + PipeWrite(VmId, PipeIoArgs), + InheritedFileDescriptor(VmId, PipeIoArgs), +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum DataPieceId { + Program, + Input(u32), + Output(u32), + CellDep(u32), + GroupInput(u32), + GroupOutput(u32), +} + +impl TryFrom<(u64, u64)> for DataPieceId { + type Error = String; + + fn try_from(value: (u64, u64)) -> Result { + let (source, index) = value; + let index: u32 = + u32::try_from(index).map_err(|e| format!("Error casting index to u32: {}", e))?; + match source { + 1 => Ok(DataPieceId::Input(index)), + 2 => Ok(DataPieceId::Output(index)), + 3 => Ok(DataPieceId::CellDep(index)), + 0x0100000000000001 => Ok(DataPieceId::GroupInput(index)), + 0x0100000000000002 => Ok(DataPieceId::GroupOutput(index)), + _ => Err(format!("Invalid source value: {:#x}", source)), + } + } +} + +/// Full state representing all VM instances from verifying a CKB script. +/// It should be serializable to binary formats, while also be able to +/// fully recover the running environment with the full transaction environment. +#[derive(Clone, Debug)] +pub struct FullSuspendedState { + pub total_cycles: Cycle, + pub next_vm_id: VmId, + pub next_pipe_slot: u64, + pub vms: Vec<(VmId, VmState, Snapshot2)>, + pub pipes: Vec<(PipeId, VmId)>, + pub inherited_fd: Vec<(VmId, Vec)>, + pub terminated_vms: Vec<(VmId, i8)>, +} + +impl FullSuspendedState { + pub fn size(&self) -> u64 { + (size_of::() + + size_of::() + + size_of::() + + self.vms.iter().fold(0, |mut acc, (_, _, snapshot)| { + acc += size_of::() + size_of::(); + acc += snapshot.pages_from_source.len() + * (size_of::() + + size_of::() + + size_of::() + + size_of::() + + size_of::()); + for dirty_page in &snapshot.dirty_pages { + acc += size_of::() + size_of::() + dirty_page.2.len(); + } + acc += size_of::() + + RISCV_GENERAL_REGISTER_NUMBER * size_of::() + + size_of::() + + size_of::() + + size_of::(); + acc + }) + + (self.pipes.len() * (size_of::() + size_of::()))) as u64 + + (self.inherited_fd.len() * (size_of::())) as u64 + + (self.terminated_vms.len() * (size_of::() + size_of::())) as u64 + } +} + +/// Context data for current running transaction & script +#[derive(Clone)] +pub struct TxData
{ + pub rtx: Arc, + pub data_loader: DL, + // Ideally one might not want to keep program here, since program is totally + // deducible from rtx + data_loader, however, for a demo here, program + // does help us save some extra coding. + pub program: Bytes, + pub script_group: Arc, +} + +impl + DataSource for TxData
+{ + fn load_data(&self, id: &DataPieceId, offset: u64, length: u64) -> Result<(Bytes, u64), Error> { + match id { + DataPieceId::Program => { + // This is just a shortcut so we don't have to copy over the logic in extract_script, + // ideally you can also only define the rest 5, then figure out a way to convert + // script group to the actual cell dep index. + Ok(self.program.clone()) + } + DataPieceId::Input(i) => { + let cell = self + .rtx + .resolved_inputs + .get(*i as usize) + .ok_or_else(|| Error::External("INDEX_OUT_OF_BOUND".to_string()))?; + self.data_loader.load_cell_data(cell).ok_or_else(|| { + Error::Unexpected(format!("Loading input cell #{}'s data failed!", i)) + }) + } + DataPieceId::Output(i) => self + .rtx + .transaction + .outputs_data() + .get(*i as usize) + .map(|data| data.raw_data()) + .ok_or_else(|| Error::External("INDEX_OUT_OF_BOUND".to_string())), + DataPieceId::CellDep(i) => { + let cell = self + .rtx + .resolved_cell_deps + .get(*i as usize) + .ok_or_else(|| Error::External("INDEX_OUT_OF_BOUND".to_string()))?; + self.data_loader.load_cell_data(cell).ok_or_else(|| { + Error::Unexpected(format!("Loading dep cell #{}'s data failed!", i)) + }) + } + DataPieceId::GroupInput(i) => { + let gi = *self + .script_group + .input_indices + .get(*i as usize) + .ok_or_else(|| Error::External("INDEX_OUT_OF_BOUND".to_string()))?; + let cell = self + .rtx + .resolved_inputs + .get(gi) + .ok_or_else(|| Error::External("INDEX_OUT_OF_BOUND".to_string()))?; + self.data_loader.load_cell_data(cell).ok_or_else(|| { + Error::Unexpected(format!("Loading input cell #{}'s data failed!", gi)) + }) + } + DataPieceId::GroupOutput(i) => { + let gi = *self + .script_group + .output_indices + .get(*i as usize) + .ok_or_else(|| Error::External("INDEX_OUT_OF_BOUND".to_string()))?; + self.rtx + .transaction + .outputs_data() + .get(gi) + .map(|data| data.raw_data()) + .ok_or_else(|| Error::External("INDEX_OUT_OF_BOUND".to_string())) + } + } + .map(|data| { + let offset = std::cmp::min(offset as usize, data.len()); + let full_length = data.len() - offset; + let slice_length = if length > 0 { + std::cmp::min(full_length, length as usize) + } else { + full_length + }; + ( + data.slice(offset..offset + slice_length), + full_length as u64, + ) + }) + } +} + +#[derive(Clone)] +pub enum RunMode { + LimitCycles(Cycle), + Pause(Pause), +}