Skip to content

Commit

Permalink
Added stream implementation for poll
Browse files Browse the repository at this point in the history
  • Loading branch information
StaticallyTypedAnxiety committed Jul 21, 2024
1 parent d3193fe commit 46b1252
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 28 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ futures = "0.3.30"
crossbeam = "0.8.4"
wit-bindgen-rt = { version = "0.26.0", features = ["bitflags"] }


[lib]
crate-type = ["cdylib"]

Expand Down
24 changes: 16 additions & 8 deletions src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crossbeam::channel::{Receiver, Sender};
use futures::pin_mut;
use futures::StreamExt;
use std::{
future::Future,
sync::Arc,
Expand All @@ -14,22 +15,29 @@ pub struct WasmRuntimeAsyncEngine {
recv: Receiver<()>,
}

/// the reactor that processes poll submissions.
#[derive(Clone)]
pub struct Reactor;
/// the reactor that processes poll submissions. Still Experimental
#[derive(Default)]
pub struct Reactor {
events: PollTasks,
}

impl Reactor {
///returns a future used to call wasi poll
pub fn poll(polls: Vec<Pollable>) -> PollTasks {
PollTasks::new(polls)
//adds event to the queue
pub fn add_to_queue(&mut self, event_name: String, pollable: Pollable) {
self.events.push(event_name, pollable);
}

//polls event queue to see if any of the events are readycar
pub async fn wait(&mut self) -> Option<Vec<String>> {
self.events.next().await
}
}

impl WasmRuntimeAsyncEngine {
/// function to execute futures
pub fn block_on<K, F: Future<Output = K>, Fun: FnOnce(Reactor) -> F>(async_closure: Fun) -> K {
let reactor = Reactor;
let future = async_closure(reactor);
let future = async_closure(Reactor::default());
pin_mut!(future);
let (sender, recv) = crossbeam::channel::bounded(TASK_QUEUE_BUFFER);
let runtime_engine = WasmRuntimeAsyncEngine {
Expand Down
48 changes: 28 additions & 20 deletions src/poll_tasks.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,52 @@
use std::task::{Context, Poll};
use std::{
collections::{BTreeMap},
task::{Context, Poll},
};

use futures::Future;
use futures::Stream;

use crate::bindings::wasi::io::poll::{poll, Pollable};

///Future that is used to poll changes from the host
///Future that is used to poll changes from the host\
#[derive(Default)]
pub struct PollTasks {
pollables: Vec<Pollable>,
processed_polls: Vec<u32>,
pollables: BTreeMap<String, Pollable>,
processed_polls: Vec<String>,
}

impl PollTasks {
pub(crate) fn new(pollables: Vec<Pollable>) -> Self {
Self {
processed_polls: Vec::with_capacity(pollables.len()),
pollables,
}
pub(crate) fn push(&mut self, event_name: String, pollable: Pollable) {
self.pollables.insert(event_name, pollable);
}
}

impl Future for PollTasks {
type Output = Vec<u32>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
impl Stream for PollTasks {
type Item = Vec<String>;

fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let pollables = self.get_mut();
let pollable_results = pollables.pollables.iter().collect::<Vec<_>>();
let pollable_results = pollables.pollables.values().collect::<Vec<_>>();
let results_vec = poll(pollable_results.as_slice());
//take all the polls that are not needed away
for index in results_vec {
for (_, key) in results_vec.into_iter().zip(pollables.pollables.keys()) {
//if processed remove it from process queue and add it to processed polls
let _ = pollables.pollables.remove(index as usize);
pollables.processed_polls.push(index);
pollables.processed_polls.push(key.clone());
}

//remove pollables
for key in pollables.processed_polls.iter() {
pollables.pollables.remove(key);
}
//if the pollable set is empty that means we have finished processing everything
if pollables.pollables.is_empty() {
Poll::Ready(pollables.processed_polls.clone())
Poll::Ready(None)
} else {
//still more polls that might need to get processed for a later date
cx.waker().wake_by_ref();
Poll::Pending
if pollables.processed_polls.is_empty() {
std::task::Poll::Pending
} else {
Poll::Ready(Some(pollables.processed_polls.clone()))
}
}
}
}

0 comments on commit 46b1252

Please sign in to comment.