From 769d1b684ed3f52f385248eaca2b9dc3c4e47c26 Mon Sep 17 00:00:00 2001 From: Robby klein Gunnewiek Date: Thu, 11 Jul 2024 14:36:14 +0200 Subject: [PATCH] feat: Google Logging support --- Cargo.lock | 568 +++++++++++++++++++++++++++++--- Cargo.toml | 5 + crates/googlelog/Cargo.toml | 19 ++ crates/googlelog/src/error.rs | 40 +++ crates/googlelog/src/lib.rs | 27 ++ crates/googlelog/src/logger.rs | 315 ++++++++++++++++++ crates/googlelog/src/shipper.rs | 184 +++++++++++ src/args.rs | 36 ++ src/logging.rs | 85 ++++- src/main.rs | 38 ++- 10 files changed, 1255 insertions(+), 62 deletions(-) create mode 100644 crates/googlelog/Cargo.toml create mode 100644 crates/googlelog/src/error.rs create mode 100644 crates/googlelog/src/lib.rs create mode 100644 crates/googlelog/src/logger.rs create mode 100644 crates/googlelog/src/shipper.rs diff --git a/Cargo.lock b/Cargo.lock index 9d2cf6e..960044a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -186,18 +186,18 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.5", + "hyper 0.14.29", "itoa 1.0.11", "matchit", "memchr", "mime", - "percent-encoding", + "percent-encoding 2.3.1", "pin-project-lite", "rustversion", "serde", - "sync_wrapper", + "sync_wrapper 0.1.2", "tower", "tower-layer", "tower-service", @@ -212,8 +212,8 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.5", "mime", "rustversion", "tower-layer", @@ -262,7 +262,7 @@ dependencies = [ "bitflags 2.5.0", "cexpr", "clang-sys", - "itertools", + "itertools 0.12.1", "lazy_static", "lazycell", "log", @@ -364,7 +364,7 @@ dependencies = [ "semver", "serde", "toml", - "url", + "url 2.5.2", ] [[package]] @@ -407,8 +407,10 @@ checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-targets 0.52.5", ] @@ -435,7 +437,7 @@ dependencies = [ "clap_lex", "indexmap 1.9.3", "once_cell", - "strsim", + "strsim 0.10.0", "termcolor", "textwrap", ] @@ -584,6 +586,41 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.11.1", + "syn 2.0.66", +] + +[[package]] +name = "darling_macro" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.66", +] + [[package]] name = "dashmap" version = "5.5.3" @@ -750,7 +787,7 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" dependencies = [ - "percent-encoding", + "percent-encoding 2.3.1", ] [[package]] @@ -896,7 +933,7 @@ dependencies = [ "libc", "libgit2-sys", "log", - "url", + "url 2.5.2", ] [[package]] @@ -905,6 +942,61 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "google-apis-common" +version = "6.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2efe58cc89d6670229d0ab84bfc4bbe35d2572571accaf2201526360b358414f" +dependencies = [ + "base64 0.22.1", + "chrono", + "http 0.2.12", + "hyper 0.14.29", + "itertools 0.12.1", + "mime", + "percent-encoding 2.3.1", + "serde", + "serde_json", + "serde_with", + "tokio", + "tower-service", + "url 2.5.2", + "yup-oauth2 9.0.0", +] + +[[package]] +name = "google-logging2" +version = "5.0.5+20240531" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66e7bd976d9c36be1f6992e50f52ab890ef784530772398111a04897cec88281" +dependencies = [ + "anyhow", + "google-apis-common", + "http 0.2.12", + "hyper 0.14.29", + "hyper-rustls 0.25.0", + "itertools 0.13.0", + "mime", + "serde", + "serde_json", + "tokio", + "tower-service", + "url 1.7.2", +] + +[[package]] +name = "googlelog" +version = "0.1.0" +dependencies = [ + "chrono", + "google-logging2", + "reqwest", + "serde_json", + "slog", + "thiserror", + "tokio", +] + [[package]] name = "h2" version = "0.3.26" @@ -916,7 +1008,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap 2.2.6", "slab", "tokio", @@ -1002,6 +1094,17 @@ dependencies = [ "itoa 1.0.11", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa 1.0.11", +] + [[package]] name = "http-body" version = "0.4.5" @@ -1009,7 +1112,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", - "http", + "http 0.2.12", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -1042,8 +1168,8 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.5", "httparse", "httpdate", "itoa 1.0.11", @@ -1055,17 +1181,36 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "httparse", + "itoa 1.0.11", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" dependencies = [ - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.29", "log", "rustls 0.20.8", - "rustls-native-certs", + "rustls-native-certs 0.6.2", "tokio", "tokio-rustls 0.23.4", ] @@ -1077,27 +1222,82 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.29", "log", "rustls 0.21.12", - "rustls-native-certs", + "rustls-native-certs 0.6.2", "tokio", "tokio-rustls 0.24.1", ] +[[package]] +name = "hyper-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "399c78f9338483cb7e630c8474b07268983c6bd5acee012e4211f9f7bb21b070" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.29", + "log", + "rustls 0.22.4", + "rustls-native-certs 0.7.1", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.25.0", +] + +[[package]] +name = "hyper-rustls" +version = "0.27.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" +dependencies = [ + "futures-util", + "http 1.1.0", + "hyper 1.4.1", + "hyper-util", + "rustls 0.23.10", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.0", + "tower-service", + "webpki-roots", +] + [[package]] name = "hyper-timeout" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.29", "pin-project-lite", "tokio", "tokio-io-timeout", ] +[[package]] +name = "hyper-util" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.4.1", + "pin-project-lite", + "socket2", + "tokio", + "tower", + "tower-service", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -1121,6 +1321,23 @@ dependencies = [ "cc", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + +[[package]] +name = "idna" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" +dependencies = [ + "matches", + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "idna" version = "0.5.0" @@ -1139,6 +1356,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", + "serde", ] [[package]] @@ -1149,6 +1367,7 @@ checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" dependencies = [ "equivalent", "hashbrown 0.14.5", + "serde", ] [[package]] @@ -1202,6 +1421,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "0.4.8" @@ -1350,6 +1578,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matches" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" + [[package]] name = "matchit" version = "0.7.0" @@ -1607,6 +1841,12 @@ dependencies = [ "stfu8", ] +[[package]] +name = "percent-encoding" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" + [[package]] name = "percent-encoding" version = "2.3.1" @@ -1773,7 +2013,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.66", @@ -1804,6 +2044,53 @@ dependencies = [ "snafu", ] +[[package]] +name = "quinn" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4ceeeeabace7857413798eb1ffa1e9c905a9946a57d81fb69b4b71c4d8eb3ad" +dependencies = [ + "bytes", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls 0.23.10", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddf517c03a109db8100448a4be38d498df8a210a99fe0e1b9eaf39e78c640efe" +dependencies = [ + "bytes", + "rand", + "ring 0.17.8", + "rustc-hash", + "rustls 0.23.10", + "slab", + "thiserror", + "tinyvec", + "tracing", +] + +[[package]] +name = "quinn-udp" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9096629c45860fc7fb143e125eb826b5e721e10be3263160c7d60ca832cf8c46" +dependencies = [ + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "quote" version = "1.0.36" @@ -1874,9 +2161,9 @@ dependencies = [ "combine", "dtoa", "itoa 0.4.8", - "percent-encoding", + "percent-encoding 2.3.1", "sha1", - "url", + "url 2.5.2", ] [[package]] @@ -1952,6 +2239,48 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" +[[package]] +name = "reqwest" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7d6d2a27d57148378eb5e111173f4276ad26340ecc5c49a4a2152167a2d6a37" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.4.1", + "hyper-rustls 0.27.2", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding 2.3.1", + "pin-project-lite", + "quinn", + "rustls 0.23.10", + "rustls-pemfile 2.1.2", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "tokio", + "tokio-rustls 0.26.0", + "tower-service", + "url 2.5.2", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots", + "winreg", +] + [[package]] name = "ring" version = "0.16.20" @@ -2074,6 +2403,7 @@ dependencies = [ "aws-lc-rs", "log", "once_cell", + "ring 0.17.8", "rustls-pki-types", "rustls-webpki 0.102.4", "subtle", @@ -2092,6 +2422,19 @@ dependencies = [ "security-framework", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.2", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -2254,6 +2597,48 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa 1.0.11", + "ryu", + "serde", +] + +[[package]] +name = "serde_with" +version = "3.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e73139bc5ec2d45e6c5fd85be5a46949c1c39a4c18e56915f5eb4c12f975e377" +dependencies = [ + "base64 0.22.1", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.2.6", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b80d3d6b56b64335c0180e5ffde23b3c5e08c14c585b51a15bd0e95393f46703" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "sha1" version = "0.6.1" @@ -2439,6 +2824,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "strum" version = "0.26.2" @@ -2495,6 +2886,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "synstructure" version = "0.13.1" @@ -2683,6 +3080,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.4", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.0" @@ -2740,11 +3148,11 @@ dependencies = [ "base64 0.21.7", "bytes", "h2", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.5", + "hyper 0.14.29", "hyper-timeout", - "percent-encoding", + "percent-encoding 2.3.1", "pin-project", "prost", "tokio", @@ -2863,8 +3271,9 @@ dependencies = [ "console-subscriber", "flate2", "futures", - "http", - "hyper", + "googlelog", + "http 0.2.12", + "hyper 0.14.29", "hyper-rustls 0.23.2", "lazy_static", "libunftp", @@ -2887,7 +3296,7 @@ dependencies = [ "unftp-sbe-gcs", "unftp-sbe-restrict", "unftp-sbe-rooter", - "url", + "url 2.5.2", ] [[package]] @@ -2932,10 +3341,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bdba42e5964ef945842167cab4c10fd12667dc12acbcfc702414f3d0262267ac" dependencies = [ "async-trait", - "hyper", + "hyper 0.14.29", "hyper-rustls 0.24.2", "libunftp", - "percent-encoding", + "percent-encoding 2.3.1", "regex", "serde", "serde_json", @@ -2974,11 +3383,11 @@ dependencies = [ "bytes", "chrono", "futures", - "hyper", + "hyper 0.14.29", "hyper-rustls 0.24.2", "libunftp", "mime", - "percent-encoding", + "percent-encoding 2.3.1", "serde", "serde_json", "time", @@ -2987,7 +3396,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-attributes", - "yup-oauth2", + "yup-oauth2 8.3.2", ] [[package]] @@ -3046,6 +3455,17 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "url" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a" +dependencies = [ + "idna 0.1.5", + "matches", + "percent-encoding 1.0.1", +] + [[package]] name = "url" version = "2.5.2" @@ -3053,8 +3473,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" dependencies = [ "form_urlencoded", - "idna", - "percent-encoding", + "idna 0.5.0", + "percent-encoding 2.3.1", ] [[package]] @@ -3139,6 +3559,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.87" @@ -3188,6 +3620,15 @@ dependencies = [ "untrusted 0.7.1", ] +[[package]] +name = "webpki-roots" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd7c23921eeb1713a4e851530e9b9756e4fb0e89978582942612524cf09f01cd" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "4.4.2" @@ -3436,6 +3877,16 @@ version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" +[[package]] +name = "winreg" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "winx" version = "0.36.3" @@ -3479,12 +3930,39 @@ dependencies = [ "async-trait", "base64 0.21.7", "futures", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.29", "hyper-rustls 0.24.2", - "itertools", + "itertools 0.12.1", + "log", + "percent-encoding 2.3.1", + "rustls 0.22.4", + "rustls-pemfile 1.0.4", + "seahash", + "serde", + "serde_json", + "time", + "tokio", + "tower-service", + "url 2.5.2", +] + +[[package]] +name = "yup-oauth2" +version = "9.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f75463c432f5d4ca9c75047514df3d768f8ac3276ac22c9a6531af6d0a3da7ee" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.21.7", + "futures", + "http 0.2.12", + "hyper 0.14.29", + "hyper-rustls 0.25.0", + "itertools 0.12.1", "log", - "percent-encoding", + "percent-encoding 2.3.1", "rustls 0.22.4", "rustls-pemfile 1.0.4", "seahash", @@ -3493,7 +3971,7 @@ dependencies = [ "time", "tokio", "tower-service", - "url", + "url 2.5.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9c440cb..e5faabd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,11 @@ documentation = "https://github.com/bolcom/unFTP" path = "crates/redislog" version = "0.1.2" +[dependencies.googlelog] +path = "crates/googlelog" +features = ["shipper"] +version = "0.1.0" + [dependencies] async-trait = "0.1.80" base64 = "0.22.1" diff --git a/crates/googlelog/Cargo.toml b/crates/googlelog/Cargo.toml new file mode 100644 index 0000000..bc76a97 --- /dev/null +++ b/crates/googlelog/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "googlelog" +version = "0.1.0" +edition = "2021" +authors = [ + "Rob klein Gunnewiek " +] + +[features] +shipper = [] + +[dependencies] +google-logging2 = "5.0.5" +serde_json = "1.0.117" +chrono = "0.4.38" +tokio = { version = "1.38.0", features = ["macros", "time", "rt-multi-thread"] } +slog = "2.7.0" +thiserror = "1.0.61" +reqwest = { version = "0.12.5", default-features = false, features = ["rustls-tls", "json"] } \ No newline at end of file diff --git a/crates/googlelog/src/error.rs b/crates/googlelog/src/error.rs new file mode 100644 index 0000000..1718a32 --- /dev/null +++ b/crates/googlelog/src/error.rs @@ -0,0 +1,40 @@ +use thiserror; + +use reqwest::{self, StatusCode}; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Failed to read the 'default_labels' object from the JSON file, does it exist by this name?. Parse error: {0}")] + DefaultLabelsError(serde_json::Error), + #[error("Failed to read the 'resource_labels' object from the JSON file, does it exist by this name?. Parse error: {0}")] + ResourceLabelsError(serde_json::Error), + #[error("Serde JSON serialization failed with context '{context}'. Error: {source}")] + ShipperSerializeError { + context: String, + source: serde_json::Error, + }, + #[error("Reqwest error with context '{context}'. Error: {source}")] + ShipperReqwestError { + context: String, + source: reqwest::Error, + }, + #[error("No 'access_token' found in the metadata server response body")] + ShipperTokenNotFound, + #[error("No 'expires_in' found in the metadata server response body")] + ShipperTokenExpiryNotFound, + #[error("unsuccessful HTTP response error with context '{context}'. HTTP status code: '{status}', body: '{body}'")] + HttpResponseError { + context: String, + status: StatusCode, + body: String, + }, +} + +impl From for Error { + fn from(err: reqwest::Error) -> Self { + Self::ShipperReqwestError { + context: "Error sending HTTP request".to_string(), + source: err, + } + } +} diff --git a/crates/googlelog/src/lib.rs b/crates/googlelog/src/lib.rs new file mode 100644 index 0000000..61e291a --- /dev/null +++ b/crates/googlelog/src/lib.rs @@ -0,0 +1,27 @@ +//! An implemention of [`slog::Drain`](https://slog-rs.github.io/slog/slog/trait.Drain.html) for logging to [Google Cloud](https://cloud.google.com/logging). +//! +//! # Usage +//! +//! Warning: Currently, this library only works in the context of [workload identity](https://cloud.google.com/iam/docs/workload-identity-federation). +//! +//! The `googlelog` drain creates log entries compatible with [Google Cloud Logging](https://cloud.google.com/logging). +//! Depending on how you want to ship these logs to the Google Logging API, you can choose from one of the available build methods. +//! +//! Start by configuring the Logger with the builder ([`Builder`](logger::Builder::new)) and selecting the appropriate build method based on your shipping requirements: +//! +//! 1. [`build()`](logger::Builder::build): Receives [`WriteLogEntries`](https://cloud.google.com/logging/docs/reference/v2/rpc/google.logging.v2#google.logging.v2.LoggingServiceV2.WriteLogEntries) over a channel and allows you to handle the transportation manually. +//! 2. [`build_with_async_shipper()`](logger::Builder::build_with_async_shipper): Offloads transportation to the [`Shipper`](shipper::Shipper) and its sync-async Bridge in an async context. (Requires the `shipper` feature.) + +//! +//! The [`builder`](struct@logger::Builder) supports several `with_*` methods to customize the log message format, +//! particularly the default labels attached to [log entries](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry). +//! +/// Googlelog Error types +pub mod error; + +/// The [`slog::Drain`](https://slog-rs.github.io/slog/slog/trait.Drain.html) Implementation of the slog Drain for [Google Cloud Logging](https://cloud.google.com/logging) +pub mod logger; + +/// An optional async process to ship the log for you +#[cfg(feature = "shipper")] +pub mod shipper; diff --git a/crates/googlelog/src/logger.rs b/crates/googlelog/src/logger.rs new file mode 100644 index 0000000..bf65829 --- /dev/null +++ b/crates/googlelog/src/logger.rs @@ -0,0 +1,315 @@ +#[cfg(feature = "shipper")] +use crate::shipper; + +use crate::error::Error; + +use google_logging2::api::{LogEntry, MonitoredResource, WriteLogEntriesRequest}; + +use slog::{self, Drain, Key, Level, Never, OwnedKVList, Record, KV}; +use std::collections::HashMap; +use std::fmt; +use std::fmt::Write; + +use serde_json::json; + +use std::sync::mpsc::sync_channel; + +use chrono::Utc; + +/// Builder for the [`Logger`] +#[derive(Default, Debug)] +pub struct Builder { + log_name: String, + log_level_label: Option, + resource_type: String, + default_labels: HashMap, + resource_labels: Option>, +} + +/// Main struct for the Google Logger drain +pub struct Logger { + log_name: String, + log_level_label: Option, + default_labels: HashMap, + resource: MonitoredResource, + sync_tx: std::sync::mpsc::SyncSender, +} + +impl Builder { + /// Creates a Builder object. + /// + /// # Parameters + /// - `log_name`: The `logName` string to be used in the [LogEntry](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry) + /// - `resource_type`: The required `type` field set in the `resource` [MonitoredResource](https://cloud.google.com/logging/docs/reference/v2/rest/v2/MonitoredResource) object of the [LogEntry](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry). For example: `k8s_container`. + /// + /// # Example + /// + /// ``` + /// let (drain, _) = logging::Builder::new( + /// "projects/my-gcp-project/logs/my-log-id", + /// "k8s_container", + /// ) + /// .build(); + /// ``` + /// + #[must_use = "The builder must be used"] + pub fn new(log_name: &str, resource_type: &str) -> Self { + Self { + log_name: log_name.to_string(), + resource_type: resource_type.to_string(), + ..Default::default() + } + } + + /// Sets resource labels to be applied. + /// + /// These labels will populate the `labels` field in the `resource` [MonitoredResource](https://cloud.google.com/logging/docs/reference/v2/rest/v2/MonitoredResource) object of the [LogEntry](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry). + /// + /// # Example + /// + /// ``` + /// let resource_labels = json!( + /// { + /// "pod_name": "dummy-value", + /// "location": "europe-west1-b", + /// "pod_name": env::var("HOSTNAME").unwrap_or_default(), + /// "container_name": "my-app", + /// "project_id": "my-gcp-project", + /// "cluster_name": "my-gke-cluster", + /// "namespace_name": "my-gke-namespace" + /// }); + /// + /// let (drain, _) = logging::Builder::new( + /// "projects/my-gcp-project/logs/my-log-id", + /// "k8s_container", + /// ) + /// .with_resource_labels(resource_labels) + /// .unwrap() + /// .build(); + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if `labels` does not parse as JSON. + #[must_use = "The builder must be used"] + pub fn with_resource_labels( + self, + labels: serde_json::Value, + ) -> Result> { + Ok(Self { + resource_labels: Some( + serde_json::from_value(labels).map_err(Error::ResourceLabelsError)?, + ), + ..self + }) + } + + /// Sets default labels to be applied in the labels field. + /// + /// These will populate the `labels` top level field of the [LogEntry](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry). These labels are added in addition to any labels set in the logger statement. + /// + /// # Example + /// + /// ``` + /// let default_labels = json!( + /// { + /// "application": "my-application", + /// "team": "my-team", + /// "version": "my-app-version", + /// "role": "my-app-role", + /// "environment": "production", + /// "platform": "gcp", + /// }); + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if `labels` does not parse as JSON. + #[must_use = "The builder must be used"] + pub fn with_default_labels(self, labels: serde_json::Value) -> Result { + Ok(Self { + default_labels: serde_json::from_value(labels).map_err(Error::DefaultLabelsError)?, + ..self + }) + } + + /// Sets the label name to store the log level + /// + /// If set, the log level value is added under this label the `labels` top level field of the [LogEntry](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry) + /// + /// If not set, the log level is not propagated, but you will still have the [severity](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity), which is always there. + #[must_use = "The builder must be used"] + pub fn with_log_level_label(self, log_level_label: &str) -> Self { + Self { + log_level_label: Some(log_level_label.into()), + ..self + } + } + + /// This returns a tuple with a [`Logger`](struct@Logger), which can be passed to the slog root logger [as usual](https://docs.rs/slog/latest/slog/#where-to-start), and a [`std::sync::mpsc::Receiver`] channel. + /// The `Logger` sends the [`WriteLogEntries`](https://cloud.google.com/logging/docs/reference/v2/rpc/google.logging.v2#google.logging.v2.LoggingServiceV2.WriteLogEntries) it creates to this channel. + /// + /// For instance you could output these to the console, if you have an external agent that reads the process' output and ships it to Google Logging. + /// + #[must_use = "The logger and receiver must be used to handle logging correctly"] + #[allow(dead_code)] + pub fn build(self) -> (Logger, std::sync::mpsc::Receiver) { + let (sync_tx, sync_rx) = sync_channel::(100); + ( + Logger { + log_name: self.log_name, + log_level_label: self.log_level_label, + default_labels: self.default_labels, + resource: MonitoredResource { + type_: Some(self.resource_type), + labels: self.resource_labels, + }, + sync_tx, + }, + sync_rx, + ) + } + + /// In an async context this 'shipper' sends the log entries directly to the [Google Logging API](https://cloud.google.com/logging/docs/reference/v2/rest). + /// + /// # Example + /// + /// ``` + /// let (drain, shipper) = logging::Builder::new( + /// "projects/my-gcp-project/logs/my-log-id", + /// "k8s_container", + /// ) + /// .with_resource_labels(resource_labels) + /// .unwrap() + /// .build_with_async_shipper(); + /// + /// // Forward messages from the sync channel to the async channel where the + /// // shipper sends it to Google Cloud Logging + /// let bridge = shipper.yield_bridge(); + /// tokio::task::spawn_blocking(move || { + /// bridge.run_sync_to_async_bridge(); + /// }); + /// + /// tokio::spawn(async move { + /// shipper.run_log_shipper().await; + /// }); + /// + /// ``` + #[cfg(feature = "shipper")] + #[must_use = "The logger and shipper must be used to handle logging correctly"] + pub fn build_with_async_shipper(self) -> (Logger, shipper::Shipper) { + let (sync_tx, sync_rx) = sync_channel::(100); + ( + Logger { + log_name: self.log_name, + log_level_label: self.log_level_label, + default_labels: self.default_labels, + resource: MonitoredResource { + type_: Some(self.resource_type), + labels: self.resource_labels, + }, + sync_tx, + }, + shipper::Shipper::new(sync_rx), + ) + } +} + +impl Logger { + // Determine a sensible severity based on the log level + fn get_severity(log_level: Level) -> String { + // https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity + match log_level { + Level::Critical => "CRITICAL".into(), + Level::Error => "ERROR".into(), + Level::Warning => "WARNING".into(), + Level::Info => "INFO".into(), + Level::Debug | Level::Trace => "DEBUG".into(), + } + } + + fn construct_log_entry( + &self, + message: &str, + log_level: Level, + serializer: Serializer, + ) -> LogEntry { + let mut labels = self.default_labels.clone(); + + if !serializer.map.is_empty() { + labels.extend(serializer.map); + } + + // We add the log level to the labels if requested + if let Some(label) = &self.log_level_label { + labels.insert(label.clone(), log_level.as_str().to_string()); + } + + let resource = Some(self.resource.clone()); + + // TODO: support both text_payload and json_payload + let json_payload = HashMap::from([("message".to_string(), json!(message))]); + + // https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity + LogEntry { + json_payload: Some(json_payload), + labels: Some(labels), + severity: Some(Self::get_severity(log_level)), + timestamp: Some(Utc::now()), + resource, + ..Default::default() + } + } +} + +#[derive(Debug)] +struct Serializer { + map: HashMap, +} + +impl Serializer { + fn new() -> Self { + Self { + map: HashMap::new(), + } + } +} + +impl slog::Serializer for Serializer { + fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result { + let mut value = String::new(); + write!(value, "{val}")?; + self.map.insert(key.into(), value); + Ok(()) + } +} + +impl Drain for Logger { + type Ok = (); + type Err = Never; // TODO: Handle errors + + fn log(&self, record: &Record<'_>, values: &OwnedKVList) -> Result { + let mut serializer = Serializer::new(); + + let kv = record.kv(); + let _ = kv.serialize(record, &mut serializer); + + let _ = values.serialize(record, &mut serializer); + + let log_entry = self.construct_log_entry( + format!("{}", record.msg()).as_str(), + record.level(), + serializer, + ); + + let body = WriteLogEntriesRequest { + log_name: Some(self.log_name.clone()), + entries: Some(vec![log_entry]), + ..Default::default() + }; + + let _ = self.sync_tx.send(body); + + Ok(()) + } +} diff --git a/crates/googlelog/src/shipper.rs b/crates/googlelog/src/shipper.rs new file mode 100644 index 0000000..78b6be3 --- /dev/null +++ b/crates/googlelog/src/shipper.rs @@ -0,0 +1,184 @@ +use std::sync::mpsc as sync_mpsc; +use tokio::sync::mpsc as async_mpsc; + +use google_logging2::api::WriteLogEntriesRequest; + +use chrono::{DateTime, TimeDelta, Utc}; + +use reqwest::{Client, Response}; + +use crate::error::Error; + +/// Token caching +#[derive(Default)] +pub struct Token { + token: Option, + renew_after: DateTime, +} + +async fn get_error_response(response: Response, context: String) -> Error { + let status = response.status(); + + let body = match response.bytes().await { + Ok(bytes) => match serde_json::from_slice::(&bytes) { + Ok(json) => json, + Err(_) => String::from_utf8_lossy(&bytes).to_string(), + }, + Err(e) => format!("could not decode body of HTTP Error response: {e}"), + }; + + Error::HttpResponseError { + context, + status, + body, + } +} + +impl Token { + fn renew_after_from_expires_in(expires_in: u64) -> DateTime { + let renew_after = TimeDelta::seconds((expires_in - 60) as i64); // Ensure safety with the duration conversion + Utc::now() + renew_after + } + + async fn fetch_access_token(&mut self, client: &Client) -> Result { + if let Some(token) = &self.token { + if Utc::now() < self.renew_after { + return Ok(token.clone()); + } + } + + let response = client.get("http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token") + .header("Metadata-Flavor", "Google") + .send() + .await + .map_err(|e| { + Error::ShipperReqwestError { + context: "performing HTTP GET token credentials from metadata server".to_string(), + source: e, + } + })?; + + if response.status().is_success() { + let body = response + .text() + .await + .map_err(|e| Error::ShipperReqwestError { + context: "consuming response body of access token request".to_string(), + source: e, + })?; + let token_data: serde_json::Value = + serde_json::from_str(&body).map_err(|e| Error::ShipperSerializeError { + context: "deserializing token data".to_string(), + source: e, + })?; + let token_str = token_data["access_token"].as_str().unwrap().to_string(); + self.token = Some(token_str.clone()); + self.renew_after = + Self::renew_after_from_expires_in(token_data["expires_in"].as_u64().unwrap()); + Ok(token_str) + } else { + Err(get_error_response(response, "fetching token".to_string()).await) + } + } +} + +/// A sync to async channel bridge. +/// Forwards the log messages from the [Drain's log function](crate::logger::Logger) to the [`Shipper`] +pub struct Bridge { + sync_rx: sync_mpsc::Receiver, + async_tx: async_mpsc::Sender, +} + +impl Bridge { + /// Forwards log messages from the drain to the shipper + /// For usage see the [`example`](crate::logger::Builder::build_with_async_shipper). + pub fn run_sync_to_async_bridge(self) { + while let Ok(message) = self.sync_rx.recv() { + let tx = self.async_tx.clone(); + tokio::task::spawn(async move { + if let Err(e) = tx.send(message).await { + eprintln!("Failed to forward log message to async channel, log message not sent to Google Logger: {}", e); + } + }); + } + } +} + +/// Sends the log messages to the Google Logging API +pub struct Shipper { + client: Client, + token: Token, + sync_rx: Option>, + async_rx: async_mpsc::Receiver, + async_tx: Option>, +} + +impl Shipper { + /// Takes the sync receiver and async sender from the Shipper struct into the [`Bridge`] + /// For usage see the [`example`](crate::logger::Builder::build_with_async_shipper). + pub fn yield_bridge(&mut self) -> Bridge { + match (self.sync_rx.take(), self.async_tx.take()) { + (Some(sync_rx), Some(async_tx)) => Bridge { sync_rx, async_tx }, + (_, _) => panic!("May not happen"), + } + } + + /// Creates a `Shipper` + pub fn new(sync_rx: sync_mpsc::Receiver) -> Self { + let (async_tx, async_rx) = tokio::sync::mpsc::channel::(100); + + Shipper { + client: Client::new(), + token: Token::default(), + sync_rx: Some(sync_rx), + async_rx, + async_tx: Some(async_tx), + } + } + + async fn send_log_entry( + &mut self, + token: &str, + body: WriteLogEntriesRequest, + ) -> Result<(), Error> { + let url = "https://logging.googleapis.com/v2/entries:write".to_string(); + + let response = self + .client + .post(url) + .bearer_auth(token) + .json(&body) + .send() + .await + .map_err(|e| Error::ShipperReqwestError { + context: "performing HTTP POST request to the Google Logging API".to_string(), + source: e, + })?; + let status = response.status(); + if status.is_success() { + Ok(()) + } else { + Err(get_error_response( + response, + "response when sending log entry to Google Logging API".to_string(), + ) + .await) + } + } + + /// The process that receives log entries and sends them to the Google Logging API + pub async fn run_log_shipper(mut self) { + while let Some(log_entry) = self.async_rx.recv().await { + match self.token.fetch_access_token(&self.client).await { + Ok(token) => { + if let Err(e) = self.send_log_entry(&token, log_entry).await { + eprintln!("Failed to send log entry: {}", e); + } + } + Err(e) => { + eprintln!("Failed to fetch access token: {}", e); + } + } + } + } +} diff --git a/src/args.rs b/src/args.rs index 4a14bdf..d45c010 100644 --- a/src/args.rs +++ b/src/args.rs @@ -45,6 +45,10 @@ pub const STORAGE_BACKEND_TYPE: &str = "sbe-type"; pub const USR_JSON_PATH: &str = "usr-json-path"; pub const USR_HTTP_URL: &str = "usr-http-url"; pub const VERBOSITY: &str = "verbosity"; +pub const GLOG_LOGNAME: &str = "log-google-logname"; +pub const GLOG_LEVEL_LABEL: &str = "log-google-level-label"; +pub const GLOG_RESOURCE_TYPE: &str = "log-google-resource-type"; +pub const GLOG_LABELS_FILE: &str = "log-google-labels-file"; #[derive(Debug, EnumString, Display, PartialEq)] #[strum(serialize_all = "lowercase")] @@ -537,4 +541,36 @@ pub(crate) fn clap_app(tmp_dir: &str) -> clap::Command { .env("UNFTP_NTF_PUBSUB_PROJECT") .takes_value(true), ) + .arg( + Arg::new(GLOG_LOGNAME) + .long("log-google-logname") + .value_name("LOG_GOOGLE_LOGNAME") + .help("Required for google logging: The logName to set in the LogEntry records going to Google Logging. See https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry") + .env("UNFTP_GLOG_LOGNAME") + .takes_value(true), + ) + .arg( + Arg::new(GLOG_RESOURCE_TYPE) + .long("log-google-resource-type") + .value_name("LOG_GOOGLE_RESOURCE_TYPE") + .help("Required for google logging: The resource type to add to all Google log entries. E.g.: 'k8s_container'. See See https://cloud.google.com/logging/docs/reference/v2/rest/v2/MonitoredResource") + .env("UNFTP_GLOG_RESOURCE_TYPE") + .takes_value(true), + ) + .arg( + Arg::new(GLOG_LEVEL_LABEL) + .long("log-google-level-label") + .value_name("LOG_GOOGLE_LEVEL_LABEL") + .help("The name you want for the label in 'labels' that will contain the log level") + .env("UNFTP_GLOG_LEVEL_LABEL") + .takes_value(true), + ) + .arg( + Arg::new(GLOG_LABELS_FILE) + .long("log-google-labels-file") + .value_name("LOG_GOOGLE_LABELS_FILE") + .help("Default labels for 'labels' and 'resource.labels' to add to all Google log entries. See https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry") + .env("UNFTP_GLOG_LABELS_FILE") + .takes_value(true), + ) } diff --git a/src/logging.rs b/src/logging.rs index 1e77a86..3553403 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -1,10 +1,13 @@ use crate::app; use crate::args; +use crate::args::GLOG_LABELS_FILE; +use crate::args::GLOG_LEVEL_LABEL; +use crate::args::GLOG_LOGNAME; +use crate::args::GLOG_RESOURCE_TYPE; use app::NAME; use args::{INSTANCE_NAME, LOG_LEVEL, REDIS_HOST, REDIS_KEY, REDIS_PORT, VERBOSITY}; use clap::ArgMatches; -use redislog::Builder; use slog::{error, o, Drain, Duplicate, Level, Logger, OwnedKVList, Record}; use slog_async::Async; use slog_redis as redislog; @@ -35,7 +38,9 @@ where } } -pub fn create_logger(arg_matches: &ArgMatches) -> Result { +pub fn create_logger( + arg_matches: &ArgMatches, +) -> Result<(slog::Logger, Option), String> { let min_log_level = match arg_matches.occurrences_of(VERBOSITY) { 0 => Level::Warning, 1 => Level::Info, @@ -62,15 +67,29 @@ pub fn create_logger(arg_matches: &ArgMatches) -> Result { .fuse(); let mut err: Option = None; - let drain = match redis_logger(arg_matches) { - Ok(Some(redis_logger)) => { + + let redis_result = redis_logger(arg_matches); + + let google_result = google_logger(arg_matches); + let mut google_shipper = None; + + let drain = match (redis_result, google_result) { + (Ok(Some(redis_logger)), _) => { let both = Duplicate::new(redis_logger, term_drain).fuse(); Async::new(both.filter_level(min_log_level).fuse()) .build() .fuse() } - Ok(None) => Async::new(term_drain).build().fuse(), - Err(e) => { + (_, Ok(Some(google_logger))) => { + let (drain, shipper) = google_logger; + google_shipper = Some(shipper); + let both = Duplicate::new(drain, term_drain).fuse(); + Async::new(both.filter_level(min_log_level).fuse()) + .build() + .fuse() + } + (Ok(None), Ok(None)) => Async::new(term_drain).build().fuse(), + (Err(e), _) | (_, Err(e)) => { err = e.into(); Async::new(term_drain).build().fuse() } @@ -80,7 +99,7 @@ pub fn create_logger(arg_matches: &ArgMatches) -> Result { if let Some(err_str) = err { error!(log, "Continuing only with terminal logger: {}", err_str) } - Ok(log) + Ok((log, google_shipper)) } fn redis_logger(m: &ArgMatches) -> Result>, String> { @@ -96,7 +115,7 @@ fn redis_logger(m: &ArgMatches) -> Result().unwrap(), @@ -110,3 +129,53 @@ fn redis_logger(m: &ArgMatches) -> Result Err("for the redis logger please specify all --log-redis-* options".to_string()), } } + +fn load_labels_file(file_path: &str, hostname: &str) -> Result { + let contents = std::fs::read_to_string(file_path) + .map_err(|e| format!("could not read file '{}': {}", file_path, e))?; + let input = contents.replace("{{hostname}}", hostname); + serde_json::from_str(input.as_str()) + .map_err(|e| format!("could not parse file {} as json: {}", file_path, e)) +} + +fn google_logger( + m: &ArgMatches, +) -> Result, String> { + match (m.value_of(GLOG_LOGNAME), m.value_of(GLOG_RESOURCE_TYPE)) { + + (Some(logname), Some(resource_type)) => { + let hostname = std::env::var("HOST") + .or_else(|_| std::env::var("HOSTNAME")) + .unwrap_or_default(); + + let (labels_file, level_label) = (m.value_of(GLOG_LABELS_FILE), m.value_of(GLOG_LEVEL_LABEL)); + + let mut builder = googlelog::logger::Builder::new( + logname, + resource_type, + ); + + if let Some(file) = labels_file { + let data = load_labels_file(file, hostname.as_str()).map_err(|e| format!("error loading labels file: {}", e))?; + let default_labels = data["default_labels"].clone(); + let resource_labels = data["resource_labels"].clone(); + + builder = builder + .with_default_labels(default_labels).map_err(|e| format!("error using default labels: {}", e))? + .with_resource_labels(resource_labels).map_err(|e| format!("error using resource labels: {}", e))?; + + } + + if let Some(level_label) = level_label { + builder = builder.with_log_level_label(level_label); + } + + let (drain, shipper) = builder.build_with_async_shipper(); + + Ok(Some((drain, shipper))) + + }, + (None, None) => Ok(None), + _ => Err("To use the google logger please specify all required options (logname + resource type)".to_string()), + } +} diff --git a/src/main.rs b/src/main.rs index 3468360..05c55c7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -49,7 +49,7 @@ use std::{ sync::Arc, time::Duration, }; -use tokio::runtime::Runtime; + #[cfg(feature = "pam_auth")] use unftp_auth_pam as pam; use unftp_sbe_gcs::options::AuthMethod; @@ -823,8 +823,8 @@ async fn main_task( Ok(signal) } -fn run(arg_matches: ArgMatches) -> Result<(), String> { - let root_logger = logging::create_logger(&arg_matches)?; +async fn run(arg_matches: ArgMatches) -> Result<(), String> { + let (root_logger, google_shipper) = logging::create_logger(&arg_matches)?; let log = root_logger.new(o!("module" => "main")); let addr = String::from(arg_matches.value_of(args::BIND_ADDRESS).unwrap()); @@ -847,12 +847,31 @@ fn run(arg_matches: ArgMatches) -> Result<(), String> { "sbe-type" => sbe_type, ); - let runtime = Runtime::new().map_err(|e| format!("could not construct runtime: {}", e))?; + // If logging needs to be sent to Google, we need to start tasks + // to bridge between the sync and async channels, as well as start + // the log shipper. For now this is the only clean way I could + // find to ship from the slog Drain to the Google API + if let Some(mut shipper) = google_shipper { + // This is an sync to async bridge: The drain creates the + // Google LogEntry's, and sends them over the sync + // channel. The bridge receives it and forwards it over the + // async bridge to the shipper. + let bridge = shipper.yield_bridge(); + tokio::task::spawn_blocking(move || { + bridge.run_sync_to_async_bridge(); + }); + + // The shipper does the calls to Google Logging API + tokio::task::spawn(async move { + shipper.run_log_shipper().await; + }); + + info!(log, "Started Google Logger"); + } + // We wait for a signal (HUP, INT, TERM). If the signal is a HUP, // we restart, otherwise we exit the loop and the program ends. - while runtime.block_on(main_task(arg_matches.clone(), &log, &root_logger))? - == ExitSignal("SIG_HUP") - { + while main_task(arg_matches.clone(), &log, &root_logger).await? == ExitSignal("SIG_HUP") { info!(log, "Received SIG_HUP, restarting"); } info!(log, "Exiting..."); @@ -872,7 +891,8 @@ fn get_host_name() -> String { } } -fn main() { +#[tokio::main] +async fn main() { #[cfg(feature = "tokio_console")] { console_subscriber::ConsoleLayer::builder() @@ -885,7 +905,7 @@ fn main() { let tmp_dir = env::temp_dir(); let tmp_dir = tmp_dir.as_path().to_str().unwrap(); let arg_matches = args::clap_app(tmp_dir).get_matches(); - if let Err(e) = run(arg_matches) { + if let Err(e) = run(arg_matches).await { eprintln!("\nError: {}", e); process::exit(1); };