Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async状态机实现 #55

Open
taikulawo opened this issue Jun 18, 2021 · 0 comments
Open

Async状态机实现 #55

taikulawo opened this issue Jun 18, 2021 · 0 comments
Labels

Comments

@taikulawo
Copy link
Collaborator

taikulawo commented Jun 18, 2021

Rust中的状态机实现

我概括一下,引入 Future 必须解决自引用被swap的问题

Pin通过 提供 &Self 来让编译器进行入参检查,mem::swap(& mut self) 需要 mut 类型,所以编译器类型检查失败而退出

https://rust-lang.github.io/async-book/04_pinning/01_chapter.html

Future 任务会跨越线程执行,我们知道Future可以编译成状态机yield执行,那在从Future Pending 到 Done 的过程中需要将全部的状态统一保存起来。

这里最好的办法是 struct,也即 async 中所有的变量都在 struct 上分配,并通过self引用。

整个 Future 执行期间全部的变量都通过 enum 进行保存。

Future 是如何保存执行状态的?

编译之前

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
            // Ignore this line for now.
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
    let future = Delay { when };

    let out = future.await;
    assert_eq!(out, "done");
}

编译之后

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
// 每次返回的Future都会存放到 enum 结构体中。这样在Future未结束之前Future会在 enum 持久化
enum MainFuture {
    // Initialized, never polled
    State0,
    // Waiting on `Delay`, i.e. the `future.await` line.
    State1(Delay),
    // The future has completed.
    Terminated,
}

impl Future for MainFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<()>
    {
        use MainFuture::*;

        loop {
            match *self {
                State0 => {
                    // 这些代码并不会执行多次
                    // https://discord.com/channels/500028886025895936/500336333500448798/854955299522084914
                    let when = Instant::now() +
                        Duration::from_millis(10);
                        // 看编译前的代码
                        // 由于when在.await使用,编译器自动将when保存到enum
                        // 所以不用担心async function调用返回后stack里的变量没了
                        // 编译器会分析出来,帮你处理
                    let future = Delay { when };
                    *self = State1(future);
                }
                State1(ref mut my_future) => {
                    match Pin::new(my_future).poll(cx) {
                        Poll::Ready(out) => {
                            assert_eq!(out, "done");
                            *self = Terminated;
                            return Poll::Ready(());
                        }
                        Poll::Pending => {
                            return Poll::Pending;
                        }
                    }
                }
                Terminated => {
                    panic!("future polled after completion")
                }
            }
        }
    }
}

比如如下async代码块

async {
    let mut x = [0; 128];
    let read_into_buf_fut = read_into_buf(&mut x);
    read_into_buf_fut.await;
    println!("{:?}", x);
}

下面是desugar的代码

struct AsyncFuture {
    x: [u8; 128],
    read_into_buf_fut: ReadIntoBuf
    state: State,
}

enum State { 
    Start, 
    AwaitingReadIntoBuf, 
    Done
}

struct ReadIntoBuf {
    buf: &'a mut [u8] // 可能被实现为原始指针,这儿只是为了方便描述
}

会被rustc编译成Future对象,并且跨.await的变量被封装到struct保存,通过生成state维护状态机的状态

并不是全部的local variable 都会被保存,rustc只会保存await需要的变量,这些变量特点是await返回后就会被销毁,所以需要保存到struct

https://discord.com/channels/500028886025895936/500336333500448798/854955299522084914

由于保存变量状态, poll 中获得这状态通过变量引用访问,所以struct往往会保存变量的引用,这就使得 future 不能被移动,所以编译器会为生成的 future 状态机增加 PhantomPinned,使得 future 是 !Unpin

https://os.phil-opp.com/async-await/#pinning-and-futures

https://www.reddit.com/r/learnrust/comments/fkvr15/why_cant_an_async_block_or_fn_implement_unpin/

也正是由于 future !Unpin,为了再次 poll 这个future,必须重新 Pin 住

所以 shadowsocks-rust 这里重新Pin住

https://github.com/willdeeper/shadowsocks-rust/blob/6ff4f5b04b8e22d36cd54477cfcadf8cb403de21/bin/sslocal.rs#L463

虽然看着 server 是 stack local,但 async 会将其保存到 struct,确保pin的整个过程中server始终存在。

https://os.phil-opp.com/async-await/#stack-pinning-and-pin-mut-t

Future 可能会出现自引用结构,比如 https://os.phil-opp.com/async-await/#self-referential-structs,在await之后获取了array的引用。这种情况下future move会出问题

https://os.phil-opp.com/async-await/#pinning-and-futures

The reason that this method takes self: Pin<&mut Self> instead of the normal &mut self is that future instances created from async/await are often self-referential, as we saw above. By wrapping Self into Pin and letting the compiler opt-out of Unpin for self-referential futures generated from async/await, it is guaranteed that the futures are not moved in memory between poll calls


https://os.phil-opp.com/async-await/#saving-state-1

Language-supported implementations of cooperative tasks are often even able to backup up the required parts of the call stack before pausing. As an example, Rust's async/await implementation stores all local variables that are still needed in an automatically generated struct


Rust没找到在线编辑器看编译后的代码,但stackless理论中,js和rust最接近,后面不不明白,可以看js编译后是如何利用函数闭包保存状态的

js async编译后的状态机

为什么说 Pin 是零开销?

Pin! 的内容会分配到堆上防止移动,对于自引用结构为了防止移动一定会内存分配到堆上,零开销的意思是不会进行多余的堆分配。

mem::swap 要求 参数为 &mut T,只要我们保证不泄漏出 mut 就可保证用户代码不会 move 内存造成不安全编程

除非显式 !Unpin, 否则编译器会自动实现Unpin,也就是Rust里默认变量是可以移动的

比如下面这代码
https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=636b71d4acef346a9ce810b735908cb1

https://stackoverflow.com/a/49917903/7529562

Javascript Promise 和 Rust Future 的不同

JavaScript 的 Promise 调用可能会返回新的 Promise,而Rust的Future只会返回 Pending 或 Ready,并不会返回新的 Future

Future在Rust中是块写法

thread::spawn(async {

});

这 async 编译为 Future,Future::poll返回 Pending / Ready,想孵化新的异步任务需调用运行时提供的孵化方法,而不是通过返回 Future

tokio::spawn(async {
    // spawn 孵化新的Future
    let join_hander = tokio::spawn(async {

    });
    // future#poll 会先返回 join_hander 的完成状态
    join_hander.await?;
});

JS中的状态机

我将用 Javascript 来演示 async 如何实现。 async 原理类似 C# 的 async Task,Rust中的 ?. Future

async 本质是依赖 回调函数

async function fetchDataFromRemote(path) {
    const url = 'https://' + path;
    const r1 = await api.get(url)
    const r2 = await api.getFrom(url)
    return [r1,r2]
}

const data = await fetchDataFromRemote('api.github.com/repos/tokio-rs/mini-redis/languages')
fetchDateFromRemote() {
    const url = 'https://' + path;
    let state = 0;
    let finalState = 2;
    let waitFor = null
    switch(state) {
        case 0: {
            waitFor = api.get(url);
            return waitFor;
        }
    }
}

又或者下面从babel拷贝过来的代码

async function getActive(keyId: string) {
    if (typeof itemId === 'string') {
        const r = await (await api.items.get(keyId, itemId)).data;
        setDefault(r.value);
        setFields({
            key: 'value',
            value: r.value
        });
        return;
    }
    const k = await (await api.keys.get(keyId)).data;
    if (Number.parseInt(k?.active) !== -1) {
        const active_item = await (await api.items.get(keyId, itemId)).data
        
        setDefault(active_item.value);
        setFields({
            key: 'value',
            value: active_item.value
        });
    }
}
function getActive(keyId) {
            return __awaiter(this, void 0, void 0, function () {
                var r, k, active_item;
                return __generator(this, function (_a) {
                    switch (_a.label) {
                        case 0:
                            if (!(typeof itemId === 'string')) return [3 /*break*/, 3];
                            return [4 /*yield*/, _api__WEBPACK_IMPORTED_MODULE_5__["default"].items.get(keyId, itemId)];
                        case 1: return [4 /*yield*/, (_a.sent()).data];
                        case 2:
                            r = _a.sent();
                            setDefault(r.value);
                            setFields({
                                key: 'value',
                                value: r.value
                            });
                            return [2 /*return*/];
                        case 3: return [4 /*yield*/, _api__WEBPACK_IMPORTED_MODULE_5__["default"].keys.get(keyId)];
                        case 4: return [4 /*yield*/, (_a.sent()).data];
                        case 5:
                            k = _a.sent();
                            debugger;
                            if (!(Number.parseInt(k === null || k === void 0 ? void 0 : k.active) !== -1)) return [3 /*break*/, 8];
                            return [4 /*yield*/, _api__WEBPACK_IMPORTED_MODULE_5__["default"].items.get(keyId, itemId)];
                        case 6: return [4 /*yield*/, (_a.sent()).data];
                        case 7:
                            active_item = _a.sent();
                            setDefault(active_item.value);
                            setFields({
                                key: 'value',
                                value: active_item.value
                            });
                            _a.label = 8;
                        case 8: return [2 /*return*/];
                    }
                });
            });
        }
@taikulawo taikulawo added the Rust label Jun 18, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant