From 4aadb37160e1a07e27803c37e3a7685203727fc9 Mon Sep 17 00:00:00 2001 From: Paulo Bressan Date: Fri, 1 Mar 2024 15:34:49 -0300 Subject: [PATCH] Pingora tcp proxy (#12) * feat(proxy): added pingora tcp proxy * chore(proxy): adjusted to use prometheus addr * chore(proxy): added metrics to get how many bytes a consumer has been used * chore(proxy): adjusted lint --- Cargo.lock | 956 +++++++++++++++++++++++++++++------ docker/dockerfile.operator | 2 +- docker/dockerfile.proxy | 2 +- operator/yaml/port.yaml | 4 +- proxy/Cargo.toml | 8 +- proxy/examples/manifest.yaml | 4 + proxy/src/auth.rs | 87 ++-- proxy/src/config.rs | 22 +- proxy/src/main.rs | 98 +++- proxy/src/proxy.rs | 220 ++++---- 10 files changed, 1072 insertions(+), 331 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b7be4c4..3d6e3dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -39,8 +39,8 @@ dependencies = [ "encoding_rs", "flate2", "futures-core", - "h2", - "http", + "h2 0.3.24", + "http 0.2.11", "httparse", "httpdate", "itoa", @@ -75,7 +75,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d22475596539443685426b6bdadb926ad0ecaefdfc5fb05e5e3441f15463c511" dependencies = [ "bytestring", - "http", + "http 0.2.11", "regex", "serde", "tracing", @@ -254,6 +254,15 @@ dependencies = [ "libc", ] +[[package]] +name = "ansi_term" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" +dependencies = [ + "winapi", +] + [[package]] name = "argon2" version = "0.5.3" @@ -266,6 +275,34 @@ dependencies = [ "password-hash", ] +[[package]] +name = "arrayvec" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" + +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.50", +] + [[package]] name = "async-trait" version = "0.1.77" @@ -277,6 +314,17 @@ dependencies = [ "syn 2.0.50", ] +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi 0.1.19", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -427,6 +475,30 @@ dependencies = [ "windows-targets 0.52.0", ] +[[package]] +name = "clap" +version = "2.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" +dependencies = [ + "ansi_term", + "atty", + "bitflags 1.3.2", + "strsim 0.8.0", + "textwrap", + "unicode-width", + "vec_map", +] + +[[package]] +name = "cmake" +version = "0.1.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" +dependencies = [ + "cc", +] + [[package]] name = "convert_case" version = "0.4.0" @@ -478,6 +550,21 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" + [[package]] name = "crypto-common" version = "0.1.6" @@ -488,6 +575,15 @@ dependencies = [ "typenum", ] +[[package]] +name = "daemonize" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab8bfdaacb3c887a54d41bdf48d3af8873b3f5566469f8ba21b92057509f116e" +dependencies = [ + "libc", +] + [[package]] name = "darling" version = "0.20.6" @@ -508,7 +604,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.10.0", "syn 2.0.50", ] @@ -523,6 +619,22 @@ dependencies = [ "syn 2.0.50", ] +[[package]] +name = "data-encoding" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" + +[[package]] +name = "debugid" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d" +dependencies = [ + "serde", + "uuid", +] + [[package]] name = "deranged" version = "0.3.11" @@ -600,22 +712,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" -[[package]] -name = "errno" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" -dependencies = [ - "libc", - "windows-sys 0.52.0", -] - -[[package]] -name = "fastrand" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" - [[package]] name = "flate2" version = "1.0.28" @@ -623,6 +719,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" dependencies = [ "crc32fast", + "libz-ng-sys", "miniz_oxide", ] @@ -783,14 +880,39 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", - "indexmap", + "http 0.2.11", + "indexmap 2.2.3", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.0.0", + "indexmap 2.2.3", "slab", "tokio", "tokio-util", "tracing", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.14.3" @@ -801,12 +923,36 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "heck" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" +dependencies = [ + "unicode-segmentation", +] + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + [[package]] name = "hermit-abi" version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd5256b483761cd23699d0da46cc6fd2ee3be420bbe6d020ae4a091e70b7e9fd" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "home" version = "0.5.9" @@ -816,6 +962,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "hostname" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" +dependencies = [ + "libc", + "match_cfg", + "winapi", +] + [[package]] name = "http" version = "0.2.11" @@ -827,6 +984,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -834,7 +1002,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.11", "pin-project-lite", ] @@ -866,7 +1034,8 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "http", + "h2 0.3.24", + "http 0.2.11", "http-body", "httparse", "httpdate", @@ -886,13 +1055,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", - "http", + "http 0.2.11", "hyper", "log", - "rustls 0.21.10", + "rustls", "rustls-native-certs", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls", ] [[package]] @@ -946,6 +1115,16 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.2.3" @@ -953,7 +1132,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "233cf39063f058ea2caae4091bf4a3ef70a653afbc026f5c4a4135d114e3c177" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.14.3", ] [[package]] @@ -965,6 +1144,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "ipnet" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" + [[package]] name = "itoa" version = "1.0.10" @@ -1044,7 +1229,7 @@ dependencies = [ "either", "futures", "home", - "http", + "http 0.2.11", "http-body", "hyper", "hyper-rustls", @@ -1054,12 +1239,12 @@ dependencies = [ "kube-core", "pem", "pin-project", - "rustls 0.21.10", - "rustls-pemfile 1.0.4", + "rustls", + "rustls-pemfile", "secrecy", "serde", "serde_json", - "serde_yaml", + "serde_yaml 0.9.32", "thiserror", "tokio", "tokio-util", @@ -1076,7 +1261,7 @@ checksum = "b5bba93d054786eba7994d03ce522f368ef7d48c88a1826faa28478d85fb63ae" dependencies = [ "chrono", "form_urlencoded", - "http", + "http 0.2.11", "json-patch", "k8s-openapi", "once_cell", @@ -1110,7 +1295,7 @@ dependencies = [ "backoff", "derivative", "futures", - "hashbrown", + "hashbrown 0.14.3", "json-patch", "k8s-openapi", "kube-client", @@ -1144,10 +1329,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] -name = "linux-raw-sys" -version = "0.4.13" +name = "libz-ng-sys" +version = "1.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6409efc61b12687963e602df8ecf70e8ddacf95bc6576bcf16e3ac6328083c5" +dependencies = [ + "cmake", + "libc", +] + +[[package]] +name = "linked-hash-map" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" [[package]] name = "local-channel" @@ -1182,12 +1377,36 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "lru" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" +dependencies = [ + "hashbrown 0.14.3", +] + +[[package]] +name = "match_cfg" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" + [[package]] name = "memchr" version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +[[package]] +name = "memoffset" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.17" @@ -1216,21 +1435,15 @@ dependencies = [ ] [[package]] -name = "native-tls" -version = "0.2.11" +name = "nix" +version = "0.24.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +checksum = "fa52e972a9a719cecb6864fb88568781eb706bac2cd1d4f04a648542dbf78069" dependencies = [ - "lazy_static", + "bitflags 1.3.2", + "cfg-if", "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", + "memoffset", ] [[package]] @@ -1264,7 +1477,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi", + "hermit-abi 0.3.6", "libc", ] @@ -1315,6 +1528,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-src" +version = "300.2.3+3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cff92b6f71555b61bb9315f7c64da3ca43d87531622120fea0195fc761b4843" +dependencies = [ + "cc", +] + [[package]] name = "openssl-sys" version = "0.9.101" @@ -1323,6 +1545,7 @@ checksum = "dda2b0f344e78efc2facf7d195d098df0dd72151b26ab98da807afc26c198dff" dependencies = [ "cc", "libc", + "openssl-src", "pkg-config", "vcpkg", ] @@ -1343,7 +1566,7 @@ dependencies = [ "schemars", "serde", "serde_json", - "serde_yaml", + "serde_yaml 0.9.32", "thiserror", "tokio", "tracing", @@ -1499,67 +1722,219 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] -name = "pkg-config" -version = "0.3.30" +name = "pingora" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +checksum = "d69b4841cdc05e8d1bf08a41231fb761ca7aac95fa1a75078b7f8cd23b57309d" +dependencies = [ + "pingora-core", + "pingora-http", + "pingora-timeout", +] [[package]] -name = "powerfmt" -version = "0.2.0" +name = "pingora-core" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" +checksum = "6578b0f60416cf3eacce79dc299cab64687b15ae5fdfe14196a7e5c7b7114e26" +dependencies = [ + "ahash", + "async-trait", + "brotli", + "bytes", + "chrono", + "daemonize", + "flate2", + "futures", + "h2 0.4.2", + "http 1.0.0", + "httparse", + "libc", + "log", + "lru", + "nix", + "once_cell", + "openssl-probe", + "parking_lot", + "percent-encoding", + "pingora-error", + "pingora-http", + "pingora-openssl", + "pingora-pool", + "pingora-runtime", + "pingora-timeout", + "prometheus", + "rand", + "regex", + "sentry", + "serde", + "serde_yaml 0.8.26", + "sfv", + "socket2", + "structopt", + "thread_local", + "tokio", + "tokio-test", + "unicase", + "zstd", +] [[package]] -name = "ppv-lite86" -version = "0.2.17" +name = "pingora-error" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +checksum = "2dadb1b750e79c2691631c911a0cbb9577366d8acb4dc48337c1aee403ec2653" [[package]] -name = "proc-macro2" -version = "1.0.78" +name = "pingora-http" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +checksum = "f9a47b70126f169be9b5ee3e4824f316fa8414b0ece0eb4eb458eca585f0a3e7" dependencies = [ - "unicode-ident", + "bytes", + "http 1.0.0", + "pingora-error", ] [[package]] -name = "prometheus" -version = "0.13.3" +name = "pingora-openssl" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" +checksum = "43b6c21272f80f0cc933f62c075268b0e6ee1a20b3543de1aa9ff8789c562efa" dependencies = [ - "cfg-if", - "fnv", - "lazy_static", - "memchr", - "parking_lot", - "protobuf", - "thiserror", + "foreign-types", + "libc", + "openssl", + "openssl-src", + "openssl-sys", + "tokio-openssl", ] [[package]] -name = "protobuf" -version = "2.28.0" +name = "pingora-pool" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +checksum = "b4569e3bef52b0abab239a5cf3287c71307615ca61be7fc7799d71fdaab33d81" +dependencies = [ + "crossbeam-queue", + "log", + "lru", + "parking_lot", + "pingora-timeout", + "thread_local", + "tokio", +] [[package]] -name = "proxy" +name = "pingora-runtime" version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0237e88828d809e8d0a5198543d72e540202887f347216fc8504ece862b0ccb" +dependencies = [ + "once_cell", + "rand", + "thread_local", + "tokio", +] + +[[package]] +name = "pingora-timeout" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0be182194d34e1b28608eaa49ee0fb86e5b7ab1d21a1d7a2b4d402446fda47e1" dependencies = [ - "dotenv", "futures", + "once_cell", + "parking_lot", + "pin-project-lite", + "thread_local", + "tokio", +] + +[[package]] +name = "pkg-config" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" + +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn 1.0.109", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + +[[package]] +name = "proc-macro2" +version = "1.0.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "prometheus" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror", +] + +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + +[[package]] +name = "proxy" +version = "0.1.0" +dependencies = [ + "async-trait", + "dotenv", "futures-util", - "native-tls", "operator", + "pingora", + "prometheus", "regex", - "rustls 0.22.2", - "rustls-pemfile 2.1.0", "tokio", - "tokio-rustls 0.25.0", "tracing", "tracing-subscriber", ] @@ -1641,6 +2016,47 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +[[package]] +name = "reqwest" +version = "0.11.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.3.24", + "http 0.2.11", + "http-body", + "hyper", + "hyper-rustls", + "ipnet", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "system-configuration", + "tokio", + "tokio-rustls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots", + "winreg", +] + [[package]] name = "ring" version = "0.17.8" @@ -1656,6 +2072,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rust_decimal" +version = "1.34.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39449a79f45e8da28c57c341891b69a183044b29518bb8f86dbac9df60bb7df" +dependencies = [ + "arrayvec", + "num-traits", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -1671,19 +2097,6 @@ dependencies = [ "semver", ] -[[package]] -name = "rustix" -version = "0.38.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" -dependencies = [ - "bitflags 2.4.2", - "errno", - "libc", - "linux-raw-sys", - "windows-sys 0.52.0", -] - [[package]] name = "rustls" version = "0.21.10" @@ -1692,24 +2105,10 @@ checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" dependencies = [ "log", "ring", - "rustls-webpki 0.101.7", + "rustls-webpki", "sct", ] -[[package]] -name = "rustls" -version = "0.22.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" -dependencies = [ - "log", - "ring", - "rustls-pki-types", - "rustls-webpki 0.102.2", - "subtle", - "zeroize", -] - [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -1717,7 +2116,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", - "rustls-pemfile 1.0.4", + "rustls-pemfile", "schannel", "security-framework", ] @@ -1731,22 +2130,6 @@ dependencies = [ "base64", ] -[[package]] -name = "rustls-pemfile" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c333bb734fcdedcea57de1602543590f545f127dc8b533324318fd492c5c70b" -dependencies = [ - "base64", - "rustls-pki-types", -] - -[[package]] -name = "rustls-pki-types" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "048a63e5b3ac996d78d402940b5fa47973d2d080c6c6fffa1d0f19c4445310b7" - [[package]] name = "rustls-webpki" version = "0.101.7" @@ -1757,17 +2140,6 @@ dependencies = [ "untrusted", ] -[[package]] -name = "rustls-webpki" -version = "0.102.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" -dependencies = [ - "ring", - "rustls-pki-types", - "untrusted", -] - [[package]] name = "ryu" version = "1.0.17" @@ -1862,6 +2234,86 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92d43fe69e652f3df9bdc2b85b2854a0825b86e4fb76bc44d945137d053639ca" +[[package]] +name = "sentry" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "904eca4fb30c6112a1dae60c0a9e29cfb42f42129da4260f1ee20e94151b62e3" +dependencies = [ + "httpdate", + "reqwest", + "sentry-backtrace", + "sentry-contexts", + "sentry-core", + "sentry-panic", + "tokio", +] + +[[package]] +name = "sentry-backtrace" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1671189d1b759879fa4bdde46c50a499abb14332ed81f84fc6f60658f41b2fdb" +dependencies = [ + "backtrace", + "lazy_static", + "regex", + "sentry-core", +] + +[[package]] +name = "sentry-contexts" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db80ceff16bb1a4b2689b8758e5e61e405fc4d8ff9f2d1b5b845b76ce37fa34e" +dependencies = [ + "hostname", + "libc", + "rustc_version", + "sentry-core", + "uname", +] + +[[package]] +name = "sentry-core" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c9f509d3959ed4dbbd80ca42572caad682aaa1cdd92c719e0815d0e87f82c96" +dependencies = [ + "lazy_static", + "rand", + "sentry-types", + "serde", + "serde_json", +] + +[[package]] +name = "sentry-panic" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8b442769cc34115f64393f7bfe4f863c3c38749e0c0b9613a7ae25b37c7ba53" +dependencies = [ + "sentry-backtrace", + "sentry-core", +] + +[[package]] +name = "sentry-types" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "254b600e93e9ef00a48382c9f1e86d27884bd9a5489efa4eb9210c20c72e88a6" +dependencies = [ + "debugid", + "getrandom", + "hex", + "serde", + "serde_json", + "thiserror", + "time", + "url", + "uuid", +] + [[package]] name = "serde" version = "1.0.197" @@ -1926,19 +2378,42 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.8.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "578a7433b776b56a35785ed5ce9a7e777ac0598aac5a6dd1b4b18a307c7fc71b" +dependencies = [ + "indexmap 1.9.3", + "ryu", + "serde", + "yaml-rust", +] + [[package]] name = "serde_yaml" version = "0.9.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fd075d994154d4a774f95b51fb96bdc2832b0ea48425c92546073816cda1f2f" dependencies = [ - "indexmap", + "indexmap 2.2.3", "itoa", "ryu", "serde", "unsafe-libyaml", ] +[[package]] +name = "sfv" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f27daf6ed3fc7ffd5ea3ce9f684fe351c47e50f2fdbb6236e2bad0b440dbe408" +dependencies = [ + "data-encoding", + "indexmap 2.2.3", + "rust_decimal", +] + [[package]] name = "sha1" version = "0.10.6" @@ -2010,12 +2485,42 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "strsim" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + [[package]] name = "strsim" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "structopt" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6b5c64445ba8094a6ab0c3cd2ad323e07171012d9c98b0b15651daf1787a10" +dependencies = [ + "clap", + "lazy_static", + "structopt-derive", +] + +[[package]] +name = "structopt-derive" +version = "0.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "subtle" version = "2.5.0" @@ -2045,15 +2550,39 @@ dependencies = [ ] [[package]] -name = "tempfile" -version = "3.10.0" +name = "sync_wrapper" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" dependencies = [ - "cfg-if", - "fastrand", - "rustix", - "windows-sys 0.52.0", + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", ] [[package]] @@ -2172,25 +2701,50 @@ dependencies = [ "syn 2.0.50", ] +[[package]] +name = "tokio-openssl" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ffab79df67727f6acf57f1ff743091873c24c579b1e2ce4d8f53e47ded4d63d" +dependencies = [ + "futures-util", + "openssl", + "openssl-sys", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.10", + "rustls", "tokio", ] [[package]] -name = "tokio-rustls" -version = "0.25.0" +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-test" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +checksum = "e89b3cbabd3ae862100094ae433e1def582cf86451b4e9bf83aa7ac1d8a7d719" dependencies = [ - "rustls 0.22.2", - "rustls-pki-types", + "async-stream", + "bytes", + "futures-core", "tokio", + "tokio-stream", ] [[package]] @@ -2236,7 +2790,7 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http", + "http 0.2.11", "http-body", "http-range-header", "mime", @@ -2343,6 +2897,24 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" +[[package]] +name = "uname" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b72f89f0ca32e4db1c04e2a72f5345d59796d4866a1ee0609084569f73683dc8" +dependencies = [ + "libc", +] + +[[package]] +name = "unicase" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.15" @@ -2364,6 +2936,18 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" + +[[package]] +name = "unicode-width" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" + [[package]] name = "unsafe-libyaml" version = "0.2.10" @@ -2385,6 +2969,17 @@ dependencies = [ "form_urlencoded", "idna", "percent-encoding", + "serde", +] + +[[package]] +name = "uuid" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" +dependencies = [ + "getrandom", + "serde", ] [[package]] @@ -2399,6 +2994,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "vec_map" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + [[package]] name = "version_check" version = "0.9.4" @@ -2445,6 +3046,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877b9c3f61ceea0e56331985743b13f3d25c406a7098d45180fb5f09bc19ed97" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.91" @@ -2474,6 +3087,22 @@ version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f186bd2dcf04330886ce82d6f33dd75a7bfcf69ecf5763b89fcde53b6ac9838" +[[package]] +name = "web-sys" +version = "0.3.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96565907687f7aceb35bc5fc03770a8a0471d82e479f25832f54a0e3f4b28446" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "webpki-roots" +version = "0.25.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" + [[package]] name = "winapi" version = "0.3.9" @@ -2637,6 +3266,25 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + +[[package]] +name = "yaml-rust" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "zerocopy" version = "0.7.32" diff --git a/docker/dockerfile.operator b/docker/dockerfile.operator index 05688c1..10eb44d 100644 --- a/docker/dockerfile.operator +++ b/docker/dockerfile.operator @@ -3,7 +3,7 @@ FROM rust:1.74-slim-buster as build WORKDIR /app RUN apt update -RUN apt install -y pkg-config libssl-dev +RUN apt install -y build-essential pkg-config libssl-dev cmake COPY ./Cargo.toml ./Cargo.toml COPY ./operator ./operator diff --git a/docker/dockerfile.proxy b/docker/dockerfile.proxy index 2c12356..842c165 100644 --- a/docker/dockerfile.proxy +++ b/docker/dockerfile.proxy @@ -3,7 +3,7 @@ FROM rust:1.74-slim-buster as build WORKDIR /app RUN apt update -RUN apt install -y pkg-config libssl-dev +RUN apt install -y build-essential pkg-config libssl-dev cmake COPY ./Cargo.toml ./Cargo.toml COPY ./operator ./operator diff --git a/operator/yaml/port.yaml b/operator/yaml/port.yaml index 16a6ae0..32a8f1b 100644 --- a/operator/yaml/port.yaml +++ b/operator/yaml/port.yaml @@ -9,6 +9,6 @@ metadata: name: mainnet-user namespace: prj-mainnet-test spec: - network: "mainnet" - version: "stable" + network: "preview" + version: "v1" throughputTier: "1" diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index ecaa0ad..fbfe0e6 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -7,14 +7,12 @@ edition = "2021" [dependencies] operator = { path = "../operator" } -futures = "0.3.30" -native-tls = "0.2.11" -rustls = "0.22.2" -rustls-pemfile = "2.1.0" tokio = { version = "1.36.0", features = ["full"] } -tokio-rustls = "0.25.0" regex = "1.10.3" dotenv = "0.15.0" tracing = "0.1.40" tracing-subscriber = "0.3.18" futures-util = "0.3.30" +pingora = "0.1.0" +prometheus = "0.13.3" +async-trait = "0.1.77" diff --git a/proxy/examples/manifest.yaml b/proxy/examples/manifest.yaml index fe0d4be..69c1d67 100644 --- a/proxy/examples/manifest.yaml +++ b/proxy/examples/manifest.yaml @@ -233,6 +233,8 @@ spec: env: - name: PROXY_ADDR value: "0.0.0.0:80" + - name: PROMETHEUS_ADDR + value: "0.0.0.0:9187" - name: NODE_PORT value: "80" - name: NODE_DNS @@ -365,6 +367,7 @@ metadata: spec: network: "mainnet" version: "stable" + throughputTier: "1" --- # Cardano Node Port 2 apiVersion: demeter.run/v1alpha1 @@ -375,3 +378,4 @@ metadata: spec: network: "mainnet" version: "stable" + throughputTier: "1" diff --git a/proxy/src/auth.rs b/proxy/src/auth.rs index 24cb3b3..ae518e4 100644 --- a/proxy/src/auth.rs +++ b/proxy/src/auth.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, sync::Arc}; +use async_trait::async_trait; use futures_util::TryStreamExt; use operator::{ kube::{ @@ -12,57 +13,71 @@ use operator::{ }, CardanoNodePort, }; +use pingora::{server::ShutdownWatch, services::background::BackgroundService}; use tokio::{pin, sync::RwLock}; use tracing::error; use crate::{Consumer, State}; -pub async fn start(state: Arc>) { - let client = Client::try_default() - .await - .expect("failed to create kube client"); - - let api = Api::::all(client.clone()); - update_auth(state.clone(), api.clone()).await; - - let stream = watcher::watcher(api.clone(), Config::default()).touched_objects(); - pin!(stream); +#[derive(Debug)] +pub struct AuthBackgroundService { + state: Arc>, +} +impl AuthBackgroundService { + pub fn new(state: Arc>) -> Self { + Self { state } + } - loop { - let result = stream.try_next().await; + async fn update_auth(&self, api: Api) { + let result = api.list(&ListParams::default()).await; if let Err(err) = result { - error!(error = err.to_string(), "fail crd auth watcher"); - continue; + error!( + error = err.to_string(), + "error to get crds while updating auth keys" + ); + return; } - update_auth(state.clone(), api.clone()).await; + let mut consumers = HashMap::new(); + for crd in result.unwrap().items.iter() { + if crd.status.is_some() { + let network = crd.spec.network.to_string(); + let version = crd.spec.version.clone(); + let auth_token = crd.status.as_ref().unwrap().auth_token.clone(); + let namespace = crd.metadata.namespace.as_ref().unwrap().clone(); + let port_name = crd.name_any(); + + let hash_key = format!("{}.{}.{}", network, version, auth_token); + let consumer = Consumer::new(namespace, port_name); + + consumers.insert(hash_key, consumer); + } + } + self.state.write().await.consumers = consumers; } } -async fn update_auth(state: Arc>, api: Api) { - let result = api.list(&ListParams::default()).await; - if let Err(err) = result { - error!( - error = err.to_string(), - "error to get crds while updating auth keys" - ); - return; - } +#[async_trait] +impl BackgroundService for AuthBackgroundService { + async fn start(&self, mut _shutdown: ShutdownWatch) { + let client = Client::try_default() + .await + .expect("failed to create kube client"); + + let api = Api::::all(client.clone()); + self.update_auth(api.clone()).await; - let mut consumers = HashMap::new(); - for crd in result.unwrap().items.iter() { - if crd.status.is_some() { - let network = crd.spec.network.to_string(); - let version = crd.spec.version.clone(); - let auth_token = crd.status.as_ref().unwrap().auth_token.clone(); - let namespace = crd.metadata.namespace.as_ref().unwrap().clone(); - let port_name = crd.name_any(); + let stream = watcher::watcher(api.clone(), Config::default()).touched_objects(); + pin!(stream); - let hash_key = format!("{}.{}.{}", network, version, auth_token); - let consumer = Consumer::new(namespace, port_name); + loop { + let result = stream.try_next().await; + if let Err(err) = result { + error!(error = err.to_string(), "fail crd auth watcher"); + continue; + } - consumers.insert(hash_key, consumer); + self.update_auth(api.clone()).await; } } - state.write().await.consumers = consumers; } diff --git a/proxy/src/config.rs b/proxy/src/config.rs index cff5c5f..7f679c2 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -1,24 +1,21 @@ -use std::{env, path::PathBuf}; +use std::env; #[derive(Debug, Clone)] pub struct Config { pub proxy_addr: String, - pub ssl_crt_path: PathBuf, - pub ssl_key_path: PathBuf, + pub prometheus_addr: String, + pub ssl_crt_path: String, + pub ssl_key_path: String, pub node_port: u16, pub node_dns: String, } - impl Config { pub fn new() -> Self { Self { proxy_addr: env::var("PROXY_ADDR").expect("PROXY_ADDR must be set"), - ssl_crt_path: env::var("SSL_CRT_PATH") - .map(|e| e.into()) - .expect("SSL_CRT_PATH must be set"), - ssl_key_path: env::var("SSL_KEY_PATH") - .map(|e| e.into()) - .expect("SSL_KEY_PATH must be set"), + prometheus_addr: env::var("PROMETHEUS_ADDR").expect("PROMETHEUS_ADDR must be set"), + ssl_crt_path: env::var("SSL_CRT_PATH").expect("SSL_CRT_PATH must be set"), + ssl_key_path: env::var("SSL_KEY_PATH").expect("SSL_KEY_PATH must be set"), node_port: env::var("NODE_PORT") .expect("NODE_PORT must be set") .parse() @@ -27,3 +24,8 @@ impl Config { } } } +impl Default for Config { + fn default() -> Self { + Self::new() + } +} diff --git a/proxy/src/main.rs b/proxy/src/main.rs index c98d8a5..14b4483 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -1,6 +1,14 @@ +use std::{collections::HashMap, fmt::Display, sync::Arc}; + +use auth::AuthBackgroundService; use dotenv::dotenv; -use regex::Regex; -use std::{collections::HashMap, error::Error, fmt::Display, sync::Arc}; +use pingora::{ + listeners::Listeners, + server::{configuration::Opt, Server}, + services::{background::background_service, listening::Service}, +}; +use prometheus::{opts, register_int_counter_vec}; +use proxy::ProxyApp; use tokio::sync::RwLock; use tracing::Level; @@ -10,43 +18,60 @@ mod auth; mod config; mod proxy; -#[tokio::main] -async fn main() -> Result<(), Box> { +fn main() { dotenv().ok(); tracing_subscriber::fmt().with_max_level(Level::INFO).init(); - let state = Arc::new(RwLock::new(State::try_new()?)); - let auth = auth::start(state.clone()); - let proxy_server = proxy::start(state.clone()); + let config: Arc = Arc::default(); + let state: Arc> = Arc::default(); + + let opt = Opt::default(); + let mut server = Server::new(Some(opt)).unwrap(); + server.bootstrap(); + + let auth_background_service = background_service( + "K8S Auth Service", + AuthBackgroundService::new(state.clone()), + ); + server.add_service(auth_background_service); - tokio::join!(auth, proxy_server); + let tls_proxy_service = Service::with_listeners( + "TLS Proxy Service".to_string(), + Listeners::tls( + &config.proxy_addr, + &config.ssl_crt_path, + &config.ssl_key_path, + ) + .unwrap(), + Arc::new(ProxyApp::new(config.clone(), state)), + ); + server.add_service(tls_proxy_service); - Ok(()) + let mut prometheus_service_http = + pingora::services::listening::Service::prometheus_http_service(); + prometheus_service_http.add_tcp(&config.prometheus_addr); + server.add_service(prometheus_service_http); + + server.run_forever(); } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct State { - config: Config, - host_regex: Regex, + metrics: Metrics, consumers: HashMap, } impl State { - pub fn try_new() -> Result> { - let config = Config::new(); - let host_regex = Regex::new(r"(dmtr_[\w\d-]+)\.([\w]+)-([\w\d]+).+")?; + pub fn new() -> Self { + let metrics = Metrics::new(); let consumers = HashMap::new(); - Ok(Self { - config, - host_regex, - consumers, - }) + Self { metrics, consumers } } - pub fn is_authenticated(&self, network: &str, version: &str, token: &str) -> bool { + pub fn get_consumer(&self, network: &str, version: &str, token: &str) -> Option { let hash_key = format!("{}.{}.{}", network, version, token); - self.consumers.get(&hash_key).is_some() + self.consumers.get(&hash_key).cloned() } } @@ -68,3 +93,32 @@ impl Display for Consumer { write!(f, "{}.{}", self.namespace, self.port_name) } } + +#[derive(Debug, Clone)] +pub struct Metrics { + total_packages_bytes: prometheus::IntCounterVec, +} +impl Metrics { + pub fn new() -> Self { + let total_packages_bytes = register_int_counter_vec!( + opts!("node_proxy_total_packages_bytes", "Total bytes transferred",), + &["consumer"] + ) + .unwrap(); + + Self { + total_packages_bytes, + } + } + + pub fn count_total_packages_bytes(&self, consumer: &Consumer, value: usize) { + self.total_packages_bytes + .with_label_values(&[&consumer.to_string()]) + .inc_by(value as u64) + } +} +impl Default for Metrics { + fn default() -> Self { + Self::new() + } +} diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 19acdfc..43743fe 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -1,118 +1,138 @@ -use std::error::Error; -use std::fs::File; -use std::io::BufReader; -use std::net::SocketAddr; -use std::str::FromStr; -use std::sync::Arc; -use tokio::io::copy_bidirectional; -use tokio::net::{lookup_host, TcpListener, TcpStream}; -use tokio::sync::RwLock; -use tokio_rustls::TlsAcceptor; -use tracing::{error, info}; - -use crate::State; - -pub async fn start(rw_state: Arc>) { - let state = rw_state.read().await.clone(); - - let proxy_addr = SocketAddr::from_str(&state.config.proxy_addr).unwrap(); - let listener_result = TcpListener::bind(proxy_addr).await; - if let Err(err) = listener_result { - error!(error = err.to_string(), "fail to bind tcp server listener"); - std::process::exit(1); - } - let listener = listener_result.unwrap(); +use std::{net::SocketAddr, sync::Arc}; + +use async_trait::async_trait; +use pingora::{ + apps::ServerApp, connectors::TransportConnector, protocols::Stream, server::ShutdownWatch, + tls::ssl::NameType, upstreams::peer::BasicPeer, +}; +use regex::Regex; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::lookup_host, + select, + sync::RwLock, +}; +use tracing::error; + +use crate::{config::Config, Consumer, State}; + +pub struct ProxyApp { + client_connector: TransportConnector, + host_regex: Regex, + state: Arc>, + config: Arc, +} + +enum DuplexEvent { + DownstreamRead(usize), + UpstreamRead(usize), +} - let tls_acceptor_result = tls_acceptor(&state); - if let Err(err) = tls_acceptor_result { - error!(error = err.to_string(), "fail to configure tls"); - std::process::exit(1); +impl ProxyApp { + pub fn new(config: Arc, state: Arc>) -> Self { + ProxyApp { + client_connector: TransportConnector::new(None), + host_regex: Regex::new(r"(dmtr_[\w\d-]+)\.([\w]+)-([\w\d]+).+").unwrap(), + config, + state, + } } - let tls_acceptor = tls_acceptor_result.unwrap(); - info!(addr = proxy_addr.to_string(), "proxy listening"); + async fn duplex( + &self, + state: State, + consumer: Consumer, + mut server_session: Stream, + mut client_session: Stream, + ) { + let mut upstream_buf = [0; 1024]; + let mut downstream_buf = [0; 1024]; + loop { + let downstream_read = server_session.read(&mut upstream_buf); + let upstream_read = client_session.read(&mut downstream_buf); + let event: DuplexEvent; + select! { + n = downstream_read => event + = DuplexEvent::DownstreamRead(n.unwrap()), + n = upstream_read => event + = DuplexEvent::UpstreamRead(n.unwrap()), + } - loop { - let accept_result = listener.accept().await; - if let Err(err) = accept_result { - error!(error = err.to_string(), "fail to accept client"); - continue; + match event { + DuplexEvent::DownstreamRead(n) => { + state.metrics.count_total_packages_bytes(&consumer, n); + + client_session.write_all(&upstream_buf[0..n]).await.unwrap(); + client_session.flush().await.unwrap(); + } + DuplexEvent::UpstreamRead(n) => { + state.metrics.count_total_packages_bytes(&consumer, n); + + server_session + .write_all(&downstream_buf[0..n]) + .await + .unwrap(); + server_session.flush().await.unwrap(); + } + } } - let (inbound, _) = accept_result.unwrap(); + } +} - let tls_acceptor = tls_acceptor.clone(); - let state = rw_state.read().await.clone(); +#[async_trait] +impl ServerApp for ProxyApp { + async fn process_new( + self: &Arc, + io: Stream, + _shutdown: &ShutdownWatch, + ) -> Option { + let state = self.state.read().await.clone(); + + let hostname = io.get_ssl()?.servername(NameType::HOST_NAME); + if hostname.is_none() { + error!("hostname is not present in the certificate"); + return None; + } - tokio::spawn(async move { - let tls_stream_result = tls_acceptor.accept(inbound).await; - if tls_stream_result.is_err() { - return; - } - let mut tls_stream = tls_stream_result.unwrap(); + let captures_result = self.host_regex.captures(hostname?); + if captures_result.is_none() { + error!("invalid hostname pattern"); + return None; + } + let captures = captures_result?; - let (_, server_connection) = tls_stream.get_ref(); + let token = captures.get(1)?.as_str().to_string(); + let network = captures.get(2)?.as_str().to_string(); + let version = captures.get(3)?.as_str().to_string(); - let hostname = server_connection.server_name(); - if hostname.is_none() { - error!("hostname is not present in the certificate"); - return; - } + let consumer = state.get_consumer(&network, &version, &token)?; - let captures_result = state.host_regex.captures(hostname.unwrap()); - if captures_result.is_none() { - error!("invalid hostname pattern"); - return; - } + let node_host = format!( + "node-{network}-{version}.{}:{}", + self.config.node_dns, self.config.node_port + ); - let captures = captures_result.unwrap(); - let token = captures.get(1).unwrap().as_str().to_string(); - let network = captures.get(2).unwrap().as_str().to_string(); - let version = captures.get(3).unwrap().as_str().to_string(); + let lookup_result = lookup_host(node_host).await; + if let Err(err) = lookup_result { + error!(error = err.to_string(), "fail to lookup ip"); + return None; + } + let lookup: Vec = lookup_result.unwrap().collect(); + let node_addr = lookup.first()?; - if !state.is_authenticated(&network, &version, &token) { - return; - } + let proxy_to = BasicPeer::new(&node_addr.to_string()); - let node_host = format!( - "node-{network}-{version}.{}:{}", - state.config.node_dns, state.config.node_port - ); - let lookup_result = lookup_host(node_host).await; - if let Err(err) = lookup_result { - error!(error = err.to_string(), "fail to lookup ip"); - return; - } - let lookup: Vec = lookup_result.unwrap().collect(); + let client_session = self.client_connector.new_stream(&proxy_to).await; - let node_addr = lookup.first().unwrap(); - let outbound_result = TcpStream::connect(node_addr).await; - if let Err(err) = outbound_result { - error!(error = err.to_string(), "fail to connect to the node"); - return; + match client_session { + Ok(client_session) => { + self.duplex(state, consumer, io, client_session).await; + None } - let mut outbound = outbound_result.unwrap(); - - if let Err(err) = copy_bidirectional(&mut tls_stream, &mut outbound).await { - error!(error = err.to_string(), "failed to proxy data"); + Err(e) => { + error!("failed to create client session: {}", e); + None } - }); + } } } - -fn tls_acceptor(state: &State) -> Result> { - let certs = rustls_pemfile::certs(&mut BufReader::new(&mut File::open( - &state.config.ssl_crt_path, - )?)) - .collect::, _>>()?; - let private_key = rustls_pemfile::private_key(&mut BufReader::new(&mut File::open( - &state.config.ssl_key_path, - )?))? - .unwrap(); - - let config = rustls::ServerConfig::builder() - .with_no_client_auth() - .with_single_cert(certs, private_key)?; - let acceptor = TlsAcceptor::from(Arc::new(config.clone())); - - Ok(acceptor) -}