Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

协程支持 #286

Open
3 of 4 tasks
Chronostasys opened this issue May 3, 2023 · 5 comments
Open
3 of 4 tasks

协程支持 #286

Chronostasys opened this issue May 3, 2023 · 5 comments
Assignees
Labels
Milestone

Comments

@Chronostasys
Copy link
Member

Chronostasys commented May 3, 2023

目标实现无栈协程,这需要编译器能自动将异步函数编译为异步状态机

考虑下方函数:

async fn test() {
  let a = 1;
  await some_async_io();
  a = a + 1;
}

在编译的时候,test会变成:

struct test_ctx{
    step:i64;
    a: i64;
}

fn test_init() test_ctx {
  return test_ctx{};
}

fn test_move_next(ctx: test_ctx) bool {
  switch ctx.step {
    case 0:
      ctx.a = 1;
      event_loop_register(some_async_io);
    case 1:
      ctx.a = ctx.a + 1;
  }
  ctx.step += 1;
  return ctx.step == 1;
}

在调用test的时候,生成代码为:

let ctx = test_init();
test_move_next(ctx);

event_loop_register会把异步操作注册到evloop里,在该操作完成之后,evloop会通知调度器,调度器继续执行test_move_next(ctx)

为了保证gc正确性,evloop线程不要跑pl代码或者函数,只跑rust代码。

关于调度器

我们把异步状态机记为Task,那么一个纯异步程序中(main也是async的),实际上所有线程都不是直接跑用户代码,而是运行调度器,而调度器对用户代码进行调度和运行。

下方假设我们使用一个全局的task 队列,那么调度器伪代码如下:

loop {
  let task = queue.dequeue();
  if task != null {
    task.move_next();
  }
}

evloop在一个异步操作完成后,需要queue.enqueue(task)

@Chronostasys Chronostasys converted this from a draft issue May 3, 2023
@Chronostasys Chronostasys added this to the 1.0.0 milestone May 4, 2023
@CjiW
Copy link
Contributor

CjiW commented May 30, 2023

// 处理之后
#[allow(unused)]
fn async_func(reactor: Arc<Mutex<Box<Reactor>>>, start: Instant) -> impl Future<Output = () >{
    enum AsyncFunc {
        State0((Arc<Mutex<Box<Reactor>>>, Instant)),
        State1((Task, Instant)),
        Done,
    }
    impl Future for AsyncFunc {
        type Output = ();
        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            loop {
                match *self {
                    Self::State0((ref mut reactor, start)) => {
                        let val = Task::new(reactor.clone(), 3, 1);
                        *self = Self::State1((val, start));
                    },
                    Self::State1((ref mut task, start)) => {
                        match Pin::new(task).poll(cx) {
                            Poll::Ready(val) => {
                                println!("Got {} at time: {:.2}.", val, start.elapsed().as_secs_f32());
                                *self = Self::Done;
                                return Poll::Ready(());
                            },
                            Poll::Pending => {
                                return Poll::Pending;
                            },
                            
                        }
                    },
                    AsyncFunc::Done => todo!(),
                }
            }
        }
    }
    AsyncFunc::State0((reactor, start))
}
// 处理之前
#[allow(unused)]
async fn func(reactor: Arc<Mutex<Box<Reactor>>>, start: Instant) -> () {
    let val = Task::new(reactor.clone(), 3, 1).await;
    println!("Got {} at time: {:.2}.", val, start.elapsed().as_secs_f32());
}

fn main() {
    let start = Instant::now();
    let reactor = Reactor::new();

    let fut1 = func(reactor.clone(), start);
    // let fut1 = async_func(reactor.clone(), start);

    let fut2 = async {
        let val = Task::new(reactor.clone(), 1, 2).await;
        println!("Got {} at time: {:.2}.", val, start.elapsed().as_secs_f32());
    };

    let mainfut = async {
        fut1.await;
        fut2.await;
    };

    block_on(mainfut);
    reactor.lock().map(|mut r| r.close()).unwrap();
}

use std::{
    future::Future, sync::{ mpsc::{channel, Sender}, Arc, Mutex, Condvar},
    task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, mem, pin::Pin,
    thread::{self, JoinHandle}, time::{Duration, Instant}, collections::HashMap
};
// ============================= Executor ====================================
#[derive(Default)]
struct Parker(Mutex<bool>, Condvar);

impl Parker {
    fn park(&self) {
        let mut resumable = self.0.lock().unwrap();
            while !*resumable {
                resumable = self.1.wait(resumable).unwrap();
            }
        *resumable = false;
    }

    fn unpark(&self) {
        *self.0.lock().unwrap() = true;
        self.1.notify_one();
    }
}

fn block_on<F: Future>(mut future: F) -> F::Output {
    let parker = Arc::new(Parker::default());
    let mywaker = Arc::new(MyWaker { parker: parker.clone() });
    let waker = mywaker_into_waker(Arc::into_raw(mywaker));
    let mut cx = Context::from_waker(&waker);
    
    // SAFETY: we shadow `future` so it can't be accessed again.
    let mut future = unsafe { Pin::new_unchecked(&mut future) }; 
    loop {
        match Future::poll(future.as_mut(), &mut cx) {
            Poll::Ready(val) => break val,
            Poll::Pending => parker.park(),
        };
    }
}
// ====================== Future Impl==============================
#[derive(Clone)]
struct MyWaker {
    parker: Arc<Parker>,
}

#[derive(Clone)]
pub struct Task {
    id: usize,
    reactor: Arc<Mutex<Box<Reactor>>>,
    data: u64,
}

fn mywaker_wake(s: &MyWaker) {
    let waker_arc = unsafe { Arc::from_raw(s) };
    waker_arc.parker.unpark();
}

fn mywaker_clone(s: &MyWaker) -> RawWaker {
    let arc = unsafe { Arc::from_raw(s) };
    std::mem::forget(arc.clone()); // increase ref count
    RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
}

const VTABLE: RawWakerVTable = unsafe {
    RawWakerVTable::new(
        |s| mywaker_clone(&*(s as *const MyWaker)),
        |s| mywaker_wake(&*(s as *const MyWaker)),
        |s| mywaker_wake(*(s as *const &MyWaker)),
        |s| drop(Arc::from_raw(s as *const MyWaker)),
    )
};

fn mywaker_into_waker(s: *const MyWaker) -> Waker {
    let raw_waker = RawWaker::new(s as *const (), &VTABLE);
    unsafe { Waker::from_raw(raw_waker) }
}

impl Task {
    fn new(reactor: Arc<Mutex<Box<Reactor>>>, data: u64, id: usize) -> Self {
        Task { id, reactor, data }
    }
}

impl Future for Task {
    type Output = usize;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut r = self.reactor.lock().unwrap();
        if r.is_ready(self.id) {
            *r.tasks.get_mut(&self.id).unwrap() = TaskState::Finished;
            Poll::Ready(self.id)
        } else if r.tasks.contains_key(&self.id) {
            r.tasks.insert(self.id, TaskState::NotReady(cx.waker().clone()));
            Poll::Pending
        } else {
            r.register(self.data, cx.waker().clone(), self.id);
            Poll::Pending
        }
    }
}
// =============================== Reactor ===================================
enum TaskState {
    Ready,
    NotReady(Waker),
    Finished,
}
struct Reactor {
    dispatcher: Sender<Event>,
    handle: Option<JoinHandle<()>>,
    tasks: HashMap<usize, TaskState>,
}

#[derive(Debug)]
enum Event {
    Close,
    Timeout(u64, usize),
}

impl Reactor {
    fn new() -> Arc<Mutex<Box<Self>>> {
        let (tx, rx) = channel::<Event>();
        let reactor = Arc::new(Mutex::new(Box::new(Reactor {
            dispatcher: tx,
            handle: None,
            tasks: HashMap::new(),
        })));
        
        let reactor_clone = Arc::downgrade(&reactor);
        let handle = thread::spawn(move || {
            let mut handles = vec![];
            for event in rx {
                let reactor = reactor_clone.clone();
                match event {
                    Event::Close => break,
                    Event::Timeout(duration, id) => {
                        let event_handle = thread::spawn(move || {
                            thread::sleep(Duration::from_secs(duration));
                            let reactor = reactor.upgrade().unwrap();
                            reactor.lock().map(|mut r| r.wake(id)).unwrap();
                        });
                        handles.push(event_handle);
                    }
                }
            }
            handles.into_iter().for_each(|handle| handle.join().unwrap());
        });
        reactor.lock().map(|mut r| r.handle = Some(handle)).unwrap();
        reactor
    }

    fn wake(&mut self, id: usize) {
        let state = self.tasks.get_mut(&id).unwrap();
        match mem::replace(state, TaskState::Ready) {
            TaskState::NotReady(waker) => waker.wake(),
            TaskState::Finished => panic!("Called 'wake' twice on task: {}", id),
            _ => unreachable!()
        }
    }

    fn register(&mut self, duration: u64, waker: Waker, id: usize) {
        if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() {
            panic!("Tried to insert a task with id: '{}', twice!", id);
        }
        self.dispatcher.send(Event::Timeout(duration, id)).unwrap();
    }

    fn close(&mut self) {
        self.dispatcher.send(Event::Close).unwrap();
    }

    fn is_ready(&self, id: usize) -> bool {
        self.tasks.get(&id).map(|state| match state {
            TaskState::Ready => true,
            _ => false,
        }).unwrap_or(false)
    }
}

impl Drop for Reactor {
    fn drop(&mut self) {
        self.handle.take().map(|h| h.join().unwrap()).unwrap();
    }
}

@Chronostasys Chronostasys moved this from 🔖 Ready to 🏗 In progress in pl's backlog Jun 12, 2023
@Chronostasys
Copy link
Member Author

Async to Iterator translate:

async fn test_async() Future<i64> {
    let a = await fn_async();
    return a;
}

desugar

gen fn test_async_iter() Iterator<Poll<i64>> {
    let statemachine = fn_async();
    while statemachine.poll() is Pending {
        let re: Poll<i64> = Pending{};
        yield return re;
    }
    let result = statemachine.poll() as Ready<i64>!;
    let re: Poll<i64> = result;
    while true {
        yield return re
    }
}

struct DefaultFuture<T> {
    v:Iterator<Poll<i64>> ;
}

impl <T> Future<T> for DefaultFuture<T> {
    fn poll() Poll<T> {
        return self.v.next() as Poll<T>;
    }
}

fn test_async() Future<i64> {
    let re = test_async_iter();
    return DefaultFuture<i64> {v:re};
}

@Chronostasys
Copy link
Member Author

Async

和rust不一样,我们的只要跑了async函数就会开始运行

async fn task1:
    ret await task2

async fn task2:
    await task3
    ret task4

let t = await task1
await task5(t)

async fn task5(t):
    await t

async fn task4:
    pass
graph TD
    A[Task1] -->|await| B[Task2]
    B -->|await| C[Task3]
    B --> |start| D[Task4]
    E[Task5] -->|await| D
Loading

execution flow

graph TD
    Executor -->|poll with waker Task1| A[Task1]
    A -->|poll with waker Task1| B[Task2]
    B -->|poll with waker Task1| C[Task3]
    C -.-> |wake| A
    Executor --> |poll with waker Task4| D[Task4]
    Executor -->|poll with waker Task5| E[Task5]
    E -->|poll with waker Task5| D
    D -.-> |wake| E
Loading

@Chronostasys
Copy link
Member Author

Lowering example:

Lowerring

async fn main() Future<i64> {
    let a = await task1();
    return await task2(a);
}
fn main() Future<i64> {
    let ctx = MainGeneratorCtx{};
    return new_future(|wk|=>ctx.poll(wk));
}


struct Future<T> {
    poll_f:|wk:*Waker|=>Poll<T>;
    result: Option<T>;
}

impl <T> Future<T> {
    fn poll(wk:*Waker) Poll<T> {
        if self.result is T {
            return Ready{v: self.result as T!};
        } else {
            let p = (self.poll_f)(wk);
            if p is Ready<T> {
                self.result = Some{(p as Ready<T>!).v};
                self.poll_f = |wk|=>Ready{v: self.result as T!};
            }
            return p;
        }
    }
}

fn new_future<T>(poll_f:|wk:*Waker|=>Poll<T>) Future<T> {
    return Future{
        poll_f: poll_f, result: None{}
    };
}

struct MainGeneratorCtx {
    state: i32;
    a_f: Future<i64>;
    b_f: Future<i64>;
    a: i64;
    b: i64;
}


impl MainGeneratorCtx {
    fn poll(wk:*Waker) Poll<i64> {
        while true {
            if self.state == 0 {
                self.a_f = task1();
                self.state = 1;
                return Pending{};
            } else if self.state == 1 {
                let p = self.a_f.poll(wk);
                if p is Pending {
                    return Pending{};
                } else {
                    self.a = (p as Ready<i64>!).v;
                    self.b_f = task2(self.a);
                    self.state = 2;
                    return Pending{};
                }
            } else if self.state == 2 {
                let p = self.b_f.poll(wk);
                if p is Pending {
                    return Pending{};
                } else {
                    self.b = (p as Ready<i64>!).v;
                    self.state = 3;
                    return Pending{};
                }
            } else{
                return Ready{v: self.b};
            }
        }
    }
}

@CjiW
Copy link
Contributor

CjiW commented Mar 9, 2024

use std::chan;

pub struct Task {
    poll: |Waker|=>void;
}

pub struct Waker {
    wake: |*chan::Chan<Task>|=>void;
    ch: *chan::Chan<Task>;
}

pub struct Pending {}

pub struct Ready<T> {
    v: T;
}

pub type Poll<T> = Ready<T> | Pending;

pub trait Future<T> {
    fn poll(wk:Waker) Poll<T>;
}

pub struct FnFuture<T> {
    poll: |Waker|=>Poll<T>;
    status: Poll<T>;
}

impl <T> Future<T> for FnFuture<T> {
    fn poll(wk: Waker) Poll<T> {
        if self.status is Ready<T> {
            return self.status;
        } else {
            return self.poll(wk);
        }
    }
}

impl <T> FnFuture<T> {
    fn then<U>(f: |T|=>U) FnFuture<U> {
        return FnFuture<U> {
            poll: |wk|=>{
                if self.status is Ready<T> {
                    let v = (self.status as Ready<T>!).v;
                    return Ready<U>{v: f(v)} as Poll<U>;
                } else {
                    let p = self.poll(wk);
                    if p is Ready<T> {
                        let v = (p as Ready<T>!).v;
                        return Ready<U>{v: f(v)} as Poll<U>;
                    } else {
                        return Pending{} as Poll<U>;
                    }
                }
            },
            status: Pending{} as Poll<U>,
        };
    }
}

///==========main==========///

fn main() void {
    let exe = executor(10 as u64);
    let d = delay(5 as u64) as Future<()>;
    let ff = FnFuture<()> {
        poll: |wk|=>{
            return d.poll(wk);
        },
        status: Pending{} as Poll<()>,
    };
    let f = ff.then<i64>(|_a|=>i64{
        return 42;
    });
    exe.spawn<i64>(f as Future<i64>);
    exe.run();
    println!("done!");
    return;
}

///==========Delay============///

pub struct Delay {
    when: u64;
}

pub fn delay(secs: u64) Delay {
    return Delay {
        when: unixtime() + secs as u64
    };
}

impl Future<()> for Delay {
    fn poll(wk:Waker) Poll<()> {
        if unixtime() >= self.when {
            return Ready<()>{};
        } else {
            spawn(||=>{
                let rem = self.when - unixtime();
                if rem > 0 {
                    sleep(rem);
                }
                wk.wake(wk.ch);
                return;
            });
            return Pending{};
        }
    }
}

fn unixtime() u64;


///=========thread============///

fn new_thread(f: *||=>void) void;


pub fn spawn(f:||=>void) void {
    new_thread(&f);
    return;
}

pub fn sleep(secs: u64) void;

///==========executor=========///
pub struct Executor {
    ch: *chan::Chan<Task>;
}

pub fn executor(n: u64) Executor {
    return Executor {
        ch: &chan::channel<Task>(n)
    };
}

impl Executor {
    fn run() void {
        while true {
            let task = self.ch.recv();
            let waker = Waker {
                ch: self.ch,
                wake: |ch|=>void {
                    ch.send(task);
                    return;
                }
            };
            task.poll(waker);
        }
        return;
    }
    fn spawn<T>(f: Future<T>) void {
        let task = Task {
            poll: |wk|=>{
                f.poll(wk);
                return;
            },
        };
        self.ch.send(task);
        return;
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: 🏗 In progress
Development

No branches or pull requests

2 participants