From 80fce36918623fe04308a92c02b80eaaa57a744b Mon Sep 17 00:00:00 2001 From: Pure White Date: Fri, 20 Oct 2023 14:55:33 +0800 Subject: [PATCH] feat: use afit and rpitit to optimize Service (#239) --- Cargo.lock | 462 ++++++++---------- Cargo.toml | 3 +- README-zh_cn.md | 12 +- README.md | 10 +- examples/src/compression/grpc_client.rs | 2 - examples/src/compression/grpc_server.rs | 2 - examples/src/hello/grpc_client.rs | 2 - examples/src/hello/grpc_server.rs | 2 - examples/src/hello/thrift_client.rs | 2 - examples/src/hello/thrift_server.rs | 2 - examples/src/loadbalance/grpc_client.rs | 2 - examples/src/loadbalance/grpc_server.rs | 2 - examples/src/multiplex/grpc_client.rs | 2 - examples/src/multiplex/grpc_server.rs | 2 - examples/src/streaming/grpc_client.rs | 2 - examples/src/streaming/grpc_server.rs | 2 - examples/src/unknown/thrift_client.rs | 2 - examples/src/unknown/thrift_server.rs | 2 - examples/volo-gen/src/lib.rs | 2 - rust-toolchain.toml | 3 +- volo-build/Cargo.toml | 4 +- volo-build/src/grpc_backend.rs | 18 +- volo-build/src/thrift_backend.rs | 10 +- volo-cli/Cargo.toml | 4 +- .../src/templates/grpc/rust-toolchain_toml | 3 +- volo-cli/src/templates/grpc/src/bin/server_rs | 2 +- volo-cli/src/templates/grpc/src/lib_rs | 2 +- .../src/templates/grpc/volo-gen/src/lib_rs | 2 +- .../src/templates/thrift/rust-toolchain_toml | 3 +- .../src/templates/thrift/src/bin/server_rs | 2 +- volo-cli/src/templates/thrift/src/lib_rs | 2 +- .../src/templates/thrift/volo-gen/src/lib_rs | 2 +- volo-grpc/Cargo.toml | 4 +- volo-grpc/src/client/meta.rs | 138 +++--- volo-grpc/src/client/mod.rs | 25 +- volo-grpc/src/layer/cross_origin.rs | 23 +- volo-grpc/src/layer/grpc_timeout.rs | 21 +- volo-grpc/src/layer/loadbalance/mod.rs | 70 ++- volo-grpc/src/layer/user_agent.rs | 21 +- volo-grpc/src/lib.rs | 1 - volo-grpc/src/server/meta.rs | 176 ++++--- volo-grpc/src/server/router.rs | 29 +- volo-grpc/src/server/service.rs | 73 ++- volo-grpc/src/transport/client.rs | 166 +++---- volo-macros/Cargo.toml | 2 +- volo-thrift/Cargo.toml | 5 +- volo-thrift/src/client/layer/timeout.rs | 57 +-- volo-thrift/src/client/mod.rs | 52 +- volo-thrift/src/lib.rs | 1 - volo-thrift/src/server.rs | 1 - volo-thrift/src/transport/multiplex/client.rs | 71 ++- volo-thrift/src/transport/pingpong/client.rs | 73 ++- .../src/transport/pool/make_transport.rs | 7 +- volo/Cargo.toml | 2 +- volo/src/client.rs | 29 +- volo/src/discovery/mod.rs | 17 +- volo/src/lib.rs | 1 - volo/src/loadbalance/consistent_hash.rs | 73 ++- volo/src/loadbalance/layer.rs | 14 +- volo/src/loadbalance/mod.rs | 11 +- volo/src/loadbalance/random.rs | 56 +-- 61 files changed, 779 insertions(+), 1014 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a184fb66..ff97f985 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,9 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "aho-corasick" -version = "1.0.5" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c378d78423fdad8089616f827526ee33c19f2fddbd5de1629152c9593ba4783" +checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" dependencies = [ "memchr", ] @@ -43,9 +43,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.5.0" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c" +checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" dependencies = [ "anstyle", "anstyle-parse", @@ -57,15 +57,15 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b84bf0a05bbb2a83e5eb6fa36bb6e87baa08193c35ff52bbf6b38d8af2890e46" +checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" [[package]] name = "anstyle-parse" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333" +checksum = "317b9a89c1868f5ea6ff1d9539a69f45dffc21ce321ac1fd1160dfa48c8e2140" dependencies = [ "utf8parse", ] @@ -81,9 +81,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "2.1.0" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd" +checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" dependencies = [ "anstyle", "windows-sys", @@ -114,7 +114,7 @@ checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -136,18 +136,18 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] name = "async-trait" -version = "0.1.73" +version = "0.1.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -206,21 +206,21 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" +checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" [[package]] name = "bumpalo" -version = "3.13.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "byteorder" -version = "1.4.3" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" @@ -248,9 +248,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.30" +version = "0.4.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "defd4e7873dbddba6c7c91e199c7fcb946abc4a6a4ac3195400bcfb01b5de877" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" dependencies = [ "android-tzdata", "iana-time-zone", @@ -260,9 +260,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.3" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84ed82781cea27b43c9b106a979fe450a13a31aab0500595fb3fc06616de08e6" +checksum = "d04704f56c2cde07f43e8e2c154b43f216dc5c92fc98ada720177362f953b956" dependencies = [ "clap_builder", "clap_derive", @@ -270,9 +270,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.2" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bb9faaa7c2ef94b2743a21f5a29e6f0010dff4caa69ac8e9d6cf8b6fa74da08" +checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45" dependencies = [ "anstream", "anstyle", @@ -290,7 +290,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -325,6 +325,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "core-foundation" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.4" @@ -340,16 +350,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam-channel" -version = "0.5.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" -dependencies = [ - "cfg-if", - "crossbeam-utils", -] - [[package]] name = "crossbeam-deque" version = "0.8.3" @@ -390,10 +390,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.0", + "hashbrown 0.14.2", "lock_api", "once_cell", - "parking_lot_core 0.9.8", + "parking_lot_core 0.9.9", ] [[package]] @@ -479,25 +479,14 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.3" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "136526188508e25c6fef639d7927dfb3e0e3084488bf202267829cf7fc23dbdd" +checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" dependencies = [ - "errno-dragonfly", "libc", "windows-sys", ] -[[package]] -name = "errno-dragonfly" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" -dependencies = [ - "cc", - "libc", -] - [[package]] name = "event-listener" version = "3.0.0" @@ -541,9 +530,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] name = "faststr" @@ -563,9 +552,9 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "flate2" -version = "1.0.27" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6c98ee8095e9d1dcbf2fcc6d95acccb90d1c81db1e44725c6a984b1dbdfb010" +checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" dependencies = [ "crc32fast", "miniz_oxide", @@ -652,7 +641,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -738,9 +727,9 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "hashbrown" -version = "0.14.0" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +checksum = "f93e7192158dbcda357bdec5fb5788eebf8bbac027f3f33e719d29135ae84156" [[package]] name = "heck" @@ -759,9 +748,9 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hermit-abi" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" +checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" [[package]] name = "hex" @@ -870,16 +859,16 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.57" +version = "0.1.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fad5b825842d2b38bd206f3e81d6957625fd7f0a361e345c30e01a0ae2dd613" +checksum = "8326b86b6cff230b97d0d312a6c40a60726df3332e721f72a1b035f451663b20" dependencies = [ "android_system_properties", "core-foundation-sys", "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows", + "windows-core", ] [[package]] @@ -913,12 +902,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.0.0" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" +checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" dependencies = [ "equivalent", - "hashbrown 0.14.0", + "hashbrown 0.14.2", ] [[package]] @@ -940,17 +929,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "io-lifetimes" -version = "1.0.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" -dependencies = [ - "hermit-abi", - "libc", - "windows-sys", -] - [[package]] name = "ipnet" version = "2.8.0" @@ -964,7 +942,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi", - "rustix 0.38.13", + "rustix", "windows-sys", ] @@ -977,15 +955,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" -dependencies = [ - "either", -] - [[package]] name = "itoa" version = "1.0.9" @@ -1009,15 +978,15 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.148" +version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" +checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" [[package]] name = "libm" -version = "0.2.7" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7012b1bbb0719e1097c47611d3898568c546d597c2e74d66f6087edd5233ff4" +checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" [[package]] name = "linked-hash-map" @@ -1038,21 +1007,15 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" - -[[package]] -name = "linux-raw-sys" -version = "0.4.7" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a9bad9f94746442c783ca431b22403b519cd7fbeed0533fdd6328b2f2212128" +checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" [[package]] name = "lock_api" -version = "0.4.10" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16" +checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" dependencies = [ "autocfg", "scopeguard", @@ -1075,15 +1038,15 @@ dependencies = [ [[package]] name = "matchit" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed1202b2a6f884ae56f04cff409ab315c5ce26b5e58d7412e484f01fd52f52ef" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" [[package]] name = "memchr" -version = "2.6.3" +version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" +checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" [[package]] name = "memoffset" @@ -1140,9 +1103,8 @@ dependencies = [ [[package]] name = "motore" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "110e416ff69dbeaf664c3cfe17800058a6636115dcefe7d8176a429eabc3a3c4" +version = "0.4.0" +source = "git+https://github.com/cloudwego/motore?branch=main#f840541d077130b2b7d1e61250575d28363b234e" dependencies = [ "futures", "motore-macros", @@ -1153,9 +1115,8 @@ dependencies = [ [[package]] name = "motore-macros" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16d94e11446e793bddb50dfb5d129cca79f0ca3ed515e0d1914e8d8ae55716e9" +version = "0.4.0" +source = "git+https://github.com/cloudwego/motore?branch=main#f840541d077130b2b7d1e61250575d28363b234e" dependencies = [ "proc-macro2", "quote", @@ -1174,7 +1135,7 @@ version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" dependencies = [ - "bitflags 2.4.0", + "bitflags 2.4.1", "cfg-if", "libc", "memoffset", @@ -1211,9 +1172,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" dependencies = [ "autocfg", "libm", @@ -1247,7 +1208,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -1307,7 +1268,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.8", + "parking_lot_core 0.9.9", ] [[package]] @@ -1326,13 +1287,13 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.8" +version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" +checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.3.5", + "redox_syscall 0.4.1", "smallvec", "windows-targets", ] @@ -1362,7 +1323,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ "fixedbitset", - "indexmap 2.0.0", + "indexmap 2.0.2", ] [[package]] @@ -1395,7 +1356,7 @@ dependencies = [ "phf_shared", "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -1439,7 +1400,7 @@ dependencies = [ "faststr", "fxhash", "heck 0.4.1", - "itertools 0.10.5", + "itertools", "lazy_static", "normpath", "paste", @@ -1455,7 +1416,7 @@ dependencies = [ "scoped-tls", "serde", "serde_yaml", - "syn 2.0.33", + "syn 2.0.38", "toml", "tracing", "tracing-subscriber", @@ -1486,7 +1447,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -1529,28 +1490,28 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.67" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ "unicode-ident", ] [[package]] name = "proptest" -version = "1.2.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e35c06b98bf36aba164cc17cb25f7e232f5c4aeea73baa14b8a9f0d92dbfa65" +checksum = "7c003ac8c77cb07bb74f5f198bce836a689bcd5a42574612bf14d17bfd08c20e" dependencies = [ "bit-set", - "bitflags 1.3.2", - "byteorder", + "bit-vec", + "bitflags 2.4.1", "lazy_static", "num-traits", "rand", "rand_chacha", "rand_xorshift", - "regex-syntax 0.6.29", + "regex-syntax 0.7.5", "rusty-fork", "tempfile", "unarray", @@ -1648,9 +1609,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b" +checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1" dependencies = [ "either", "rayon-core", @@ -1658,14 +1619,12 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.11.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" +checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed" dependencies = [ - "crossbeam-channel", "crossbeam-deque", "crossbeam-utils", - "num_cpus", ] [[package]] @@ -1686,6 +1645,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_users" version = "0.4.3" @@ -1699,14 +1667,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.5" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.8", - "regex-syntax 0.7.5", + "regex-automata 0.4.3", + "regex-syntax 0.8.2", ] [[package]] @@ -1720,13 +1688,13 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.8" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" +checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.7.5", + "regex-syntax 0.8.2", ] [[package]] @@ -1741,11 +1709,17 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +[[package]] +name = "regex-syntax" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" + [[package]] name = "reqwest" -version = "0.11.20" +version = "0.11.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" +checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ "base64 0.21.4", "bytes", @@ -1769,6 +1743,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "system-configuration", "tokio", "tokio-rustls", "tower-service", @@ -1776,7 +1751,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots 0.25.2", + "webpki-roots", "winreg", ] @@ -1818,28 +1793,14 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" [[package]] name = "rustix" -version = "0.37.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" -dependencies = [ - "bitflags 1.3.2", - "errno", - "io-lifetimes", - "libc", - "linux-raw-sys 0.3.8", - "windows-sys", -] - -[[package]] -name = "rustix" -version = "0.38.13" +version = "0.38.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7db8590df6dfcd144d22afd1b83b36c21a18d7cbc1dc4bb5295a8712e9eb662" +checksum = "67ce50cb2e16c2903e30d1cbccfd8387a74b9d4c938b6a4c5ec6cc7556f7a8a0" dependencies = [ - "bitflags 2.4.0", + "bitflags 2.4.1", "errno", "libc", - "linux-raw-sys 0.4.7", + "linux-raw-sys", "windows-sys", ] @@ -1851,7 +1812,7 @@ checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" dependencies = [ "log", "ring", - "rustls-webpki 0.101.5", + "rustls-webpki", "sct", ] @@ -1866,19 +1827,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.100.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f6a5fc258f1c1276dfe3016516945546e2d5383911efc0fc4f1cdc5df3a4ae3" -dependencies = [ - "ring", - "untrusted", -] - -[[package]] -name = "rustls-webpki" -version = "0.101.5" +version = "0.101.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45a27e3b59326c16e23d30aeb7a36a24cc0d29e71d68ff611cdfb4a01d013bed" +checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe" dependencies = [ "ring", "untrusted", @@ -1964,28 +1915,28 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.18" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" +checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" [[package]] name = "serde" -version = "1.0.188" +version = "1.0.189" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e" +checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.188" +version = "1.0.189" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" +checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -2026,7 +1977,7 @@ version = "0.9.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a49e178e4452f45cb61d0cd8cebc1b0fafd3e41929e996cef79aa3aca91f574" dependencies = [ - "indexmap 2.0.0", + "indexmap 2.0.2", "itoa", "ryu", "serde", @@ -2035,9 +1986,9 @@ dependencies = [ [[package]] name = "sharded-slab" -version = "0.1.4" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" dependencies = [ "lazy_static", ] @@ -2068,9 +2019,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" [[package]] name = "socket2" @@ -2117,15 +2068,36 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.33" +version = "2.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9caece70c63bfba29ec2fed841a09851b14a235c60010fa4de58089b6c025668" +checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" dependencies = [ "proc-macro2", "quote", "unicode-ident", ] +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tempfile" version = "3.8.0" @@ -2135,47 +2107,47 @@ dependencies = [ "cfg-if", "fastrand", "redox_syscall 0.3.5", - "rustix 0.38.13", + "rustix", "windows-sys", ] [[package]] name = "termcolor" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" +checksum = "6093bad37da69aab9d123a8091e4be0aa4a03e4d601ec641c327398315f62b64" dependencies = [ "winapi-util", ] [[package]] name = "terminal_size" -version = "0.2.6" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e6bf6f19e9f8ed8d4048dc22981458ebcf406d67e94cd422e5ecd73d63b3237" +checksum = "21bebf2b7c9e0a515f6e0f8c51dc0f8e4696391e6f1ff30379559f8365fb0df7" dependencies = [ - "rustix 0.37.23", + "rustix", "windows-sys", ] [[package]] name = "thiserror" -version = "1.0.48" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" +checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.48" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" +checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -2205,9 +2177,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.32.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" dependencies = [ "backtrace", "bytes", @@ -2240,7 +2212,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -2266,9 +2238,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" dependencies = [ "bytes", "futures-core", @@ -2306,7 +2278,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.0.0", + "indexmap 2.0.2", "serde", "serde_spanned", "toml_datetime", @@ -2347,11 +2319,10 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.37" +version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ - "cfg-if", "log", "pin-project-lite", "tracing-attributes", @@ -2360,20 +2331,20 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] name = "tracing-core" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", "valuable", @@ -2475,20 +2446,20 @@ dependencies = [ [[package]] name = "ureq" -version = "2.7.1" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b11c96ac7ee530603dcdf68ed1557050f374ce55a5a07193ebf8cbc9f8927e9" +checksum = "f5ccd538d4a604753ebc2f17cd9946e89b77bf87f6a8e2309667c6f2e87855e3" dependencies = [ "base64 0.21.4", "flate2", "log", "once_cell", "rustls", - "rustls-webpki 0.100.3", + "rustls-webpki", "serde", "serde_json", "url", - "webpki-roots 0.23.1", + "webpki-roots", ] [[package]] @@ -2522,7 +2493,7 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" [[package]] name = "volo" -version = "0.5.5" +version = "0.8.0" dependencies = [ "async-broadcast", "async-trait", @@ -2548,13 +2519,13 @@ dependencies = [ [[package]] name = "volo-build" -version = "0.6.2" +version = "0.8.0" dependencies = [ "anyhow", "async-trait", "dirs", "heck 0.4.1", - "itertools 0.11.0", + "itertools", "lazy_static", "nom", "normpath", @@ -2575,13 +2546,13 @@ dependencies = [ [[package]] name = "volo-cli" -version = "0.6.2" +version = "0.8.0" dependencies = [ "anyhow", "clap", "colored", "heck 0.4.1", - "itertools 0.11.0", + "itertools", "lazy_static", "log", "normpath", @@ -2613,7 +2584,7 @@ dependencies = [ [[package]] name = "volo-grpc" -version = "0.6.0" +version = "0.8.0" dependencies = [ "anyhow", "async-stream", @@ -2651,11 +2622,11 @@ version = "0.0.0" [[package]] name = "volo-macros" -version = "0.3.0" +version = "0.8.0" [[package]] name = "volo-thrift" -version = "0.7.3" +version = "0.8.0" dependencies = [ "anyhow", "async-trait", @@ -2734,7 +2705,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", "wasm-bindgen-shared", ] @@ -2768,7 +2739,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2789,15 +2760,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki-roots" -version = "0.23.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b03058f88386e5ff5310d9111d53f48b17d732b401aeb83a8d5190f2ac459338" -dependencies = [ - "rustls-webpki 0.100.3", -] - [[package]] name = "webpki-roots" version = "0.25.2" @@ -2813,7 +2775,7 @@ dependencies = [ "either", "home", "once_cell", - "rustix 0.38.13", + "rustix", ] [[package]] @@ -2834,9 +2796,9 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] name = "winapi-util" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" dependencies = [ "winapi", ] @@ -2848,10 +2810,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] -name = "windows" -version = "0.48.0" +name = "windows-core" +version = "0.51.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64" dependencies = [ "windows-targets", ] @@ -2924,9 +2886,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "winnow" -version = "0.5.15" +version = "0.5.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c2e3184b9c4e92ad5167ca73039d0c42476302ab603e2fec4487511f38ccefc" +checksum = "a3b801d0e0a6726477cc207f60162da452f3a95adb368399bef20a946e06f65c" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index f12dff64..ca510712 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,8 @@ pilota = { git = "https://github.com/cloudwego/pilota", branch = "main" } pilota-build = { git = "https://github.com/cloudwego/pilota", branch = "main" } pilota-thrift-parser = { git = "https://github.com/cloudwego/pilota", branch = "main" } -motore = "0.3" +# motore = "0.4" +motore = { git = "https://github.com/cloudwego/motore", branch = "main" } metainfo = "0.7" diff --git a/README-zh_cn.md b/README-zh_cn.md index 46146887..f9848dd7 100644 --- a/README-zh_cn.md +++ b/README-zh_cn.md @@ -11,9 +11,9 @@ [English](README.md) | 中文 -Volo 是字节跳动服务框架团队研发的 **高性能**、**可扩展性强** 的 Rust RPC 框架,使用了 Rust 最新的 GAT 特性。 +Volo 是字节跳动服务框架团队研发的 **高性能**、**可扩展性强** 的 Rust RPC 框架,使用了 Rust 最新的 AFIT 和 RPITIT 特性。 -Volo 使用 [`Motore`][motore] 作为其中间件抽象层, Motore 基于 GAT 设计。 +Volo 使用 [`Motore`][motore] 作为其中间件抽象层, Motore 基于 AFIT 和 RPITIT 设计。 ## 概览 @@ -30,11 +30,11 @@ Volo 主要包含 6 个 crate 库: ### 特点 -#### 使用 GAT 特性 +#### 使用 AFIT 和 RPITIT 特性 -Volo 使用 [`Motore`][motore] 作为其中间件抽象层, Motore 基于 GAT 设计。 +Volo 使用 [`Motore`][motore] 作为其中间件抽象层, Motore 基于 AFIT 和 RPITIT 设计。 -通过 GAT,我们可以避免很多不必要的 Box 内存分配,以及提升易用性,给用户提供更友好的编程接口和更符合人体工程学的编程范式。 +通过 RPITIT,我们可以避免很多不必要的 Box 内存分配,以及提升易用性,给用户提供更友好的编程接口和更符合人体工程学的编程范式。 #### 高性能 @@ -77,7 +77,7 @@ Volo-gRPC: https://www.cloudwego.io/zh/docs/volo/volo-grpc/getting-started/ ## 相关生态 - [Volo-rs][volo-rs]: Volo 的相关生态,包含了 Volo 的许多组件 -- [Motore][motore]: Volo 参考 Tower 设计的,使用了 GAT 的 middleware 抽象层 +- [Motore][motore]: Volo 参考 Tower 设计的,使用了 AFIT 和 RPITIT 的 middleware 抽象层 - [Pilota][pilota]: Volo 使用的 Thrift 与 Protobuf 编译器及编解码的纯 Rust 实现(不依赖 protoc) - [Metainfo][metainfo]: Volo 用于进行元信息透传的组件,期望定义一套元信息透传的标准 diff --git a/README.md b/README.md index dbd971bc..c4611e48 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ English | [中文](README-zh_cn.md) Volo is a **high-performance** and **strong-extensibility** Rust RPC framework that helps developers build microservices. -Volo uses [`Motore`][motore] as its middleware abstraction, which is powered by GAT. +Volo uses [`Motore`][motore] as its middleware abstraction, which is powered by AFIT and RPITIT. ## Overview @@ -30,11 +30,11 @@ Volo mainly consists of six crates: ### Features -#### Powered by GAT +#### Powered by AFIT and RPITIT -Volo uses [`Motore`][motore] as its middleware abstraction, which is powered by GAT. +Volo uses [`Motore`][motore] as its middleware abstraction, which is powered by AFIT and RPITIT. -Through GAT, we can avoid many unnecessary `Box` memory allocations, improve ease of use, and provide users with a more friendly programming interface and a more ergonomic programming paradigm. +Through RPITIT, we can avoid many unnecessary `Box` memory allocations, improve ease of use, and provide users with a more friendly programming interface and a more ergonomic programming paradigm. #### High Performance @@ -77,7 +77,7 @@ See [Examples][examples]. ## Related Projects - [Volo-rs][volo-rs]: The volo ecosystem which contains a lot of useful components. -- [Motore][motore]: Middleware abstraction layer powered by GAT. +- [Motore][motore]: Middleware abstraction layer powered by AFIT and RPITIT. - [Pilota][pilota]: A thrift and protobuf implementation in pure rust with high performance and extensibility. - [Metainfo][metainfo]: Transmissing metainfo across components. diff --git a/examples/src/compression/grpc_client.rs b/examples/src/compression/grpc_client.rs index 403d84e6..e7f702c3 100644 --- a/examples/src/compression/grpc_client.rs +++ b/examples/src/compression/grpc_client.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use lazy_static::lazy_static; diff --git a/examples/src/compression/grpc_server.rs b/examples/src/compression/grpc_server.rs index 6d80bfd1..9bc42e96 100644 --- a/examples/src/compression/grpc_server.rs +++ b/examples/src/compression/grpc_server.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use volo_grpc::{ diff --git a/examples/src/hello/grpc_client.rs b/examples/src/hello/grpc_client.rs index 006cdf7e..3e722dfc 100644 --- a/examples/src/hello/grpc_client.rs +++ b/examples/src/hello/grpc_client.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use lazy_static::lazy_static; diff --git a/examples/src/hello/grpc_server.rs b/examples/src/hello/grpc_server.rs index a44cbed2..9829d17a 100644 --- a/examples/src/hello/grpc_server.rs +++ b/examples/src/hello/grpc_server.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use volo_grpc::server::{Server, ServiceBuilder}; diff --git a/examples/src/hello/thrift_client.rs b/examples/src/hello/thrift_client.rs index ad1e2af0..379178d0 100644 --- a/examples/src/hello/thrift_client.rs +++ b/examples/src/hello/thrift_client.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use lazy_static::lazy_static; diff --git a/examples/src/hello/thrift_server.rs b/examples/src/hello/thrift_server.rs index 367c59b5..142b0be2 100644 --- a/examples/src/hello/thrift_server.rs +++ b/examples/src/hello/thrift_server.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; pub struct S; diff --git a/examples/src/loadbalance/grpc_client.rs b/examples/src/loadbalance/grpc_client.rs index 21901331..8574309b 100644 --- a/examples/src/loadbalance/grpc_client.rs +++ b/examples/src/loadbalance/grpc_client.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::{ cell::RefCell, hash::{Hash, Hasher}, diff --git a/examples/src/loadbalance/grpc_server.rs b/examples/src/loadbalance/grpc_server.rs index 5ba8ec93..83f2b788 100644 --- a/examples/src/loadbalance/grpc_server.rs +++ b/examples/src/loadbalance/grpc_server.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use tokio::task; diff --git a/examples/src/multiplex/grpc_client.rs b/examples/src/multiplex/grpc_client.rs index 55a2e57e..7f7cc492 100644 --- a/examples/src/multiplex/grpc_client.rs +++ b/examples/src/multiplex/grpc_client.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use lazy_static::lazy_static; diff --git a/examples/src/multiplex/grpc_server.rs b/examples/src/multiplex/grpc_server.rs index 7fc4d859..cd01914d 100644 --- a/examples/src/multiplex/grpc_server.rs +++ b/examples/src/multiplex/grpc_server.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use volo_grpc::server::{Server, ServiceBuilder}; diff --git a/examples/src/streaming/grpc_client.rs b/examples/src/streaming/grpc_client.rs index 8d0f8a18..01fc9fe8 100644 --- a/examples/src/streaming/grpc_client.rs +++ b/examples/src/streaming/grpc_client.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use async_stream::stream; diff --git a/examples/src/streaming/grpc_server.rs b/examples/src/streaming/grpc_server.rs index 377ed132..21e2eeaa 100644 --- a/examples/src/streaming/grpc_server.rs +++ b/examples/src/streaming/grpc_server.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use tokio::sync::mpsc; diff --git a/examples/src/unknown/thrift_client.rs b/examples/src/unknown/thrift_client.rs index 5a0d1d10..ad08531f 100644 --- a/examples/src/unknown/thrift_client.rs +++ b/examples/src/unknown/thrift_client.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use lazy_static::lazy_static; diff --git a/examples/src/unknown/thrift_server.rs b/examples/src/unknown/thrift_server.rs index c9bb3fe1..5255bde1 100644 --- a/examples/src/unknown/thrift_server.rs +++ b/examples/src/unknown/thrift_server.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; pub struct S; diff --git a/examples/volo-gen/src/lib.rs b/examples/volo-gen/src/lib.rs index 97e9cdd6..987d7b91 100644 --- a/examples/volo-gen/src/lib.rs +++ b/examples/volo-gen/src/lib.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - mod gen { volo::include_service!("thrift_gen.rs"); volo::include_service!("proto_gen.rs"); diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 8e47c085..ecd40730 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,3 @@ [toolchain] -# TODO: we can remove this toolchain file when TAIT hits stable. -# Related issue: https://github.com/rust-lang/rust/issues/63063. +# TODO: we can remove this toolchain file when AFIT and RPITIT hits stable. channel = "nightly" diff --git a/volo-build/Cargo.toml b/volo-build/Cargo.toml index a135e098..7a095944 100644 --- a/volo-build/Cargo.toml +++ b/volo-build/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volo-build" -version = "0.6.2" +version = "0.8.0" edition.workspace = true homepage.workspace = true repository.workspace = true @@ -17,7 +17,7 @@ keywords = ["thrift", "grpc", "protobuf", "volo", "build"] maintenance = { status = "actively-developed" } [dependencies] -volo = { version = "0.5", path = "../volo" } +volo = { version = "0.8", path = "../volo" } pilota-build.workspace = true diff --git a/volo-build/src/grpc_backend.rs b/volo-build/src/grpc_backend.rs index ff5581cf..1c43550b 100644 --- a/volo-build/src/grpc_backend.rs +++ b/volo-build/src/grpc_backend.rs @@ -547,20 +547,14 @@ impl CodegenBackend for VoloGrpcBackend { {{ type Response = ::volo_grpc::Response<{resp_enum_name_send}>; type Error = ::volo_grpc::status::Status; - type Future<'cx> = impl ::std::future::Future> + 'cx; - fn call<'cx, 's>(&'s self, cx: &'cx mut ::volo_grpc::context::ServerContext, req: ::volo_grpc::Request<{req_enum_name_recv}>) -> Self::Future<'cx> - where - 's: 'cx, - {{ + async fn call<'s, 'cx>(&'s self, cx: &'cx mut ::volo_grpc::context::ServerContext, req: ::volo_grpc::Request<{req_enum_name_recv}>) -> ::std::result::Result {{ let inner = self.inner.clone(); - async move {{ - match cx.rpc_info.method().unwrap().as_str() {{ - {req_matches} - path => {{ - let path = path.to_string(); - Err(::volo_grpc::Status::unimplemented(::std::format!("Unimplemented http path: {{}}", path))) - }} + match cx.rpc_info.method().unwrap().as_str() {{ + {req_matches} + path => {{ + let path = path.to_string(); + Err(::volo_grpc::Status::unimplemented(::std::format!("Unimplemented http path: {{}}", path))) }} }} }} diff --git a/volo-build/src/thrift_backend.rs b/volo-build/src/thrift_backend.rs index 6e9ea759..3388aa7c 100644 --- a/volo-build/src/thrift_backend.rs +++ b/volo-build/src/thrift_backend.rs @@ -588,13 +588,9 @@ impl pilota_build::CodegenBackend for VoloThriftBackend { type Response = {res_send_name}; type Error = ::anyhow::Error; - type Future<'cx> = impl ::std::future::Future> + 'cx; - - fn call<'cx, 's>(&'s self, _cx: &'cx mut ::volo_thrift::context::ServerContext, req: {req_recv_name}) -> Self::Future<'cx> where 's:'cx {{ - async move {{ - match req {{ - {handler} - }} + async fn call<'s, 'cx>(&'s self, _cx: &'cx mut ::volo_thrift::context::ServerContext, req: {req_recv_name}) -> ::std::result::Result {{ + match req {{ + {handler} }} }} }}"# diff --git a/volo-cli/Cargo.toml b/volo-cli/Cargo.toml index 6e86a51a..3acfa44c 100644 --- a/volo-cli/Cargo.toml +++ b/volo-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volo-cli" -version = "0.6.2" +version = "0.8.0" edition.workspace = true homepage.workspace = true repository.workspace = true @@ -22,7 +22,7 @@ keywords = ["thrift", "grpc", "protobuf", "volo", "cli"] maintenance = { status = "actively-developed" } [dependencies] -volo-build = { version = "0.6", path = "../volo-build" } +volo-build = { version = "0.8", path = "../volo-build" } pilota-thrift-parser.workspace = true anyhow.workspace = true diff --git a/volo-cli/src/templates/grpc/rust-toolchain_toml b/volo-cli/src/templates/grpc/rust-toolchain_toml index 8e47c085..ecd40730 100644 --- a/volo-cli/src/templates/grpc/rust-toolchain_toml +++ b/volo-cli/src/templates/grpc/rust-toolchain_toml @@ -1,4 +1,3 @@ [toolchain] -# TODO: we can remove this toolchain file when TAIT hits stable. -# Related issue: https://github.com/rust-lang/rust/issues/63063. +# TODO: we can remove this toolchain file when AFIT and RPITIT hits stable. channel = "nightly" diff --git a/volo-cli/src/templates/grpc/src/bin/server_rs b/volo-cli/src/templates/grpc/src/bin/server_rs index 4e37b629..5d92fed9 100644 --- a/volo-cli/src/templates/grpc/src/bin/server_rs +++ b/volo-cli/src/templates/grpc/src/bin/server_rs @@ -1,4 +1,4 @@ -#![feature(impl_trait_in_assoc_type)] + use std::net::SocketAddr; diff --git a/volo-cli/src/templates/grpc/src/lib_rs b/volo-cli/src/templates/grpc/src/lib_rs index 879cf755..e4c010c0 100644 --- a/volo-cli/src/templates/grpc/src/lib_rs +++ b/volo-cli/src/templates/grpc/src/lib_rs @@ -1,4 +1,4 @@ -#![feature(impl_trait_in_assoc_type)] + pub struct S; diff --git a/volo-cli/src/templates/grpc/volo-gen/src/lib_rs b/volo-cli/src/templates/grpc/volo-gen/src/lib_rs index 38503f80..fe51797c 100644 --- a/volo-cli/src/templates/grpc/volo-gen/src/lib_rs +++ b/volo-cli/src/templates/grpc/volo-gen/src/lib_rs @@ -1,4 +1,4 @@ -#![feature(impl_trait_in_assoc_type)] + mod gen {{ volo::include_service!("volo_gen.rs"); diff --git a/volo-cli/src/templates/thrift/rust-toolchain_toml b/volo-cli/src/templates/thrift/rust-toolchain_toml index 8e47c085..ecd40730 100644 --- a/volo-cli/src/templates/thrift/rust-toolchain_toml +++ b/volo-cli/src/templates/thrift/rust-toolchain_toml @@ -1,4 +1,3 @@ [toolchain] -# TODO: we can remove this toolchain file when TAIT hits stable. -# Related issue: https://github.com/rust-lang/rust/issues/63063. +# TODO: we can remove this toolchain file when AFIT and RPITIT hits stable. channel = "nightly" diff --git a/volo-cli/src/templates/thrift/src/bin/server_rs b/volo-cli/src/templates/thrift/src/bin/server_rs index 8089a522..b9b255e3 100644 --- a/volo-cli/src/templates/thrift/src/bin/server_rs +++ b/volo-cli/src/templates/thrift/src/bin/server_rs @@ -1,4 +1,4 @@ -#![feature(impl_trait_in_assoc_type)] + use std::net::SocketAddr; diff --git a/volo-cli/src/templates/thrift/src/lib_rs b/volo-cli/src/templates/thrift/src/lib_rs index 879cf755..e4c010c0 100644 --- a/volo-cli/src/templates/thrift/src/lib_rs +++ b/volo-cli/src/templates/thrift/src/lib_rs @@ -1,4 +1,4 @@ -#![feature(impl_trait_in_assoc_type)] + pub struct S; diff --git a/volo-cli/src/templates/thrift/volo-gen/src/lib_rs b/volo-cli/src/templates/thrift/volo-gen/src/lib_rs index 38503f80..fe51797c 100644 --- a/volo-cli/src/templates/thrift/volo-gen/src/lib_rs +++ b/volo-cli/src/templates/thrift/volo-gen/src/lib_rs @@ -1,4 +1,4 @@ -#![feature(impl_trait_in_assoc_type)] + mod gen {{ volo::include_service!("volo_gen.rs"); diff --git a/volo-grpc/Cargo.toml b/volo-grpc/Cargo.toml index 074a72f5..0146787c 100644 --- a/volo-grpc/Cargo.toml +++ b/volo-grpc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volo-grpc" -version = "0.6.0" +version = "0.8.0" edition.workspace = true homepage.workspace = true repository.workspace = true @@ -19,7 +19,7 @@ maintenance = { status = "actively-developed" } [dependencies] pilota.workspace = true -volo = { version = "0.5", path = "../volo" } +volo = { version = "0.8", path = "../volo" } motore = { workspace = true, features = ["tower"] } metainfo.workspace = true diff --git a/volo-grpc/src/client/meta.rs b/volo-grpc/src/client/meta.rs index 49f8a806..45975cf9 100644 --- a/volo-grpc/src/client/meta.rs +++ b/volo-grpc/src/client/meta.rs @@ -1,6 +1,5 @@ use std::{net::SocketAddr, str::FromStr}; -use futures::Future; use metainfo::{Backward, Forward}; use volo::{context::Context, Service}; @@ -36,95 +35,86 @@ where type Error = S::Error; - type Future<'cx> = impl Future> + 'cx; - - fn call<'cx, 's>( + async fn call<'s, 'cx>( &'s self, cx: &'cx mut ClientContext, mut volo_req: Request, - ) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - let metadata = volo_req.metadata_mut(); - _ = metainfo::METAINFO.with(|metainfo| { - let metainfo = metainfo.borrow_mut(); - - // persistents for multi-hops - if let Some(ap) = metainfo.get_all_persistents() { - for (key, value) in ap { - let key = metainfo::HTTP_PREFIX_PERSISTENT.to_owned() + key; - metadata.insert(MetadataKey::from_str(key.as_str())?, value.parse()?); - } - } - - // transients for one-hop - if let Some(at) = metainfo.get_all_transients() { - for (key, value) in at { - let key = metainfo::HTTP_PREFIX_TRANSIENT.to_owned() + key; - metadata.insert(MetadataKey::from_str(key.as_str())?, value.parse()?); - } + ) -> Result { + let metadata = volo_req.metadata_mut(); + _ = metainfo::METAINFO.with(|metainfo| { + let metainfo = metainfo.borrow_mut(); + + // persistents for multi-hops + if let Some(ap) = metainfo.get_all_persistents() { + for (key, value) in ap { + let key = metainfo::HTTP_PREFIX_PERSISTENT.to_owned() + key; + metadata.insert(MetadataKey::from_str(key.as_str())?, value.parse()?); } + } - // caller - if let Some(caller) = cx.rpc_info.caller.as_ref() { - metadata.insert(SOURCE_SERVICE, caller.service_name().parse()?); + // transients for one-hop + if let Some(at) = metainfo.get_all_transients() { + for (key, value) in at { + let key = metainfo::HTTP_PREFIX_TRANSIENT.to_owned() + key; + metadata.insert(MetadataKey::from_str(key.as_str())?, value.parse()?); } - - // callee - if let Some(callee) = cx.rpc_info.callee.as_ref() { - metadata.insert(DESTINATION_SERVICE, callee.service_name().parse()?); - if let Some(method) = cx.rpc_info.method() { - metadata.insert(DESTINATION_METHOD, method.parse()?); - } + } + + // caller + if let Some(caller) = cx.rpc_info.caller.as_ref() { + metadata.insert(SOURCE_SERVICE, caller.service_name().parse()?); + } + + // callee + if let Some(callee) = cx.rpc_info.callee.as_ref() { + metadata.insert(DESTINATION_SERVICE, callee.service_name().parse()?); + if let Some(method) = cx.rpc_info.method() { + metadata.insert(DESTINATION_METHOD, method.parse()?); } + } - Ok::<(), Status>(()) - }); + Ok::<(), Status>(()) + }); - let mut volo_resp = self.inner.call(cx, volo_req).await?; + let mut volo_resp = self.inner.call(cx, volo_req).await?; - let metadata = volo_resp.metadata_mut(); - _ = metainfo::METAINFO.with(|metainfo| { - let mut metainfo = metainfo.borrow_mut(); + let metadata = volo_resp.metadata_mut(); + _ = metainfo::METAINFO.with(|metainfo| { + let mut metainfo = metainfo.borrow_mut(); - // callee - if let Some(ad) = metadata.remove(HEADER_TRANS_REMOTE_ADDR) { - let maybe_addr = ad.to_str()?.parse::(); - if let (Some(callee), Ok(addr)) = - (cx.rpc_info_mut().callee.as_mut(), maybe_addr) - { - callee.set_address(volo::net::Address::from(addr)); - } + // callee + if let Some(ad) = metadata.remove(HEADER_TRANS_REMOTE_ADDR) { + let maybe_addr = ad.to_str()?.parse::(); + if let (Some(callee), Ok(addr)) = (cx.rpc_info_mut().callee.as_mut(), maybe_addr) { + callee.set_address(volo::net::Address::from(addr)); } - - // backward - let mut vec = Vec::with_capacity(metadata.len()); - for key_and_value in metadata.iter() { - match key_and_value { - KeyAndValueRef::Ascii(k, v) => { - let k = k.as_str(); - let v = v.to_str()?; - if k.starts_with(metainfo::HTTP_PREFIX_BACKWARD) { - vec.push(k.to_owned()); - metainfo.strip_http_prefix_and_set_backward_downstream( - k.to_owned(), - v.to_owned(), - ); - } + } + + // backward + let mut vec = Vec::with_capacity(metadata.len()); + for key_and_value in metadata.iter() { + match key_and_value { + KeyAndValueRef::Ascii(k, v) => { + let k = k.as_str(); + let v = v.to_str()?; + if k.starts_with(metainfo::HTTP_PREFIX_BACKWARD) { + vec.push(k.to_owned()); + metainfo.strip_http_prefix_and_set_backward_downstream( + k.to_owned(), + v.to_owned(), + ); } - _ => unreachable!(), } + _ => unreachable!(), } - for k in vec { - metadata.remove(k); - } + } + for k in vec { + metadata.remove(k); + } - Ok::<(), Status>(()) - }); + Ok::<(), Status>(()) + }); - Ok(volo_resp) - } + Ok(volo_resp) } } diff --git a/volo-grpc/src/client/mod.rs b/volo-grpc/src/client/mod.rs index 534d7fd8..6ea2dc3b 100644 --- a/volo-grpc/src/client/mod.rs +++ b/volo-grpc/src/client/mod.rs @@ -11,7 +11,6 @@ mod meta; use std::{cell::RefCell, marker::PhantomData, sync::Arc, time::Duration}; pub use callopt::CallOpt; -use futures::Future; pub use meta::MetaService; use motore::{ layer::{Identity, Layer, Stack}, @@ -399,13 +398,10 @@ where Service, Response = Response> + 'static + Send + Clone + Sync, <>::Service as Service>>::Error: Into, - for<'cx> <>::Service as Service>>::Future<'cx>: - Send, IL: Layer>>, IL::Service: Service, Response = Response> + 'static + Send + Clone + Sync, >>::Error: Into, - for<'cx> >>::Future<'cx>: Send, OL: Layer< BoxCloneService< @@ -421,7 +417,6 @@ where OL::Service: Service, Response = Response> + 'static + Send + Clone + Sync, >>::Error: Send + Into, - for<'cx> >>::Future<'cx>: Send, T: 'static + Send, { /// Builds a new [`Client`]. @@ -509,17 +504,13 @@ macro_rules! impl_client { { type Response = S::Response; type Error = S::Error; - type Future<'cx> = impl Future> + 'cx; - fn call<'cx, 's>( + async fn call<'s, 'cx>( &'s $self, $cx: &'cx mut crate::context::ClientContext, $req: Req, - ) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { $e } + ) -> Result { + $e } } @@ -536,17 +527,13 @@ macro_rules! impl_client { { type Response = S::Response; type Error = S::Error; - type Future<'cx> = impl Future> + 'cx; - fn call<'cx>( + async fn call<'cx>( $self, $cx: &'cx mut crate::context::ClientContext, $req: Req, - ) -> Self::Future<'cx> - where - Self: 'cx, - { - async move { $e } + ) -> Result { + $e } } }; diff --git a/volo-grpc/src/layer/cross_origin.rs b/volo-grpc/src/layer/cross_origin.rs index 7dfe2627..304f8903 100644 --- a/volo-grpc/src/layer/cross_origin.rs +++ b/volo-grpc/src/layer/cross_origin.rs @@ -1,4 +1,3 @@ -use futures::Future; use http::{Request, Uri}; use motore::Service; @@ -18,20 +17,18 @@ impl AddOrigin { impl Service> for AddOrigin where - T: Service>, - ReqBody: 'static, + T: Service> + Send + Sync, + ReqBody: Send + 'static, + Cx: Send, { type Response = T::Response; type Error = T::Error; - type Future<'cx> = impl Future> + 'cx - where - Self: 'cx, - Cx: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Request) -> Self::Future<'cx> - where - 's: 'cx, - { + + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Request, + ) -> Result { // split the header and body let (mut head, body) = req.into_parts(); @@ -49,6 +46,6 @@ where let request = Request::from_parts(head, body); // call inner Service - self.inner.call(cx, request) + self.inner.call(cx, request).await } } diff --git a/volo-grpc/src/layer/grpc_timeout.rs b/volo-grpc/src/layer/grpc_timeout.rs index 6c41e47b..55ee019d 100644 --- a/volo-grpc/src/layer/grpc_timeout.rs +++ b/volo-grpc/src/layer/grpc_timeout.rs @@ -88,20 +88,18 @@ fn try_parse_client_timeout( impl Service> for GrpcTimeout where - S: Service, Error = Status>, - ReqBody: 'static, + Cx: Send, + S: Service, Error = Status> + Send + Sync, + ReqBody: 'static + Send, { type Response = S::Response; type Error = Status; - type Future<'cx> = ResponseFuture> + 'cx> - where - Self: 'cx, - Cx: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: hyper::Request) -> Self::Future<'cx> - where - 's: 'cx, - { + + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: hyper::Request, + ) -> Result { // parse the client_timeout let client_timeout = try_parse_client_timeout(req.headers()).unwrap_or_else(|_| { tracing::trace!("[VOLO] error parsing grpc-timeout header"); @@ -126,6 +124,7 @@ where inner: self.inner.call(cx, req), sleep: pined_sleep, } + .await } } diff --git a/volo-grpc/src/layer/loadbalance/mod.rs b/volo-grpc/src/layer/loadbalance/mod.rs index b2c7dab2..598ecb72 100644 --- a/volo-grpc/src/layer/loadbalance/mod.rs +++ b/volo-grpc/src/layer/loadbalance/mod.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, future::Future, sync::Arc}; +use std::{fmt::Debug, sync::Arc}; use motore::Service; use tracing::warn; @@ -79,7 +79,6 @@ where D: Discover, LB: LoadBalance, S: Service> + 'static + Send + Sync, - for<'cx> S::Future<'cx>: Send, LoadBalanceError: Into, S::Error: Debug, T: Send + 'static, @@ -88,49 +87,44 @@ where type Error = S::Error; - type Future<'cx> = impl Future> + Send + 'cx - where - Self: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Request) -> Self::Future<'cx> - where - 's: 'cx, - { + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Request, + ) -> Result { debug_assert!( cx.rpc_info().callee.is_some(), "must set callee endpoint before load balance service" ); - async move { - let callee = cx.rpc_info().callee().volo_unwrap(); - - let mut picker = match &callee.address { - None => self - .load_balance - .get_picker(callee, &self.discover) - .await - .map_err(|err| err.into())?, - _ => { - return self.service.call(cx, req).await.map_err(Into::into); - } - }; - - if let Some(addr) = picker.next() { - if let Some(callee) = cx.rpc_info_mut().callee_mut() { - callee.address = Some(addr.clone()) - } + let callee = cx.rpc_info().callee().volo_unwrap(); + + let mut picker = match &callee.address { + None => self + .load_balance + .get_picker(callee, &self.discover) + .await + .map_err(|err| err.into())?, + _ => { + return self.service.call(cx, req).await.map_err(Into::into); + } + }; - return match self.service.call(cx, req).await { - Ok(resp) => Ok(resp), - Err(err) => { - warn!("[VOLO] call endpoint: {:?} error: {:?}", addr, err); - Err(err) - } - }; - } else { - warn!("[VOLO] zero call count, call info: {:?}", cx.rpc_info()); + if let Some(addr) = picker.next() { + if let Some(callee) = cx.rpc_info_mut().callee_mut() { + callee.address = Some(addr.clone()) } - Err(LoadBalanceError::Retry).map_err(|err| err.into())? + + return match self.service.call(cx, req).await { + Ok(resp) => Ok(resp), + Err(err) => { + warn!("[VOLO] call endpoint: {:?} error: {:?}", addr, err); + Err(err) + } + }; + } else { + warn!("[VOLO] zero call count, call info: {:?}", cx.rpc_info()); } + Err(LoadBalanceError::Retry).map_err(|err| err.into())? } } diff --git a/volo-grpc/src/layer/user_agent.rs b/volo-grpc/src/layer/user_agent.rs index aecbe0a6..3e6bc796 100644 --- a/volo-grpc/src/layer/user_agent.rs +++ b/volo-grpc/src/layer/user_agent.rs @@ -1,4 +1,3 @@ -use futures::Future; use http::{header::USER_AGENT, HeaderValue, Request}; use motore::Service; @@ -29,24 +28,22 @@ impl UserAgent { impl Service> for UserAgent where - T: Service>, - ReqBody: 'static, + T: Service> + Send + Sync, + ReqBody: Send + 'static, + Cx: Send, { type Response = T::Response; type Error = T::Error; - type Future<'cx> = impl Future> + 'cx - where - Self: 'cx, - Cx: 'cx; - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, mut req: Request) -> Self::Future<'cx> - where - 's: 'cx, - { + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + mut req: Request, + ) -> Result { req.headers_mut() .insert(USER_AGENT, self.user_agent.clone()); - self.inner.call(cx, req) + self.inner.call(cx, req).await } } diff --git a/volo-grpc/src/lib.rs b/volo-grpc/src/lib.rs index 0290d133..1a353ecd 100644 --- a/volo-grpc/src/lib.rs +++ b/volo-grpc/src/lib.rs @@ -2,7 +2,6 @@ html_logo_url = "https://github.com/cloudwego/volo/raw/main/.github/assets/logo.png?sanitize=true" )] #![cfg_attr(not(doctest), doc = include_str!("../README.md"))] -#![feature(impl_trait_in_assoc_type)] pub mod body; pub mod client; diff --git a/volo-grpc/src/server/meta.rs b/volo-grpc/src/server/meta.rs index 78794450..0703489b 100644 --- a/volo-grpc/src/server/meta.rs +++ b/volo-grpc/src/server/meta.rs @@ -1,6 +1,5 @@ use std::{cell::RefCell, net::SocketAddr, str::FromStr, sync::Arc}; -use futures::Future; use metainfo::{Backward, Forward}; use volo::{ context::{Context, Endpoint}, @@ -51,112 +50,111 @@ where type Error = Status; - type Future<'cx> = impl Future> + Send + 'cx; - - fn call<'cx, 's>( + async fn call<'s, 'cx>( &'s self, cx: &'cx mut ServerContext, req: hyper::Request, - ) -> Self::Future<'cx> - where - 's: 'cx, - { + ) -> Result { let peer_addr = self.peer_addr.clone(); - metainfo::METAINFO.scope(RefCell::new(metainfo::MetaInfo::default()), async move { - cx.rpc_info.method = Some(FastStr::new(req.uri().path())); + metainfo::METAINFO + .scope(RefCell::new(metainfo::MetaInfo::default()), async move { + cx.rpc_info.method = Some(FastStr::new(req.uri().path())); - let mut volo_req = Request::from_http(req); + let mut volo_req = Request::from_http(req); - let metadata = volo_req.metadata_mut(); + let metadata = volo_req.metadata_mut(); - status_to_http!(metainfo::METAINFO.with(|metainfo| { - let mut metainfo = metainfo.borrow_mut(); + status_to_http!(metainfo::METAINFO.with(|metainfo| { + let mut metainfo = metainfo.borrow_mut(); - // caller - if let Some(source_service) = metadata.remove(SOURCE_SERVICE) { - let source_service = Arc::::from(source_service.to_str()?); - let mut caller = Endpoint::new(source_service.into()); - if let Some(ad) = metadata.remove(HEADER_TRANS_REMOTE_ADDR) { - let addr = ad.to_str()?.parse::(); - if let Ok(addr) = addr { - caller.set_address(volo::net::Address::from(addr)); + // caller + if let Some(source_service) = metadata.remove(SOURCE_SERVICE) { + let source_service = Arc::::from(source_service.to_str()?); + let mut caller = Endpoint::new(source_service.into()); + if let Some(ad) = metadata.remove(HEADER_TRANS_REMOTE_ADDR) { + let addr = ad.to_str()?.parse::(); + if let Ok(addr) = addr { + caller.set_address(volo::net::Address::from(addr)); + } } + if caller.address.is_none() { + caller.address = peer_addr; + } + cx.rpc_info_mut().caller = Some(caller); } - if caller.address.is_none() { - caller.address = peer_addr; + + // callee + if let Some(destination_service) = metadata.remove(DESTINATION_SERVICE) { + let destination_service = Arc::::from(destination_service.to_str()?); + let callee = Endpoint::new(destination_service.into()); + cx.rpc_info_mut().callee = Some(callee); } - cx.rpc_info_mut().caller = Some(caller); - } - - // callee - if let Some(destination_service) = metadata.remove(DESTINATION_SERVICE) { - let destination_service = Arc::::from(destination_service.to_str()?); - let callee = Endpoint::new(destination_service.into()); - cx.rpc_info_mut().callee = Some(callee); - } - - // persistent and transient - let mut vec = Vec::with_capacity(metadata.len()); - for key_and_value in metadata.iter() { - match key_and_value { - KeyAndValueRef::Ascii(k, v) => { - let k = k.as_str(); - let v = v.to_str()?; - if k.starts_with(metainfo::HTTP_PREFIX_PERSISTENT) { - vec.push(k.to_owned()); - metainfo.strip_http_prefix_and_set_persistent( - k.to_owned(), - v.to_owned(), - ); - } else if k.starts_with(metainfo::HTTP_PREFIX_TRANSIENT) { - vec.push(k.to_owned()); - metainfo - .strip_http_prefix_and_set_upstream(k.to_owned(), v.to_owned()); + + // persistent and transient + let mut vec = Vec::with_capacity(metadata.len()); + for key_and_value in metadata.iter() { + match key_and_value { + KeyAndValueRef::Ascii(k, v) => { + let k = k.as_str(); + let v = v.to_str()?; + if k.starts_with(metainfo::HTTP_PREFIX_PERSISTENT) { + vec.push(k.to_owned()); + metainfo.strip_http_prefix_and_set_persistent( + k.to_owned(), + v.to_owned(), + ); + } else if k.starts_with(metainfo::HTTP_PREFIX_TRANSIENT) { + vec.push(k.to_owned()); + metainfo.strip_http_prefix_and_set_upstream( + k.to_owned(), + v.to_owned(), + ); + } } + _ => unreachable!(), } - _ => unreachable!(), } - } - for k in vec { - metadata.remove(k); - } - - Ok::<(), Status>(()) - })); - - let volo_resp = match self.inner.call(cx, volo_req).await { - Ok(resp) => resp, - Err(err) => { - return Ok(err.into().to_http()); - } - }; - - let (mut metadata, extensions, message) = volo_resp.into_parts(); - - status_to_http!(metainfo::METAINFO.with(|metainfo| { - let metainfo = metainfo.borrow_mut(); - - // backward - if let Some(at) = metainfo.get_all_backward_transients() { - for (key, value) in at { - let key = metainfo::HTTP_PREFIX_BACKWARD.to_owned() + key; - metadata.insert(MetadataKey::from_str(key.as_str())?, value.parse()?); + for k in vec { + metadata.remove(k); + } + + Ok::<(), Status>(()) + })); + + let volo_resp = match self.inner.call(cx, volo_req).await { + Ok(resp) => resp, + Err(err) => { + return Ok(err.into().to_http()); + } + }; + + let (mut metadata, extensions, message) = volo_resp.into_parts(); + + status_to_http!(metainfo::METAINFO.with(|metainfo| { + let metainfo = metainfo.borrow_mut(); + + // backward + if let Some(at) = metainfo.get_all_backward_transients() { + for (key, value) in at { + let key = metainfo::HTTP_PREFIX_BACKWARD.to_owned() + key; + metadata.insert(MetadataKey::from_str(key.as_str())?, value.parse()?); + } } - } - Ok::<(), Status>(()) - })); + Ok::<(), Status>(()) + })); - let mut resp = hyper::Response::new(message); - *resp.headers_mut() = metadata.into_headers(); - *resp.extensions_mut() = extensions; - resp.headers_mut().insert( - http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("application/grpc"), - ); + let mut resp = hyper::Response::new(message); + *resp.headers_mut() = metadata.into_headers(); + *resp.extensions_mut() = extensions; + resp.headers_mut().insert( + http::header::CONTENT_TYPE, + http::header::HeaderValue::from_static("application/grpc"), + ); - Ok(resp) - }) + Ok(resp) + }) + .await } } diff --git a/volo-grpc/src/server/router.rs b/volo-grpc/src/server/router.rs index fb84041a..0eb0fa3b 100644 --- a/volo-grpc/src/server/router.rs +++ b/volo-grpc/src/server/router.rs @@ -3,7 +3,6 @@ use std::{ sync::atomic::{AtomicU32, Ordering}, }; -use futures::Future; use fxhash::FxHashMap; use http_body::Body as HttpBody; use motore::{BoxCloneService, Service}; @@ -93,24 +92,20 @@ where { type Response = Response; type Error = Status; - type Future<'cx> = impl Future> + 'cx - where - Self: 'cx; - fn call<'cx, 's>(&'s self, cx: &'cx mut ServerContext, req: Request) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - let path = cx.rpc_info.method.as_ref().unwrap(); - match self.node.at(path) { - Ok(match_) => { - let id = match_.value; - let route = self.routes.get(id).volo_unwrap().clone(); - route.call(cx, req).await - } - Err(err) => Err(Status::unimplemented(err.to_string())), + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut ServerContext, + req: Request, + ) -> Result { + let path = cx.rpc_info.method.as_ref().unwrap(); + match self.node.at(path) { + Ok(match_) => { + let id = match_.value; + let route = self.routes.get(id).volo_unwrap().clone(); + route.call(cx, req).await } + Err(err) => Err(Status::unimplemented(err.to_string())), } } } diff --git a/volo-grpc/src/server/service.rs b/volo-grpc/src/server/service.rs index 8d3dd212..91481538 100644 --- a/volo-grpc/src/server/service.rs +++ b/volo-grpc/src/server/service.rs @@ -1,6 +1,5 @@ use std::marker::PhantomData; -use futures::Future; use motore::{ layer::{Identity, Layer, Stack}, service::Service, @@ -126,52 +125,44 @@ where { type Response = Response; type Error = Status; - type Future<'cx> = impl Future> + Send + 'cx - where - Self: 'cx; - fn call<'cx, 's>( + async fn call<'s, 'cx>( &'s self, cx: &'cx mut ServerContext, req: Request, - ) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - let (metadata, extensions, body) = req.into_parts(); - let send_compression = CompressionEncoding::from_accept_encoding_header( - metadata.headers(), - &self.rpc_config.send_compressions, + ) -> Result { + let (metadata, extensions, body) = req.into_parts(); + let send_compression = CompressionEncoding::from_accept_encoding_header( + metadata.headers(), + &self.rpc_config.send_compressions, + ); + + let recv_compression = CompressionEncoding::from_encoding_header( + metadata.headers(), + &self.rpc_config.accept_compressions, + )?; + + let message = T::from_body( + cx.rpc_info.method.as_deref(), + body, + Kind::Request, + recv_compression, + )?; + + let volo_req = Request::from_parts(metadata, extensions, message); + + let volo_resp = self.inner.call(cx, volo_req).await.map_err(Into::into)?; + + let mut resp = volo_resp.map(|message| Body::new(message.into_body(send_compression))); + + if let Some(encoding) = send_compression { + resp.metadata_mut().insert( + ENCODING_HEADER, + MetadataValue::unchecked_from_header_value(encoding.into_header_value()), ); + }; - let recv_compression = CompressionEncoding::from_encoding_header( - metadata.headers(), - &self.rpc_config.accept_compressions, - )?; - - let message = T::from_body( - cx.rpc_info.method.as_deref(), - body, - Kind::Request, - recv_compression, - )?; - - let volo_req = Request::from_parts(metadata, extensions, message); - - let volo_resp = self.inner.call(cx, volo_req).await.map_err(Into::into)?; - - let mut resp = volo_resp.map(|message| Body::new(message.into_body(send_compression))); - - if let Some(encoding) = send_compression { - resp.metadata_mut().insert( - ENCODING_HEADER, - MetadataValue::unchecked_from_header_value(encoding.into_header_value()), - ); - }; - - Ok(resp) - } + Ok(resp) } } diff --git a/volo-grpc/src/transport/client.rs b/volo-grpc/src/transport/client.rs index 089ae4d9..9fc51ee9 100644 --- a/volo-grpc/src/transport/client.rs +++ b/volo-grpc/src/transport/client.rs @@ -1,6 +1,5 @@ use std::{io, marker::PhantomData}; -use futures::Future; use http::{ header::{CONTENT_TYPE, TE}, HeaderValue, @@ -76,103 +75,94 @@ where type Error = Status; - type Future<'cx> = impl Future> + 'cx; - - fn call<'cx, 's>( + async fn call<'s, 'cx>( &'s self, cx: &'cx mut ClientContext, volo_req: Request, - ) -> Self::Future<'cx> - where - 's: 'cx, - { + ) -> Result { let mut http_client = self.http_client.clone(); - async move { - // SAFETY: parameters controlled by volo-grpc are guaranteed to be valid. - // get the call address from the context - let target = cx - .rpc_info - .callee() - .volo_unwrap() - .address() - .ok_or_else(|| { - io::Error::new(std::io::ErrorKind::InvalidData, "address is required") - })?; - - let (metadata, extensions, message) = volo_req.into_parts(); - let path = cx.rpc_info.method().volo_unwrap(); - let rpc_config = cx.rpc_info.config().volo_unwrap(); - let accept_compressions = &rpc_config.accept_compressions; - - // select the compression algorithm with the highest priority by user's config - let send_compression = rpc_config - .send_compressions - .as_ref() - .map(|config| config[0]); - - let body = hyper::Body::wrap_stream(message.into_body(send_compression)); - - let mut req = hyper::Request::new(body); - *req.version_mut() = http::Version::HTTP_2; - *req.method_mut() = http::Method::POST; - *req.uri_mut() = build_uri(target, path); - *req.headers_mut() = metadata.into_headers(); - *req.extensions_mut() = extensions; - req.headers_mut() - .insert(TE, HeaderValue::from_static("trailers")); + // SAFETY: parameters controlled by volo-grpc are guaranteed to be valid. + // get the call address from the context + let target = cx + .rpc_info + .callee() + .volo_unwrap() + .address() + .ok_or_else(|| { + io::Error::new(std::io::ErrorKind::InvalidData, "address is required") + })?; + + let (metadata, extensions, message) = volo_req.into_parts(); + let path = cx.rpc_info.method().volo_unwrap(); + let rpc_config = cx.rpc_info.config().volo_unwrap(); + let accept_compressions = &rpc_config.accept_compressions; + + // select the compression algorithm with the highest priority by user's config + let send_compression = rpc_config + .send_compressions + .as_ref() + .map(|config| config[0]); + + let body = hyper::Body::wrap_stream(message.into_body(send_compression)); + + let mut req = hyper::Request::new(body); + *req.version_mut() = http::Version::HTTP_2; + *req.method_mut() = http::Method::POST; + *req.uri_mut() = build_uri(target, path); + *req.headers_mut() = metadata.into_headers(); + *req.extensions_mut() = extensions; + req.headers_mut() + .insert(TE, HeaderValue::from_static("trailers")); + req.headers_mut() + .insert(CONTENT_TYPE, HeaderValue::from_static("application/grpc")); + + // insert compression headers + if let Some(send_compression) = send_compression { req.headers_mut() - .insert(CONTENT_TYPE, HeaderValue::from_static("application/grpc")); - - // insert compression headers - if let Some(send_compression) = send_compression { - req.headers_mut() - .insert(ENCODING_HEADER, send_compression.into_header_value()); - } - if let Some(accept_compressions) = accept_compressions { - if !accept_compressions.is_empty() { - if let Some(header_value) = accept_compressions[0] - .into_accept_encoding_header_value(accept_compressions) - { - req.headers_mut() - .insert(ACCEPT_ENCODING_HEADER, header_value); - } + .insert(ENCODING_HEADER, send_compression.into_header_value()); + } + if let Some(accept_compressions) = accept_compressions { + if !accept_compressions.is_empty() { + if let Some(header_value) = + accept_compressions[0].into_accept_encoding_header_value(accept_compressions) + { + req.headers_mut() + .insert(ACCEPT_ENCODING_HEADER, header_value); } } + } - // call the service through hyper client - let resp = http_client - .ready() - .await - .map_err(|err| Status::from_error(err.into()))? - .call(req) - .await - .map_err(|err| Status::from_error(err.into()))?; - - let status_code = resp.status(); - let headers = resp.headers(); - - if let Some(status) = Status::from_header_map(headers) { - if status.code() != Code::Ok { - return Err(status); - } + // call the service through hyper client + let resp = http_client + .ready() + .await + .map_err(|err| Status::from_error(err.into()))? + .call(req) + .await + .map_err(|err| Status::from_error(err.into()))?; + + let status_code = resp.status(); + let headers = resp.headers(); + + if let Some(status) = Status::from_header_map(headers) { + if status.code() != Code::Ok { + return Err(status); } - - let accept_compression = CompressionEncoding::from_encoding_header( - headers, - &rpc_config.accept_compressions, - )?; - - let (parts, body) = resp.into_parts(); - - let body = U::from_body( - Some(path), - body, - Kind::Response(status_code), - accept_compression, - )?; - let resp = hyper::Response::from_parts(parts, body); - Ok(Response::from_http(resp)) } + + let accept_compression = + CompressionEncoding::from_encoding_header(headers, &rpc_config.accept_compressions)?; + + let (parts, body) = resp.into_parts(); + + let body = U::from_body( + Some(path), + body, + Kind::Response(status_code), + accept_compression, + )?; + let resp = hyper::Response::from_parts(parts, body); + Ok(Response::from_http(resp)) } } diff --git a/volo-macros/Cargo.toml b/volo-macros/Cargo.toml index 2fafc2a1..5fe89b41 100644 --- a/volo-macros/Cargo.toml +++ b/volo-macros/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volo-macros" -version = "0.3.0" +version = "0.8.0" edition.workspace = true homepage.workspace = true repository.workspace = true diff --git a/volo-thrift/Cargo.toml b/volo-thrift/Cargo.toml index c29a9ed3..c73f006e 100644 --- a/volo-thrift/Cargo.toml +++ b/volo-thrift/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volo-thrift" -version = "0.7.3" +version = "0.8.0" edition.workspace = true homepage.workspace = true repository.workspace = true @@ -18,7 +18,7 @@ keywords = ["async", "rpc", "thrift"] maintenance = { status = "actively-developed" } [dependencies] -volo = { version = "0.5", path = "../volo" } +volo = { version = "0.8", path = "../volo" } pilota.workspace = true motore.workspace = true metainfo.workspace = true @@ -47,6 +47,7 @@ tokio = { workspace = true, features = [ tracing.workspace = true [features] +default = [] # multiplex is unstable and we don't provide backward compatibility multiplex = [] # unsafe-codec can achieve better performance for thrift binary protocol, but may cause undefined behavior diff --git a/volo-thrift/src/client/layer/timeout.rs b/volo-thrift/src/client/layer/timeout.rs index 30739cdd..2377506c 100644 --- a/volo-thrift/src/client/layer/timeout.rs +++ b/volo-thrift/src/client/layer/timeout.rs @@ -1,8 +1,6 @@ //! Applies a timeout to request //! if the inner service's call does not complete within specified timeout, the response will be //! aborted. - -use futures::Future; use motore::{layer::Layer, service::Service}; use tracing::warn; @@ -23,39 +21,36 @@ where type Error = crate::Error; - type Future<'cx> = impl Future> + 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut ClientContext, req: Req) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - if let Some(config) = cx.rpc_info.config() { - match config.rpc_timeout() { - Some(duration) => { - let start = std::time::Instant::now(); - match tokio::time::timeout(duration, self.inner.call(cx, req)).await { - Ok(r) => r.map_err(Into::into), - Err(_) => { - let msg = format!( - "[VOLO] thrift rpc call timeout, rpcinfo: {:?}, elpased: \ - {:?}, timeout config: {:?}", - cx.rpc_info, - start.elapsed(), - duration - ); - warn!(msg); - Err(crate::Error::Transport( - std::io::Error::new(std::io::ErrorKind::TimedOut, msg).into(), - )) - } + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut ClientContext, + req: Req, + ) -> Result { + if let Some(config) = cx.rpc_info.config() { + match config.rpc_timeout() { + Some(duration) => { + let start = std::time::Instant::now(); + match tokio::time::timeout(duration, self.inner.call(cx, req)).await { + Ok(r) => r.map_err(Into::into), + Err(_) => { + let msg = format!( + "[VOLO] thrift rpc call timeout, rpcinfo: {:?}, elpased: {:?}, \ + timeout config: {:?}", + cx.rpc_info, + start.elapsed(), + duration + ); + warn!(msg); + Err(crate::Error::Transport( + std::io::Error::new(std::io::ErrorKind::TimedOut, msg).into(), + )) } } - None => self.inner.call(cx, req).await.map_err(Into::into), } - } else { - unreachable!("rpc_info.config should never be None") + None => self.inner.call(cx, req).await.map_err(Into::into), } + } else { + unreachable!("rpc_info.config should never be None") } } } diff --git a/volo-thrift/src/client/mod.rs b/volo-thrift/src/client/mod.rs index 02e1f4ed..fb2704b1 100644 --- a/volo-thrift/src/client/mod.rs +++ b/volo-thrift/src/client/mod.rs @@ -11,7 +11,6 @@ use std::{ sync::{atomic::AtomicI32, Arc}, }; -use futures::Future; use motore::{ layer::{Identity, Layer, Stack}, service::{BoxCloneService, Service}, @@ -474,21 +473,18 @@ where type Error = Error; - type Future<'cx> = impl Future> + 'cx + Send where Self:'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut ClientContext, req: Req) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - let msg = ThriftMessage::mk_client_msg(cx, Ok(req))?; - let resp = self.inner.call(cx, msg).await; - match resp { - Ok(Some(ThriftMessage { data: Ok(data), .. })) => Ok(Some(data)), - Ok(Some(ThriftMessage { data: Err(e), .. })) => Err(e), - Err(e) => Err(e), - Ok(None) => Ok(None), - } + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut ClientContext, + req: Req, + ) -> Result { + let msg = ThriftMessage::mk_client_msg(cx, Ok(req))?; + let resp = self.inner.call(cx, msg).await; + match resp { + Ok(Some(ThriftMessage { data: Ok(data), .. })) => Ok(Some(data)), + Ok(Some(ThriftMessage { data: Err(e), .. })) => Err(e), + Err(e) => Err(e), + Ok(None) => Ok(None), } } } @@ -512,21 +508,17 @@ where + Send + Clone + Sync, - for<'cx> <>::Service as Service>::Future<'cx>: - Send, Req: EntryMessage + Send + 'static + Sync + Clone, Resp: EntryMessage + Send + 'static, IL: Layer>, IL::Service: Service> + Sync + Clone + Send + 'static, >::Error: Send + Into, - for<'cx> >::Future<'cx>: Send, MkT: MakeTransport, MkC: MakeCodec + Sync, OL: Layer, Error>>, OL::Service: Service> + 'static + Send + Clone + Sync, - for<'cx> >::Future<'cx>: Send, >::Error: Send + Sync + Into, { /// Build volo client. @@ -692,17 +684,13 @@ macro_rules! impl_client { { type Response = S::Response; type Error = S::Error; - type Future<'cx> = impl Future> + 'cx; - fn call<'cx, 's>( + async fn call<'s, 'cx>( &'s $self, $cx: &'cx mut crate::context::ClientContext, $req: Req, - ) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { $e } + ) -> Result { + $e } } @@ -720,17 +708,13 @@ macro_rules! impl_client { { type Response = S::Response; type Error = S::Error; - type Future<'cx> = impl Future> + 'cx; - fn call<'cx>( + async fn call<'cx>( $self, $cx: &'cx mut crate::context::ClientContext, $req: Req, - ) -> Self::Future<'cx> - where - Self: 'cx, - { - async move { $e } + ) -> Result { + $e } } }; diff --git a/volo-thrift/src/lib.rs b/volo-thrift/src/lib.rs index 33da6c56..415a65f9 100644 --- a/volo-thrift/src/lib.rs +++ b/volo-thrift/src/lib.rs @@ -1,4 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] #![doc( html_logo_url = "https://github.com/cloudwego/volo/raw/main/.github/assets/logo.png?sanitize=true" )] diff --git a/volo-thrift/src/server.rs b/volo-thrift/src/server.rs index 42b94500..44a5d1d7 100644 --- a/volo-thrift/src/server.rs +++ b/volo-thrift/src/server.rs @@ -176,7 +176,6 @@ impl Server { MkC: MakeCodec, L::Service: Service + Send + 'static + Sync, >::Error: Into + Send, - for<'cx> >::Future<'cx>: Send, S: Service + Send + 'static, S::Error: Into + Send, Req: EntryMessage + Send + 'static, diff --git a/volo-thrift/src/transport/multiplex/client.rs b/volo-thrift/src/transport/multiplex/client.rs index e1068df9..d6acd514 100644 --- a/volo-thrift/src/transport/multiplex/client.rs +++ b/volo-thrift/src/transport/multiplex/client.rs @@ -1,6 +1,5 @@ use std::{io, marker::PhantomData}; -use futures::Future; use motore::service::{Service, UnaryService}; use pilota::thrift::TransportErrorKind; use volo::{ @@ -64,19 +63,16 @@ where { type Response = ThriftTransport; type Error = io::Error; - type Future<'s> = impl Future> + 's; - fn call(&self, target: Address) -> Self::Future<'_> { + async fn call(&self, target: Address) -> Result { let make_transport = self.make_transport.clone(); - async move { - let (rh, wh) = make_transport.make_transport(target.clone()).await?; - Ok(ThriftTransport::new( - rh, - wh, - self.make_codec.clone(), - target, - )) - } + let (rh, wh) = make_transport.make_transport(target.clone()).await?; + Ok(ThriftTransport::new( + rh, + wh, + self.make_codec.clone(), + target, + )) } } @@ -132,39 +128,32 @@ where type Error = crate::Error; - type Future<'cx> = impl Future> + Send + 'cx where Self:'cx; - - fn call<'cx, 's>( + async fn call<'cx, 's>( &'s self, cx: &'cx mut ClientContext, req: ThriftMessage, - ) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - let rpc_info = &cx.rpc_info; - let target = rpc_info.callee().volo_unwrap().address().ok_or_else(|| { - let msg = format!("address is required, rpcinfo: {:?}", rpc_info); - crate::Error::Transport(io::Error::new(io::ErrorKind::InvalidData, msg).into()) - })?; - let oneway = cx.message_type == TMessageType::OneWay; - cx.stats.record_make_transport_start_at(); - let transport = self.make_transport.call(target).await?; - cx.stats.record_make_transport_end_at(); - let resp = transport.send(cx, req, oneway).await; - if let Ok(None) = resp { - if !oneway { - return Err(Error::Transport(pilota::thrift::TransportError::new( - TransportErrorKind::EndOfFile, - format!("an unexpected end of file from server, cx: {:?}", cx), - ))); - } + ) -> Result { + let rpc_info = &cx.rpc_info; + let target = rpc_info.callee().volo_unwrap().address().ok_or_else(|| { + let msg = format!("address is required, rpcinfo: {:?}", rpc_info); + crate::Error::Transport(io::Error::new(io::ErrorKind::InvalidData, msg).into()) + })?; + let oneway = cx.message_type == TMessageType::OneWay; + cx.stats.record_make_transport_start_at(); + let transport = self.make_transport.call(target).await?; + cx.stats.record_make_transport_end_at(); + let resp = transport.send(cx, req, oneway).await; + if let Ok(None) = resp { + if !oneway { + return Err(Error::Transport(pilota::thrift::TransportError::new( + TransportErrorKind::EndOfFile, + format!("an unexpected end of file from server, cx: {:?}", cx), + ))); } - if cx.transport.should_reuse && resp.is_ok() { - transport.reuse(); - } - resp } + if cx.transport.should_reuse && resp.is_ok() { + transport.reuse(); + } + resp } } diff --git a/volo-thrift/src/transport/pingpong/client.rs b/volo-thrift/src/transport/pingpong/client.rs index 3aae4fff..498c5dad 100644 --- a/volo-thrift/src/transport/pingpong/client.rs +++ b/volo-thrift/src/transport/pingpong/client.rs @@ -1,6 +1,5 @@ use std::{io, marker::PhantomData}; -use futures::Future; use motore::service::{Service, UnaryService}; use pilota::thrift::{TransportError, TransportErrorKind}; use volo::{ @@ -51,15 +50,12 @@ where { type Response = ThriftTransport; type Error = io::Error; - type Future<'s> = impl Future> + 's; #[inline] - fn call(&self, target: Address) -> Self::Future<'_> { + async fn call(&self, target: Address) -> Result { let make_transport = self.make_transport.clone(); - async move { - let (rh, wh) = make_transport.make_transport(target).await?; - Ok(ThriftTransport::new(rh, wh, self.make_codec.clone())) - } + let (rh, wh) = make_transport.make_transport(target).await?; + Ok(ThriftTransport::new(rh, wh, self.make_codec.clone())) } } @@ -112,47 +108,40 @@ where type Error = crate::Error; - type Future<'cx> = impl Future> + Send + 'cx where Self:'cx; - #[inline] - fn call<'cx, 's>( + async fn call<'s, 'cx>( &'s self, cx: &'cx mut ClientContext, req: ThriftMessage, - ) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - let rpc_info = &cx.rpc_info; - let target = rpc_info.callee().volo_unwrap().address().ok_or_else(|| { - TransportError::from(io::Error::new( - io::ErrorKind::InvalidData, - format!("address is required, rpc_info: {:?}", rpc_info), - )) - })?; - let oneway = cx.message_type == TMessageType::OneWay; - cx.stats.record_make_transport_start_at(); - let mut transport = self.make_transport.call(target).await?; - cx.stats.record_make_transport_end_at(); - let resp = transport.send(cx, req, oneway).await; - if let Ok(None) = resp { - if !oneway { - return Err(crate::Error::Transport( - pilota::thrift::TransportError::new( - TransportErrorKind::EndOfFile, - format!( - "an unexpected end of file from server, rpc_info: {:?}", - cx.rpc_info - ), + ) -> Result { + let rpc_info = &cx.rpc_info; + let target = rpc_info.callee().volo_unwrap().address().ok_or_else(|| { + TransportError::from(io::Error::new( + io::ErrorKind::InvalidData, + format!("address is required, rpc_info: {:?}", rpc_info), + )) + })?; + let oneway = cx.message_type == TMessageType::OneWay; + cx.stats.record_make_transport_start_at(); + let mut transport = self.make_transport.call(target).await?; + cx.stats.record_make_transport_end_at(); + let resp = transport.send(cx, req, oneway).await; + if let Ok(None) = resp { + if !oneway { + return Err(crate::Error::Transport( + pilota::thrift::TransportError::new( + TransportErrorKind::EndOfFile, + format!( + "an unexpected end of file from server, rpc_info: {:?}", + cx.rpc_info ), - )); - } + ), + )); } - if cx.transport.should_reuse && resp.is_ok() { - transport.reuse(); - } - resp } + if cx.transport.should_reuse && resp.is_ok() { + transport.reuse(); + } + resp } } diff --git a/volo-thrift/src/transport/pool/make_transport.rs b/volo-thrift/src/transport/pool/make_transport.rs index d460d445..15c27adf 100644 --- a/volo-thrift/src/transport/pool/make_transport.rs +++ b/volo-thrift/src/transport/pool/make_transport.rs @@ -2,7 +2,6 @@ use std::{fmt::Debug, hash::Hash}; -use futures::Future; use motore::{service::UnaryService, BoxError}; use super::{Pool, Poolable, Pooled}; @@ -55,10 +54,8 @@ where type Error = BoxError; - type Future<'cx> = impl Future> + 'cx; - - fn call(&self, key: Key) -> Self::Future<'_> { + async fn call(&self, key: Key) -> Result { let mt = self.inner.clone(); - async move { self.pool.get(key, mt).await } + self.pool.get(key, mt).await } } diff --git a/volo/Cargo.toml b/volo/Cargo.toml index c33304fe..e1b39b04 100644 --- a/volo/Cargo.toml +++ b/volo/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volo" -version = "0.5.5" +version = "0.8.0" edition.workspace = true homepage.workspace = true repository.workspace = true diff --git a/volo/src/client.rs b/volo/src/client.rs index de4c1223..14498406 100644 --- a/volo/src/client.rs +++ b/volo/src/client.rs @@ -26,16 +26,12 @@ pub trait OneShotService { /// Errors produced by the service. type Error; - /// The future response value. - type Future<'cx>: Future> + Send + 'cx - where - Cx: 'cx, - Self: 'cx; - /// Process the request and return the response asynchronously. - fn call<'cx>(self, cx: &'cx mut Cx, req: Request) -> Self::Future<'cx> - where - Self: 'cx; + fn call<'cx>( + self, + cx: &'cx mut Cx, + req: Request, + ) -> impl Future> + Send; } impl OneShotService for WithOptService @@ -44,22 +40,17 @@ where Opt: 'static + Send + Sync + Apply, Req: 'static + Send, S: Service + 'static + Sync + Send, - for<'cx> S::Future<'cx>: Send, { type Response = S::Response; type Error = S::Error; - type Future<'cx> = impl Future> + Send + 'cx - where - Cx: 'cx, - Self: 'cx; - #[inline] - fn call<'cx>(self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx> - where - Self: 'cx, - { + fn call<'cx>( + self, + cx: &'cx mut Cx, + req: Req, + ) -> impl Future> + Send { async move { self.opt.apply(cx)?; self.inner.call(cx, req).await diff --git a/volo/src/discovery/mod.rs b/volo/src/discovery/mod.rs index 0f4a2c11..d2e05371 100644 --- a/volo/src/discovery/mod.rs +++ b/volo/src/discovery/mod.rs @@ -31,11 +31,12 @@ pub trait Discover: Send + Sync + 'static { type Key: Hash + PartialEq + Eq + Send + Sync + Clone + 'static; /// `Error` is the discovery error. type Error: Into; - /// `DiscFut` is a Future object which returns a discovery result. - type DiscFut<'future>: Future>, Self::Error>> + Send + 'future; /// `discover` allows to request an endpoint and return a discover future. - fn discover<'s>(&'s self, endpoint: &'s Endpoint) -> Self::DiscFut<'s>; + fn discover<'s>( + &'s self, + endpoint: &'s Endpoint, + ) -> impl Future>, Self::Error>> + Send; /// `key` should return a key suitable for cache. fn key(&self, endpoint: &Endpoint) -> Self::Key; /// `watch` should return a [`async_broadcast::Receiver`] which can be used to subscribe @@ -150,10 +151,9 @@ impl From> for StaticDiscover { impl Discover for StaticDiscover { type Key = (); type Error = Infallible; - type DiscFut<'a> = impl Future>, Self::Error>> + 'a; - fn discover(&self, _: &Endpoint) -> Self::DiscFut<'_> { - async { Ok(self.instances.clone()) } + async fn discover<'s>(&'s self, _: &'s Endpoint) -> Result>, Self::Error> { + Ok(self.instances.clone()) } fn key(&self, _: &Endpoint) -> Self::Key {} @@ -172,10 +172,9 @@ pub struct DummyDiscover; impl Discover for DummyDiscover { type Key = (); type Error = Infallible; - type DiscFut<'a> = impl Future>, Self::Error>> + 'a; - fn discover(&self, _: &Endpoint) -> Self::DiscFut<'_> { - async { Ok(vec![]) } + async fn discover<'s>(&'s self, _: &'s Endpoint) -> Result>, Self::Error> { + Ok(vec![]) } fn key(&self, _: &Endpoint) {} diff --git a/volo/src/lib.rs b/volo/src/lib.rs index 920fce1a..70f1b467 100644 --- a/volo/src/lib.rs +++ b/volo/src/lib.rs @@ -1,4 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] #![doc( html_logo_url = "https://github.com/cloudwego/volo/raw/main/.github/assets/logo.png?sanitize=true" )] diff --git a/volo/src/loadbalance/consistent_hash.rs b/volo/src/loadbalance/consistent_hash.rs index 43c6e3b1..4fb80bf4 100644 --- a/volo/src/loadbalance/consistent_hash.rs +++ b/volo/src/loadbalance/consistent_hash.rs @@ -1,4 +1,4 @@ -use std::{cmp::min, collections::HashSet, future::Future, hash::Hash, sync::Arc}; +use std::{cmp::min, collections::HashSet, hash::Hash, sync::Arc}; use dashmap::{mapref::entry::Entry, DashMap}; @@ -249,51 +249,40 @@ where D: Discover, { type InstanceIter = InstancePicker; - - type GetFut<'future> = - impl Future> + Send + 'future - where - Self: 'future; - - fn get_picker<'future>( + async fn get_picker<'future>( &'future self, endpoint: &'future Endpoint, discover: &'future D, - ) -> Self::GetFut<'future> - where - Self: 'future, - { - async move { - let request_hash = metainfo::METAINFO - .try_with(|m| m.borrow().get::().copied()) - .map_err(|_| LoadBalanceError::MissRequestHash)?; - if request_hash.is_none() { - return Err(LoadBalanceError::MissRequestHash); - } - let request_hash = request_hash.unwrap(); - let key = discover.key(endpoint); - let weighted_list = match self.router.entry(key) { - Entry::Occupied(e) => e.get().clone(), - Entry::Vacant(e) => { - let instances = Arc::new( - self.build_weighted_instances( - discover - .discover(endpoint) - .await - .map_err(|err| err.into())?, - ), - ); - e.insert(instances).value().clone() - } - }; - Ok(InstancePicker { - shared_instances: weighted_list, - request_hash, - last_pick: None, - used: HashSet::new(), - replicas: self.option.replicas, - }) + ) -> Result { + let request_hash = metainfo::METAINFO + .try_with(|m| m.borrow().get::().copied()) + .map_err(|_| LoadBalanceError::MissRequestHash)?; + if request_hash.is_none() { + return Err(LoadBalanceError::MissRequestHash); } + let request_hash = request_hash.unwrap(); + let key = discover.key(endpoint); + let weighted_list = match self.router.entry(key) { + Entry::Occupied(e) => e.get().clone(), + Entry::Vacant(e) => { + let instances = Arc::new( + self.build_weighted_instances( + discover + .discover(endpoint) + .await + .map_err(|err| err.into())?, + ), + ); + e.insert(instances).value().clone() + } + }; + Ok(InstancePicker { + shared_instances: weighted_list, + request_hash, + last_pick: None, + used: HashSet::new(), + replicas: self.option.replicas, + }) } fn rebalance(&self, changes: Change<::Key>) { diff --git a/volo/src/loadbalance/layer.rs b/volo/src/loadbalance/layer.rs index a6b22f10..2c5a7e1d 100644 --- a/volo/src/loadbalance/layer.rs +++ b/volo/src/loadbalance/layer.rs @@ -52,20 +52,16 @@ where LoadBalanceError: Into, S::Error: Debug + Retryable, Req: Clone + Send + Sync + 'static, - for<'cx> S::Future<'cx>: Send, { type Response = S::Response; type Error = S::Error; - type Future<'cx> = impl Future> + Send + 'cx - where - Self: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx> - where - 's: 'cx, - { + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> impl Future> + Send { debug_assert!( cx.rpc_info().callee.is_some(), "must set callee endpoint before load balance service" diff --git a/volo/src/loadbalance/mod.rs b/volo/src/loadbalance/mod.rs index 84b8ab72..cc0b55d5 100644 --- a/volo/src/loadbalance/mod.rs +++ b/volo/src/loadbalance/mod.rs @@ -23,22 +23,13 @@ where /// `InstanceIter` is an iterator of [`crate::discovery::Instance`]. type InstanceIter: Iterator + Send; - /// `GetFut` is the return type of `get_picker`. - type GetFut<'future>: Future> - + Send - + 'future - where - Self: 'future; - /// `get_picker` allows to get an instance iterator of a specified endpoint from self or /// service discovery. fn get_picker<'future>( &'future self, endpoint: &'future Endpoint, discover: &'future D, - ) -> Self::GetFut<'future> - where - Self: 'future; + ) -> impl Future> + Send; /// `rebalance` is the callback method be used in service discovering subscription. fn rebalance(&self, changes: Change); } diff --git a/volo/src/loadbalance/random.rs b/volo/src/loadbalance/random.rs index 3d3dcced..13ca735e 100644 --- a/volo/src/loadbalance/random.rs +++ b/volo/src/loadbalance/random.rs @@ -1,5 +1,5 @@ use core::cell::OnceCell; -use std::{future::Future, hash::Hash, sync::Arc}; +use std::{hash::Hash, sync::Arc}; use dashmap::{mapref::entry::Entry, DashMap}; use rand::Rng; @@ -117,41 +117,31 @@ where { type InstanceIter = InstancePicker; - type GetFut<'future> = - impl Future> + Send + 'future - where - Self: 'future; - - fn get_picker<'future>( + async fn get_picker<'future>( &'future self, endpoint: &'future Endpoint, discover: &'future D, - ) -> Self::GetFut<'future> - where - Self: 'future, - { - async { - let key = discover.key(endpoint); - let weighted_list = match self.router.entry(key) { - Entry::Occupied(e) => e.get().clone(), - Entry::Vacant(e) => { - let instances = Arc::new(WeightedInstances::from( - discover - .discover(endpoint) - .await - .map_err(|err| err.into())?, - )); - e.insert(instances).value().clone() - } - }; - let sum_of_weights = weighted_list.sum_of_weights; - Ok(InstancePicker { - owned_instances: OnceCell::new(), - last_pick: None, - shared_instances: weighted_list, - sum_of_weights, - }) - } + ) -> Result { + let key = discover.key(endpoint); + let weighted_list = match self.router.entry(key) { + Entry::Occupied(e) => e.get().clone(), + Entry::Vacant(e) => { + let instances = Arc::new(WeightedInstances::from( + discover + .discover(endpoint) + .await + .map_err(|err| err.into())?, + )); + e.insert(instances).value().clone() + } + }; + let sum_of_weights = weighted_list.sum_of_weights; + Ok(InstancePicker { + owned_instances: OnceCell::new(), + last_pick: None, + shared_instances: weighted_list, + sum_of_weights, + }) } fn rebalance(&self, changes: Change) {