From 7f4da55d271b8007b255b326c071a90837c049de Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 13 Oct 2023 16:01:30 +0530 Subject: [PATCH] add live tail support for stream (#516) This PR adds support for live tailing a stream. With this PR, server exposes a gRPC server on a configurable port (default 8001) and uses Arrow Flight as the communication protocol. As live tail is requested for a stream, Server sends events from that steam, before the local staging and parquet conversion. This means, server will use the event record batch to create a flight stream and send over to the client. fixes #503 --- Cargo.lock | 325 +++++++++++++++++++++++++++---- server/Cargo.toml | 10 + server/src/analytics.rs | 2 +- server/src/event.rs | 2 + server/src/handlers.rs | 6 + server/src/handlers/http/oidc.rs | 11 +- server/src/handlers/livetail.rs | 242 +++++++++++++++++++++++ server/src/livetail.rs | 169 ++++++++++++++++ server/src/main.rs | 6 +- server/src/option.rs | 36 ++++ 10 files changed, 767 insertions(+), 42 deletions(-) create mode 100644 server/src/handlers/livetail.rs create mode 100644 server/src/livetail.rs diff --git a/Cargo.lock b/Cargo.lock index 7909059b4..d8e8d403c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,7 +8,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57a7559404a7f3573127aab53c08ce37a6c6a315c374a31070f3c91cd1b4a7fe" dependencies = [ - "bitflags", + "bitflags 1.3.2", "bytes", "futures-core", "futures-sink", @@ -47,7 +47,7 @@ dependencies = [ "actix-utils", "ahash 0.7.6", "base64 0.21.0", - "bitflags", + "bitflags 1.3.2", "brotli", "bytes", "bytestring", @@ -184,7 +184,7 @@ dependencies = [ "bytes", "bytestring", "cfg-if", - "cookie", + "cookie 0.16.2", "derive_more", "encoding_rs", "futures-core", @@ -489,6 +489,26 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-flight" +version = "42.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6b1efc6f549b313e66deac3d96db34b416ffdd043a0fe3e0d53be1b5785bb3e" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-ipc", + "arrow-schema", + "base64 0.21.0", + "bytes", + "futures", + "paste", + "prost", + "tokio", + "tonic", +] + [[package]] name = "arrow-ipc" version = "42.0.0" @@ -610,13 +630,13 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.64" +version = "0.1.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cd7fce9ba8c3c042128ce72d8b2ddbf3a05747efb67ea0313c635e10bda47a2" +checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 1.0.107", + "syn 2.0.37", ] [[package]] @@ -636,6 +656,51 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa 1.0.5", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.67" @@ -691,6 +756,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitflags" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" + [[package]] name = "blake2" version = "0.10.6" @@ -893,7 +964,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71655c45cb9845d3270c9d6df84ebe72b4dad3c2ba3f7023ad47c144e4e473a5" dependencies = [ "atty", - "bitflags", + "bitflags 1.3.2", "clap_derive 3.2.18", "clap_lex 0.2.4", "indexmap", @@ -909,7 +980,7 @@ version = "4.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f13b9c79b5d1dd500d20ef541215a6423c75829ef43117e1b4d17fd8af0b5d76" dependencies = [ - "bitflags", + "bitflags 1.3.2", "clap_derive 4.1.0", "clap_lex 0.3.1", "is-terminal", @@ -1036,6 +1107,22 @@ dependencies = [ "version_check", ] +[[package]] +name = "cookie" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7efb37c3e1ccb1ff97164ad95ac1606e8ccd35b3fa0a7d99a304c7f4a428cc24" +dependencies = [ + "time 0.3.21", + "version_check", +] + +[[package]] +name = "cookies" +version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ed970f886b2692d34c1976254bfae22336b12733ecb37ef0fad32128ff4bfb" + [[package]] name = "core-foundation" version = "0.9.3" @@ -1128,7 +1215,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77f67c7faacd4db07a939f55d66a983a5355358a1f17d32cc9a8d01d1266b9ce" dependencies = [ - "bitflags", + "bitflags 1.3.2", "crossterm_winapi", "libc", "mio", @@ -1543,7 +1630,7 @@ version = "23.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77f5399c2c9c50ae9418e522842ad362f61ee48b346ac106807bd355a8a7c619" dependencies = [ - "bitflags", + "bitflags 1.3.2", "rustc_version", ] @@ -1664,7 +1751,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.37", ] [[package]] @@ -1830,15 +1917,24 @@ dependencies = [ [[package]] name = "http" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" +checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" dependencies = [ "bytes", "fnv", "itoa 1.0.5", ] +[[package]] +name = "http-auth-basic" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd2e17aacf7f4a2428def798e2ff4f4f883c0987bdaf47dd5c8bc027bc9f1ebc" +dependencies = [ + "base64 0.13.1", +] + [[package]] name = "http-body" version = "0.4.5" @@ -1850,6 +1946,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" + [[package]] name = "httparse" version = "1.8.0" @@ -1880,9 +1982,9 @@ dependencies = [ [[package]] name = "hyper" -version = "0.14.24" +version = "0.14.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e011372fa0b68db8350aa7a248930ecc7839bf46d8485577d69f117a75f164c" +checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" dependencies = [ "bytes", "futures-channel", @@ -1916,6 +2018,18 @@ dependencies = [ "tokio-rustls 0.24.1", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -2262,6 +2376,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" +[[package]] +name = "matchit" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed1202b2a6f884ae56f04cff409ab315c5ce26b5e58d7412e484f01fd52f52ef" + [[package]] name = "md-5" version = "0.10.5" @@ -2297,9 +2417,9 @@ dependencies = [ [[package]] name = "mime" -version = "0.3.16" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "mime_guess" @@ -2372,7 +2492,7 @@ version = "0.24.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa52e972a9a719cecb6864fb88568781eb706bac2cd1d4f04a648542dbf78069" dependencies = [ - "bitflags", + "bitflags 1.3.2", "cfg-if", "libc", ] @@ -2561,7 +2681,7 @@ version = "0.10.55" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "345df152bc43501c5eb9e4654ff05f794effb78d4efe3d53abc158baddc0703d" dependencies = [ - "bitflags", + "bitflags 1.3.2", "cfg-if", "foreign-types", "libc", @@ -2698,6 +2818,7 @@ dependencies = [ "anyhow", "argon2", "arrow-array", + "arrow-flight", "arrow-ipc", "arrow-json", "arrow-schema", @@ -2711,6 +2832,8 @@ dependencies = [ "chrono-humanize", "clap 4.1.4", "clokwerk", + "cookie 0.17.0", + "cookies", "crossterm", "datafusion", "derive_more", @@ -2721,11 +2844,13 @@ dependencies = [ "hex", "hostname", "http", + "http-auth-basic", "humantime", "humantime-serde", "itertools 0.10.5", "log", "maplit", + "mime", "nom", "num_cpus", "object_store", @@ -2752,6 +2877,9 @@ dependencies = [ "thread-priority", "tokio", "tokio-stream", + "tonic", + "tonic-web", + "tower-http", "ulid", "uptime_lib", "ureq", @@ -2944,9 +3072,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.58" +version = "1.0.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa1fb82fc0c281dd9671101b66b771ebbe1eaf967b96ac8740dcba4b70005ca8" +checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328" dependencies = [ "unicode-ident", ] @@ -2957,7 +3085,7 @@ version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1de8dacb0873f77e6aefc6d71e044761fcc68060290f5b1089fcdf84626bb69" dependencies = [ - "bitflags", + "bitflags 1.3.2", "byteorder", "hex", "lazy_static", @@ -2981,6 +3109,29 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prost" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +dependencies = [ + "anyhow", + "itertools 0.10.5", + "proc-macro2", + "quote", + "syn 1.0.107", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -3039,9 +3190,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.27" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f4f29d145265ec1c483c7c654450edde0bfe043d3938d6972630663356d9500" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ "proc-macro2", ] @@ -3082,7 +3233,7 @@ version = "10.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c307f7aacdbab3f0adee67d52739a1d71112cc068d6fab169ddeb18e48877fad" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -3113,7 +3264,7 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -3264,7 +3415,7 @@ version = "0.36.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f43abb88211988493c1abb44a70efa56ff0ce98f233b7b276146f1f3f7ba9644" dependencies = [ - "bitflags", + "bitflags 1.3.2", "errno", "io-lifetimes", "libc", @@ -3385,7 +3536,7 @@ version = "2.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a332be01508d814fed64bf28f798a146d73792121129962fdf335bb3c49a4254" dependencies = [ - "bitflags", + "bitflags 1.3.2", "core-foundation", "core-foundation-sys", "libc", @@ -3678,7 +3829,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.16", + "syn 2.0.37", ] [[package]] @@ -3723,15 +3874,21 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.16" +version = "2.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6f671d4b5ffdb8eadec19c0ae67fe2639df8684bd7bc4b83d986b8db549cf01" +checksum = "7303ef2c05cd654186cb250d29049a24840ca25d2747c25c0381c8d9e2f582e8" dependencies = [ "proc-macro2", "quote", "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "sysinfo" version = "0.29.6" @@ -3802,7 +3959,7 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c56ce92f1285eaaa11fc1a3201e25de97898c50e87caa4c2aee836fe05288de" dependencies = [ - "bitflags", + "bitflags 1.3.2", "cfg-if", "libc", "log", @@ -3902,6 +4059,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.1.0" @@ -3910,7 +4077,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.37", ] [[package]] @@ -4003,6 +4170,98 @@ dependencies = [ "toml_datetime", ] +[[package]] +name = "tonic" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" +dependencies = [ + "async-trait", + "axum", + "base64 0.21.0", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-web" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b00ec4842256d1fe0a46176e2ef5bc357664c66e7d91aff5a7d43d83a65f47" +dependencies = [ + "base64 0.21.0", + "bytes", + "futures-core", + "http", + "http-body", + "hyper", + "pin-project", + "tonic", + "tower-http", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap", + "pin-project", + "pin-project-lite", + "rand", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" +dependencies = [ + "bitflags 2.4.0", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" diff --git a/server/Cargo.toml b/server/Cargo.toml index 65134510a..065ab0420 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -24,6 +24,13 @@ actix-web = { version = "4.3", features = ["rustls"] } actix-cors = "0.6" actix-web-prometheus = { version = "0.1" } actix-web-static-files = "4.0" +mime = "0.3.17" + +### LiveTail server deps +arrow-flight = "42.0.0" +tonic = "0.9.0" +tonic-web = "0.9.0" +tower-http = { version = "0.4.4", features = ["cors"] } ### other dependencies anyhow = { version = "1.0", features = ["backtrace"] } @@ -91,6 +98,9 @@ nom = "7.1.3" humantime = "2.1.0" openid = { version = "0.12.0", default-features = false, features = ["rustls"] } url = "2.4.0" +http-auth-basic = "0.3.3" +cookies = "0.0.1" +cookie = "0.17.0" [build-dependencies] cargo_toml = "0.15" diff --git a/server/src/analytics.rs b/server/src/analytics.rs index ae0529813..e543b3845 100644 --- a/server/src/analytics.rs +++ b/server/src/analytics.rs @@ -155,7 +155,7 @@ fn build_metrics() -> HashMap { metrics } -pub async fn init_analytics_scheduler() { +pub fn init_analytics_scheduler() { log::info!("Setting up schedular for anonymous user analytics"); let mut scheduler = AsyncScheduler::new(); diff --git a/server/src/event.rs b/server/src/event.rs index b3877126b..f913251c1 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -63,6 +63,8 @@ impl Event { num_rows, )?; + crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb); + if let Err(e) = metadata::STREAM_INFO .check_alerts(&self.stream_name, self.rb) .await diff --git a/server/src/handlers.rs b/server/src/handlers.rs index 5dfb5e35e..294fc40a8 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -17,9 +17,15 @@ */ pub mod http; +pub mod livetail; const PREFIX_TAGS: &str = "x-p-tag-"; const PREFIX_META: &str = "x-p-meta-"; const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; const FILL_NULL_OPTION_KEY: &str = "send_null"; const SEPARATOR: char = '^'; + +const OIDC_SCOPE: &str = "openid profile email"; +const COOKIE_AGE_DAYS: usize = 7; +const SESSION_COOKIE_NAME: &str = "session"; +const USER_COOKIE_NAME: &str = "username"; diff --git a/server/src/handlers/http/oidc.rs b/server/src/handlers/http/oidc.rs index 618805762..55d1d24f6 100644 --- a/server/src/handlers/http/oidc.rs +++ b/server/src/handlers/http/oidc.rs @@ -31,6 +31,7 @@ use ulid::Ulid; use url::Url; use crate::{ + handlers::{COOKIE_AGE_DAYS, OIDC_SCOPE, SESSION_COOKIE_NAME, USER_COOKIE_NAME}, oidc::{Claims, DiscoveredClient}, option::CONFIG, rbac::{ @@ -42,10 +43,6 @@ use crate::{ utils::actix::extract_session_key_from_req, }; -// fetch common personalization scope to determine username. -const SCOPE: &str = "openid profile email"; -const COOKIE_AGE_DAYS: usize = 7; - /// Struct representing query params returned from oidc provider #[derive(Deserialize, Debug)] pub struct Login { @@ -182,7 +179,7 @@ fn redirect_to_oidc( ) -> HttpResponse { let redirect = query.into_inner().redirect.to_string(); let auth_url = oidc_client.auth_url(&Options { - scope: Some(SCOPE.into()), + scope: Some(OIDC_SCOPE.into()), state: Some(redirect), ..Default::default() }); @@ -222,7 +219,7 @@ fn redirect_no_oauth_setup(mut url: Url) -> HttpResponse { } fn cookie_session(id: Ulid) -> Cookie<'static> { - let authorization_cookie = Cookie::build("session", id.to_string()) + let authorization_cookie = Cookie::build(SESSION_COOKIE_NAME, id.to_string()) .max_age(time::Duration::days(COOKIE_AGE_DAYS as i64)) .same_site(SameSite::Strict) .path("/") @@ -231,7 +228,7 @@ fn cookie_session(id: Ulid) -> Cookie<'static> { } fn cookie_username(username: &str) -> Cookie<'static> { - let authorization_cookie = Cookie::build("username", username.to_string()) + let authorization_cookie = Cookie::build(USER_COOKIE_NAME, username.to_string()) .max_age(time::Duration::days(COOKIE_AGE_DAYS as i64)) .same_site(SameSite::Strict) .path("/") diff --git a/server/src/handlers/livetail.rs b/server/src/handlers/livetail.rs new file mode 100644 index 000000000..3277161b9 --- /dev/null +++ b/server/src/handlers/livetail.rs @@ -0,0 +1,242 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::net::SocketAddr; + +use arrow_array::RecordBatch; +use arrow_flight::encode::FlightDataEncoderBuilder; +use cookie::Cookie; +use futures::stream::BoxStream; +use futures_util::{Future, StreamExt, TryFutureExt, TryStreamExt}; +use http_auth_basic::Credentials; +use rand::distributions::{Alphanumeric, DistString}; +use tonic::metadata::MetadataMap; +use tonic::transport::Server; +use tonic::{Request, Response, Status, Streaming}; + +use arrow_flight::{ + flight_service_server::FlightService, flight_service_server::FlightServiceServer, Action, + ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, + HandshakeResponse, PutResult, SchemaResult, Ticket, +}; +use tonic_web::GrpcWebLayer; +use tower_http::cors::{Any, CorsLayer}; + +use crate::livetail::{Message, LIVETAIL}; +use crate::metadata::STREAM_INFO; +use crate::option::CONFIG; +use crate::rbac::map::SessionKey; +use crate::rbac::{self, Users}; +use crate::utils; + +use super::SESSION_COOKIE_NAME; + +#[derive(Clone)] +pub struct FlightServiceImpl {} + +#[tonic::async_trait] +impl FlightService for FlightServiceImpl { + type HandshakeStream = BoxStream<'static, Result>; + type ListFlightsStream = BoxStream<'static, Result>; + type DoGetStream = BoxStream<'static, Result>; + type DoPutStream = BoxStream<'static, Result>; + type DoActionStream = BoxStream<'static, Result>; + type ListActionsStream = BoxStream<'static, Result>; + type DoExchangeStream = BoxStream<'static, Result>; + + async fn handshake( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented( + "handshake is disabled in favour of direct authentication and authorization", + )) + } + + async fn list_flights( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Implement list_flights")) + } + + async fn get_flight_info( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Implement get_flight_info")) + } + + async fn get_schema( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Implement get_schema")) + } + + async fn do_get(&self, req: Request) -> Result, Status> { + let key = extract_session_key(req.metadata())?; + let ticket: serde_json::Value = serde_json::from_slice(&req.into_inner().ticket) + .map_err(|err| Status::internal(err.to_string()))?; + let stream = extract_stream(&ticket)?; + log::info!("livetail requested for stream {}", stream); + match Users.authorize(key, rbac::role::Action::Query, Some(stream), None) { + rbac::Response::Authorized => (), + rbac::Response::UnAuthorized => { + return Err(Status::permission_denied( + "user is not authenticated to access this resource", + )) + } + rbac::Response::ReloadRequired => { + return Err(Status::unauthenticated("reload required")) + } + } + + let schema = STREAM_INFO + .schema(stream) + .map_err(|err| Status::failed_precondition(err.to_string()))?; + + let rx = LIVETAIL.new_pipe( + Alphanumeric.sample_string(&mut rand::thread_rng(), 32), + stream.to_string(), + ); + + let adapter_schema = schema.clone(); + let rx = rx.map(move |x| match x { + Message::Record(t) => Ok(utils::arrow::adapt_batch(&adapter_schema, &t)), + Message::Skipped(_) => { + log::warn!("livetail channel capacity is full."); + Ok(RecordBatch::new_empty(adapter_schema.clone())) + } + }); + + let rb_stream = FlightDataEncoderBuilder::new() + .with_schema(schema) + .build(rx); + + let rb_stream = rb_stream.map_err(|err| Status::unknown(err.to_string())); + Ok(Response::new(Box::pin(rb_stream))) + } + + async fn do_put( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("Implement do_put")) + } + + async fn do_action( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Implement do_action")) + } + + async fn list_actions( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Implement list_actions")) + } + + async fn do_exchange( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("Implement do_exchange")) + } +} + +pub fn server() -> impl Future>> + Send { + let mut addr: SocketAddr = CONFIG + .parseable + .address + .parse() + .expect("valid socket address"); + addr.set_port(CONFIG.parseable.grpc_port); + + let service = FlightServiceImpl {}; + + let svc = FlightServiceServer::new(service); + + let cors = CorsLayer::new() + // allow `GET` and `POST` when accessing the resource + .allow_methods(Any) + .allow_headers(Any) + .allow_origin(Any); + // allow requests from any origin + + Server::builder() + .accept_http1(true) + .layer(cors) + .layer(GrpcWebLayer::new()) + .add_service(svc) + .serve(addr) + .map_err(|err| Box::new(err) as Box) +} + +fn extract_stream(body: &serde_json::Value) -> Result<&str, Status> { + body.as_object() + .ok_or(Status::invalid_argument("expected object in request body"))? + .get("stream") + .ok_or(Status::invalid_argument("stream key value is not provided"))? + .as_str() + .ok_or(Status::invalid_argument("stream key value is invalid")) +} + +fn extract_session_key(headers: &MetadataMap) -> Result { + // Extract username and password from the request using basic auth extractor. + let basic = extract_basic_auth(headers).map(|creds| SessionKey::BasicAuth { + username: creds.user_id, + password: creds.password, + }); + + if let Some(basic) = basic { + return Ok(basic); + } + + let session = extract_cookie(headers) + .map(|cookie| ulid::Ulid::from_string(cookie.value())) + .transpose() + .map_err(|_| Status::invalid_argument("Cookie is tampered with or invalid"))?; + + if let Some(session) = session { + return Ok(SessionKey::SessionId(session)); + } + + Err(Status::unauthenticated("No authentication method supplied")) +} + +fn extract_basic_auth(header: &MetadataMap) -> Option { + let creds = header + .get("Authorization") + .and_then(|value| value.to_str().ok()) + .and_then(|value| Credentials::from_header(value.to_string()).ok()); + creds +} + +fn extract_cookie(header: &MetadataMap) -> Option { + let cookies = header + .get("Cookies") + .and_then(|value| value.to_str().ok()) + .map(Cookie::split_parse)?; + + cookies + .flatten() + .find(|cookie| cookie.name() == SESSION_COOKIE_NAME) +} diff --git a/server/src/livetail.rs b/server/src/livetail.rs new file mode 100644 index 000000000..aa2b1f153 --- /dev/null +++ b/server/src/livetail.rs @@ -0,0 +1,169 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{ + collections::HashMap, + sync::{Arc, RwLock, Weak}, + task::Poll, +}; + +use futures_util::Stream; +use tokio::sync::mpsc::{ + self, error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender, +}; + +use arrow_array::RecordBatch; +use once_cell::sync::Lazy; + +pub static LIVETAIL: Lazy = Lazy::new(LiveTail::default); + +pub type LiveTailRegistry = RwLock>>; + +pub struct LiveTail { + pipes: Arc, +} + +impl LiveTail { + pub fn new_pipe(&self, id: String, stream: String) -> ReceiverPipe { + let (sender, revc) = channel(id, stream.clone(), Arc::downgrade(&self.pipes)); + self.pipes + .write() + .unwrap() + .entry(stream) + .or_default() + .push(sender); + revc + } + + pub fn process(&self, stream_name: &str, rb: &RecordBatch) { + let read = self.pipes.read().unwrap(); + let Some(pipes) = read.get(stream_name) else { + return; + }; + for pipe in pipes { + pipe.send(rb.clone()) + } + } +} + +impl Default for LiveTail { + fn default() -> Self { + Self { + pipes: Arc::new(RwLock::default()), + } + } +} + +#[derive(Debug, PartialEq)] +pub enum Message { + Record(RecordBatch), + Skipped(usize), +} + +// Receiver should swap out channel for a new one when full +pub enum Command { + Skipping(usize), +} + +type Id = String; +pub struct SenderPipe { + pub id: Id, + inner: Sender, + command: UnboundedSender, +} + +impl SenderPipe { + pub fn send(&self, rb: RecordBatch) { + if let Err(TrySendError::Full(rb)) = self.inner.try_send(rb) { + self.command + .send(Command::Skipping(rb.num_rows())) + .expect("receiver is not dropped before sender") + } + } +} + +pub struct ReceiverPipe { + pub id: Id, + pub stream: String, + inner: Receiver, + command: UnboundedReceiver, + _ref: Weak, +} + +fn channel( + id: String, + stream: String, + weak_ptr: Weak, +) -> (SenderPipe, ReceiverPipe) { + let (command_tx, command_rx) = mpsc::unbounded_channel::(); + let (rb_tx, rb_rx) = mpsc::channel::(1000); + + ( + SenderPipe { + id: id.clone(), + inner: rb_tx, + command: command_tx, + }, + ReceiverPipe { + id, + stream, + _ref: weak_ptr, + inner: rb_rx, + command: command_rx, + }, + ) +} + +impl Stream for ReceiverPipe { + type Item = Message; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + match this.command.poll_recv(cx) { + Poll::Ready(Some(Command::Skipping(mut row_count))) => { + while let Poll::Ready(Some(rb)) = this.inner.poll_recv(cx) { + row_count += rb.num_rows(); + } + while let Poll::Ready(Some(Command::Skipping(count))) = this.command.poll_recv(cx) { + row_count += count + } + Poll::Ready(Some(Message::Skipped(row_count))) + } + Poll::Pending => match this.inner.poll_recv(cx) { + Poll::Ready(Some(rb)) => Poll::Ready(Some(Message::Record(rb))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + }, + Poll::Ready(None) => Poll::Ready(None), + } + } +} + +// drop sender on map when going out of scope +impl Drop for ReceiverPipe { + fn drop(&mut self) { + if let Some(map) = self._ref.upgrade() { + if let Some(pipes) = map.write().unwrap().get_mut(&self.stream) { + pipes.retain(|x| x.id != self.id) + } + } + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 4077236e7..95499f621 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -36,6 +36,7 @@ mod analytics; mod banner; mod event; mod handlers; +mod livetail; mod metadata; mod metrics; mod migration; @@ -96,9 +97,11 @@ async fn main() -> anyhow::Result<()> { // all internal data structures populated now. // start the analytics scheduler if enabled if CONFIG.parseable.send_analytics { - analytics::init_analytics_scheduler().await; + analytics::init_analytics_scheduler(); } + tokio::spawn(handlers::livetail::server()); + let app = handlers::http::run_http(prometheus, CONFIG.parseable.openid.clone()); tokio::pin!(app); loop { @@ -121,6 +124,7 @@ async fn main() -> anyhow::Result<()> { remote_sync_handler.join().unwrap_or(()); (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = object_store_sync(); } + }; } } diff --git a/server/src/option.rs b/server/src/option.rs index 5bc967f89..27a9879ee 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -196,6 +196,12 @@ pub struct Server { /// Open AI access key pub open_ai_key: Option, + /// Livetail port + pub grpc_port: u16, + + /// Livetail channel capacity + pub livetail_channel_capacity: usize, + /// Rows in Parquet Rowgroup pub row_group_size: usize, @@ -250,6 +256,14 @@ impl FromArgMatches for Server { .cloned() .expect("default for send analytics"); self.open_ai_key = m.get_one::(Self::OPEN_AI_KEY).cloned(); + self.grpc_port = m + .get_one::(Self::GRPC_PORT) + .cloned() + .expect("default for livetail port"); + self.livetail_channel_capacity = m + .get_one::(Self::LIVETAIL_CAPACITY) + .cloned() + .expect("default for livetail port"); // converts Gib to bytes before assigning self.query_memory_pool_size = m .get_one::(Self::QUERY_MEM_POOL_SIZE) @@ -314,6 +328,8 @@ impl Server { pub const OPENID_CLIENT_ID: &str = "oidc-client"; pub const OPENID_CLIENT_SECRET: &str = "oidc-client-secret"; pub const OPENID_ISSUER: &str = "oidc-issuer"; + pub const GRPC_PORT: &str = "grpc-port"; + pub const LIVETAIL_CAPACITY: &str = "livetail-capacity"; // todo : what should this flag be pub const QUERY_MEM_POOL_SIZE: &str = "query-mempool-size"; pub const ROW_GROUP_SIZE: &str = "row-group-size"; @@ -457,6 +473,26 @@ impl Server { .value_parser(validation::url) .help("Set host global domain address"), ) + .arg( + Arg::new(Self::GRPC_PORT) + .long(Self::GRPC_PORT) + .env("P_GRPC_PORT") + .value_name("PORT") + .default_value("8001") + .required(false) + .value_parser(value_parser!(u16)) + .help("Set port for livetail arrow flight server"), + ) + .arg( + Arg::new(Self::LIVETAIL_CAPACITY) + .long(Self::LIVETAIL_CAPACITY) + .env("P_LIVETAIL_CAPACITY") + .value_name("NUMBER") + .default_value("1000") + .required(false) + .value_parser(value_parser!(usize)) + .help("Set port for livetail arrow flight server"), + ) .arg( Arg::new(Self::QUERY_MEM_POOL_SIZE) .long(Self::QUERY_MEM_POOL_SIZE)