diff --git a/Cargo.lock b/Cargo.lock index d3ebf88..a5de54a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "atomic_refcell" version = "0.1.13" @@ -31,9 +37,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "backtrace" -version = "0.3.69" +version = "0.3.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d" dependencies = [ "addr2line", "cc", @@ -46,9 +52,9 @@ dependencies = [ [[package]] name = "base64" -version = "0.21.5" +version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" [[package]] name = "bitflags" @@ -67,9 +73,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.14.0" +version = "3.15.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" +checksum = "7ff69b9dd49fd426c69a0db9fc04dd934cdb6645ff000864d98f7e2af8830eaa" [[package]] name = "byteorder" @@ -79,18 +85,15 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.5.0" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" [[package]] name = "cc" -version = "1.0.83" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" -dependencies = [ - "libc", -] +checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" [[package]] name = "cfg-if" @@ -100,9 +103,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "cpufeatures" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce420fe07aecd3e67c5f910618fe65e94158f6dcc0adf44e00d69ce2bdfe0fd0" +checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" dependencies = [ "libc", ] @@ -119,9 +122,9 @@ dependencies = [ [[package]] name = "data-encoding" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" +checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" [[package]] name = "digest" @@ -148,6 +151,15 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "fastrand" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984" +dependencies = [ + "getrandom", +] + [[package]] name = "fnv" version = "1.0.7" @@ -156,18 +168,18 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "form_urlencoded" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" dependencies = [ "percent-encoding", ] [[package]] name = "futures-channel" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", @@ -175,15 +187,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-macro" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", @@ -192,21 +204,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-core", "futures-macro", @@ -229,46 +241,35 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.1.16" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" +checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" dependencies = [ "cfg-if", "js-sys", "libc", - "wasi 0.9.0+wasi-snapshot-preview1", + "wasi", "wasm-bindgen", ] -[[package]] -name = "getrandom" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" -dependencies = [ - "cfg-if", - "libc", - "wasi 0.11.0+wasi-snapshot-preview1", -] - [[package]] name = "gimli" -version = "0.28.0" +version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" [[package]] name = "h2" -version = "0.3.22" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d6250322ef6e60f93f9a2162799302cd6f68f79f6e5d85c8c16f14d1d958178" +checksum = "4fbd2820c5e49886948654ab546d0688ff24530286bdcf8fca3cefb16d4618eb" dependencies = [ "bytes", "fnv", "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap", "slab", "tokio", @@ -278,9 +279,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.2" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f93e7192158dbcda357bdec5fb5788eebf8bbac027f3f33e719d29135ae84156" +checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" [[package]] name = "headers" @@ -291,7 +292,7 @@ dependencies = [ "base64", "bytes", "headers-core", - "http", + "http 0.2.12", "httpdate", "mime", "sha1", @@ -303,20 +304,31 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" dependencies = [ - "http", + "http 0.2.12", ] [[package]] name = "hermit-abi" -version = "0.3.3" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + +[[package]] +name = "http" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] [[package]] name = "http" -version = "0.2.11" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", @@ -325,12 +337,12 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", "pin-project-lite", ] @@ -348,22 +360,22 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "0.14.27" +version = "0.14.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" +checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" dependencies = [ "bytes", "futures-channel", "futures-core", "futures-util", "h2", - "http", + "http 0.2.12", "http-body", "httparse", "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2", "tokio", "tower-service", "tracing", @@ -372,9 +384,9 @@ dependencies = [ [[package]] name = "idna" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" dependencies = [ "unicode-bidi", "unicode-normalization", @@ -382,9 +394,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.1.0" +version = "2.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" +checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" dependencies = [ "equivalent", "hashbrown", @@ -392,24 +404,24 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "js-sys" -version = "0.3.65" +version = "0.3.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54c0c35952f67de54bb584e9fd912b3023117cbafc0a77d8f3dee1fb5f572fe8" +checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" dependencies = [ "wasm-bindgen", ] [[package]] name = "libc" -version = "0.2.150" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "lock_api" @@ -423,15 +435,15 @@ dependencies = [ [[package]] name = "log" -version = "0.4.20" +version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" [[package]] name = "memchr" -version = "2.6.4" +version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" [[package]] name = "mime" @@ -451,22 +463,22 @@ dependencies = [ [[package]] name = "miniz_oxide" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" dependencies = [ "adler", ] [[package]] name = "mio" -version = "0.8.9" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", - "wasi 0.11.0+wasi-snapshot-preview1", - "windows-sys", + "wasi", + "windows-sys 0.48.0", ] [[package]] @@ -478,7 +490,7 @@ dependencies = [ "bytes", "encoding_rs", "futures-util", - "http", + "http 0.2.12", "httparse", "log", "memchr", @@ -499,18 +511,18 @@ dependencies = [ [[package]] name = "object" -version = "0.32.1" +version = "0.32.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" dependencies = [ "memchr", ] [[package]] name = "once_cell" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "parking_lot" @@ -532,29 +544,29 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-targets", + "windows-targets 0.48.5", ] [[package]] name = "percent-encoding" -version = "2.3.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pin-project" -version = "1.1.3" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.3" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", @@ -581,35 +593,22 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.69" +version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" +checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.33" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" dependencies = [ "proc-macro2", ] -[[package]] -name = "rand" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" -dependencies = [ - "getrandom 0.1.16", - "libc", - "rand_chacha 0.2.2", - "rand_core 0.5.1", - "rand_hc", -] - [[package]] name = "rand" version = "0.8.5" @@ -617,18 +616,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha 0.3.1", - "rand_core 0.6.4", -] - -[[package]] -name = "rand_chacha" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" -dependencies = [ - "ppv-lite86", - "rand_core 0.5.1", + "rand_chacha", + "rand_core", ] [[package]] @@ -638,16 +627,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core 0.6.4", -] - -[[package]] -name = "rand_core" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" -dependencies = [ - "getrandom 0.1.16", + "rand_core", ] [[package]] @@ -656,16 +636,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom 0.2.11", -] - -[[package]] -name = "rand_hc" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" -dependencies = [ - "rand_core 0.5.1", + "getrandom", ] [[package]] @@ -694,9 +665,9 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.15" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" +checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "scoped-tls" @@ -712,18 +683,18 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "serde" -version = "1.0.192" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bca2a08484b285dcb282d0f67b26cadc0df8b19f8c12502c13d966bf9482f001" +checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.192" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1" +checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", @@ -732,9 +703,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.108" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" +checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" dependencies = [ "itoa", "ryu", @@ -793,28 +764,18 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" - -[[package]] -name = "socket2" -version = "0.4.10" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" -dependencies = [ - "libc", - "winapi", -] +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "socket2" -version = "0.5.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" +checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -825,9 +786,9 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "syn" -version = "2.0.39" +version = "2.0.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" +checksum = "002a1b3dbf967edfafc32655d0f377ab0bb7b994aa1d32c8cc7e9b8bf3ebb8f0" dependencies = [ "proc-macro2", "quote", @@ -836,18 +797,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.50" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" +checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.50" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" +checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" dependencies = [ "proc-macro2", "quote", @@ -871,9 +832,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.34.0" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", @@ -883,9 +844,9 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.5", + "socket2", "tokio-macros", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -901,9 +862,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" dependencies = [ "futures-core", "pin-project-lite", @@ -919,7 +880,19 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.20.1", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.21.0", ] [[package]] @@ -950,9 +923,21 @@ checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.32" @@ -964,9 +949,9 @@ dependencies = [ [[package]] name = "try-lock" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "tungstenite" @@ -977,10 +962,29 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http", + "http 0.2.12", "httparse", "log", - "rand 0.8.5", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand", "sha1", "thiserror", "url", @@ -1004,9 +1008,9 @@ dependencies = [ [[package]] name = "unicode-bidi" -version = "0.3.13" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" +checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" [[package]] name = "unicode-ident" @@ -1016,18 +1020,18 @@ checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] name = "unicode-normalization" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" +checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" dependencies = [ "tinyvec", ] [[package]] name = "url" -version = "2.4.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "143b538f18257fac9cad154828a57c6bf5157e1aa604d4816b5995bf6de87ae5" +checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" dependencies = [ "form_urlencoded", "idna", @@ -1065,7 +1069,7 @@ dependencies = [ "futures-channel", "futures-util", "headers", - "http", + "http 0.2.12", "hyper", "log", "mime", @@ -1080,18 +1084,12 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-stream", - "tokio-tungstenite", + "tokio-tungstenite 0.20.1", "tokio-util", "tower-service", "tracing", ] -[[package]] -name = "wasi" -version = "0.9.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -1100,9 +1098,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.88" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7daec296f25a1bae309c0cd5c29c4b260e510e6d813c286b19eaadf409d40fce" +checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -1110,9 +1108,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.88" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e397f4664c0e4e428e8313a469aaa58310d302159845980fd23b0f22a847f217" +checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" dependencies = [ "bumpalo", "log", @@ -1125,9 +1123,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.88" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5961017b3b08ad5f3fe39f1e79877f8ee7c23c5e5fd5eb80de95abc41f1f16b2" +checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1135,9 +1133,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.88" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5353b8dab669f5e10f5bd76df26a9360c748f054f862ff5f3f8aae0c7fb3907" +checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", @@ -1148,39 +1146,26 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.88" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d046c5d029ba91a1ed14da14dca44b68bf2f124cfbaf741c54151fdb3e0750b" +checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" [[package]] -name = "winapi" -version = "0.3.9" +name = "windows-sys" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "winapi-i686-pc-windows-gnu", - "winapi-x86_64-pc-windows-gnu", + "windows-targets 0.48.5", ] -[[package]] -name = "winapi-i686-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" - -[[package]] -name = "winapi-x86_64-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - [[package]] name = "windows-sys" -version = "0.48.0" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets", + "windows-targets 0.52.4", ] [[package]] @@ -1189,13 +1174,28 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b" +dependencies = [ + "windows_aarch64_gnullvm 0.52.4", + "windows_aarch64_msvc 0.52.4", + "windows_i686_gnu 0.52.4", + "windows_i686_msvc 0.52.4", + "windows_x86_64_gnu 0.52.4", + "windows_x86_64_gnullvm 0.52.4", + "windows_x86_64_msvc 0.52.4", ] [[package]] @@ -1204,36 +1204,72 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675" + [[package]] name = "windows_i686_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3" + [[package]] name = "windows_i686_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -1241,25 +1277,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] -name = "y-sync" -version = "0.4.0" +name = "windows_x86_64_msvc" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52e3675a497cde881a71e7e5c2ae1d087dfc7733ddece9b24a9a61408e969d3b" -dependencies = [ - "futures-util", - "thiserror", - "tokio", - "yrs", -] +checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" [[package]] name = "yrs" -version = "0.17.1" +version = "0.18.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86f2fbea97ed32722d4f09fcf1aace9daf36e8b2e72b1f605d5a1bee575fa0da" +checksum = "a4058d69bbbc97181d53d9d093a4b892001b84601f2fc4e27f48c8862bc8b369" dependencies = [ + "arc-swap", "atomic_refcell", - "rand 0.7.3", + "fastrand", "serde", "serde_json", "smallstr", @@ -1269,15 +1300,16 @@ dependencies = [ [[package]] name = "yrs-warp" -version = "0.6.1" +version = "0.7.0" dependencies = [ + "bytes", "futures-util", - "log", "serde", "serde_json", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.21.0", + "tokio-util", + "tracing", "warp", - "y-sync", "yrs", ] diff --git a/Cargo.toml b/Cargo.toml index 409907b..16f549b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,15 +13,16 @@ readme = "./README.md" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -yrs = "0.17" -y-sync = { version = "0.4", features = ["net"] } +yrs = "0.18.2" warp = "0.3" futures-util = { version = "0.3", features = ["sink"] } -tokio = { version = "1.34", features = ["rt","net","sync","macros"] } +tokio = { version = "1.34", features = ["rt", "net", "sync", "macros"] } serde = { version = "1.0", features = ["derive", "rc"] } serde_json = "1.0" -log = "0.4" +tracing = { version = "0.1", features = ["log"] } +tokio-util = { version = "0.7.10", features = ["codec"] } [dev-dependencies] -tokio-tungstenite = "0.20" -tokio = { version = "1", features = ["full"] } \ No newline at end of file +tokio-tungstenite = "0.21" +tokio = { version = "1", features = ["full"] } +bytes = "1.6" \ No newline at end of file diff --git a/examples/code-mirror/main.rs b/examples/code-mirror/main.rs index 84cd2ee..877619a 100644 --- a/examples/code-mirror/main.rs +++ b/examples/code-mirror/main.rs @@ -3,9 +3,9 @@ use std::sync::Arc; use tokio::sync::{Mutex, RwLock}; use warp::ws::{WebSocket, Ws}; use warp::{Filter, Rejection, Reply}; -use y_sync::awareness::Awareness; -use y_sync::net::BroadcastGroup; +use yrs::sync::Awareness; use yrs::{Doc, Text, Transact}; +use yrs_warp::broadcast::BroadcastGroup; use yrs_warp::ws::{WarpSink, WarpStream}; use yrs_warp::AwarenessRef; diff --git a/src/broadcast.rs b/src/broadcast.rs new file mode 100644 index 0000000..54d60ff --- /dev/null +++ b/src/broadcast.rs @@ -0,0 +1,376 @@ +#![allow(dead_code)] +use crate::AwarenessRef; +use futures_util::{SinkExt, StreamExt}; +use std::sync::Arc; +use tokio::select; +use tokio::sync::broadcast::error::SendError; +use tokio::sync::broadcast::{channel, Receiver, Sender}; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; +use yrs::encoding::write::Write; +use yrs::sync::protocol::{MSG_SYNC, MSG_SYNC_UPDATE}; +use yrs::sync::{DefaultProtocol, Error, Message, Protocol, SyncMessage}; +use yrs::updates::decoder::Decode; +use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; +use yrs::Update; + +/// A broadcast group can be used to propagate updates produced by yrs [yrs::Doc] and [Awareness] +/// structures in a binary form that conforms to a y-sync protocol. +/// +/// New receivers can subscribe to a broadcasting group via [BroadcastGroup::subscribe] method. +pub struct BroadcastGroup { + awareness_sub: yrs::Subscription, + doc_sub: yrs::Subscription, + awareness_ref: AwarenessRef, + sender: Sender>, + receiver: Receiver>, + awareness_updater: JoinHandle<()>, +} + +unsafe impl Send for BroadcastGroup {} +unsafe impl Sync for BroadcastGroup {} + +impl BroadcastGroup { + /// Creates a new [BroadcastGroup] over a provided `awareness` instance. All changes triggered + /// by this awareness structure or its underlying document will be propagated to all subscribers + /// which have been registered via [BroadcastGroup::subscribe] method. + /// + /// The overflow of the incoming events that needs to be propagates will be buffered up to a + /// provided `buffer_capacity` size. + pub async fn new(awareness: AwarenessRef, buffer_capacity: usize) -> Self { + let (sender, receiver) = channel(buffer_capacity); + let awareness_c = Arc::downgrade(&awareness); + let mut lock = awareness.write().await; + let sink = sender.clone(); + let doc_sub = { + lock.doc_mut() + .observe_update_v1(move |_txn, u| { + // we manually construct msg here to avoid update data copying + let mut encoder = EncoderV1::new(); + encoder.write_var(MSG_SYNC); + encoder.write_var(MSG_SYNC_UPDATE); + encoder.write_buf(&u.update); + let msg = encoder.to_vec(); + if let Err(_e) = sink.send(msg) { + // current broadcast group is being closed + } + }) + .unwrap() + }; + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let sink = sender.clone(); + let awareness_sub = lock.on_update(move |e| { + let added = e.added(); + let updated = e.updated(); + let removed = e.removed(); + let mut changed = Vec::with_capacity(added.len() + updated.len() + removed.len()); + changed.extend_from_slice(added); + changed.extend_from_slice(updated); + changed.extend_from_slice(removed); + + if let Err(_) = tx.send(changed) { + tracing::warn!("failed to send awareness update"); + } + }); + drop(lock); + let awareness_updater = tokio::task::spawn(async move { + while let Some(changed_clients) = rx.recv().await { + if let Some(awareness) = awareness_c.upgrade() { + let awareness = awareness.read().await; + match awareness.update_with_clients(changed_clients) { + Ok(update) => { + if let Err(_) = sink.send(Message::Awareness(update).encode_v1()) { + tracing::warn!("couldn't broadcast awareness update"); + } + } + Err(e) => { + tracing::warn!("error while computing awareness update: {}", e) + } + } + } else { + return; + } + } + }); + BroadcastGroup { + awareness_ref: awareness, + awareness_updater, + sender, + receiver, + awareness_sub, + doc_sub, + } + } + + /// Returns a reference to an underlying [Awareness] instance. + pub fn awareness(&self) -> &AwarenessRef { + &self.awareness_ref + } + + /// Broadcasts user message to all active subscribers. Returns error if message could not have + /// been broadcasted. + pub fn broadcast(&self, msg: Vec) -> Result<(), SendError>> { + self.sender.send(msg)?; + Ok(()) + } + + /// Subscribes a new connection - represented by `sink`/`stream` pair implementing a futures + /// Sink and Stream protocols - to a current broadcast group. + /// + /// Returns a subscription structure, which can be dropped in order to unsubscribe or awaited + /// via [Subscription::completed] method in order to complete of its own volition (due to + /// an internal connection error or closed connection). + pub fn subscribe(&self, sink: Arc>, stream: Stream) -> Subscription + where + Sink: SinkExt> + Send + Sync + Unpin + 'static, + Stream: StreamExt, E>> + Send + Sync + Unpin + 'static, + >>::Error: std::error::Error + Send + Sync, + E: std::error::Error + Send + Sync + 'static, + { + self.subscribe_with(sink, stream, DefaultProtocol) + } + + /// Subscribes a new connection - represented by `sink`/`stream` pair implementing a futures + /// Sink and Stream protocols - to a current broadcast group. + /// + /// Returns a subscription structure, which can be dropped in order to unsubscribe or awaited + /// via [Subscription::completed] method in order to complete of its own volition (due to + /// an internal connection error or closed connection). + /// + /// Unlike [BroadcastGroup::subscribe], this method can take [Protocol] parameter that allows to + /// customize the y-sync protocol behavior. + pub fn subscribe_with( + &self, + sink: Arc>, + mut stream: Stream, + protocol: P, + ) -> Subscription + where + Sink: SinkExt> + Send + Sync + Unpin + 'static, + Stream: StreamExt, E>> + Send + Sync + Unpin + 'static, + >>::Error: std::error::Error + Send + Sync, + E: std::error::Error + Send + Sync + 'static, + P: Protocol + Send + Sync + 'static, + { + let sink_task = { + let sink = sink.clone(); + let mut receiver = self.sender.subscribe(); + tokio::spawn(async move { + while let Ok(msg) = receiver.recv().await { + let mut sink = sink.lock().await; + if let Err(e) = sink.send(msg).await { + println!("broadcast failed to sent sync message"); + return Err(Error::Other(Box::new(e))); + } + } + Ok(()) + }) + }; + let stream_task = { + let awareness = self.awareness().clone(); + tokio::spawn(async move { + while let Some(res) = stream.next().await { + let msg = Message::decode_v1(&res.map_err(|e| Error::Other(Box::new(e)))?)?; + let reply = Self::handle_msg(&protocol, &awareness, msg).await?; + match reply { + None => {} + Some(reply) => { + let mut sink = sink.lock().await; + sink.send(reply.encode_v1()) + .await + .map_err(|e| Error::Other(Box::new(e)))?; + } + } + } + Ok(()) + }) + }; + + Subscription { + sink_task, + stream_task, + } + } + + async fn handle_msg( + protocol: &P, + awareness: &AwarenessRef, + msg: Message, + ) -> Result, Error> { + match msg { + Message::Sync(msg) => match msg { + SyncMessage::SyncStep1(state_vector) => { + let awareness = awareness.read().await; + protocol.handle_sync_step1(&*awareness, state_vector) + } + SyncMessage::SyncStep2(update) => { + let mut awareness = awareness.write().await; + let update = Update::decode_v1(&update)?; + protocol.handle_sync_step2(&mut *awareness, update) + } + SyncMessage::Update(update) => { + let mut awareness = awareness.write().await; + let update = Update::decode_v1(&update)?; + protocol.handle_sync_step2(&mut *awareness, update) + } + }, + Message::Auth(deny_reason) => { + let awareness = awareness.read().await; + protocol.handle_auth(&*awareness, deny_reason) + } + Message::AwarenessQuery => { + let awareness = awareness.read().await; + protocol.handle_awareness_query(&*awareness) + } + Message::Awareness(update) => { + let mut awareness = awareness.write().await; + protocol.handle_awareness_update(&mut *awareness, update) + } + Message::Custom(tag, data) => { + let mut awareness = awareness.write().await; + protocol.missing_handle(&mut *awareness, tag, data) + } + } + } +} + +impl Drop for BroadcastGroup { + fn drop(&mut self) { + self.awareness_updater.abort(); + } +} + +/// A subscription structure returned from [BroadcastGroup::subscribe], which represents a +/// subscribed connection. It can be dropped in order to unsubscribe or awaited via +/// [Subscription::completed] method in order to complete of its own volition (due to an internal +/// connection error or closed connection). +#[derive(Debug)] +pub struct Subscription { + sink_task: JoinHandle>, + stream_task: JoinHandle>, +} + +impl Subscription { + /// Consumes current subscription, waiting for it to complete. If an underlying connection was + /// closed because of failure, an error which caused it to happen will be returned. + /// + /// This method doesn't invoke close procedure. If you need that, drop current subscription instead. + pub async fn completed(self) -> Result<(), Error> { + let res = select! { + r1 = self.sink_task => r1, + r2 = self.stream_task => r2, + }; + res.map_err(|e| Error::Other(e.into()))? + } +} + +#[cfg(test)] +mod test { + use crate::broadcast::BroadcastGroup; + use futures_util::{ready, SinkExt, StreamExt}; + use std::collections::HashMap; + use std::pin::Pin; + use std::sync::Arc; + use std::task::{Context, Poll}; + use tokio::sync::{Mutex, RwLock}; + use tokio_util::sync::PollSender; + use yrs::sync::awareness::AwarenessUpdateEntry; + use yrs::sync::{Awareness, AwarenessUpdate, Error, Message, SyncMessage}; + use yrs::updates::decoder::Decode; + use yrs::updates::encoder::Encode; + use yrs::{Doc, StateVector, Text, Transact}; + + #[derive(Debug)] + pub struct ReceiverStream { + inner: tokio::sync::mpsc::Receiver, + } + + impl ReceiverStream { + /// Create a new `ReceiverStream`. + pub fn new(recv: tokio::sync::mpsc::Receiver) -> Self { + Self { inner: recv } + } + } + + impl futures_util::Stream for ReceiverStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match ready!(self.inner.poll_recv(cx)) { + None => Poll::Ready(None), + Some(v) => Poll::Ready(Some(Ok(v))), + } + } + } + + fn test_channel(capacity: usize) -> (PollSender>, ReceiverStream>) { + let (s, r) = tokio::sync::mpsc::channel::>(capacity); + let s = PollSender::new(s); + let r = ReceiverStream::new(r); + (s, r) + } + + #[tokio::test] + async fn broadcast_changes() -> Result<(), Box> { + let doc = Doc::with_client_id(1); + let text = doc.get_or_insert_text("test"); + let awareness = Arc::new(RwLock::new(Awareness::new(doc))); + let group = BroadcastGroup::new(awareness.clone(), 1).await; + + let (server_sender, mut client_receiver) = test_channel(1); + let (mut client_sender, server_receiver) = test_channel(1); + let _sub1 = group.subscribe(Arc::new(Mutex::new(server_sender)), server_receiver); + + // check update propagation + { + let a = awareness.write().await; + text.push(&mut a.doc().transact_mut(), "a"); + } + let msg = client_receiver.next().await; + let msg = msg.map(|x| Message::decode_v1(&x.unwrap()).unwrap()); + assert_eq!( + msg, + Some(Message::Sync(SyncMessage::Update(vec![ + 1, 1, 1, 0, 4, 1, 4, 116, 101, 115, 116, 1, 97, 0, + ]))) + ); + + // check awareness update propagation + { + let mut a = awareness.write().await; + a.set_local_state(r#"{"key":"value"}"#) + } + + let msg = client_receiver.next().await; + let msg = msg.map(|x| Message::decode_v1(&x.unwrap()).unwrap()); + assert_eq!( + msg, + Some(Message::Awareness(AwarenessUpdate { + clients: HashMap::from([( + 1, + AwarenessUpdateEntry { + clock: 1, + json: r#"{"key":"value"}"#.to_string(), + }, + )]), + })) + ); + + // check sync state request/response + { + client_sender + .send(Message::Sync(SyncMessage::SyncStep1(StateVector::default())).encode_v1()) + .await?; + let msg = client_receiver.next().await; + let msg = msg.map(|x| Message::decode_v1(&x.unwrap()).unwrap()); + assert_eq!( + msg, + Some(Message::Sync(SyncMessage::SyncStep2(vec![ + 1, 1, 1, 0, 4, 1, 4, 116, 101, 115, 116, 1, 97, 0, + ]))) + ); + } + + Ok(()) + } +} diff --git a/src/conn.rs b/src/conn.rs new file mode 100644 index 0000000..e6f38cb --- /dev/null +++ b/src/conn.rs @@ -0,0 +1,530 @@ +#![allow(dead_code)] +use futures_util::sink::SinkExt; +use futures_util::StreamExt; +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; +use std::sync::{Arc, Weak}; +use std::task::{Context, Poll}; +use tokio::spawn; +use tokio::sync::{Mutex, RwLock}; +use tokio::task::JoinHandle; +use yrs::encoding::read::Cursor; +use yrs::sync::Awareness; +use yrs::sync::{DefaultProtocol, Error, Message, MessageReader, Protocol, SyncMessage}; +use yrs::updates::decoder::{Decode, DecoderV1}; +use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; +use yrs::Update; + +/// Connection handler over a pair of message streams, which implements a Yjs/Yrs awareness and +/// update exchange protocol. +/// +/// This connection implements Future pattern and can be awaited upon in order for a caller to +/// recognize whether underlying websocket connection has been finished gracefully or abruptly. +#[derive(Debug)] +pub struct Connection { + processing_loop: JoinHandle>, + awareness: Arc>, + inbox: Arc>, + _stream: PhantomData, +} + +impl Connection +where + Sink: SinkExt, Error = E> + Send + Sync + Unpin + 'static, + E: Into + Send + Sync, +{ + pub async fn send(&self, msg: Vec) -> Result<(), Error> { + let mut inbox = self.inbox.lock().await; + match inbox.send(msg).await { + Ok(_) => Ok(()), + Err(err) => Err(err.into()), + } + } + + pub async fn close(self) -> Result<(), E> { + let mut inbox = self.inbox.lock().await; + inbox.close().await + } + + pub fn sink(&self) -> Weak> { + Arc::downgrade(&self.inbox) + } +} + +impl Connection +where + Stream: StreamExt, E>> + Send + Sync + Unpin + 'static, + Sink: SinkExt, Error = E> + Send + Sync + Unpin + 'static, + E: Into + Send + Sync, +{ + /// Wraps incoming [WebSocket] connection and supplied [Awareness] accessor into a new + /// connection handler capable of exchanging Yrs/Yjs messages. + /// + /// While creation of new [WarpConn] always succeeds, a connection itself can possibly fail + /// while processing incoming input/output. This can be detected by awaiting for returned + /// [WarpConn] and handling the awaited result. + pub fn new(awareness: Arc>, sink: Sink, stream: Stream) -> Self { + Self::with_protocol(awareness, sink, stream, DefaultProtocol) + } + + /// Returns an underlying [Awareness] structure, that contains client state of that connection. + pub fn awareness(&self) -> &Arc> { + &self.awareness + } + + /// Wraps incoming [WebSocket] connection and supplied [Awareness] accessor into a new + /// connection handler capable of exchanging Yrs/Yjs messages. + /// + /// While creation of new [WarpConn] always succeeds, a connection itself can possibly fail + /// while processing incoming input/output. This can be detected by awaiting for returned + /// [WarpConn] and handling the awaited result. + pub fn with_protocol

( + awareness: Arc>, + sink: Sink, + mut stream: Stream, + protocol: P, + ) -> Self + where + P: Protocol + Send + Sync + 'static, + { + let sink = Arc::new(Mutex::new(sink)); + let inbox = sink.clone(); + let loop_sink = Arc::downgrade(&sink); + let loop_awareness = Arc::downgrade(&awareness); + let processing_loop: JoinHandle> = spawn(async move { + // at the beginning send SyncStep1 and AwarenessUpdate + let payload = { + let awareness = loop_awareness.upgrade().unwrap(); + let mut encoder = EncoderV1::new(); + let awareness = awareness.read().await; + protocol.start(&awareness, &mut encoder)?; + encoder.to_vec() + }; + if !payload.is_empty() { + if let Some(sink) = loop_sink.upgrade() { + let mut s = sink.lock().await; + if let Err(e) = s.send(payload).await { + return Err(e.into()); + } + } else { + return Ok(()); // parent ConnHandler has been dropped + } + } + + while let Some(input) = stream.next().await { + match input { + Ok(data) => { + if let Some(mut sink) = loop_sink.upgrade() { + if let Some(awareness) = loop_awareness.upgrade() { + match Self::process(&protocol, &awareness, &mut sink, data).await { + Ok(()) => { /* continue */ } + Err(e) => { + return Err(e); + } + } + } else { + return Ok(()); // parent ConnHandler has been dropped + } + } else { + return Ok(()); // parent ConnHandler has been dropped + } + } + Err(e) => return Err(e.into()), + } + } + + Ok(()) + }); + Connection { + processing_loop, + awareness, + inbox, + _stream: PhantomData::default(), + } + } + + async fn process( + protocol: &P, + awareness: &Arc>, + sink: &mut Arc>, + input: Vec, + ) -> Result<(), Error> { + let mut decoder = DecoderV1::new(Cursor::new(&input)); + let reader = MessageReader::new(&mut decoder); + for r in reader { + let msg = r?; + if let Some(reply) = handle_msg(protocol, &awareness, msg).await? { + let mut sender = sink.lock().await; + if let Err(e) = sender.send(reply.encode_v1()).await { + println!("connection failed to send back the reply"); + return Err(e.into()); + } else { + println!("connection send back the reply"); + } + } + } + Ok(()) + } +} + +impl Unpin for Connection {} + +impl Future for Connection { + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.processing_loop).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(Error::Other(e.into()))), + Poll::Ready(Ok(r)) => Poll::Ready(r), + } + } +} + +pub async fn handle_msg( + protocol: &P, + a: &Arc>, + msg: Message, +) -> Result, Error> { + match msg { + Message::Sync(msg) => match msg { + SyncMessage::SyncStep1(sv) => { + let awareness = a.read().await; + protocol.handle_sync_step1(&awareness, sv) + } + SyncMessage::SyncStep2(update) => { + let mut awareness = a.write().await; + protocol.handle_sync_step2(&mut awareness, Update::decode_v1(&update)?) + } + SyncMessage::Update(update) => { + let mut awareness = a.write().await; + protocol.handle_update(&mut awareness, Update::decode_v1(&update)?) + } + }, + Message::Auth(reason) => { + let awareness = a.read().await; + protocol.handle_auth(&awareness, reason) + } + Message::AwarenessQuery => { + let awareness = a.read().await; + protocol.handle_awareness_query(&awareness) + } + Message::Awareness(update) => { + let mut awareness = a.write().await; + protocol.handle_awareness_update(&mut awareness, update) + } + Message::Custom(tag, data) => { + let mut awareness = a.write().await; + protocol.missing_handle(&mut awareness, tag, data) + } + } +} + +#[cfg(test)] +mod test { + use crate::broadcast::BroadcastGroup; + use crate::conn::Connection; + use bytes::{Bytes, BytesMut}; + use futures_util::SinkExt; + use std::net::SocketAddr; + use std::str::FromStr; + use std::sync::Arc; + use std::time::Duration; + use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; + use tokio::net::{TcpListener, TcpSocket}; + use tokio::sync::{Mutex, Notify, RwLock}; + use tokio::task; + use tokio::task::JoinHandle; + use tokio::time::{sleep, timeout}; + use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite, LengthDelimitedCodec}; + use yrs::sync::{Awareness, Error, Message, SyncMessage}; + use yrs::updates::encoder::Encode; + use yrs::{Doc, GetString, Subscription, Text, Transact}; + + #[derive(Debug, Default)] + struct YrsCodec(LengthDelimitedCodec); + + impl Encoder> for YrsCodec { + type Error = Error; + + fn encode(&mut self, item: Vec, dst: &mut BytesMut) -> Result<(), Self::Error> { + self.0.encode(Bytes::from(item), dst)?; + Ok(()) + } + } + + impl Decoder for YrsCodec { + type Item = Vec; + type Error = Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + if let Some(bytes) = self.0.decode(src)? { + Ok(Some(bytes.freeze().to_vec())) + } else { + Ok(None) + } + } + } + + type WrappedStream = FramedRead; + type WrappedSink = FramedWrite; + + async fn start_server( + addr: SocketAddr, + bcast: BroadcastGroup, + ) -> Result, Box> { + let server = TcpListener::bind(addr).await?; + Ok(tokio::spawn(async move { + let mut subscribers = Vec::new(); + while let Ok((stream, _)) = server.accept().await { + let (reader, writer) = stream.into_split(); + let stream = WrappedStream::new(reader, YrsCodec::default()); + let sink = WrappedSink::new(writer, YrsCodec::default()); + let sub = bcast.subscribe(Arc::new(Mutex::new(sink)), stream); + subscribers.push(sub); + } + })) + } + + async fn client( + addr: SocketAddr, + doc: Doc, + ) -> Result, Box> { + let stream = TcpSocket::new_v4()?.connect(addr).await?; + let (reader, writer) = stream.into_split(); + let stream: WrappedStream = WrappedStream::new(reader, YrsCodec::default()); + let sink: WrappedSink = WrappedSink::new(writer, YrsCodec::default()); + Ok(Connection::new( + Arc::new(RwLock::new(Awareness::new(doc))), + sink, + stream, + )) + } + + fn create_notifier(doc: &Doc) -> (Arc, Subscription) { + let n = Arc::new(Notify::new()); + let sub = { + let n = n.clone(); + doc.observe_update_v1(move |_, _| n.notify_waiters()) + .unwrap() + }; + (n, sub) + } + + const TIMEOUT: Duration = Duration::from_secs(5); + + #[tokio::test] + async fn change_introduced_by_server_reaches_subscribed_clients( + ) -> Result<(), Box> { + let server_addr = SocketAddr::from_str("127.0.0.1:6600").unwrap(); + let doc = Doc::with_client_id(1); + let text = doc.get_or_insert_text("test"); + let awareness = Arc::new(RwLock::new(Awareness::new(doc))); + let bcast = BroadcastGroup::new(awareness.clone(), 10).await; + let _server = start_server(server_addr.clone(), bcast).await?; + + let doc = Doc::new(); + let (n, _sub) = create_notifier(&doc); + let c1 = client(server_addr.clone(), doc).await?; + + { + let lock = awareness.write().await; + text.push(&mut lock.doc().transact_mut(), "abc"); + } + + timeout(TIMEOUT, n.notified()).await?; + + { + let awareness = c1.awareness().read().await; + let doc = awareness.doc(); + let text = doc.get_or_insert_text("test"); + let str = text.get_string(&doc.transact()); + assert_eq!(str, "abc".to_string()); + } + + Ok(()) + } + + #[tokio::test] + async fn subscribed_client_fetches_initial_state() -> Result<(), Box> { + let server_addr = SocketAddr::from_str("127.0.0.1:6601").unwrap(); + let doc = Doc::with_client_id(1); + let text = doc.get_or_insert_text("test"); + + text.push(&mut doc.transact_mut(), "abc"); + + let awareness = Arc::new(RwLock::new(Awareness::new(doc))); + let bcast = BroadcastGroup::new(awareness.clone(), 10).await; + let _server = start_server(server_addr.clone(), bcast).await?; + + let doc = Doc::new(); + let (n, _sub) = create_notifier(&doc); + let c1 = client(server_addr.clone(), doc).await?; + + timeout(TIMEOUT, n.notified()).await?; + + { + let awareness = c1.awareness().read().await; + let doc = awareness.doc(); + let text = doc.get_or_insert_text("test"); + let str = text.get_string(&doc.transact()); + assert_eq!(str, "abc".to_string()); + } + + Ok(()) + } + + #[tokio::test] + async fn changes_from_one_client_reach_others() -> Result<(), Box> { + let server_addr = SocketAddr::from_str("127.0.0.1:6602").unwrap(); + let doc = Doc::with_client_id(1); + let _text = doc.get_or_insert_text("test"); + + let awareness = Arc::new(RwLock::new(Awareness::new(doc))); + let bcast = BroadcastGroup::new(awareness.clone(), 10).await; + let _server = start_server(server_addr.clone(), bcast).await?; + + let d1 = Doc::with_client_id(2); + let c1 = client(server_addr.clone(), d1).await?; + // by default changes made by document on the client side are not propagated automatically + let _sub11 = { + let sink = c1.sink(); + let a = c1.awareness().write().await; + let doc = a.doc(); + doc.observe_update_v1(move |_, e| { + let update = e.update.to_owned(); + if let Some(sink) = sink.upgrade() { + task::spawn(async move { + let msg = Message::Sync(SyncMessage::Update(update)).encode_v1(); + let mut sink = sink.lock().await; + sink.send(msg).await.unwrap(); + }); + } + }) + .unwrap() + }; + + let d2 = Doc::with_client_id(3); + let (n2, _sub2) = create_notifier(&d2); + let c2 = client(server_addr.clone(), d2).await?; + + { + let a = c1.awareness().write().await; + let doc = a.doc(); + let text = doc.get_or_insert_text("test"); + text.push(&mut doc.transact_mut(), "def"); + } + + timeout(TIMEOUT, n2.notified()).await?; + + { + let awareness = c2.awareness.read().await; + let doc = awareness.doc(); + let text = doc.get_or_insert_text("test"); + let str = text.get_string(&doc.transact()); + assert_eq!(str, "def".to_string()); + } + + Ok(()) + } + + #[tokio::test] + async fn client_failure_doesnt_affect_others() -> Result<(), Box> { + let server_addr = SocketAddr::from_str("127.0.0.1:6604").unwrap(); + let doc = Doc::with_client_id(1); + let _ = doc.get_or_insert_text("test"); + + let awareness = Arc::new(RwLock::new(Awareness::new(doc))); + let bcast = BroadcastGroup::new(awareness.clone(), 10).await; + let _server = start_server(server_addr.clone(), bcast).await?; + + let d1 = Doc::with_client_id(2); + let c1 = client(server_addr.clone(), d1).await?; + // by default changes made by document on the client side are not propagated automatically + let _sub11 = { + let sink = c1.sink(); + let a = c1.awareness().write().await; + let doc = a.doc(); + doc.observe_update_v1(move |_, e| { + let update = e.update.to_owned(); + if let Some(sink) = sink.upgrade() { + task::spawn(async move { + let msg = Message::Sync(SyncMessage::Update(update)).encode_v1(); + let mut sink = sink.lock().await; + sink.send(msg).await.unwrap(); + }); + } + }) + .unwrap() + }; + + let d2 = Doc::with_client_id(3); + let (n2, sub2) = create_notifier(&d2); + let c2 = client(server_addr.clone(), d2).await?; + + let d3 = Doc::with_client_id(4); + let (n3, sub3) = create_notifier(&d3); + let c3 = client(server_addr.clone(), d3).await?; + + { + let a = c1.awareness().write().await; + let doc = a.doc(); + let text = doc.get_or_insert_text("test"); + text.push(&mut doc.transact_mut(), "abc"); + } + + // on the first try both C2 and C3 should receive the update + //timeout(TIMEOUT, n2.notified()).await.unwrap(); + //timeout(TIMEOUT, n3.notified()).await.unwrap(); + sleep(TIMEOUT).await; + + { + let awareness = c2.awareness.read().await; + let doc = awareness.doc(); + let text = doc.get_or_insert_text("test"); + let str = text.get_string(&doc.transact()); + assert_eq!(str, "abc".to_string()); + } + { + let awareness = c3.awareness.read().await; + let doc = awareness.doc(); + let text = doc.get_or_insert_text("test"); + let str = text.get_string(&doc.transact()); + assert_eq!(str, "abc".to_string()); + } + + // drop client, causing abrupt ending + drop(c3); + drop(n3); + drop(sub3); + // C2 notification subscription has been realized, we need to refresh it + drop(n2); + drop(sub2); + + let (n2, _sub2) = { + let a = c2.awareness().write().await; + let doc = a.doc(); + create_notifier(doc) + }; + + { + let a = c1.awareness().write().await; + let doc = a.doc(); + let text = doc.get_or_insert_text("test"); + text.push(&mut doc.transact_mut(), "def"); + } + + timeout(TIMEOUT, n2.notified()).await.unwrap(); + + { + let awareness = c2.awareness.read().await; + let doc = awareness.doc(); + let text = doc.get_or_insert_text("test"); + let str = text.get_string(&doc.transact()); + assert_eq!(str, "abcdef".to_string()); + } + + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index ce1727f..ff8cb38 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,9 @@ use std::sync::Arc; use tokio::sync::RwLock; -use y_sync::awareness::Awareness; +pub mod broadcast; +pub mod conn; pub mod signaling; pub mod ws; -pub type BroadcastGroup = y_sync::net::BroadcastGroup; -pub type AwarenessRef = Arc>; +pub type AwarenessRef = Arc>; diff --git a/src/signaling.rs b/src/signaling.rs index f61cc8b..6c44e70 100644 --- a/src/signaling.rs +++ b/src/signaling.rs @@ -58,10 +58,10 @@ impl SignalingService { let topics = self.0.read().await; if let Some(subs) = topics.get(topic) { let client_count = subs.len(); - log::info!("publishing message to {client_count} clients: {msg:?}"); + tracing::info!("publishing message to {client_count} clients: {msg:?}"); for sub in subs { if let Err(e) = sub.try_send(msg.clone()).await { - log::info!("failed to send {msg:?}: {e}"); + tracing::info!("failed to send {msg:?}: {e}"); failed.push(sub.clone()); } } @@ -83,7 +83,7 @@ impl SignalingService { if let Some(subs) = topics.remove(topic) { for sub in subs { if let Err(e) = sub.close().await { - log::warn!("failed to close connection on topic '{topic}': {e}"); + tracing::warn!("failed to close connection on topic '{topic}': {e}"); } } } @@ -101,7 +101,7 @@ impl SignalingService { for conn in all_conns { if let Err(e) = conn.close().await { - log::warn!("failed to close connection: {e}"); + tracing::warn!("failed to close connection: {e}"); } } @@ -217,7 +217,7 @@ async fn process_msg( if !topic_names.is_empty() { let mut topics = topics.write().await; for topic in topic_names { - log::trace!("subscribing new client to '{topic}'"); + tracing::trace!("subscribing new client to '{topic}'"); if let Some((key, _)) = topics.get_key_value(topic) { state.subscribed_topics.insert(key.clone()); let subs = topics.get_mut(topic).unwrap(); @@ -239,7 +239,7 @@ async fn process_msg( let mut topics = topics.write().await; for topic in topic_names { if let Some(subs) = topics.get_mut(topic) { - log::trace!("unsubscribing client from '{topic}'"); + tracing::trace!("unsubscribing client from '{topic}'"); subs.remove(ws); } } @@ -251,10 +251,14 @@ async fn process_msg( let topics = topics.read().await; if let Some(receivers) = topics.get(topic) { let client_count = receivers.len(); - log::trace!("publishing on {client_count} clients at '{topic}': {json}"); + tracing::trace!( + "publishing on {client_count} clients at '{topic}': {json}" + ); for receiver in receivers.iter() { if let Err(e) = receiver.try_send(Message::text(json)).await { - log::info!("failed to publish message {json} on '{topic}': {e}"); + tracing::info!( + "failed to publish message {json} on '{topic}': {e}" + ); failed.push(receiver.clone()); } } diff --git a/src/ws.rs b/src/ws.rs index acdb7e7..306c28b 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -1,13 +1,11 @@ +use crate::conn::Connection; +use crate::AwarenessRef; use futures_util::stream::{SplitSink, SplitStream}; use futures_util::{Stream, StreamExt}; use std::pin::Pin; -use std::sync::Arc; use std::task::{Context, Poll}; -use tokio::sync::RwLock; use warp::ws::{Message, WebSocket}; -use y_sync::awareness::Awareness; -use y_sync::net::Connection; -use y_sync::sync::Error; +use yrs::sync::Error; /// Connection Wrapper over a [WebSocket], which implements a Yjs/Yrs awareness and update exchange /// protocol. @@ -19,7 +17,7 @@ use y_sync::sync::Error; pub struct WarpConn(Connection); impl WarpConn { - pub fn new(awareness: Arc>, socket: WebSocket) -> Self { + pub fn new(awareness: AwarenessRef, socket: WebSocket) -> Self { let (sink, stream) = socket.split(); let conn = Connection::new(awareness, WarpSink(sink), WarpStream(stream)); WarpConn(conn) @@ -52,7 +50,7 @@ impl core::future::Future for WarpConn { /// use tokio::task::JoinHandle; /// use warp::{Filter, Rejection, Reply}; /// use warp::ws::{WebSocket, Ws}; -/// use yrs_warp::BroadcastGroup; +/// use yrs_warp::broadcast::BroadcastGroup; /// use yrs_warp::ws::{WarpSink, WarpStream}; /// /// async fn start_server( @@ -154,7 +152,7 @@ impl futures_util::Sink> for WarpSink { /// use tokio::task::JoinHandle; /// use warp::{Filter, Rejection, Reply}; /// use warp::ws::{WebSocket, Ws}; -/// use yrs_warp::BroadcastGroup; +/// use yrs_warp::broadcast::BroadcastGroup; /// use yrs_warp::ws::{WarpSink, WarpStream}; /// /// async fn start_server( @@ -222,6 +220,8 @@ impl Stream for WarpStream { #[cfg(test)] mod test { + use crate::broadcast::BroadcastGroup; + use crate::conn::Connection; use crate::ws::{WarpSink, WarpStream}; use futures_util::stream::{SplitSink, SplitStream}; use futures_util::{ready, SinkExt, Stream, StreamExt}; @@ -240,12 +240,9 @@ mod test { use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; use warp::ws::{WebSocket, Ws}; use warp::{Filter, Rejection, Reply, Sink}; - use y_sync::awareness::Awareness; - use y_sync::net::BroadcastGroup; - use y_sync::net::Connection; - use y_sync::sync::Error; + use yrs::sync::{Awareness, Error}; use yrs::updates::encoder::Encode; - use yrs::{Doc, GetString, Text, Transact, UpdateSubscription}; + use yrs::{Doc, GetString, Subscription, Text, Transact}; async fn start_server( addr: &str, @@ -358,7 +355,7 @@ mod test { )) } - fn create_notifier(doc: &Doc) -> (Arc, UpdateSubscription) { + fn create_notifier(doc: &Doc) -> (Arc, Subscription) { let n = Arc::new(Notify::new()); let sub = { let n = n.clone(); @@ -371,24 +368,23 @@ mod test { const TIMEOUT: Duration = Duration::from_secs(5); #[tokio::test] - async fn change_introduced_by_server_reaches_subscribed_clients( - ) -> Result<(), Box> { + async fn change_introduced_by_server_reaches_subscribed_clients() { let doc = Doc::with_client_id(1); let text = doc.get_or_insert_text("test"); let awareness = Arc::new(RwLock::new(Awareness::new(doc))); let bcast = BroadcastGroup::new(awareness.clone(), 10).await; - let server = start_server("0.0.0.0:6600", Arc::new(bcast)).await?; + let _server = start_server("0.0.0.0:6600", Arc::new(bcast)).await.unwrap(); let doc = Doc::new(); - let (n, sub) = create_notifier(&doc); - let c1 = client("ws://localhost:6600/my-room", doc).await?; + let (n, _sub) = create_notifier(&doc); + let c1 = client("ws://localhost:6600/my-room", doc).await.unwrap(); { let lock = awareness.write().await; text.push(&mut lock.doc().transact_mut(), "abc"); } - timeout(TIMEOUT, n.notified()).await?; + timeout(TIMEOUT, n.notified()).await.unwrap(); { let awareness = c1.awareness().read().await; @@ -397,12 +393,10 @@ mod test { let str = text.get_string(&doc.transact()); assert_eq!(str, "abc".to_string()); } - - Ok(()) } #[tokio::test] - async fn subscribed_client_fetches_initial_state() -> Result<(), Box> { + async fn subscribed_client_fetches_initial_state() { let doc = Doc::with_client_id(1); let text = doc.get_or_insert_text("test"); @@ -410,13 +404,13 @@ mod test { let awareness = Arc::new(RwLock::new(Awareness::new(doc))); let bcast = BroadcastGroup::new(awareness.clone(), 10).await; - let server = start_server("0.0.0.0:6601", Arc::new(bcast)).await?; + let _server = start_server("0.0.0.0:6601", Arc::new(bcast)).await.unwrap(); let doc = Doc::new(); - let (n, sub) = create_notifier(&doc); - let c1 = client("ws://localhost:6601/my-room", doc).await?; + let (n, _sub) = create_notifier(&doc); + let c1 = client("ws://localhost:6601/my-room", doc).await.unwrap(); - timeout(TIMEOUT, n.notified()).await?; + timeout(TIMEOUT, n.notified()).await.unwrap(); { let awareness = c1.awareness().read().await; @@ -425,33 +419,30 @@ mod test { let str = text.get_string(&doc.transact()); assert_eq!(str, "abc".to_string()); } - - Ok(()) } #[tokio::test] - async fn changes_from_one_client_reach_others() -> Result<(), Box> { + async fn changes_from_one_client_reach_others() { let doc = Doc::with_client_id(1); - let text = doc.get_or_insert_text("test"); + let _ = doc.get_or_insert_text("test"); let awareness = Arc::new(RwLock::new(Awareness::new(doc))); let bcast = BroadcastGroup::new(awareness.clone(), 10).await; - let server = start_server("0.0.0.0:6602", Arc::new(bcast)).await?; + let _server = start_server("0.0.0.0:6602", Arc::new(bcast)).await.unwrap(); let d1 = Doc::with_client_id(2); - let c1 = client("ws://localhost:6602/my-room", d1).await?; + let c1 = client("ws://localhost:6602/my-room", d1).await.unwrap(); // by default changes made by document on the client side are not propagated automatically - let sub11 = { + let _sub11 = { let sink = c1.sink(); let a = c1.awareness().write().await; let doc = a.doc(); - doc.observe_update_v1(move |txn, e| { + doc.observe_update_v1(move |_, e| { let update = e.update.to_owned(); if let Some(sink) = sink.upgrade() { task::spawn(async move { - let msg = - y_sync::sync::Message::Sync(y_sync::sync::SyncMessage::Update(update)) - .encode_v1(); + let msg = yrs::sync::Message::Sync(yrs::sync::SyncMessage::Update(update)) + .encode_v1(); let mut sink = sink.lock().await; sink.send(msg).await.unwrap(); }); @@ -461,8 +452,8 @@ mod test { }; let d2 = Doc::with_client_id(3); - let (n2, sub2) = create_notifier(&d2); - let c2 = client("ws://localhost:6602/my-room", d2).await?; + let (n2, _sub2) = create_notifier(&d2); + let c2 = client("ws://localhost:6602/my-room", d2).await.unwrap(); { let a = c1.awareness().write().await; @@ -471,7 +462,7 @@ mod test { text.push(&mut doc.transact_mut(), "def"); } - timeout(TIMEOUT, n2.notified()).await?; + timeout(TIMEOUT, n2.notified()).await.unwrap(); { let awareness = c2.awareness().read().await; @@ -480,33 +471,30 @@ mod test { let str = text.get_string(&doc.transact()); assert_eq!(str, "def".to_string()); } - - Ok(()) } #[tokio::test] - async fn client_failure_doesnt_affect_others() -> Result<(), Box> { + async fn client_failure_doesnt_affect_others() { let doc = Doc::with_client_id(1); - let text = doc.get_or_insert_text("test"); + let _text = doc.get_or_insert_text("test"); let awareness = Arc::new(RwLock::new(Awareness::new(doc))); let bcast = BroadcastGroup::new(awareness.clone(), 10).await; - let server = start_server("0.0.0.0:6603", Arc::new(bcast)).await?; + let _server = start_server("0.0.0.0:6603", Arc::new(bcast)).await.unwrap(); let d1 = Doc::with_client_id(2); - let c1 = client("ws://localhost:6603/my-room", d1).await?; + let c1 = client("ws://localhost:6603/my-room", d1).await.unwrap(); // by default changes made by document on the client side are not propagated automatically - let sub11 = { + let _sub11 = { let sink = c1.sink(); let a = c1.awareness().write().await; let doc = a.doc(); - doc.observe_update_v1(move |txn, e| { + doc.observe_update_v1(move |_, e| { let update = e.update.to_owned(); if let Some(sink) = sink.upgrade() { task::spawn(async move { - let msg = - y_sync::sync::Message::Sync(y_sync::sync::SyncMessage::Update(update)) - .encode_v1(); + let msg = yrs::sync::Message::Sync(yrs::sync::SyncMessage::Update(update)) + .encode_v1(); let mut sink = sink.lock().await; sink.send(msg).await.unwrap(); }); @@ -517,11 +505,11 @@ mod test { let d2 = Doc::with_client_id(3); let (n2, sub2) = create_notifier(&d2); - let c2 = client("ws://localhost:6603/my-room", d2).await?; + let c2 = client("ws://localhost:6603/my-room", d2).await.unwrap(); let d3 = Doc::with_client_id(4); let (n3, sub3) = create_notifier(&d3); - let c3 = client("ws://localhost:6603/my-room", d3).await?; + let c3 = client("ws://localhost:6603/my-room", d3).await.unwrap(); { let a = c1.awareness().write().await; @@ -558,7 +546,7 @@ mod test { drop(n2); drop(sub2); - let (n2, sub2) = { + let (n2, _sub2) = { let a = c2.awareness().write().await; let doc = a.doc(); create_notifier(doc) @@ -580,7 +568,5 @@ mod test { let str = text.get_string(&doc.transact()); assert_eq!(str, "abcdef".to_string()); } - - Ok(()) } }