From 94c53e715756338baefe28714c7a4c1673b8ad16 Mon Sep 17 00:00:00 2001 From: Will Nelson Date: Tue, 24 May 2022 09:14:07 -0700 Subject: [PATCH] Refactor Redis broker --- Cargo.lock | 871 +++++++++++++++++------------------ brokers/Cargo.toml | 8 +- brokers/src/error.rs | 10 +- brokers/src/redis.rs | 421 +++++++++-------- brokers/src/redis/message.rs | 45 +- brokers/src/redis/pubsub.rs | 95 ---- brokers/src/redis/rpc.rs | 29 +- brokers/src/util/stream.rs | 8 +- 8 files changed, 693 insertions(+), 794 deletions(-) delete mode 100644 brokers/src/redis/pubsub.rs diff --git a/Cargo.lock b/Cargo.lock index b1d6405..4a51f0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -21,7 +21,7 @@ dependencies = [ "amq-protocol-types", "amq-protocol-uri", "cookie-factory", - "nom 7.1.0", + "nom", ] [[package]] @@ -42,7 +42,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "028cb766932137535fe8a320245e385bac379506d7e9e3d0375be069bb6fb0de" dependencies = [ "cookie-factory", - "nom 7.1.0", + "nom", "serde", "serde_json", ] @@ -57,12 +57,6 @@ dependencies = [ "url", ] -[[package]] -name = "arrayvec" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" - [[package]] name = "async-channel" version = "1.6.1" @@ -76,9 +70,9 @@ dependencies = [ [[package]] name = "async-lock" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6a8ea61bf9947a1007c5cada31e647dbc77b103c679858150003ba697ea798b" +checksum = "e97a171d191782fba31bb902b14ad94e24a68145032b7eedf871ab0bc0d077b6" dependencies = [ "event-listener", ] @@ -97,56 +91,35 @@ dependencies = [ [[package]] name = "async-std" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8056f1455169ab86dd47b47391e4ab0cbd25410a70e9fe675544f49bafaf952" +checksum = "52580991739c5cdb36cde8b2a516371c0a3b70dda36d916cc08b82372916808c" dependencies = [ "async-channel", "async-lock", - "crossbeam-utils 0.8.5", + "crossbeam-utils 0.8.8", "futures-channel", "futures-core", "futures-io", "memchr", "once_cell", - "pin-project-lite 0.2.7", + "pin-project-lite 0.2.9", "pin-utils", "slab", "wasm-bindgen-futures", ] -[[package]] -name = "async-stream" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625" -dependencies = [ - "async-stream-impl", - "futures-core", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "async-task" -version = "4.0.3" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" +checksum = "30696a84d817107fc028e049980e09d5e140e8da8f1caeb17e8e950658a3cea9" [[package]] name = "async-trait" -version = "0.1.52" +version = "0.1.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3" +checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600" dependencies = [ "proc-macro2", "quote", @@ -163,7 +136,7 @@ dependencies = [ "futures-util", "log", "native-tls", - "pin-project 0.4.28", + "pin-project 0.4.29", "tokio 0.2.25", "tokio-native-tls", "tungstenite", @@ -182,15 +155,18 @@ dependencies = [ [[package]] name = "autocfg" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d49d90015b3c36167a20fe2810c5cd875ad504b39cff3d4eae7977e6b7c1cb2" +checksum = "0dde43e75fd43e8a1bf86103336bc699aa8d17ad1be60c76c0bdfd4828e19b78" +dependencies = [ + "autocfg 1.1.0", +] [[package]] name = "autocfg" -version = "1.0.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "base64" @@ -221,9 +197,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.8.0" +version = "3.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f1e260c3a9040a7c19a12468758f4c16f31a81a1fe087482be9570ec864bb6c" +checksum = "a4a45a46ab1f2412e53d3a0ade76ffad2025804294569aae387231a0cd6e0899" [[package]] name = "byteorder" @@ -261,9 +237,9 @@ checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c" [[package]] name = "cc" -version = "1.0.72" +version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee" +checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" [[package]] name = "cfg-if" @@ -300,20 +276,6 @@ dependencies = [ "bitflags", ] -[[package]] -name = "combine" -version = "4.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2b2f5d0ee456f3928812dfc8c6d9a1d592b98678f6d56db9b0cd2b7bc6c8db5" -dependencies = [ - "bytes 1.1.0", - "futures-core", - "memchr", - "pin-project-lite 0.2.7", - "tokio 1.15.0", - "tokio-util 0.6.9", -] - [[package]] name = "concurrent-queue" version = "1.2.2" @@ -323,17 +285,6 @@ dependencies = [ "cache-padded", ] -[[package]] -name = "config" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b1b9d958c2b1368a663f05538fc1b5975adce1e19f435acceae987aceeeb369" -dependencies = [ - "lazy_static", - "nom 5.1.2", - "serde", -] - [[package]] name = "cookie-factory" version = "0.3.2" @@ -342,9 +293,9 @@ checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" [[package]] name = "core-foundation" -version = "0.9.2" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6888e10551bb93e424d8df1d07f1a8b4fceb0001a3a4b048bfc47554946f47b3" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" dependencies = [ "core-foundation-sys", "libc", @@ -358,21 +309,21 @@ checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" [[package]] name = "cpufeatures" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95059428f66df56b63431fdb4e1947ed2190586af5c5a8a8b71122bdf5a7f469" +checksum = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b" dependencies = [ "libc", ] [[package]] name = "crossbeam-channel" -version = "0.5.1" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" +checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53" dependencies = [ "cfg-if 1.0.0", - "crossbeam-utils 0.8.5", + "crossbeam-utils 0.8.8", ] [[package]] @@ -392,7 +343,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "cfg-if 0.1.10", "crossbeam-utils 0.7.2", "lazy_static", @@ -418,16 +369,16 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "cfg-if 0.1.10", "lazy_static", ] [[package]] name = "crossbeam-utils" -version = "0.8.5" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" +checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38" dependencies = [ "cfg-if 1.0.0", "lazy_static", @@ -435,30 +386,22 @@ dependencies = [ [[package]] name = "deadpool" -version = "0.8.2" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef82259c587bceda08349f28ff00f69ae4c897898f254140af6021eb218e8232" +checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e" dependencies = [ "async-trait", - "config", + "deadpool-runtime", "num_cpus", - "serde", - "tokio 1.15.0", + "retain_mut", + "tokio 1.18.2", ] [[package]] -name = "deadpool-redis" -version = "0.9.0" +name = "deadpool-runtime" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58f31ab753d882c470926de2b34f42ed03e6e0661b609586825391f6560113db" -dependencies = [ - "async-trait", - "config", - "deadpool", - "log", - "redis", - "serde", -] +checksum = "eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1" [[package]] name = "digest" @@ -476,16 +419,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" [[package]] -name = "dtoa" -version = "0.4.8" +name = "either" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56899898ce76aaf4a0f24d914c97ea6ed976d42fec6ad33fcbb0a1103e07b2b0" +checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" [[package]] name = "encoding_rs" -version = "0.8.30" +version = "0.8.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7896dc8abb250ffdda33912550faa54c88ec8b998dec0b2c55ab224921ce11df" +checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b" dependencies = [ "cfg-if 1.0.0", ] @@ -505,9 +448,18 @@ dependencies = [ [[package]] name = "event-listener" -version = "2.5.1" +version = "2.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" +checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" + +[[package]] +name = "fastrand" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3fcf0cee53519c866c09b5de1f6c56ff9d647101f81c1964fa632e148896cdf" +dependencies = [ + "instant", +] [[package]] name = "fnv" @@ -570,9 +522,9 @@ checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" [[package]] name = "futures" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28560757fe2bb34e79f907794bb6b22ae8b0e5c669b638a1132f2592b19035b4" +checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" dependencies = [ "futures-channel", "futures-core", @@ -585,9 +537,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3dda0b6588335f360afc675d0564c17a77a2bda81ca178a4b6081bd86c7f0b" +checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" dependencies = [ "futures-core", "futures-sink", @@ -595,15 +547,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7" +checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" [[package]] name = "futures-executor" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29d6d2ff5bb10fb95c85b8ce46538a2e5f5e7fdc755623a7d4529ab8a4ed9d2a" +checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" dependencies = [ "futures-core", "futures-task", @@ -612,15 +564,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f9d34af5a1aac6fb380f735fe510746c38067c5bf16c7fd250280503c971b2" +checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" [[package]] name = "futures-macro" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" dependencies = [ "proc-macro2", "quote", @@ -629,21 +581,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3055baccb68d74ff6480350f8d6eb8fcfa3aa11bdc1a1ae3afdd0514617d508" +checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" [[package]] name = "futures-task" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ee7c6485c30167ce4dfb83ac568a849fe53274c831081476ee13e0dce1aad72" +checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" [[package]] name = "futures-util" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164" +checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ "futures-channel", "futures-core", @@ -652,16 +604,16 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project-lite 0.2.7", + "pin-project-lite 0.2.9", "pin-utils", "slab", ] [[package]] name = "generic-array" -version = "0.14.4" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "501466ecc8a30d1d3b7fc9229b122b2ce8ed6e9d9223f1138d4babb253e51817" +checksum = "fd48d33ec7f05fbfa152300fdad764757cbded343c1aa1cff2fbaf4134851803" dependencies = [ "typenum", "version_check", @@ -678,17 +630,6 @@ dependencies = [ "wasi 0.9.0+wasi-snapshot-preview1", ] -[[package]] -name = "getrandom" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" -dependencies = [ - "cfg-if 1.0.0", - "libc", - "wasi 0.10.2+wasi-snapshot-preview1", -] - [[package]] name = "h2" version = "0.2.7" @@ -726,13 +667,13 @@ dependencies = [ [[package]] name = "http" -version = "0.2.5" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1323096b05d41827dadeaee54c9981958c0f94e670bc94ed80037d1a7b8b186b" +checksum = "ff8670570af52249509a86f5e3e18a08c60b177071826898fde8997cf5f6bfbb" dependencies = [ "bytes 1.1.0", "fnv", - "itoa 0.4.8", + "itoa 1.0.2", ] [[package]] @@ -747,9 +688,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.5.1" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503" +checksum = "496ce29bb5a52785b44e0f7ca2847ae0bb839c9bd28f69acac9b99d461c0c04c" [[package]] name = "httpdate" @@ -782,8 +723,8 @@ dependencies = [ "httparse", "httpdate", "itoa 0.4.8", - "pin-project 1.0.8", - "socket2", + "pin-project 1.0.10", + "socket2 0.3.19", "tokio 0.2.25", "tower-service", "tracing", @@ -816,11 +757,11 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.7.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" +checksum = "0f647032dfaa1f8b6dc29bd3edb7bbef4861b8b8007ebb118d6db284fd59f6ee" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "hashbrown", ] @@ -853,9 +794,18 @@ dependencies = [ [[package]] name = "ipnet" -version = "2.3.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9" +checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" + +[[package]] +name = "itertools" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3" +dependencies = [ + "either", +] [[package]] name = "itoa" @@ -865,15 +815,15 @@ checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" [[package]] name = "itoa" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" +checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" [[package]] name = "js-sys" -version = "0.3.55" +version = "0.3.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cc9ffccd38c451a86bf13657df244e9c3f37493cce8e5e21e940963777acc84" +checksum = "671a26f820db17c2a2750743f1dd03bafd15b98c9f30c7c2628c024c05d73397" dependencies = [ "wasm-bindgen", ] @@ -890,9 +840,9 @@ dependencies = [ [[package]] name = "lapin" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef0a8c145a248b1536cfa06890d480cda7982add59f77973ac7b03db47f349b5" +checksum = "36c0eacc7b8880c2e73ab70e47c9f099ad81af6debde9353fdb11045c0ce716e" dependencies = [ "amq-protocol", "async-task", @@ -900,7 +850,7 @@ dependencies = [ "futures-core", "log", "mio 0.7.14", - "parking_lot 0.11.2", + "parking_lot 0.12.0", "pinky-swear", "serde", ] @@ -911,24 +861,11 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" -[[package]] -name = "lexical-core" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe" -dependencies = [ - "arrayvec", - "bitflags", - "cfg-if 1.0.0", - "ryu", - "static_assertions", -] - [[package]] name = "libc" -version = "0.2.112" +version = "0.2.126" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125" +checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836" [[package]] name = "lock_api" @@ -941,18 +878,19 @@ dependencies = [ [[package]] name = "lock_api" -version = "0.4.5" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109" +checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53" dependencies = [ + "autocfg 1.1.0", "scopeguard", ] [[package]] name = "log" -version = "0.4.14" +version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ "cfg-if 1.0.0", ] @@ -971,9 +909,9 @@ checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" [[package]] name = "memchr" -version = "2.4.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" [[package]] name = "memoffset" @@ -981,7 +919,7 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", ] [[package]] @@ -992,9 +930,9 @@ checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" [[package]] name = "mime_guess" -version = "2.0.3" +version = "2.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2684d4c2e97d99848d30b324b00c8fcc7e5c897b7cbb5819b09e7c90e8baf212" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" dependencies = [ "mime", "unicase", @@ -1038,6 +976,18 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "mio" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "713d550d9b44d89174e066b7a6217ae06234c10cb47819a88290d2b353c31799" +dependencies = [ + "libc", + "log", + "wasi 0.11.0+wasi-snapshot-preview1", + "windows-sys", +] + [[package]] name = "mio-named-pipes" version = "0.1.7" @@ -1093,9 +1043,9 @@ dependencies = [ [[package]] name = "native-tls" -version = "0.2.8" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48ba9f7719b5a0f42f338907614285fb5fd70e53858141f69898a1fb7203b24d" +checksum = "fd7e2f3618557f980e0b17e8856252eee3c97fa12c54dff0ca290fb6266ca4a9" dependencies = [ "lazy_static", "libc", @@ -1122,52 +1072,40 @@ dependencies = [ [[package]] name = "nom" -version = "5.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb4262d26ed83a1c0a33a38fe2bb15797329c85770da05e6b828ddb782627af" -dependencies = [ - "lexical-core", - "memchr", - "version_check", -] - -[[package]] -name = "nom" -version = "7.1.0" +version = "7.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109" +checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36" dependencies = [ "memchr", "minimal-lexical", - "version_check", ] [[package]] name = "ntapi" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" +checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f" dependencies = [ "winapi 0.3.9", ] [[package]] name = "num-integer" -version = "0.1.44" +version = "0.1.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" +checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "num-traits", ] [[package]] name = "num-traits" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", ] [[package]] @@ -1182,9 +1120,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.9.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5" +checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225" [[package]] name = "opaque-debug" @@ -1194,31 +1132,43 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "openssl" -version = "0.10.38" +version = "0.10.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c7ae222234c30df141154f159066c5093ff73b63204dcda7121eb082fc56a95" +checksum = "fb81a6430ac911acb25fe5ac8f1d2af1b4ea8a4fdfda0f1ee4292af2e2d8eb0e" dependencies = [ "bitflags", "cfg-if 1.0.0", "foreign-types", "libc", "once_cell", + "openssl-macros", "openssl-sys", ] +[[package]] +name = "openssl-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "openssl-probe" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.72" +version = "0.9.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb" +checksum = "9d5fd19fb3e0a8191c1e34935718976a3e70c112ab9a24af6d7cadccd9d90bc0" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "cc", "libc", "pkg-config", @@ -1237,13 +1187,12 @@ dependencies = [ [[package]] name = "parking_lot" -version = "0.11.2" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" dependencies = [ - "instant", - "lock_api 0.4.5", - "parking_lot_core 0.8.5", + "lock_api 0.4.7", + "parking_lot_core 0.9.3", ] [[package]] @@ -1262,18 +1211,23 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.8.5" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" +checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" dependencies = [ "cfg-if 1.0.0", - "instant", "libc", - "redox_syscall 0.2.10", + "redox_syscall 0.2.13", "smallvec", - "winapi 0.3.9", + "windows-sys", ] +[[package]] +name = "paste" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c520e05135d6e763148b6426a837e239041653ba7becd2e538c076c738025fc" + [[package]] name = "pem" version = "0.8.3" @@ -1293,27 +1247,27 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "pin-project" -version = "0.4.28" +version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "918192b5c59119d51e0cd221f4d49dde9112824ba717369e903c97d076083d0f" +checksum = "9615c18d31137579e9ff063499264ddc1278e7b1982757ebc111028c4d1dc909" dependencies = [ - "pin-project-internal 0.4.28", + "pin-project-internal 0.4.29", ] [[package]] name = "pin-project" -version = "1.0.8" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08" +checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" dependencies = [ - "pin-project-internal 1.0.8", + "pin-project-internal 1.0.10", ] [[package]] name = "pin-project-internal" -version = "0.4.28" +version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3be26700300be6d9d23264c73211d8190e755b6b5ca7a1b28230025511b52a5e" +checksum = "044964427019eed9d49d9d5bbce6047ef18f37100ea400912a9fa4a3523ab12a" dependencies = [ "proc-macro2", "quote", @@ -1322,9 +1276,9 @@ dependencies = [ [[package]] name = "pin-project-internal" -version = "1.0.8" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389" +checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" dependencies = [ "proc-macro2", "quote", @@ -1339,9 +1293,9 @@ checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777" [[package]] name = "pin-project-lite" -version = "0.2.7" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d31d11c69a6b52a174b42bdc0c30e5e11670f90788b2c471c31c1d17d449443" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" [[package]] name = "pin-utils" @@ -1351,34 +1305,34 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pinky-swear" -version = "4.4.0" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bf8cda6f8e1500338634e4e3ce90ac59eb7929a1e088b6946c742be1cc44dc1" +checksum = "c5ade3d5e4fa85586b4795e097180fd48447d39fb56e351bd889f1c9664291a8" dependencies = [ "doc-comment", - "parking_lot 0.11.2", + "parking_lot 0.12.0", "tracing", ] [[package]] name = "pkg-config" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58893f751c9b0412871a09abd62ecd2a00298c6c83befa223ef98c52aef40cbe" +checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" [[package]] name = "ppv-lite86" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed0cfbc8191465bed66e1718596ee0b0b35d5ee1f41c5df2189d0fe8bde535ba" +checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" [[package]] name = "proc-macro2" -version = "1.0.34" +version = "1.0.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f84e92c0f7c9d58328b85a78557813e4bd845130db68d7184635344399423b1" +checksum = "c54b25569025b7fc9651de43004ae593a75ad88543b17178aa5e1b9c4f15f56f" dependencies = [ - "unicode-xid", + "unicode-ident", ] [[package]] @@ -1389,9 +1343,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quote" -version = "1.0.10" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38bc8cc6a5f2e3655e0899c1b848643b2562f853f114bfec7be120678e3ace05" +checksum = "a1feb54ed693b93a84e14094943b84b7c4eae204c512b7ccb95ab0c66d278ad1" dependencies = [ "proc-macro2", ] @@ -1402,7 +1356,7 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca" dependencies = [ - "autocfg 0.1.7", + "autocfg 0.1.8", "libc", "rand_chacha 0.1.1", "rand_core 0.4.2", @@ -1421,32 +1375,20 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" dependencies = [ - "getrandom 0.1.16", + "getrandom", "libc", "rand_chacha 0.2.2", "rand_core 0.5.1", "rand_hc 0.2.0", ] -[[package]] -name = "rand" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" -dependencies = [ - "libc", - "rand_chacha 0.3.1", - "rand_core 0.6.3", - "rand_hc 0.3.1", -] - [[package]] name = "rand_chacha" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef" dependencies = [ - "autocfg 0.1.7", + "autocfg 0.1.8", "rand_core 0.3.1", ] @@ -1460,16 +1402,6 @@ dependencies = [ "rand_core 0.5.1", ] -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core 0.6.3", -] - [[package]] name = "rand_core" version = "0.3.1" @@ -1491,16 +1423,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" dependencies = [ - "getrandom 0.1.16", -] - -[[package]] -name = "rand_core" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" -dependencies = [ - "getrandom 0.2.3", + "getrandom", ] [[package]] @@ -1521,15 +1444,6 @@ dependencies = [ "rand_core 0.5.1", ] -[[package]] -name = "rand_hc" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" -dependencies = [ - "rand_core 0.6.3", -] - [[package]] name = "rand_isaac" version = "0.1.1" @@ -1570,7 +1484,7 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abf9b09b01790cfe0364f52bf32995ea3c39f4d2dd011eac241d2914146d0b44" dependencies = [ - "autocfg 0.1.7", + "autocfg 0.1.8", "rand_core 0.4.2", ] @@ -1593,58 +1507,57 @@ dependencies = [ ] [[package]] -name = "redis" -version = "0.21.4" +name = "redox_syscall" +version = "0.1.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f23ceed4c0e76b322657c2c3352ea116f9ec60a1a1aefeb3c84ed062c50865b" -dependencies = [ - "async-trait", - "bytes 1.1.0", - "combine", - "dtoa", - "futures-util", - "itoa 0.4.8", - "percent-encoding", - "pin-project-lite 0.2.7", - "tokio 1.15.0", - "tokio-util 0.6.9", - "url", -] +checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" [[package]] -name = "redis-subscribe" -version = "0.2.0" -source = "git+https://github.com/appellation/redis-subscribe?branch=feat/impl-error#8b84ee52e1354f5d802540398240b1ddb53be59c" +name = "redox_syscall" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62f25bc4c7e55e0b0b7a1d43fb893f4fa1361d0abe38b9ce4f323c2adfe6ef42" dependencies = [ - "async-stream", - "nom 7.1.0", - "rand 0.8.4", - "thiserror", - "tokio 1.15.0", - "tokio-stream", - "tracing", + "bitflags", ] [[package]] -name = "redox_syscall" -version = "0.1.57" +name = "redust" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" +checksum = "79a5d7baf5b0bfff5f3eacc779348a49a70e98aa768df3a3f2102450f7c36cf2" +dependencies = [ + "async-trait", + "bytes 1.1.0", + "deadpool", + "futures 0.3.21", + "pin-project-lite 0.2.9", + "redust-resp", + "serde", + "serde_bytes", + "tokio 1.18.2", + "tokio-util 0.7.2", +] [[package]] -name = "redox_syscall" -version = "0.2.10" +name = "redust-resp" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff" +checksum = "1edcb33e1771842da0f5ccda6035e6572442c3d04a37eb0d38a68eda6c80df9c" dependencies = [ - "bitflags", + "bytes 1.1.0", + "itertools", + "nom", + "serde", + "serde_bytes", + "thiserror", ] [[package]] name = "regex" -version = "1.5.4" +version = "1.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +checksum = "d83f127d94bdbcda4c8cc2e50f6f84f4b611f69c902699ca385a39c3a75f9ff1" dependencies = [ "aho-corasick", "memchr", @@ -1653,9 +1566,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.25" +version = "0.6.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" +checksum = "49b3de9ec5dc0a3417da371aab17d729997c15010e7fd24ff707773a33bddb64" [[package]] name = "remove_dir_all" @@ -1689,7 +1602,7 @@ dependencies = [ "mime_guess", "native-tls", "percent-encoding", - "pin-project-lite 0.2.7", + "pin-project-lite 0.2.9", "serde", "serde_json", "serde_urlencoded", @@ -1702,14 +1615,21 @@ dependencies = [ "winreg", ] +[[package]] +name = "retain_mut" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0" + [[package]] name = "rmp" -version = "0.8.10" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f55e5fa1446c4d5dd1f5daeed2a4fe193071771a2636274d0d7a3b082aa7ad6" +checksum = "44519172358fd6d58656c86ab8e7fbc9e1490c3e8f14d35ed78ca0dd07403c9f" dependencies = [ "byteorder", "num-traits", + "paste", ] [[package]] @@ -1727,19 +1647,19 @@ dependencies = [ name = "rustacles-brokers" version = "0.2.0" dependencies = [ - "deadpool-redis", + "bytes 1.1.0", "env_logger", - "futures 0.3.19", + "futures 0.3.21", "lapin", "log", "nanoid", - "pin-project 1.0.8", - "redis", - "redis-subscribe", + "pin-project 1.0.10", + "redust", "rmp-serde", "serde", + "serde_bytes", "thiserror", - "tokio 1.15.0", + "tokio 1.18.2", "tokio-stream", ] @@ -1750,7 +1670,7 @@ dependencies = [ "async-native-tls", "async-tungstenite", "env_logger", - "futures 0.3.19", + "futures 0.3.21", "log", "parking_lot 0.10.2", "reqwest", @@ -1775,18 +1695,18 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" +checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695" [[package]] name = "schannel" -version = "0.1.19" +version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" +checksum = "88d6731146462ea25d9244b2ed5fd1d716d25c52e4d54aa4fb0f3c4e9854dbe2" dependencies = [ "lazy_static", - "winapi 0.3.9", + "windows-sys", ] [[package]] @@ -1797,9 +1717,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "security-framework" -version = "2.4.2" +version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "525bc1abfda2e1998d152c45cf13e696f76d0a4972310b22fac1658b05df7c87" +checksum = "2dc14f172faf8a0194a3aded622712b0de276821addc574fa54fc0a1167e10dc" dependencies = [ "bitflags", "core-foundation", @@ -1810,9 +1730,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.4.2" +version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9dd14d83160b528b7bfd66439110573efcfbe281b17fc2ca9f39f550d619c7e" +checksum = "0160a13a177a45bfb43ce71c01580998474f556ad854dcbca936dd2841a5c556" dependencies = [ "core-foundation-sys", "libc", @@ -1820,18 +1740,27 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.132" +version = "1.0.137" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b9875c23cf305cd1fd7eb77234cbb705f21ea6a72c637a5c6db5fe4b8e7f008" +checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1" dependencies = [ "serde_derive", ] +[[package]] +name = "serde_bytes" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212e73464ebcde48d723aa02eb270ba62eff38a9b732df31f33f1b4e145f3a54" +dependencies = [ + "serde", +] + [[package]] name = "serde_derive" -version = "1.0.132" +version = "1.0.137" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecc0db5cb2556c0e558887d9bbdcf6ac4471e83ff66cf696e5419024d1606276" +checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be" dependencies = [ "proc-macro2", "quote", @@ -1840,20 +1769,20 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.73" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcbd0344bc6533bc7ec56df11d42fb70f1b912351c0825ccb7211b59d8af7cf5" +checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c" dependencies = [ - "itoa 1.0.1", + "itoa 1.0.2", "ryu", "serde", ] [[package]] name = "serde_repr" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98d0516900518c29efa217c298fa1f4e6c6ffc85ae29fd7f4ee48f176e1a9ed5" +checksum = "a2ad84e47328a31223de7fed7a4f5087f2d6ddfe586cf3ca25b7a165bc0a5aed" dependencies = [ "proc-macro2", "quote", @@ -1862,12 +1791,12 @@ dependencies = [ [[package]] name = "serde_urlencoded" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edfa57a7f8d9c1d260a549e7224100f6c43d43f9103e06dd8b4095a9b2b43ce9" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" dependencies = [ "form_urlencoded", - "itoa 0.4.8", + "itoa 1.0.2", "ryu", "serde", ] @@ -1896,15 +1825,15 @@ dependencies = [ [[package]] name = "slab" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" +checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" [[package]] name = "smallvec" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309" +checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" [[package]] name = "socket2" @@ -1918,27 +1847,31 @@ dependencies = [ ] [[package]] -name = "static_assertions" -version = "1.1.0" +name = "socket2" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" +dependencies = [ + "libc", + "winapi 0.3.9", +] [[package]] name = "syn" -version = "1.0.83" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23a1dfb999630e338648c83e91c59a4e9fb7620f520c3194b6b89e276f2f1959" +checksum = "fbaf6116ab8924f39d52792136fb74fd60a80194cf1b1c6ffa6453eef1c3f942" dependencies = [ "proc-macro2", "quote", - "unicode-xid", + "unicode-ident", ] [[package]] name = "tcp-stream" -version = "0.20.9" +version = "0.20.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a77f06a7c6a1ecff1bbab0743a8beca305ca1ebcbe5d1bc32390e94117859af" +checksum = "c839b9cf24db4225fa445589e014e6ecc4c42ba6ecf5db3e9fe38fbe8ea2377a" dependencies = [ "cfg-if 1.0.0", "mio 0.7.14", @@ -1948,41 +1881,41 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.2.0" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" +checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4" dependencies = [ "cfg-if 1.0.0", + "fastrand", "libc", - "rand 0.8.4", - "redox_syscall 0.2.10", + "redox_syscall 0.2.13", "remove_dir_all", "winapi 0.3.9", ] [[package]] name = "termcolor" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" +checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" dependencies = [ "winapi-util", ] [[package]] name = "thiserror" -version = "1.0.30" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417" +checksum = "bd829fe32373d27f76265620b5309d0340cb8550f523c1dda251d6298069069a" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.30" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" +checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a" dependencies = [ "proc-macro2", "quote", @@ -1991,19 +1924,20 @@ dependencies = [ [[package]] name = "time" -version = "0.1.43" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" dependencies = [ "libc", + "wasi 0.10.0+wasi-snapshot-preview1", "winapi 0.3.9", ] [[package]] name = "tinyvec" -version = "1.5.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c1c1d5a42b6245520c249549ec267180beaffcc0615401ac8e31853d4b6d8d2" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" dependencies = [ "tinyvec_macros", ] @@ -2040,16 +1974,16 @@ dependencies = [ [[package]] name = "tokio" -version = "1.15.0" +version = "1.18.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbbf1c778ec206785635ce8ad57fe52b3009ae9e0c9f574a728f3049d3e55838" +checksum = "4903bf0427cf68dddd5aa6a93220756f8be0c34fcfa9f5e6191e103e15a31395" dependencies = [ - "bytes 1.1.0", "libc", - "memchr", - "mio 0.7.14", + "mio 0.8.3", "num_cpus", - "pin-project-lite 0.2.7", + "once_cell", + "pin-project-lite 0.2.9", + "socket2 0.4.4", "tokio-macros 1.7.0", "winapi 0.3.9", ] @@ -2125,9 +2059,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" dependencies = [ "futures-core", - "pin-project-lite 0.2.7", - "tokio 1.15.0", - "tokio-util 0.6.9", + "pin-project-lite 0.2.9", + "tokio 1.18.2", + "tokio-util 0.6.10", ] [[package]] @@ -2173,16 +2107,30 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.6.9" +version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" +checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" dependencies = [ "bytes 1.1.0", "futures-core", "futures-sink", "log", - "pin-project-lite 0.2.7", - "tokio 1.15.0", + "pin-project-lite 0.2.9", + "tokio 1.18.2", +] + +[[package]] +name = "tokio-util" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f988a1a1adc2fb21f9c12aa96441da33a1728193ae0b95d2be22dbd17fcb4e5c" +dependencies = [ + "bytes 1.1.0", + "futures-core", + "futures-sink", + "pin-project-lite 0.2.9", + "tokio 1.18.2", + "tracing", ] [[package]] @@ -2193,22 +2141,22 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.29" +version = "0.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105" +checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" dependencies = [ "cfg-if 1.0.0", "log", - "pin-project-lite 0.2.7", + "pin-project-lite 0.2.9", "tracing-attributes", "tracing-core", ] [[package]] name = "tracing-attributes" -version = "0.1.18" +version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e" +checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c" dependencies = [ "proc-macro2", "quote", @@ -2217,9 +2165,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.21" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4" +checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f" dependencies = [ "lazy_static", ] @@ -2230,7 +2178,7 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" dependencies = [ - "pin-project 1.0.8", + "pin-project 1.0.10", "tracing", ] @@ -2262,9 +2210,9 @@ dependencies = [ [[package]] name = "typenum" -version = "1.14.0" +version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b63708a265f51345575b27fe43f9500ad611579e764c79edbc2037b1121959ec" +checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" [[package]] name = "unicase" @@ -2277,9 +2225,15 @@ dependencies = [ [[package]] name = "unicode-bidi" -version = "0.3.7" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" + +[[package]] +name = "unicode-ident" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a01404663e3db436ed2746d9fefef640d868edae3cceb81c3b8d5732fda678f" +checksum = "d22af068fba1eb5edcb4aea19d382b2a3deb4c8f9d475c589b6ada9e0fd493ee" [[package]] name = "unicode-normalization" @@ -2290,12 +2244,6 @@ dependencies = [ "tinyvec", ] -[[package]] -name = "unicode-xid" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" - [[package]] name = "url" version = "2.2.2" @@ -2322,9 +2270,9 @@ checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" [[package]] name = "version_check" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "want" @@ -2344,15 +2292,21 @@ checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" [[package]] name = "wasi" -version = "0.10.2+wasi-snapshot-preview1" +version = "0.10.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.78" +version = "0.2.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce" +checksum = "27370197c907c55e3f1a9fbe26f44e937fe6451368324e009cba39e139dc08ad" dependencies = [ "cfg-if 1.0.0", "serde", @@ -2362,9 +2316,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.78" +version = "0.2.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b" +checksum = "53e04185bfa3a779273da532f5025e33398409573f348985af9a1cbf3774d3f4" dependencies = [ "bumpalo", "lazy_static", @@ -2377,9 +2331,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.28" +version = "0.4.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e8d7523cb1f2a4c96c1317ca690031b714a51cc14e05f712446691f413f5d39" +checksum = "6f741de44b75e14c35df886aff5f1eb73aa114fa5d4d00dcd37b5e01259bf3b2" dependencies = [ "cfg-if 1.0.0", "js-sys", @@ -2389,9 +2343,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.78" +version = "0.2.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9" +checksum = "17cae7ff784d7e83a2fe7611cfe766ecf034111b49deb850a3dc7699c08251f5" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2399,9 +2353,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.78" +version = "0.2.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab" +checksum = "99ec0dc7a4756fffc231aab1b9f2f578d23cd391390ab27f952ae0c9b3ece20b" dependencies = [ "proc-macro2", "quote", @@ -2412,15 +2366,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.78" +version = "0.2.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc" +checksum = "d554b7f530dee5964d9a9468d95c1f8b8acae4f282807e7d27d4b03099a46744" [[package]] name = "web-sys" -version = "0.3.55" +version = "0.3.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38eb105f1c59d9eaa6b5cdc92b859d85b926e82cb2e0945cd0c9259faa6fe9fb" +checksum = "7b17e741662c70c8bd24ac5c5b18de314a2c26c32bf8346ee1e6f53de919c283" dependencies = [ "js-sys", "wasm-bindgen", @@ -2469,6 +2423,49 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-sys" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" +dependencies = [ + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" + +[[package]] +name = "windows_i686_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" + +[[package]] +name = "windows_i686_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" + [[package]] name = "winreg" version = "0.7.0" diff --git a/brokers/Cargo.toml b/brokers/Cargo.toml index b5523e4..fe5bda2 100644 --- a/brokers/Cargo.toml +++ b/brokers/Cargo.toml @@ -11,17 +11,17 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -deadpool-redis = { version = "0.9", optional = true } lapin = { version = "1.1", optional = true } +bytes = "1.1" env_logger = "0.7" futures = "0.3" log = "0.4" nanoid = "0.3" pin-project = "1.0" -redis = { version = "0.21", optional = true, default-features = false, features = ["streams"] } -redis-subscribe = { git = "https://github.com/appellation/redis-subscribe", branch = "feat/impl-error", optional = true } +redust = { version = "0.1", features = ["model", "pool"], optional = true } rmp-serde = "0.15" serde = "1.0" +serde_bytes = "0.11" thiserror = "1.0" tokio = "1.0" tokio-stream = { version = "0.1", features = ["sync"] } @@ -32,7 +32,7 @@ features = ["rt-multi-thread", "macros"] [features] amqp-broker = ["lapin"] -redis-broker = ["deadpool-redis", "redis", "redis-subscribe"] +redis-broker = ["redust"] [[example]] name = "amqp_consumer" diff --git a/brokers/src/error.rs b/brokers/src/error.rs index bab1c59..befcc05 100644 --- a/brokers/src/error.rs +++ b/brokers/src/error.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "redis-broker")] -use deadpool_redis::{redis::RedisError, PoolError}; #[cfg(feature = "amqp-broker")] use lapin::Error as LapinError; use std::{io::Error as IoError, result::Result as StdResult}; @@ -31,15 +29,11 @@ pub enum Error { #[cfg(feature = "redis-broker")] #[error("Redis error")] - Redis(#[from] RedisError), + Redis(#[from] redust::Error), #[cfg(feature = "redis-broker")] #[error("Pool error")] - Deadpool(#[from] PoolError), - - #[cfg(feature = "redis-broker")] - #[error("Redis subscribe error")] - RedisSub(#[from] redis_subscribe::Error), + Pool(#[from] redust::pool::PoolError), #[error("MessagePack encode error")] MsgpackEncode(#[from] rmp_serde::encode::Error), diff --git a/brokers/src/redis.rs b/brokers/src/redis.rs index 5f0c338..28e1461 100644 --- a/brokers/src/redis.rs +++ b/brokers/src/redis.rs @@ -1,23 +1,23 @@ use std::{ - fmt::{self, Debug}, - sync::Arc, + borrow::Cow, time::{SystemTime, UNIX_EPOCH}, }; -pub use deadpool_redis; -use deadpool_redis::{ - redis::{ - streams::{StreamRangeReply, StreamReadOptions, StreamReadReply}, - AsyncCommands, FromRedisValue, RedisError, Value, - }, - Connection, Pool, -}; +use bytes::Bytes; use futures::{ - stream::{iter, select_all}, - stream_select, TryStream, TryStreamExt, + stream::{iter, select, select_all}, + StreamExt, TryStream, TryStreamExt, }; use nanoid::nanoid; -use redis::ToRedisArgs; +use redust::{ + model::stream::{ + claim::AutoclaimResponse, + read::{Field, ReadResponse}, + Id, + }, + pool::Pool, + resp::from_data, +}; use serde::{de::DeserializeOwned, Serialize}; use crate::{ @@ -25,92 +25,56 @@ use crate::{ util::stream::repeat_fn, }; -use self::{message::Message, pubsub::BroadcastSub, rpc::Rpc}; +use self::{message::Message, rpc::Rpc}; pub mod message; -pub mod pubsub; pub mod rpc; -const DEFAULT_MAX_CHUNK: usize = 10; -const DEFAULT_BLOCK_INTERVAL: usize = 5000; -const STREAM_DATA_KEY: &'static str = "data"; -const STREAM_TIMEOUT_KEY: &'static str = "timeout_at"; +const DEFAULT_MAX_CHUNK: &[u8] = b"10"; +const DEFAULT_BLOCK_INTERVAL: &[u8] = b"5000"; +const STREAM_DATA_KEY: Field<'static> = Field(Cow::Borrowed(b"data")); +const STREAM_TIMEOUT_KEY: Field<'static> = Field(Cow::Borrowed(b"timeout_at")); /// RedisBroker is internally reference counted and can be safely cloned. +#[derive(Debug, Clone)] pub struct RedisBroker { /// The consumer name of this broker. Should be unique to the container/machine consuming /// messages. - pub name: Arc, + pub name: Bytes, /// The consumer group name. - pub group: Arc, - /// The largest chunk to consume from Redis. This is only exposed for tuning purposes and - /// doesn't affect the public API at all. - pub max_chunk: usize, - /// The maximum time that a broker is assumed to be alive (ms). Messages pending after this - /// time period will be reclaimed by other clients. - pub max_operation_time: usize, + pub group: Bytes, pool: Pool, - pubsub: BroadcastSub, - read_opts: StreamReadOptions, -} - -impl Clone for RedisBroker { - fn clone(&self) -> Self { - Self { - name: self.name.clone(), - group: self.group.clone(), - max_chunk: self.max_chunk, - max_operation_time: self.max_operation_time, - pool: self.pool.clone(), - pubsub: self.pubsub.clone(), - read_opts: Self::make_read_opts(&*self.group, &*self.name), - } - } -} - -impl Debug for RedisBroker { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("RedisBroker") - .field("name", &self.name) - .field("group", &self.group) - .field("max_chunk", &self.max_chunk) - .field("max_operation_time", &self.max_operation_time) - .field("pubsub", &self.pubsub) - .field("read_opts", &self.read_opts) - .finish_non_exhaustive() - } } impl RedisBroker { - fn make_read_opts(group: &str, name: &str) -> StreamReadOptions { - StreamReadOptions::default() - .group(group, name) - .count(DEFAULT_MAX_CHUNK) - .block(DEFAULT_BLOCK_INTERVAL) - } - /// Creates a new broker with sensible defaults. - pub fn new(group: impl Into>, pool: Pool, address: &str) -> RedisBroker { + pub fn new(group: impl Into, pool: Pool) -> Self { let group = group.into(); let name = nanoid!(); - let read_opts = RedisBroker::make_read_opts(&*group, &name); - - let pubsub = BroadcastSub::new(address); Self { name: name.into(), group, - max_chunk: DEFAULT_MAX_CHUNK, - max_operation_time: DEFAULT_BLOCK_INTERVAL, pool, - pubsub, - read_opts, } } /// Publishes an event to the broker. Returned value is the ID of the message. - pub async fn publish(&self, event: &str, data: &impl Serialize) -> Result { - self.publish_timeout(event, data, None).await + pub async fn publish(&self, event: impl AsRef<[u8]>, data: &impl Serialize) -> Result { + let serialized_data = rmp_serde::to_vec(data)?; + let mut conn = self.pool.get().await?; + + let data = conn + .cmd([ + b"xadd", + event.as_ref(), + b"*", + &STREAM_DATA_KEY.0, + &serialized_data, + ]) + .await?; + + Ok(from_data(data)?) } pub async fn call( @@ -118,160 +82,194 @@ impl RedisBroker { event: &str, data: &impl Serialize, timeout: Option, - ) -> Result> { - let id = self.publish_timeout(event, data, timeout).await?; + ) -> Result { + let id = if let Some(timeout) = timeout { + self.publish_timeout(event, data, timeout).await? + } else { + self.publish(event, data).await? + }; + let name = format!("{}:{}", event, id); Ok(Rpc { name, - broker: &self, + broker: self.clone(), }) } - async fn publish_timeout( + pub async fn publish_timeout( &self, - event: &str, + event: impl AsRef<[u8]>, data: &impl Serialize, - maybe_timeout: Option, - ) -> Result { + timeout: SystemTime, + ) -> Result { let serialized_data = rmp_serde::to_vec(data)?; - let mut conn = self.get_conn().await?; - - let args = match maybe_timeout { - Some(timeout) => vec![ - (STREAM_DATA_KEY, serialized_data), - ( - STREAM_TIMEOUT_KEY, - timeout - .duration_since(UNIX_EPOCH) - .unwrap() - .as_nanos() - .to_string() - .into_bytes(), - ), - ], - None => vec![(STREAM_DATA_KEY, serialized_data)], - }; - - Ok(conn.xadd(event, "*", &args).await?) + let mut conn = self.pool.get().await?; + + let timeout_bytes = timeout + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() + .to_string() + .into_bytes(); + + let data = conn + .cmd([ + b"xadd", + event.as_ref(), + b"*", + &STREAM_DATA_KEY.0, + &serialized_data, + &STREAM_TIMEOUT_KEY.0, + &timeout_bytes, + ]) + .await?; + + Ok(from_data(data)?) } - pub async fn subscribe(&self, events: &[&str]) -> Result<()> { + pub async fn subscribe(&self, events: impl Iterator) -> Result<()> { + let mut conn = self.pool.get().await?; + for event in events { - let _: Result = self - .get_conn() - .await? - .xgroup_create_mkstream(*event, &*self.group, 0) - .await; + let cmd: &[&[u8]] = &[ + b"xgroup", + b"create", + &*event, + &*self.group, + b"$", + b"mkstream", + ]; + + match conn.cmd(cmd).await { + Ok(_) => (), + Err(redust::Error::Redis(err)) if err.starts_with("BUSYGROUP") => (), + Err(e) => return Err(e.into()), + } } Ok(()) } - async fn get_conn(&self) -> Result { - Ok(self.pool.get().await?) + /// Consume events from the broker. + pub fn consume(&self, events: Vec) -> impl TryStream, Error = Error> + where + V: DeserializeOwned + 'static, + { + let autoclaim = self + .autoclaim_all::(events.clone()) + .into_stream() + .boxed(); + let claim = self.claim::(events).into_stream().boxed(); + + select(autoclaim, claim) } - /// Consume events from the broker. - pub fn consume<'consume, E, V>( - &'consume self, - events: &'consume [E], - ) -> impl TryStream, Error = Error> + 'consume + fn claim(&self, events: Vec) -> impl TryStream, Error = Error> where - E: Into> + Clone + ToRedisArgs + Send + Sync, V: DeserializeOwned, { - let ids = vec![">"; events.len()]; + let this = self.clone(); + let fut_fn = move || { + let this = this.clone(); + let events = events.clone(); - let pool = &self.pool; - let group = self.group.clone(); - let name = self.name.clone(); - let time = self.max_operation_time; + async move { Some(this.get_messages(&events).await) } + }; - let autoclaim_futs = events - .iter() - .cloned() - .map(|event| { - let event = event.into(); - let group = group.clone(); - let name = name.clone(); + repeat_fn(fut_fn).try_flatten() + } + + async fn get_messages( + &self, + events: &[Bytes], + ) -> Result, Error = Error>> + where + V: DeserializeOwned, + { + let this = self.clone(); + let read = self.xreadgroup(events).await?; + + let messages = read.0.into_iter().flat_map(move |(event, entries)| { + let this = this.clone(); + entries.0.into_iter().map(move |(id, entry)| { + Ok(Message::::new( + id, + entry, + Bytes::copy_from_slice(&event.0), + this.clone(), + )) + }) + }); + + Ok::<_, Error>(iter(messages)) + } + async fn xreadgroup(&self, events: &[Bytes]) -> Result, Error> { + let ids = vec![&b">"[..]; events.len()]; + let mut cmd: Vec<&[u8]> = vec![ + b"xreadgroup", + b"group", + &*self.group, + &*self.name, + b"count", + DEFAULT_MAX_CHUNK, + b"block", + DEFAULT_BLOCK_INTERVAL, + b"streams", + ]; + cmd.extend(events.iter().map(|b| &b[..])); + cmd.extend_from_slice(&ids); + + let data = self.pool.get().await?.cmd(cmd).await?; + Ok(from_data(data)?) + } + + async fn xautoclaim(&self, event: &[u8]) -> Result, Error> { + let cmd = [ + b"xautoclaim", + event, + &*self.group, + &*self.name, + DEFAULT_BLOCK_INTERVAL, + b"0-0", + ]; + + let mut conn = self.pool.get().await?; + + let res = conn.cmd(cmd).await?; + Ok(from_data(res)?) + } + + fn autoclaim_all(&self, events: Vec) -> impl TryStream, Error = Error> + where + V: DeserializeOwned, + { + let futs = events + .into_iter() + .map(|event| { + let this = self.clone(); move || { + let this = this.clone(); let event = event.clone(); - let group = group.clone(); - let name = name.clone(); - - async move { - let messages = async move { - let mut conn = pool.get().await?; - let mut cmd = redis::cmd("xautoclaim"); - - cmd.arg(&*event) - .arg(&*group) - .arg(&*name) - .arg(time) - .arg("0-0"); - - let res: Vec = cmd.query_async(&mut conn).await?; - let read = StreamRangeReply::from_redis_value(&res[1])?; - - let messages = read.ids.into_iter().map(move |id| { - Ok::<_, Error>(Message::::new( - id, - group.clone(), - event.clone(), - self.clone(), - )) - }); - - Ok::<_, Error>(iter(messages)) - }; - - Some(messages.await) - } - } - }) - .map(repeat_fn) - .map(|iter| iter.try_flatten()); - let group = group.clone(); - let claim_fut = move || { - let opts = &self.read_opts; - let ids = ids.clone(); - let group = group.clone(); - - async move { - let messages = - async move { - let read: Option = - pool.get().await?.xread_options(&events, &ids, opts).await?; - - let messages = read.map(|reply| reply.keys).into_iter().flatten().flat_map( - move |event| { - let group = group.clone(); - let key = Arc::::from(event.key); - event.ids.into_iter().map(move |id| { - Ok(Message::::new( - id, - group.clone(), - key.clone(), - self.clone(), - )) - }) - }, - ); + let messages = async move { + let read = this.xautoclaim(&event).await?; + + let messages = read.1 .0.into_iter().map(move |(id, data)| { + Ok::<_, Error>(Message::::new(id, data, event.clone(), this.clone())) + }); Ok::<_, Error>(iter(messages)) }; - Some(messages.await) - } - }; - - let autoclaim = select_all(autoclaim_futs); - let claim = repeat_fn(claim_fut).try_flatten(); + async move { Some(messages.await) } + } + }) + .map(repeat_fn) + .map(|iter| iter.try_flatten()); - stream_select!(autoclaim, claim) + select_all(futs) } } @@ -279,9 +277,9 @@ impl RedisBroker { mod test { use std::time::{Duration, SystemTime}; - use deadpool_redis::{Manager, Pool}; + use bytes::Bytes; use futures::TryStreamExt; - use redis::cmd; + use redust::pool::{Manager, Pool}; use tokio::{spawn, try_join}; use super::RedisBroker; @@ -289,24 +287,24 @@ mod test { #[tokio::test] async fn consumes_messages() { let group = "foo"; - let manager = Manager::new("redis://localhost:6379").expect("create manager"); - let pool = Pool::new(manager, 32); - let broker = RedisBroker::new(group, pool, "redis://localhost:6379"); + let manager = Manager::new(([127, 0, 0, 1], 6379).into()); + let pool = Pool::builder(manager).build().expect("pool builder"); + let broker = RedisBroker::new(group, pool); - let events = ["abc"]; + let events = [Bytes::from("abc")]; - broker.subscribe(&events).await.expect("subscribed"); + broker.subscribe(events.iter()).await.expect("subscribed"); broker .publish("abc", &[1u8, 2, 3]) .await .expect("published"); - let mut consumer = broker.consume::<_, Vec>(&events); + let mut consumer = broker.consume::>(events.to_vec()); let msg = consumer .try_next() .await - .expect("message") - .expect("message"); + .expect("read message") + .expect("read message"); msg.ack().await.expect("ack"); assert_eq!(msg.data.expect("data"), vec![1, 2, 3]); @@ -315,19 +313,14 @@ mod test { #[tokio::test] async fn rpc_timeout() { let group = "foo"; - let manager = Manager::new("redis://localhost:6379").expect("create manager"); - let pool = Pool::new(manager, 32); - - let _: () = cmd("FLUSHDB") - .query_async(&mut pool.get().await.expect("redis connection")) - .await - .expect("flush db"); + let manager = Manager::new(([127, 0, 0, 1], 6379).into()); + let pool = Pool::builder(manager).build().expect("pool builder"); - let broker1 = RedisBroker::new(group, pool, "localhost:6379"); + let broker1 = RedisBroker::new(group, pool); let broker2 = broker1.clone(); - let events = ["def"]; - broker1.subscribe(&events).await.expect("subscribed"); + let events = [Bytes::from("def")]; + broker1.subscribe(events.iter()).await.expect("subscribed"); let timeout = Some(SystemTime::now() + Duration::from_millis(500)); @@ -339,7 +332,7 @@ mod test { }); let consume_fut = spawn(async move { - let mut consumer = broker1.consume::<_, Vec>(&events); + let mut consumer = broker1.consume::>(events.to_vec()); let msg = consumer .try_next() .await diff --git a/brokers/src/redis/message.rs b/brokers/src/redis/message.rs index 0adee06..cc4b69b 100644 --- a/brokers/src/redis/message.rs +++ b/brokers/src/redis/message.rs @@ -1,9 +1,11 @@ use std::{ - sync::Arc, + io::Write, + str::from_utf8, time::{Duration, SystemTime, UNIX_EPOCH}, }; -use redis::{streams::StreamId, AsyncCommands}; +use bytes::Bytes; +use redust::model::stream::{read::Entry, Id}; use serde::{de::DeserializeOwned, Serialize}; use crate::error::Result; @@ -11,14 +13,14 @@ use crate::error::Result; use super::{RedisBroker, STREAM_DATA_KEY, STREAM_TIMEOUT_KEY}; /// A message received from the broker. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Message { /// The group this message belongs to. - pub group: Arc, + pub group: Bytes, /// The event this message signals. - pub event: Arc, + pub event: Bytes, /// The ID of this message (generated by Redis). - pub id: String, + pub id: Id, /// The data of this message. Always present unless there is a bug with a client implementation. pub data: Option, /// When this message times out. Clients should cancel work if it is still in progress after @@ -39,19 +41,20 @@ impl Message where V: DeserializeOwned, { - pub(crate) fn new(id: StreamId, group: Arc, event: Arc, broker: RedisBroker) -> Self { - let data = id - .get(STREAM_DATA_KEY) - .and_then(|data: Vec| rmp_serde::from_read_ref(&data).ok()); + pub(crate) fn new(id: Id, entry: Entry, event: Bytes, broker: RedisBroker) -> Self { + let data = entry + .get(&STREAM_DATA_KEY) + .and_then(|value| rmp_serde::from_read_ref(&value.0).ok()); - let timeout_at = id - .get(STREAM_TIMEOUT_KEY) + let timeout_at = entry + .get(&STREAM_TIMEOUT_KEY) + .and_then(|value| from_utf8(&value.0).ok()?.parse().ok()) .map(|timeout| UNIX_EPOCH + Duration::from_nanos(timeout)); Message { - group, + group: broker.group.clone(), event, - id: id.id, + id, data, timeout_at, broker, @@ -67,7 +70,12 @@ impl Message { .pool .get() .await? - .xack(&*self.event, &*self.group, &[&self.id]) + .cmd([ + b"xack", + &*self.event, + &*self.group, + self.id.to_string().as_bytes(), + ]) .await?; Ok(()) @@ -75,13 +83,16 @@ impl Message { /// Reply to this message. pub async fn reply(&self, data: &impl Serialize) -> Result<()> { - let key = format!("{}:{}", self.event, self.id); + let mut key = Vec::new(); + key.copy_from_slice(&self.event); + write!(key, ":{}", self.id)?; + let serialized = rmp_serde::to_vec(data)?; self.broker .pool .get() .await? - .publish(key, serialized) + .cmd([b"publish", key.as_slice(), serialized.as_slice()]) .await?; Ok(()) diff --git a/brokers/src/redis/pubsub.rs b/brokers/src/redis/pubsub.rs deleted file mode 100644 index 95e3560..0000000 --- a/brokers/src/redis/pubsub.rs +++ /dev/null @@ -1,95 +0,0 @@ -use std::{ - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; - -use futures::{future::ready, Stream, StreamExt, TryStream, TryStreamExt}; -use pin_project::{pin_project, pinned_drop}; -use redis_subscribe::{Message, RedisSub}; -use tokio::{spawn, sync::broadcast}; -use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; - -use crate::error::{Error, Result}; - -#[derive(Debug, Clone)] -pub struct BroadcastSub { - pubsub: Arc, - pubsub_msgs: broadcast::Sender>, -} - -impl BroadcastSub { - pub fn new(addr: &str) -> Self { - let pubsub = Arc::new(RedisSub::new(addr)); - - let (tx, _) = broadcast::channel(1); - let task_pubsub = Arc::clone(&pubsub); - let task_tx = tx.clone(); - spawn(async move { - let mut stream = task_pubsub.listen().await.unwrap(); - while let Some(msg) = stream.next().await { - let _ = task_tx.send(Arc::new(msg)); - } - }); - - Self { - pubsub, - pubsub_msgs: tx, - } - } - - pub async fn subscribe( - &self, - channel: String, - ) -> Result> { - self.pubsub.subscribe(channel.clone()).await?; - - let stream = SubStream { - pubsub: Arc::clone(&self.pubsub), - channel: channel.clone(), - msgs: BroadcastStream::new(self.pubsub_msgs.subscribe()), - }; - - Ok(stream.err_into::().try_filter_map(move |msg| { - ready(match &*msg { - Message::Message { - channel: new_ch, - message, - } if &channel == new_ch => Ok(Some(message.clone())), - _ => Ok(None), - }) - })) - } -} - -#[derive(Debug)] -#[pin_project(PinnedDrop)] -struct SubStream { - pubsub: Arc, - channel: String, - #[pin] - msgs: BroadcastStream, -} - -impl Stream for SubStream -where - T: 'static + Clone + Send, -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - this.msgs.poll_next(cx) - } -} - -#[pinned_drop] -impl PinnedDrop for SubStream { - fn drop(self: Pin<&mut Self>) { - let client = Arc::clone(&self.pubsub); - let channel = self.channel.clone(); - spawn(async move { - client.unsubscribe(channel).await.unwrap(); - }); - } -} diff --git a/brokers/src/redis/rpc.rs b/brokers/src/redis/rpc.rs index 43baadb..6018e11 100644 --- a/brokers/src/redis/rpc.rs +++ b/brokers/src/redis/rpc.rs @@ -1,5 +1,6 @@ -use futures::TryStreamExt; +use redust::resp::from_data; use serde::de::DeserializeOwned; +use serde_bytes::Bytes; use crate::error::Result; @@ -7,32 +8,30 @@ use super::RedisBroker; /// A Remote Procedure Call. Poll the future returned by `response` to get the response value. #[derive(Debug, Clone)] -pub struct Rpc<'broker> { +pub struct Rpc { pub(crate) name: String, - pub(crate) broker: &'broker RedisBroker, + pub(crate) broker: RedisBroker, } -impl<'broker> PartialEq for Rpc<'broker> { +impl PartialEq for Rpc { fn eq(&self, other: &Self) -> bool { self.name == other.name } } -impl<'broker> Eq for Rpc<'broker> {} +impl Eq for Rpc {} -impl<'broker> Rpc<'broker> { +impl Rpc { pub async fn response(&self) -> Result> where V: DeserializeOwned, { - Ok(self - .broker - .pubsub - .subscribe(self.name.clone()) - .await? - .try_next() - .await? - .map(|msg| rmp_serde::from_read(msg.as_bytes())) - .transpose()?) + let mut conn = self.broker.pool.get().await?; + + conn.cmd(["subscribe", &self.name]).await?; + let data = conn.read_cmd().await?; + + let bytes = from_data::<&Bytes>(data)?; + Ok(rmp_serde::from_read_ref(bytes)?) } } diff --git a/brokers/src/util/stream.rs b/brokers/src/util/stream.rs index 51f83d1..73a9175 100644 --- a/brokers/src/util/stream.rs +++ b/brokers/src/util/stream.rs @@ -1,5 +1,5 @@ -use futures::Stream; -use std::{future::Future, task::Poll}; +use futures::{stream::poll_fn, Stream}; +use std::future::Future; pub fn repeat_fn(mut func: F) -> impl Stream where @@ -8,10 +8,10 @@ where { let mut fut = Box::pin(func()); - futures::stream::poll_fn(move |ctx| { + poll_fn(move |ctx| { let out = fut.as_mut().poll(ctx); - if let Poll::Ready(_) = out { + if out.is_ready() { fut.set(func()); }