From fc46db465f4c4fb119b3529d144f4074c90c945d Mon Sep 17 00:00:00 2001 From: Charles Schleich Date: Tue, 28 May 2024 11:34:55 +0200 Subject: [PATCH] fix: createdb not functioning due to application/json header in request --- Cargo.lock | 179 +++++++++++++++++++++++++++++++++++++++++--------- v2/Cargo.toml | 16 +++-- v2/src/lib.rs | 171 +++++++++++++++++++++++------------------------ 3 files changed, 244 insertions(+), 122 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c3d5e33..24153ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -173,7 +173,7 @@ dependencies = [ "async-lock", "async-task", "concurrent-queue", - "fastrand", + "fastrand 1.9.0", "futures-lite", "slab", ] @@ -224,7 +224,7 @@ dependencies = [ "log", "parking", "polling", - "rustix", + "rustix 0.37.25", "slab", "socket2 0.4.9", "waker-fn", @@ -252,7 +252,7 @@ dependencies = [ "cfg-if", "event-listener 2.5.3", "futures-lite", - "rustix", + "rustix 0.37.25", "signal-hook", "windows-sys 0.48.0", ] @@ -414,7 +414,7 @@ dependencies = [ "async-lock", "async-task", "atomic-waker", - "fastrand", + "fastrand 1.9.0", "futures-lite", "log", ] @@ -847,6 +847,12 @@ dependencies = [ "instant", ] +[[package]] +name = "fastrand" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" + [[package]] name = "fixedbitset" version = "0.4.2" @@ -871,6 +877,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.0" @@ -934,7 +955,7 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" dependencies = [ - "fastrand", + "fastrand 1.9.0", "futures-core", "futures-io", "memchr", @@ -1273,17 +1294,16 @@ dependencies = [ ] [[package]] -name = "hyper-rustls" -version = "0.24.2" +name = "hyper-tls" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ - "futures-util", - "http 0.2.12", + "bytes", "hyper", - "rustls 0.21.7", + "native-tls", "tokio", - "tokio-rustls 0.24.1", + "tokio-native-tls", ] [[package]] @@ -1355,9 +1375,8 @@ dependencies = [ [[package]] name = "influxdb2" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e9905b42e1657450f88a3e47a9323d20862a05bf1e09c58d3391ee0916b623f" +version = "0.5.0" +source = "git+https://github.com/Charles-Schleich/influxdb2?branch=fix/content_headers#6cecd9f92acf4ea6d51e6fd89f6be7c0326b78d4" dependencies = [ "base64 0.13.1", "bytes", @@ -1381,8 +1400,7 @@ dependencies = [ [[package]] name = "influxdb2-derive" version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "990f899841aa30130fc06f7938e3cc2cbc3d5b92c03fd4b5d79a965045abcf16" +source = "git+https://github.com/Charles-Schleich/influxdb2?branch=fix/content_headers#6cecd9f92acf4ea6d51e6fd89f6be7c0326b78d4" dependencies = [ "itertools", "proc-macro2", @@ -1394,8 +1412,7 @@ dependencies = [ [[package]] name = "influxdb2-structmap" version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1408e712051787357e99ff732e44e8833e79cea0fabc9361018abfbff72b6265" +source = "git+https://github.com/Charles-Schleich/influxdb2?branch=fix/content_headers#6cecd9f92acf4ea6d51e6fd89f6be7c0326b78d4" dependencies = [ "chrono", "num-traits", @@ -1569,6 +1586,12 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" +[[package]] +name = "linux-raw-sys" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" + [[package]] name = "lock_api" version = "0.4.10" @@ -1657,6 +1680,24 @@ dependencies = [ "getrandom 0.2.10", ] +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nix" version = "0.27.1" @@ -1774,12 +1815,50 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "openssl" +version = "0.10.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" +dependencies = [ + "bitflags 2.4.2", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.33", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c597637d56fbc83893a35eb0dd04b2b8e7a50c91e64e9493e398b5df4fb45fa2" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -1983,6 +2062,12 @@ dependencies = [ "spki", ] +[[package]] +name = "pkg-config" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" + [[package]] name = "pnet_base" version = "0.34.0" @@ -2279,15 +2364,15 @@ dependencies = [ "http 0.2.12", "http-body", "hyper", - "hyper-rustls", + "hyper-tls", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.7", "rustls-pemfile 1.0.3", "serde", "serde_json", @@ -2295,7 +2380,7 @@ dependencies = [ "sync_wrapper", "system-configuration", "tokio", - "tokio-rustls 0.24.1", + "tokio-native-tls", "tokio-util", "tower-service", "url", @@ -2303,7 +2388,6 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots 0.25.4", "winreg", ] @@ -2420,10 +2504,23 @@ dependencies = [ "errno", "io-lifetimes", "libc", - "linux-raw-sys", + "linux-raw-sys 0.3.8", "windows-sys 0.48.0", ] +[[package]] +name = "rustix" +version = "0.38.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" +dependencies = [ + "bitflags 2.4.2", + "errno", + "libc", + "linux-raw-sys 0.4.14", + "windows-sys 0.52.0", +] + [[package]] name = "rustls" version = "0.18.1" @@ -3081,6 +3178,18 @@ dependencies = [ "libc", ] +[[package]] +name = "tempfile" +version = "3.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" +dependencies = [ + "cfg-if", + "fastrand 2.1.0", + "rustix 0.38.34", + "windows-sys 0.52.0", +] + [[package]] name = "thiserror" version = "1.0.48" @@ -3201,6 +3310,16 @@ dependencies = [ "syn 2.0.33", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -3531,6 +3650,12 @@ version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d92ccd67fb88503048c01b59152a04effd0782d035a83a6d256ce6085f08f4a3" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vec_map" version = "0.8.2" @@ -3678,12 +3803,6 @@ dependencies = [ "webpki", ] -[[package]] -name = "webpki-roots" -version = "0.25.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" - [[package]] name = "webpki-roots" version = "0.26.0" diff --git a/v2/Cargo.toml b/v2/Cargo.toml index d83d27d..55fe07d 100644 --- a/v2/Cargo.toml +++ b/v2/Cargo.toml @@ -27,11 +27,17 @@ base64 = { workspace = true } chrono = { version = "0.4.31", features = ["serde"] } futures = "0.3.28" git-version = { workspace = true } -influxdb2 = { version = "0.4.5", features = [ - "rustls", -], default-features = false } -influxdb2-derive = "0.1.1" -influxdb2-structmap = "0.2" +# influxdb2 = { version = "0.5.0", features = [ +# "rustls", +# ], default-features = false } +# influxdb2-derive = "0.1.1" +# influxdb2-structmap = "0.2" +influxdb2 = { git = "https://github.com/Charles-Schleich/influxdb2", branch = "fix/content_headers"} +# Derive and struct map need to also come from the same Repo as the macros are used here +# Otherwise Cargo complaines Type T does not Match Type T because of its origin. +influxdb2-derive = { git = "https://github.com/Charles-Schleich/influxdb2", branch = "fix/content_headers" } +influxdb2-structmap = { git = "https://github.com/Charles-Schleich/influxdb2", branch = "fix/content_headers" } + lazy_static = { workspace = true } num-traits = "0.2" rand = "0.8.5" diff --git a/v2/src/lib.rs b/v2/src/lib.rs index 082e3ef..eba5c83 100644 --- a/v2/src/lib.rs +++ b/v2/src/lib.rs @@ -17,11 +17,9 @@ use async_trait::async_trait; use base64::{engine::general_purpose::STANDARD as b64_std_engine, Engine}; use chrono::{NaiveDateTime, SecondsFormat}; use futures::prelude::*; -use influxdb2::api::buckets::ListBucketsRequest; use influxdb2::models::Query; use influxdb2::models::{DataPoint, PostBucketRequest}; use influxdb2::Client; -use influxdb2::FromDataPoint; use zenoh::buffers::ZBuf; use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin}; @@ -222,10 +220,9 @@ impl Volume for InfluxDbVolume { } async fn create_storage(&self, mut config: StorageConfig) -> ZResult> { - let volume_cfg = match config.volume_cfg.as_object() { - Some(v) => v, - None => bail!("InfluxDBv2 backed storages need some volume-specific configuration"), - }; + let volume_cfg = config.volume_cfg.as_object().ok_or_else(|| { + zerror!("InfluxDBv2 backed storages need some volume-specific configuration") + })?; let on_closure = match volume_cfg.get(PROP_STORAGE_ON_CLOSURE) { Some(serde_json::Value::String(x)) if x == "drop_series" => OnClosure::DropSeries, @@ -240,6 +237,7 @@ impl Volume for InfluxDbVolume { ) } }; + let (db, createdb) = match volume_cfg.get(PROP_STORAGE_DB) { Some(serde_json::Value::String(s)) => ( s.clone(), @@ -268,6 +266,8 @@ impl Volume for InfluxDbVolume { Some(creds) => creds, _ => bail!("No credentials specified to access database '{}'", db), }; + + // Client::new can panic: TODO : Switch to libraries without Panics let client = match std::panic::catch_unwind(|| { Client::new(url.clone(), creds.org_id.clone(), creds.token.clone()) }) { @@ -275,25 +275,20 @@ impl Volume for InfluxDbVolume { Err(e) => bail!("Error in creating client for InfluxDBv2 storage: {:?}", e), }; - //check if db exists, if it doesn't create one if user has set createdb=true in config - match async_std::task::block_on(async { is_db_existing(&client, &db).await }) { - Ok(res) => { - if !res && createdb { - // try to create db using user credentials - match async_std::task::block_on(async { - create_db(&self.admin_client, &creds.org_id, &db).await - }) { - Ok(res) => { - if !res { - bail!("Database '{}' wasnt't created in InfluxDBv2 storage", db) - } - } + match does_db_exist(&client, &db).await { + Ok(db_exists) => { + if !db_exists && createdb { + // Try to create db using user credentials + match create_db(&self.admin_client, &creds.org_id, &db).await { + Ok(_) => tracing::info!("Created {db} Influx"), Err(e) => bail!("Failed to create InfluxDBv2 Storage : {:?}", e), } + } else if db_exists && createdb { + tracing::warn!("Database '{db}' already exists exists in Influx and config 'create_db'='true'"); } } - Err(e) => bail!("Failed to create InfluxDBv2 Storage : {:?}", e), - } + Err(e) => bail!("Failed to get Buckets from InfluxDB : {:?}", e), + }; config .volume_cfg @@ -307,6 +302,7 @@ impl Volume for InfluxDbVolume { Some(creds) => creds, None => bail!("No credentials specified to access database '{}'", db), }; + let admin_client = match std::panic::catch_unwind(|| { Client::new(url.clone(), &admin_creds.org_id, &admin_creds.token) }) { @@ -356,6 +352,53 @@ impl TryFrom<&Properties> for OnClosure { } } +#[derive(Debug, Default)] +struct ZenohPoint { + #[allow(dead_code)] + // NOTE: "kind" is present within InfluxDB and used in query clauses, but not read in Rust... + kind: String, + timestamp: String, + encoding_prefix: i64, //should be u8 but not supported in v2.x so using a workaround + encoding_suffix: String, + base64: bool, + value: String, +} + +// Safest Thing for now is to send back default values if keys dont exist in query +// But a proper solution would be real deserializing of the structure +// Either you get the full data into a Zenoh Point or the function fails to deserialize +// Or we make the fields of a Zenoh Point optional +// I do not like using a Default value and expecting the GenericMap to have the values +// The underlying library influxDB2 must change to support proper Deserialization +// as influxdb2::FromMap should be Failable +impl influxdb2::FromMap for ZenohPoint { + fn from_genericmap(map: influxdb2_structmap::GenericMap) -> Self { + use influxdb2_structmap::value::Value as V; + + let mut z_point = ZenohPoint::default(); + + if let Some(V::String(kind)) = map.get("kind") { + z_point.kind = kind.clone(); + }; + if let Some(V::String(timestamp)) = map.get("timestamp") { + z_point.timestamp = timestamp.clone(); + }; + if let Some(V::Long(encoding_prefix)) = map.get("encoding_prefix") { + z_point.encoding_prefix = encoding_prefix.clone(); + }; + if let Some(V::String(encoding_suffix)) = map.get("encoding_suffix") { + z_point.encoding_suffix = encoding_suffix.clone(); + }; + if let Some(V::Bool(base64)) = map.get("base64") { + z_point.base64 = *base64; + }; + if let Some(V::String(value)) = map.get("value") { + z_point.value = value.clone(); + }; + z_point + } +} + struct InfluxDbStorage { config: StorageConfig, admin_client: Client, @@ -378,41 +421,20 @@ impl InfluxDbStorage { ); // get the value and if it exists then extract the timestamp from it - - #[derive(Debug, Default, FromDataPoint)] - struct ZenohPoint { - #[allow(dead_code)] - // NOTE: "kind" is present within InfluxDB and used in query clauses, but not read in Rust... - kind: String, - timestamp: String, - encoding_prefix: i64, //should be u8 but not supported in v2.x so using a workaround - encoding_suffix: String, - base64: bool, - value: String, - } - let query = Query::new(qs); - let mut query_result: Vec = vec![]; - - match async_std::task::block_on(async { - self.client.query::(Some(query)).await - }) { - Ok(result) => { - query_result = result; - } + let query_result: Vec = match self.client.query::(Some(query)).await + { + Ok(result) => result, Err(e) => { tracing::error!( "Couldn't get data from InfluxDBv2 database {} with error: {} ", db, e ); + return Ok(None); } }; - if query_result.is_empty() { - return Ok(None); - } - match Timestamp::from_str(&query_result[0].timestamp) { Ok(ts) => Ok(Some(ts)), Err(e) => { @@ -520,11 +542,12 @@ impl Storage for InfluxDbStorage { let db = get_db_name(self.config.clone())?; let start_timestamp = NaiveDateTime::UNIX_EPOCH; - let stop_timestamp = NaiveDateTime::from_timestamp_opt( + + let stop_timestamp = chrono::DateTime::from_timestamp( timestamp.get_time().as_secs() as i64, timestamp.get_time().subsec_nanos(), ) - .expect("Couldn't convert uhlc timestamp to naivedatetime"); + .ok_or_else(|| zerror!("delete: timestamp out of range"))?; let predicate = None; //can be specified with tag or field values tracing::debug!( @@ -533,7 +556,7 @@ impl Storage for InfluxDbStorage { ); if let Err(e) = self .client - .delete(&db, start_timestamp, stop_timestamp, predicate) + .delete(&db, start_timestamp, stop_timestamp.naive_utc(), predicate) .await { bail!( @@ -587,20 +610,6 @@ impl Storage for InfluxDbStorage { let db = get_db_name(self.config.clone())?; - // the expected JSon type resulting from the query - // #[derive(Deserialize, Debug)] - #[derive(Debug, Default, FromDataPoint)] - struct ZenohPoint { - #[allow(dead_code)] - // NOTE: "kind" is present within InfluxDB and used in query clauses, but not read in Rust... - kind: String, - timestamp: String, - encoding_prefix: i64, //should be u8 but not supported in v2.x so using a workaround - encoding_suffix: String, - base64: bool, - value: String, - } - #[allow(unused_assignments)] let mut qs: String = String::new(); match timerange_from_parameters(parameters)? { @@ -787,32 +796,21 @@ fn generate_db_name() -> String { format!("zenoh_db_{}", Uuid::new_v4().simple()) } -async fn is_db_existing(client: &Client, db: &str) -> ZResult { - let request = ListBucketsRequest { - name: Some(db.to_owned()), - ..ListBucketsRequest::default() - }; - - let dbs = client.list_buckets(Some(request)).await?.buckets; - if !dbs.is_empty() { - Ok(true) - } else { - Ok(false) - } +async fn does_db_exist(client: &Client, db: &str) -> ZResult { + Ok(client + .list_buckets(None) + .await? + .buckets + .into_iter() + .find(|bucket| bucket.name == db) + .is_some()) } -async fn create_db(client: &Client, org_id: &str, db: &str) -> ZResult { - let result = client - .create_bucket(Some(PostBucketRequest::new( - org_id.to_owned(), - db.to_owned(), - ))) - .await; - match result { - Ok(_) => Ok(true), - Err(_) => Ok(false), //can post error here - } +async fn create_db(client: &Client, org_id: &str, db: &str) -> Result<(), influxdb2::RequestError> { + let post_bucket_options = PostBucketRequest::new(org_id.into(), db.into()); + client.create_bucket(Some(post_bucket_options)).await } + // Returns an InfluxDB regex (see https://docs.influxdata.com/influxdb/v1.8/query_language/explore-data/#regular-expressions) // corresponding to the list of path expressions. I.e.: // Replace "**" with ".*", "*" with "[^\/]*" and "/" with "\/". @@ -885,7 +883,6 @@ fn write_timeexpr(s: &mut String, t: TimeExpr, i: u64) { use std::fmt::Write; match t { TimeExpr::Fixed(t) => { - // let tm = t + let time_duration = t.duration_since(UNIX_EPOCH).expect("Time went backwards") + Duration::from_nanos(i); //adding 1ns for inclusive timebinding ; let datetime = chrono::DateTime::from_timestamp(