diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 3a4f47f4..72c42279 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -4,6 +4,33 @@ on: types: [published] jobs: + release-tools: + name: Build tools/${{ matrix.build }} + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + build: [system-stats] + steps: + - uses: actions/checkout@v3 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + profile: minimal + override: true + target: x86_64-unknown-linux-musl + + - name: Build binary + working-directory: tools/${{ matrix.build }} + run: cargo build --verbose --release + + - name: Upload release archive + uses: softprops/action-gh-release@v1 + with: + files: tools/${{ matrix.build }}/target/release/${{ matrix.build }} + build-release: name: Build release for ${{ matrix.target }} runs-on: ${{ matrix.os }} @@ -105,6 +132,7 @@ jobs: uses: softprops/action-gh-release@v1 with: files: uplink* + build-release-android: name: Build release for android runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index 65e6c9f5..63aa9914 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -65,19 +65,19 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.78" +version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca87830a3e3fb156dc96cfbd31cb620265dd053be734723f22b760d6cc3c3051" +checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" [[package]] name = "async-trait" -version = "0.1.76" +version = "0.1.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "531b97fb4cd3dfdce92c35dedbfdc1f0b9d8091c8ca943d6dae340ef5012d514" +checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.44", + "syn 2.0.47", ] [[package]] @@ -444,7 +444,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.10.0", - "syn 2.0.44", + "syn 2.0.47", ] [[package]] @@ -455,7 +455,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core", "quote", - "syn 2.0.44", + "syn 2.0.47", ] [[package]] @@ -492,7 +492,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.44", + "syn 2.0.47", ] [[package]] @@ -527,7 +527,7 @@ checksum = "eecf8589574ce9b895052fa12d69af7a233f99e6107f5cb8dd1044f2a17bfdcb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.44", + "syn 2.0.47", ] [[package]] @@ -762,7 +762,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.44", + "syn 2.0.47", ] [[package]] @@ -1484,7 +1484,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.44", + "syn 2.0.47", ] [[package]] @@ -1587,7 +1587,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.44", + "syn 2.0.47", ] [[package]] @@ -1733,9 +1733,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.73" +version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dd5e8a1f1029c43224ad5898e50140c2aebb1705f19e67c918ebf5b9e797fe1" +checksum = "907a61bd0f64c2f29cd1cf1dc34d05176426a3f504a78010f08416ddb7b13708" dependencies = [ "unicode-ident", ] @@ -1748,9 +1748,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quote" -version = "1.0.34" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22a37c9326af5ed140c86a46655b5278de879853be5573c01df185b6f49a580a" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" dependencies = [ "proc-macro2", ] @@ -2077,7 +2077,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" dependencies = [ - "semver 1.0.20", + "semver 1.0.21", ] [[package]] @@ -2236,9 +2236,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.20" +version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" +checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" [[package]] name = "semver-parser" @@ -2248,29 +2248,29 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.193" +version = "1.0.194" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" +checksum = "0b114498256798c94a0689e1a15fec6005dee8ac1f41de56404b67afc2a4b773" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.193" +version = "1.0.194" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" +checksum = "a3385e45322e8f9931410f01b3031ec534c3947d0e94c18049af4d9f9907d4e0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.44", + "syn 2.0.47", ] [[package]] name = "serde_json" -version = "1.0.109" +version = "1.0.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb0652c533506ad7a2e353cce269330d6afd8bdfb6d75e0ace5b35aacbd7b9e9" +checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4" dependencies = [ "itoa", "ryu", @@ -2279,9 +2279,9 @@ dependencies = [ [[package]] name = "serde_path_to_error" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4beec8bce849d58d06238cb50db2e1c417cfeafa4c63f692b15c82b7c80f8335" +checksum = "ebd154a240de39fdebcf5775d2675c204d7c13cf39a4c697be6493c8e734337c" dependencies = [ "itoa", "serde", @@ -2325,7 +2325,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.44", + "syn 2.0.47", ] [[package]] @@ -2571,9 +2571,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.44" +version = "2.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92d27c2c202598d05175a6dd3af46824b7f747f8d8e9b14c623f19fa5069735d" +checksum = "1726efe18f42ae774cc644f330953a5e7b3c3003d3edcecf18850fe9d4dd9afb" dependencies = [ "proc-macro2", "quote", @@ -2699,22 +2699,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.53" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2cd5904763bad08ad5513ddbb12cf2ae273ca53fa9f68e843e236ec6dfccc09" +checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.53" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dcf4a824cce0aeacd6f38ae6f24234c8e80d68632338ebaa1443b5df9e29e19" +checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" dependencies = [ "proc-macro2", "quote", - "syn 2.0.44", + "syn 2.0.47", ] [[package]] @@ -2845,7 +2845,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.44", + "syn 2.0.47", ] [[package]] @@ -2978,7 +2978,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.44", + "syn 2.0.47", ] [[package]] @@ -3340,7 +3340,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.44", + "syn 2.0.47", "wasm-bindgen-shared", ] @@ -3374,7 +3374,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.44", + "syn 2.0.47", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/Cargo.toml b/Cargo.toml index a973622a..ce7d636f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ exclude = [ "tools/deserialize-backup", "tools/simulator", + "tools/system-stats", "tools/tunshell", ] diff --git a/simulator.sh b/simulator.sh index 85bad046..240a51f7 100755 --- a/simulator.sh +++ b/simulator.sh @@ -83,7 +83,7 @@ download_auth_config() { echo "Downloading config: $url" mkdir -p devices curl --location $url \ - --header 'x-bytebeam-tenant: demo' \ + --header "x-bytebeam-tenant: $BYTEBEAM_TENANT_ID" \ --header "x-bytebeam-api-key: $BYTEBEAM_API_KEY" > devices/device_$id.json } diff --git a/storage/src/lib.rs b/storage/src/lib.rs index e4783797..d692cbc7 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -16,6 +16,8 @@ pub enum Error { NotBackup, #[error("Corrupted backup file")] CorruptedFile, + #[error("Empty write buffer")] + NoWrites, } pub struct Storage { @@ -92,6 +94,14 @@ impl Storage { return Ok(None); } + self.flush() + } + + /// Force flush the contents of write buffer onto disk + pub fn flush(&mut self) -> Result, Error> { + if self.current_write_file.is_empty() { + return Err(Error::NoWrites); + } match &mut self.persistence { Some(persistence) => { let NextFile { mut file, deleted } = persistence.open_next_write_file()?; diff --git a/tools/system-stats/Cargo.lock b/tools/system-stats/Cargo.lock new file mode 100644 index 00000000..654de981 --- /dev/null +++ b/tools/system-stats/Cargo.lock @@ -0,0 +1,841 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "addr2line" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + +[[package]] +name = "ansi_term" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" +dependencies = [ + "winapi", +] + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi 0.1.19", + "libc", + "winapi", +] + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "backtrace" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bytes" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" + +[[package]] +name = "cc" +version = "1.0.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +dependencies = [ + "libc", +] + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "clap" +version = "2.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" +dependencies = [ + "ansi_term", + "atty", + "bitflags", + "strsim", + "textwrap", + "unicode-width", + "vec_map", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" + +[[package]] +name = "crossbeam-deque" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fca89a0e215bab21874660c67903c5f143333cab1da83d041c7ded6053774751" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2fe95351b870527a5d09bf563ed3c97c0cffb87cf1c78a591bf48bb218d9aa" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d96137f14f244c37f989d9fff8f95e6c18b918e71f36638f8c49112e4c78f" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "deranged" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eb30d70a07a3b04884d2677f06bec33509dc67ca60d92949e5535352d3191dc" +dependencies = [ + "powerfmt", +] + +[[package]] +name = "either" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" + +[[package]] +name = "futures-core" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" + +[[package]] +name = "futures-macro" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.41", +] + +[[package]] +name = "futures-sink" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" + +[[package]] +name = "futures-task" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" + +[[package]] +name = "futures-util" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +dependencies = [ + "futures-core", + "futures-macro", + "futures-sink", + "futures-task", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "gimli" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" + +[[package]] +name = "heck" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" +dependencies = [ + "unicode-segmentation", +] + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "hermit-abi" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" + +[[package]] +name = "itoa" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.151" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" + +[[package]] +name = "log" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" + +[[package]] +name = "memchr" +version = "2.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" + +[[package]] +name = "memoffset" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" +dependencies = [ + "autocfg", +] + +[[package]] +name = "miniz_oxide" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +dependencies = [ + "adler", +] + +[[package]] +name = "mio" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi 0.3.3", + "libc", +] + +[[package]] +name = "num_threads" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" +dependencies = [ + "libc", +] + +[[package]] +name = "object" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" +dependencies = [ + "memchr", +] + +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" + +[[package]] +name = "pin-project-lite" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn 1.0.109", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + +[[package]] +name = "proc-macro2" +version = "1.0.70" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39278fbbf5fb4f646ce651690877f89d1c5811a3d4acb27700c1cb3cdb78fd3b" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rayon" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + +[[package]] +name = "ryu" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" + +[[package]] +name = "serde" +version = "1.0.193" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.193" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.41", +] + +[[package]] +name = "serde_json" +version = "1.0.108" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "simplelog" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acee08041c5de3d5048c8b3f6f13fafb3026b24ba43c6a695a0c76179b844369" +dependencies = [ + "log", + "termcolor", + "time", +] + +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] +name = "socket2" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "strsim" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + +[[package]] +name = "structopt" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6b5c64445ba8094a6ab0c3cd2ad323e07171012d9c98b0b15651daf1787a10" +dependencies = [ + "clap", + "lazy_static", + "structopt-derive", +] + +[[package]] +name = "structopt-derive" +version = "0.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "syn" +version = "2.0.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c8b28c477cc3bf0e7966561e3460130e1255f7a1cf71931075f1c5e7a7e269" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "sysinfo" +version = "0.26.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c18a6156d1f27a9592ee18c1a846ca8dd5c258b7179fc193ae87c74ebb666f5" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "winapi", +] + +[[package]] +name = "system-stats" +version = "0.1.0" +dependencies = [ + "futures-util", + "log", + "serde", + "serde_json", + "simplelog", + "structopt", + "sysinfo", + "thiserror", + "tokio", + "tokio-stream", + "tokio-util", +] + +[[package]] +name = "termcolor" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", +] + +[[package]] +name = "thiserror" +version = "1.0.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f11c217e1416d6f036b870f14e0413d480dbf28edbee1f877abaf0206af43bb7" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01742297787513b79cf8e29d1056ede1313e2420b7b3b15d0a768b4921f549df" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.41", +] + +[[package]] +name = "time" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" +dependencies = [ + "deranged", + "itoa", + "libc", + "num_threads", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" +dependencies = [ + "time-core", +] + +[[package]] +name = "tokio" +version = "1.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d45b238a16291a4e1584e61820b8ae57d696cc5015c459c229ccc6990cc1c" +dependencies = [ + "backtrace", + "libc", + "mio", + "num_cpus", + "pin-project-lite", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.41", +] + +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "slab", + "tokio", + "tracing", +] + +[[package]] +name = "tracing" +version = "0.1.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +dependencies = [ + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +dependencies = [ + "once_cell", +] + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" + +[[package]] +name = "unicode-segmentation" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" + +[[package]] +name = "unicode-width" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" + +[[package]] +name = "vec_map" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + +[[package]] +name = "version_check" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" diff --git a/tools/system-stats/Cargo.toml b/tools/system-stats/Cargo.toml new file mode 100644 index 00000000..f5235776 --- /dev/null +++ b/tools/system-stats/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "system-stats" +version = "0.1.0" +edition = "2021" +authors = ["Devdutt Shenoi "] + +[dependencies] +futures-util = { version = "0.3", features = ["sink"] } +log = "0.4" +serde = { version = "1", features = ["derive"] } +serde_json = "1.0" +simplelog = "0.12.0" +structopt = "0.3" +sysinfo = "0.26" +thiserror = "1" +tokio = { version = "1", features = ["rt-multi-thread", "macros", "net"] } +tokio-stream = "0.1" +tokio-util = { version = "0.7", features = ["codec", "time"] } \ No newline at end of file diff --git a/tools/system-stats/src/lib.rs b/tools/system-stats/src/lib.rs new file mode 100644 index 00000000..200a10d9 --- /dev/null +++ b/tools/system-stats/src/lib.rs @@ -0,0 +1,636 @@ +use futures_util::SinkExt; +use log::error; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use sysinfo::{ + ComponentExt, CpuExt, DiskExt, NetworkData, NetworkExt, PidExt, ProcessExt, SystemExt, +}; +use tokio::{net::TcpStream, time::interval}; +use tokio_util::codec::{Framed, LinesCodec, LinesCodecError}; + +use std::{ + collections::HashMap, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Io error {0}")] + Io(#[from] std::io::Error), + #[error("Lines codec error {0}")] + Codec(#[from] LinesCodecError), + #[error("Serde error {0}")] + Json(#[from] serde_json::error::Error), +} + +#[derive(Debug, Serialize)] +pub struct Payload { + pub stream: String, + pub sequence: u32, + pub timestamp: u64, + #[serde(flatten)] + pub payload: Value, +} + +fn clock() -> u64 { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64 +} + +type Pid = u32; + +#[derive(Debug, Default, Serialize, Clone)] +pub struct System { + sequence: u32, + timestamp: u64, + kernel_version: String, + uptime: u64, + no_processes: usize, + /// Average load within one minute. + load_avg_one: f64, + /// Average load within five minutes. + load_avg_five: f64, + /// Average load within fifteen minutes. + load_avg_fifteen: f64, + total_memory: u64, + available_memory: u64, + used_memory: u64, +} + +impl System { + fn init(sys: &sysinfo::System) -> System { + System { + kernel_version: match sys.kernel_version() { + Some(kv) => kv, + None => String::default(), + }, + total_memory: sys.total_memory(), + ..Default::default() + } + } + + fn update(&mut self, sys: &sysinfo::System, timestamp: u64) { + self.sequence += 1; + self.timestamp = timestamp; + self.uptime = sys.uptime(); + self.no_processes = sys.processes().len(); + let sysinfo::LoadAvg { one, five, fifteen } = sys.load_average(); + self.load_avg_one = one; + self.load_avg_five = five; + self.load_avg_fifteen = fifteen; + self.available_memory = sys.available_memory(); + self.used_memory = self.total_memory - self.available_memory; + } +} + +impl From<&System> for Payload { + fn from(value: &System) -> Self { + let System { + sequence, + timestamp, + kernel_version, + uptime, + no_processes, + load_avg_one, + load_avg_five, + load_avg_fifteen, + total_memory, + available_memory, + used_memory, + } = value; + + Payload { + stream: "uplink".to_owned(), + sequence: *sequence, + timestamp: *timestamp, + payload: json!({ + "kernel_version": kernel_version, + "uptime": uptime, + "no_processes": no_processes, + "load_avg_one": load_avg_one, + "load_avg_five": load_avg_five, + "load_avg_fifteen": load_avg_fifteen, + "total_memory": total_memory, + "available_memory": available_memory, + "used_memory": used_memory, + }), + } + } +} + +struct SystemStats { + stat: System, +} + +impl SystemStats { + fn push(&mut self, sys: &sysinfo::System, timestamp: u64) -> Payload { + self.stat.update(sys, timestamp); + + (&self.stat).into() + } +} + +#[derive(Debug, Serialize, Clone)] +struct Network { + sequence: u32, + timestamp: u64, + name: String, + incoming_data_rate: f64, + outgoing_data_rate: f64, + #[serde(skip_serializing)] + timer: Instant, +} + +impl Network { + fn init(name: String) -> Self { + Network { + sequence: 0, + timestamp: 0, + name, + incoming_data_rate: 0.0, + outgoing_data_rate: 0.0, + timer: Instant::now(), + } + } + + /// Update metrics values for network usage over time + fn update(&mut self, data: &NetworkData, timestamp: u64, sequence: u32) { + let update_period = self.timer.elapsed().as_secs_f64(); + // TODO: check if these calculations are correct + self.incoming_data_rate = data.total_received() as f64 / update_period; + self.outgoing_data_rate = data.total_transmitted() as f64 / update_period; + self.timestamp = timestamp; + self.sequence = sequence; + } +} + +impl From<&mut Network> for Payload { + fn from(value: &mut Network) -> Self { + let Network { sequence, timestamp, name, incoming_data_rate, outgoing_data_rate, .. } = + value; + + Payload { + stream: "uplink_network_stats".to_owned(), + sequence: *sequence, + timestamp: *timestamp, + payload: json!({ + "name": name, + "incoming_data_rate": incoming_data_rate, + "outgoing_data_rate": outgoing_data_rate, + }), + } + } +} + +struct NetworkStats { + sequence: u32, + map: HashMap, +} + +impl NetworkStats { + fn push( + &mut self, + net_name: String, + net_data: &sysinfo::NetworkData, + timestamp: u64, + ) -> Payload { + self.sequence += 1; + let net = self.map.entry(net_name.clone()).or_insert_with(|| Network::init(net_name)); + net.update(net_data, timestamp, self.sequence); + + net.into() + } +} + +#[derive(Debug, Serialize, Default, Clone)] +struct Disk { + sequence: u32, + timestamp: u64, + name: String, + total: u64, + available: u64, + used: u64, +} + +impl Disk { + fn init(name: String, disk: &sysinfo::Disk) -> Self { + Disk { name, total: disk.total_space(), ..Default::default() } + } + + fn update(&mut self, disk: &sysinfo::Disk, timestamp: u64, sequence: u32) { + self.total = disk.total_space(); + self.available = disk.available_space(); + self.used = self.total - self.available; + self.timestamp = timestamp; + self.sequence = sequence; + } +} + +impl From<&mut Disk> for Payload { + fn from(value: &mut Disk) -> Self { + let Disk { sequence, timestamp, name, total, available, used } = value; + + Payload { + stream: "uplink_disk_stats".to_owned(), + sequence: *sequence, + timestamp: *timestamp, + payload: json!({ + "name": name, + "total": total, + "available": available, + "used": used, + }), + } + } +} + +struct DiskStats { + sequence: u32, + map: HashMap, +} + +impl DiskStats { + fn push(&mut self, disk_data: &sysinfo::Disk, timestamp: u64) -> Payload { + self.sequence += 1; + let disk_name = disk_data.name().to_string_lossy().to_string(); + let disk = + self.map.entry(disk_name.clone()).or_insert_with(|| Disk::init(disk_name, disk_data)); + disk.update(disk_data, timestamp, self.sequence); + + disk.into() + } +} + +#[derive(Debug, Default, Serialize, Clone)] +struct Processor { + sequence: u32, + timestamp: u64, + name: String, + frequency: u64, + usage: f32, +} + +impl Processor { + fn init(name: String) -> Self { + Processor { name, ..Default::default() } + } + + fn update(&mut self, proc: &sysinfo::Cpu, timestamp: u64, sequence: u32) { + self.frequency = proc.frequency(); + self.usage = proc.cpu_usage(); + self.timestamp = timestamp; + self.sequence = sequence; + } +} + +impl From<&mut Processor> for Payload { + fn from(value: &mut Processor) -> Self { + let Processor { sequence, timestamp, name, frequency, usage } = value; + + Payload { + stream: "uplink_processor_stats".to_owned(), + sequence: *sequence, + timestamp: *timestamp, + payload: json!({ + "name": name, + "frequency": frequency, + "usage": usage, + }), + } + } +} + +struct ProcessorStats { + sequence: u32, + map: HashMap, +} + +impl ProcessorStats { + fn push(&mut self, proc_data: &sysinfo::Cpu, timestamp: u64) -> Payload { + let proc_name = proc_data.name().to_string(); + self.sequence += 1; + let proc = self.map.entry(proc_name.clone()).or_insert_with(|| Processor::init(proc_name)); + proc.update(proc_data, timestamp, self.sequence); + + proc.into() + } +} + +#[derive(Debug, Default, Serialize, Clone)] +struct Component { + sequence: u32, + timestamp: u64, + label: String, + temperature: f32, +} + +impl Component { + fn init(label: String) -> Self { + Component { label, ..Default::default() } + } + + fn update(&mut self, comp: &sysinfo::Component, timestamp: u64, sequence: u32) { + self.temperature = comp.temperature(); + self.timestamp = timestamp; + self.sequence = sequence; + } +} + +impl From<&mut Component> for Payload { + fn from(value: &mut Component) -> Self { + let Component { sequence, timestamp, label, temperature } = value; + + Payload { + stream: "uplink_component_stats".to_owned(), + sequence: *sequence, + timestamp: *timestamp, + payload: json!({ + "label": label, "temperature": temperature + }), + } + } +} + +struct ComponentStats { + sequence: u32, + map: HashMap, +} + +impl ComponentStats { + fn push(&mut self, comp_data: &sysinfo::Component, timestamp: u64) -> Payload { + let comp_label = comp_data.label().to_string(); + self.sequence += 1; + let comp = + self.map.entry(comp_label.clone()).or_insert_with(|| Component::init(comp_label)); + comp.update(comp_data, timestamp, self.sequence); + + comp.into() + } +} + +#[derive(Debug, Default, Serialize, Clone)] +struct Process { + sequence: u32, + timestamp: u64, + pid: Pid, + name: String, + cpu_usage: f32, + mem_usage: u64, + disk_total_written_bytes: u64, + disk_written_bytes: u64, + disk_total_read_bytes: u64, + disk_read_bytes: u64, + start_time: u64, +} + +impl Process { + fn init(pid: Pid, name: String, start_time: u64) -> Self { + Process { pid, name, start_time, ..Default::default() } + } + + fn update(&mut self, proc: &sysinfo::Process, timestamp: u64, sequence: u32) { + let sysinfo::DiskUsage { total_written_bytes, written_bytes, total_read_bytes, read_bytes } = + proc.disk_usage(); + self.disk_total_written_bytes = total_written_bytes; + self.disk_written_bytes = written_bytes; + self.disk_total_read_bytes = total_read_bytes; + self.disk_read_bytes = read_bytes; + self.cpu_usage = proc.cpu_usage(); + self.mem_usage = proc.memory(); + self.timestamp = timestamp; + self.sequence = sequence; + } +} + +impl From<&mut Process> for Payload { + fn from(value: &mut Process) -> Self { + let Process { + sequence, + timestamp, + pid, + name, + cpu_usage, + mem_usage, + disk_total_written_bytes, + disk_written_bytes, + disk_total_read_bytes, + disk_read_bytes, + start_time, + } = value; + + Payload { + stream: "uplink_process_stats".to_owned(), + sequence: *sequence, + timestamp: *timestamp, + payload: json!({ + "pid": pid, + "name": name, + "cpu_usage": cpu_usage, + "mem_usage": mem_usage, + "disk_total_written_bytes": disk_total_written_bytes, + "disk_written_bytes": disk_written_bytes, + "disk_total_read_bytes": disk_total_read_bytes, + "disk_read_bytes": disk_read_bytes, + "start_time": start_time, + }), + } + } +} + +struct ProcessStats { + sequence: u32, + map: HashMap, +} + +impl ProcessStats { + fn push( + &mut self, + id: Pid, + proc_data: &sysinfo::Process, + name: String, + timestamp: u64, + ) -> Payload { + self.sequence += 1; + let proc = + self.map.entry(id).or_insert_with(|| Process::init(id, name, proc_data.start_time())); + proc.update(proc_data, timestamp, self.sequence); + + proc.into() + } +} + +#[derive(Debug, Clone, Deserialize, Default)] +pub struct Config { + pub process_names: Vec, + pub update_period: u64, +} + +/// Collects and forward system information such as kernel version, memory and disk space usage, +/// information regarding running processes, network and processor usage, etc to an IoT platform. +pub struct StatCollector { + /// Handle to sysinfo struct containing system information. + sys: sysinfo::System, + /// System information values to be serialized. + system: SystemStats, + /// Information about running processes. + processes: ProcessStats, + /// Individual Processor information. + processors: ProcessorStats, + /// Information regarding individual Network interfaces. + networks: NetworkStats, + /// Information regarding individual Disks. + disks: DiskStats, + /// Temperature information from individual components. + components: ComponentStats, + /// System stats configuration. + config: Config, + /// Handle to.send stats as payload onto bridge + client: Framed, +} + +impl StatCollector { + /// Create and initialize a stat collector + pub fn new(config: Config, client: Framed) -> Self { + let mut sys = sysinfo::System::new(); + sys.refresh_disks_list(); + sys.refresh_networks_list(); + sys.refresh_memory(); + sys.refresh_cpu(); + sys.refresh_components(); + + let mut map = HashMap::new(); + for disk_data in sys.disks() { + let disk_name = disk_data.name().to_string_lossy().to_string(); + map.insert(disk_name.clone(), Disk::init(disk_name, disk_data)); + } + let disks = DiskStats { sequence: 0, map }; + + let mut map = HashMap::new(); + for (net_name, _) in sys.networks() { + map.insert(net_name.to_owned(), Network::init(net_name.to_owned())); + } + let networks = NetworkStats { sequence: 0, map }; + + let mut map = HashMap::new(); + for proc in sys.cpus().iter() { + let proc_name = proc.name().to_owned(); + map.insert(proc_name.clone(), Processor::init(proc_name)); + } + let processors = ProcessorStats { sequence: 0, map }; + + let processes = ProcessStats { sequence: 0, map: HashMap::new() }; + let components = ComponentStats { sequence: 0, map: HashMap::new() }; + + let system = SystemStats { stat: System::init(&sys) }; + + StatCollector { + sys, + system, + config, + processes, + disks, + networks, + processors, + components, + client, + } + } + + /// Stat collector execution loop, sleeps for the duation of `config.stats.update_period` in seconds. + /// Update system information values and increment sequence numbers, while.sending to specific data streams. + pub async fn start(mut self) -> Result<(), Error> { + let mut interval = interval(Duration::from_secs(self.config.update_period)); + loop { + interval.tick().await; + + self.update_memory_stats().await?; + self.update_disk_stats().await?; + self.update_network_stats().await?; + self.update_cpu_stats().await?; + self.update_component_stats().await?; + self.update_process_stats().await?; + } + } + + // Refresh memory stats + async fn update_memory_stats(&mut self) -> Result<(), Error> { + self.sys.refresh_memory(); + let timestamp = clock(); + let payload = self.system.push(&self.sys, timestamp); + let payload = serde_json::to_string(&payload)?; + self.client.send(payload).await?; + + Ok(()) + } + + // Refresh disk stats + async fn update_disk_stats(&mut self) -> Result<(), Error> { + self.sys.refresh_disks(); + let timestamp = clock(); + for disk_data in self.sys.disks() { + let payload = self.disks.push(disk_data, timestamp); + let payload = serde_json::to_string(&payload)?; + self.client.send(payload).await?; + } + + Ok(()) + } + + // Refresh network byte rate stats + async fn update_network_stats(&mut self) -> Result<(), Error> { + self.sys.refresh_networks(); + let timestamp = clock(); + for (net_name, net_data) in self.sys.networks() { + let payload = self.networks.push(net_name.to_owned(), net_data, timestamp); + let payload = serde_json::to_string(&payload)?; + self.client.send(payload).await?; + } + + Ok(()) + } + + // Refresh processor stats + async fn update_cpu_stats(&mut self) -> Result<(), Error> { + self.sys.refresh_cpu(); + let timestamp = clock(); + for proc_data in self.sys.cpus().iter() { + let payload = self.processors.push(proc_data, timestamp); + let payload = serde_json::to_string(&payload)?; + self.client.send(payload).await?; + } + + Ok(()) + } + + // Refresh component stats + async fn update_component_stats(&mut self) -> Result<(), Error> { + self.sys.refresh_components(); + let timestamp = clock(); + for comp_data in self.sys.components().iter() { + let payload = self.components.push(comp_data, timestamp); + let payload = serde_json::to_string(&payload)?; + self.client.send(payload).await?; + } + + Ok(()) + } + + // Refresh processes info + // NOTE: This can be further optimized by storing pids of interested processes + // at init and only collecting process information for them instead of iterating + // over all running processes as is being done now. + async fn update_process_stats(&mut self) -> Result<(), Error> { + self.sys.refresh_processes(); + let timestamp = clock(); + for (&id, p) in self.sys.processes() { + let name = p.cmd().get(0).map(|s| s.to_string()).unwrap_or(p.name().to_string()); + + if self.config.process_names.contains(&name) { + let payload = self.processes.push(id.as_u32(), p, name, timestamp); + let payload = serde_json::to_string(&payload)?; + self.client.send(payload).await?; + } + } + + Ok(()) + } +} diff --git a/tools/system-stats/src/main.rs b/tools/system-stats/src/main.rs new file mode 100644 index 00000000..131d0393 --- /dev/null +++ b/tools/system-stats/src/main.rs @@ -0,0 +1,71 @@ +use std::time::Duration; + +use log::{error, LevelFilter}; +use simplelog::{ + ColorChoice, CombinedLogger, ConfigBuilder, LevelPadding, TermLogger, TerminalMode, +}; +use structopt::StructOpt; +use system_stats::{Config, StatCollector}; +use tokio::net::TcpStream; +use tokio_util::codec::{Framed, LinesCodec}; + +#[derive(StructOpt, Debug)] +#[structopt(name = "simulator", about = "simulates a demo device")] +pub struct CommandLine { + /// uplink port + #[structopt(short = "p", help = "uplink port")] + pub port: u16, + /// log level (v: info, vv: debug, vvv: trace) + #[structopt(short = "v", long = "verbose", parse(from_occurrences))] + pub verbose: u8, + /// name of processes to be monitored + #[structopt(short = "P", help = "processes")] + pub process_names: Vec, + /// time between updates + #[structopt(short = "t", help = "update period", default_value = "30")] + pub update_period: u64, +} + +#[tokio::main] +async fn main() { + let CommandLine { process_names, update_period, port, .. } = init(); + + let addr = format!("localhost:{}", port); + + loop { + let Ok(stream) = TcpStream::connect(&addr).await else { + error!("Uplink is not running, will reconnect after sleeping"); + std::thread::sleep(Duration::from_secs(update_period)); + continue; + }; + let client = Framed::new(stream, LinesCodec::new()); + let config = Config { process_names: process_names.clone(), update_period }; + + let collector = StatCollector::new(config, client); + if let Err(e) = collector.start().await { + error!("Error forwarding stats: {e}"); + } + } +} + +fn init() -> CommandLine { + let commandline: CommandLine = StructOpt::from_args(); + let level = match commandline.verbose { + 0 => LevelFilter::Warn, + 1 => LevelFilter::Info, + 2 => LevelFilter::Debug, + _ => LevelFilter::Trace, + }; + + let mut config = ConfigBuilder::new(); + config + .set_location_level(LevelFilter::Off) + .set_target_level(LevelFilter::Error) + .set_thread_level(LevelFilter::Error) + .set_level_padding(LevelPadding::Right); + + let loggers = TermLogger::new(level, config.build(), TerminalMode::Mixed, ColorChoice::Auto); + CombinedLogger::init(vec![loggers]).unwrap(); + + commandline +} diff --git a/uplink/src/base/mod.rs b/uplink/src/base/mod.rs index b6fa7763..527763a0 100644 --- a/uplink/src/base/mod.rs +++ b/uplink/src/base/mod.rs @@ -16,6 +16,7 @@ use crate::collector::logcat::LogcatConfig; use self::bridge::stream::MAX_BUFFER_SIZE; use self::bridge::{ActionsLaneCtrlTx, DataLaneCtrlTx}; use self::mqtt::CtrlTx as MqttCtrlTx; +use self::serializer::CtrlTx as SerializerCtrlTx; pub mod actions; pub mod bridge; @@ -291,6 +292,7 @@ pub struct CtrlTx { pub actions_lane: ActionsLaneCtrlTx, pub data_lane: DataLaneCtrlTx, pub mqtt: MqttCtrlTx, + pub serializer: SerializerCtrlTx, } impl CtrlTx { @@ -299,6 +301,7 @@ impl CtrlTx { self.actions_lane.trigger_shutdown(), self.data_lane.trigger_shutdown(), self.mqtt.trigger_shutdown(), + self.serializer.trigger_shutdown() ); } } diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 9163ef1c..18145dc1 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -2,10 +2,11 @@ mod metrics; use std::collections::{HashMap, VecDeque}; use std::io::{self, Write}; +use std::time::Instant; use std::{sync::Arc, time::Duration}; use bytes::Bytes; -use flume::{Receiver, RecvError, Sender}; +use flume::{bounded, Receiver, RecvError, Sender}; use log::{debug, error, info, trace}; use lz4_flex::frame::FrameEncoder; use rumqttc::*; @@ -58,6 +59,8 @@ pub enum Error { EmptyStorage, #[error("Permission denied while accessing persistence directory \"{0}\"")] Persistence(String), + #[error("Serializer has shutdown after handling crash")] + Shutdown, } #[derive(Debug, PartialEq)] @@ -66,6 +69,7 @@ enum Status { SlowEventloop(Publish, Arc), EventLoopReady, EventLoopCrash(Publish, Arc), + Shutdown, } /// Description of an interface that the [`Serializer`] expects to be provided by the MQTT client to publish the serialized data with. @@ -206,6 +210,19 @@ impl StorageHandler { None } + + fn flush_all(&mut self) { + for (stream_config, storage) in self.map.iter_mut() { + match storage.flush() { + Ok(_) => trace!("Force flushed stream = {} onto disk", stream_config.topic), + Err(storage::Error::NoWrites) => {} + Err(e) => error!( + "Error when force flushing storage = {}; error = {e}", + stream_config.topic + ), + } + } + } } /// The uplink Serializer is the component that deals with serializing, compressing and writing data onto disk or Network. @@ -246,6 +263,9 @@ impl StorageHandler { /// but continue trying to publish /// ///``` +/// +/// NOTE: Shutdown mode and crash mode are only different in how they get triggered, +/// but should be considered as interchangeable in the above diagram. /// [`start()`]: Serializer::start /// [`try_publish()`]: AsyncClient::try_publish /// [`publish()`]: AsyncClient::publish @@ -257,6 +277,9 @@ pub struct Serializer { metrics: SerializerMetrics, metrics_tx: Sender, pending_metrics: VecDeque, + /// Control handles + ctrl_rx: Receiver, + ctrl_tx: Sender, } impl Serializer { @@ -269,6 +292,7 @@ impl Serializer { metrics_tx: Sender, ) -> Result, Error> { let storage_handler = StorageHandler::new(config.clone())?; + let (ctrl_tx, ctrl_rx) = bounded(1); Ok(Serializer { config, @@ -278,9 +302,39 @@ impl Serializer { metrics: SerializerMetrics::new("catchup"), metrics_tx, pending_metrics: VecDeque::with_capacity(3), + ctrl_tx, + ctrl_rx, }) } + pub fn ctrl_tx(&self) -> CtrlTx { + CtrlTx { inner: self.ctrl_tx.clone() } + } + + /// Write all data received, from here-on, to disk only, shutdown serializer + /// after handling all data payloads. + fn shutdown(&mut self) -> Result<(), Error> { + debug!("Forced into shutdown mode, writing all incoming data to persistence."); + + loop { + // Collect remaining data packets and write to disk + // NOTE: wait 2s to allow bridge to shutdown and flush leftover data. + let deadline = Instant::now() + Duration::from_secs(2); + let Ok(data) = self.collector_rx.recv_deadline(deadline) else { + self.storage_handler.flush_all(); + return Ok(()); + }; + let stream_config = data.stream_config(); + let publish = construct_publish(data)?; + let storage = self.storage_handler.select(&stream_config); + match write_to_disk(publish, storage) { + Ok(Some(deleted)) => debug!("Lost segment = {deleted}"), + Ok(_) => {} + Err(e) => error!("Shutdown: write error = {:?}", e), + } + } + } + /// Write all data received, from here-on, to disk only. async fn crash( &mut self, @@ -358,6 +412,10 @@ impl Serializer { _ = interval.tick() => { check_metrics(&mut self.metrics, &self.storage_handler); } + // Transition into crash mode when uplink is shutting down + Ok(SerializerShutdown) = self.ctrl_rx.recv_async() => { + break Ok(Status::Shutdown) + } } }; @@ -468,6 +526,10 @@ impl Serializer { _ = interval.tick() => { let _ = check_and_flush_metrics(&mut self.pending_metrics, &mut self.metrics, &self.metrics_tx, &self.storage_handler); } + // Transition into crash mode when uplink is shutting down + Ok(SerializerShutdown) = self.ctrl_rx.recv_async() => { + return Ok(Status::Shutdown) + } } }; @@ -514,6 +576,10 @@ impl Serializer { debug!("Failed to flush serializer metrics (normal). Error = {}", e); } } + // Transition into crash mode when uplink is shutting down + Ok(SerializerShutdown) = self.ctrl_rx.recv_async() => { + return Ok(Status::Shutdown) + } } } } @@ -528,10 +594,17 @@ impl Serializer { Status::SlowEventloop(publish, stream) => self.slow(publish, stream).await?, Status::EventLoopReady => self.catchup().await?, Status::EventLoopCrash(publish, stream) => self.crash(publish, stream).await?, + Status::Shutdown => break, }; status = next_status; } + + self.shutdown()?; + + info!("Serializer has handled all pending packets, shutting down"); + + Ok(()) } } @@ -711,6 +784,22 @@ fn check_and_flush_metrics( Ok(()) } +/// Command to remotely trigger `Serializer` shutdown +pub(crate) struct SerializerShutdown; + +/// Handle to send control messages to `Serializer` +#[derive(Debug, Clone)] +pub struct CtrlTx { + pub(crate) inner: Sender, +} + +impl CtrlTx { + /// Triggers shutdown of `Serializer` + pub async fn trigger_shutdown(&self) { + self.inner.send_async(SerializerShutdown).await.unwrap() + } +} + // TODO(RT): Test cases // - Restart with no internet but files on disk diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index 0cb892e2..d16da57b 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -337,6 +337,7 @@ impl Uplink { mqtt_client.clone(), self.serializer_metrics_tx(), )?; + let ctrl_serializer = serializer.ctrl_tx(); // Serializer thread to handle network conditions state machine // and send data to mqtt thread @@ -406,7 +407,12 @@ impl Uplink { }) }); - Ok(CtrlTx { actions_lane: ctrl_actions_lane, data_lane: ctrl_data_lane, mqtt: ctrl_mqtt }) + Ok(CtrlTx { + actions_lane: ctrl_actions_lane, + data_lane: ctrl_data_lane, + mqtt: ctrl_mqtt, + serializer: ctrl_serializer, + }) } pub fn spawn_builtins(&mut self, bridge: &mut Bridge) -> Result<(), Error> { diff --git a/uplink/src/main.rs b/uplink/src/main.rs index a09ece67..4b42ae21 100644 --- a/uplink/src/main.rs +++ b/uplink/src/main.rs @@ -200,7 +200,7 @@ fn main() -> Result<(), Error> { uplink.resolve_on_shutdown().await.unwrap(); info!("Uplink shutting down..."); // NOTE: wait 5s to allow serializer to write to network/disk - sleep(Duration::from_secs(5)).await; + sleep(Duration::from_secs(10)).await; }); Ok(()) diff --git a/vd-lib b/vd-lib new file mode 160000 index 00000000..cdaec8b6 --- /dev/null +++ b/vd-lib @@ -0,0 +1 @@ +Subproject commit cdaec8b6dacc3779e063e0ff1241a41a821b0cf2