Skip to content

Commit

Permalink
Merge branch 'vilayat/task/graceful-shutdown' of https://github.com/b…
Browse files Browse the repository at this point in the history
…ytebeamio/uplink into vilayat/task/re-read-packets
  • Loading branch information
Vilayat-Ali committed Nov 1, 2023
2 parents e83f69d + 3621334 commit 1dda22a
Show file tree
Hide file tree
Showing 15 changed files with 828 additions and 603 deletions.
1,079 changes: 550 additions & 529 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[simulator]
actions = [{ name = "lock" }]
gps_paths = "./paths"
70 changes: 70 additions & 0 deletions notes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@

## REPO TOUR

`uplink/base` -> all uplink related stuffs.

`uplink/collector` -> contains built-in applications

### 2 types of app in uplink-

1. doing stuffs using actions (actions are commands that are send by the platform)

2. collecting data from telemetrics data (eg from sensors) CAN

### Uplink Base

- **Bridge**: There are 2 types of lanes in bridge in uplink. `Actions lane` and `Data lane`, respectively. **Actions lane** is used to receive and respond to actions. Whereas **Data lane** is used only to forward or send data.

### Duties of `Action lane`:

- Guard against new actions when there exists another action in execution.

- Receive responses, from the connected application and forwards it if the currently executing action has the same action id.

- If action is not configured, uplink will reject it.

- Sensor recieves responses, [when an action is received by Uplink, it is sending the response that it receives the action]

- Performs timeout of actions that are running or executing for more time than specified or configured.

### Duties of `Data lane`:

- it receives data as `JSON object`, it takes that data object and batches it upto batch size [batch size is customizable]

- separate data into streams

- If a particular stream has not reached batch size, due to slow data generation or any other reason, push the data after a timeout as configured by the user.

### Response types:

- Receive [uplink sends when an action is received through the mqtt connection]

- Running [contains progress information from the application] (possible values from 0 to 100) of 1 byte max

- Completed [sent on completing execution of action]

- Failed [sent when an action can't be/fails to be executed, we can also send backtrace data along with the failed response]

## Platform Behaviour

Resends the action when there is no response.


## Serializer

Writes data into some place that it can read out of later.

User can customize serializer for certain streams.

eg. -

If lots of data is expected on that stream and all of it is important, user can configure the serializer to write it to the disc.

If not much data is expected but all of it is still important then we keep in memory (RAM only).


## Todo

- We are using multi context tokio processes. [go through tokio]
- Go through concurrency & parallelism
- Go through codebase
6 changes: 3 additions & 3 deletions qa-scripts/.env
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CONSOLED_DOMAIN=
BYTEBEAM_API_KEY=
DEVICE_ID=
CONSOLED_DOMAIN=stage.bytebeam.io
BYTEBEAM_API_KEY=b027b7ab-b294-4ff7-a197-d47e38e55bd1
DEVICE_ID=1001
4 changes: 2 additions & 2 deletions qa-scripts/actions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ actions = [{ name = "send_file" },{ name = "send_script"}]
path = "/var/tmp/downloads"
[simulator]
actions = ["load_file"]
actions = [{ name = "load_file" }]
gps_paths = "./paths/"
[tcpapps.blackhole]
port = 7891
actions = [{ name = "no_response", timeout = 100 }, { name = "restart_response", timeout = 1800 }]
EOF
)" > devices/actions.toml
docker cp devices/actions.toml simulator:/usr/share/bytebeam/uplink/devices/actions.toml
# docker cp devices/actions.toml simulator:/usr/share/bytebeam/uplink/devices/actions.toml

docker exec -it simulator uplink -a /usr/share/bytebeam/uplink/devices/device_$DEVICE_ID.json -c /usr/share/bytebeam/uplink/devices/actions.toml -vv -m uplink::base::bridge

Expand Down
Empty file added questions.md
Empty file.
11 changes: 11 additions & 0 deletions ssl.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"project_id": "demo",
"broker": "stage.bytebeam.io",
"port": 8883,
"device_id": "251",
"authentication": {
"ca_certificate": "-----BEGIN CERTIFICATE-----\nMIIFwDCCA6igAwIBAgICB+MwDQYJKoZIhvcNAQELBQAwgYAxDjAMBgNVBAYTBUlu\nZGlhMRIwEAYDVQQIEwlLYXJuYXRha2ExEjAQBgNVBAcTCUJhbmdhbG9yZTEXMBUG\nA1UECRMOU3ViYmlhaCBHYXJkZW4xDzANBgNVBBETBjU2MDAxMTEcMBoGA1UEChMT\nSU9UIEV4cHJlc3MgUHZ0IEx0ZDAeFw0yMjExMTYxMjM3MzFaFw0zMjExMTYxMjM3\nMzFaMIGAMQ4wDAYDVQQGEwVJbmRpYTESMBAGA1UECBMJS2FybmF0YWthMRIwEAYD\nVQQHEwlCYW5nYWxvcmUxFzAVBgNVBAkTDlN1YmJpYWggR2FyZGVuMQ8wDQYDVQQR\nEwY1NjAwMTExHDAaBgNVBAoTE0lPVCBFeHByZXNzIFB2dCBMdGQwggIiMA0GCSqG\nSIb3DQEBAQUAA4ICDwAwggIKAoICAQDjqNe5tifVH0myE4S/GStOXOfzWDFP/7U7\nuCrLhr6kNfcj8CPd+whyGSbbJYRA1XgYXxvkaYUFjUqDnp3lLZ/UYW/lzAMdOSpz\nVmyFOv9Hj7DOF4CTpcDKK8a1FRqcpoTAVyKJD5WphZkpnmhHlpaXIUTRINckQFen\nURX/JdzfqOLN311q176lWOaKfe0wBsGSz7PhdRmZRv2ZtSF+jj4o9IOjowbjTY79\n1LH+oBU8csZ28hi/6Cjd/k5DGv+cNRiWbDyo3+2VyBaaH41EE8U8dj5oDzP/oZxm\n21PUWFwGB/e2WGtF+NDtQCTMILdzOJz9ptYyQMsYtFITUoIdSgY5wTjSggAGCsPS\nGSqqZ3cnBCOqhxZ1rcjHgTgdmqSzOx+Z7INFM2F/m3Zp4eHRa+CYAIjohE4ZdTwz\niEBaRi+tntegzfmCUpYav/Io7wylRyJHODgApyMIKEdjW3GrhONaT+rEyxuFXAoD\nFaxnt5O9X8uHahSvEj2xm6TDoFQh4gyzc8uVB7RhbjFLgAosFeHmYUIc8mLugT/7\nVfRq3BKMmSWOXhd90gcH82lQwAk24TjOhHvMmxBdyODqHvnWeEyN0xxXKC5kf7vI\nxfgEJUc+Aus2Z7rfcGUgSfdyjUCKrLQe61D0nSVWPKgQyTBNb4cThdPC7eqZg35p\n7c49mnrr9QIDAQABo0IwQDAOBgNVHQ8BAf8EBAMCAoQwDwYDVR0TAQH/BAUwAwEB\n/zAdBgNVHQ4EFgQU+VIYsB43s6wECc2ejPJpInKj/rgwDQYJKoZIhvcNAQELBQAD\nggIBAMdA8bvCnJ9LiLvWuTTlO+3XYXAlU9+DBAT35pQnw8GbZKWCVJIzIddInu/L\n2uSsmriA/tJjLnawuHX8cgapM3GrMmJzhHc9lm92hu/qXY0M6A3daDY5p+3k1IeE\nLM4PAU4sa1EJo0C0c0foNBlEMDEZEiV4TXzJ+zrESN2LXii7Dsq9RpBjkmWheEY5\ni0Eue80aCI08BpgvE7KjT8emhhTgqsfh6N2Nbpu0lGzzUJD8dY/kkmhmvXcpA7rZ\n6bA+A1G7Ir7v+khhfo1Vlzl/8sCtDI0U10yhJcZptKDU+5suN/zSwNivgApV4Qxp\nCZwP7OCwc0v2G+oj2THZzM5OyZEdE/HHIkBKFTR4scb12Ew7T5/8KkFCF9wyc1ef\nOCHGGHWIZUBeySVviijEPY/ae6Fcto+8OEbec+Gte3Va4byAujEkHf/lhXPtdfjS\nFjJpLlgrDqO/BQxzGckhtQVV8BjMgLgFNdwUPcv8t2osNWK6KrPgTJCDvh1Z1SsL\nEw0niH9tCVKesw712EuMVkku+VJx7mQi+3nRCYc48F0PcpSRd7FLO4/Yn3t/e0P0\nAHkvroB/Ys+HNHCGt2hjxx6TBPwyIWoThtvRKbcdsnIXQ57nybtSdGx0PUvRRyxL\njxRDE2swAwwkfmJqPDgfwXmTnTLvCRwZ+SItYDv6GCRztQyv\n-----END CERTIFICATE-----\n",
"device_certificate": "-----BEGIN CERTIFICATE-----\nMIIEcjCCAlqgAwIBAgICB+MwDQYJKoZIhvcNAQELBQAwgYAxDjAMBgNVBAYTBUlu\nZGlhMRIwEAYDVQQIEwlLYXJuYXRha2ExEjAQBgNVBAcTCUJhbmdhbG9yZTEXMBUG\nA1UECRMOU3ViYmlhaCBHYXJkZW4xDzANBgNVBBETBjU2MDAxMTEcMBoGA1UEChMT\nSU9UIEV4cHJlc3MgUHZ0IEx0ZDAeFw0yMzEwMTAwNTE4NDVaFw0zMzEwMTAwNTE4\nNDVaMB0xDTALBgNVBAoTBGRlbW8xDDAKBgNVBAMTAzI1MTCCASIwDQYJKoZIhvcN\nAQEBBQADggEPADCCAQoCggEBALUFfFzEb8SDiJWx86tLWX+VXxzGbuIEYJcr4B5i\nykq4HhtG7Lt3DztQGP22Rkfityuz21s/sbCoq18QY6hhcxBEzV1HmK1EniD3KKn5\ncIjSM/psN/+bcwb4z9Oo2/Uzvh9o4ya0LvUkTKpMPH2z8Ftu/56cuVNnrI/ZKlq2\nTdgPp1rFHq3pemqMFs+irr6o1pf+9RHDZrrSIebrqh63+DqSMpBrDrOEMOHSRr2E\ndUvu7C0/Mo85zMeUYzWMy2bO4r8DJa8k5VDPbVrt4z+FQdYeWUwm3x9TkgcnNe3z\npmhb36x+K3L7BecIgtaP75Vk7n2b6+DaQS7pgr5k0dm5AXkCAwEAAaNYMFYwDgYD\nVR0PAQH/BAQDAgWgMBMGA1UdJQQMMAoGCCsGAQUFBwMCMB8GA1UdIwQYMBaAFPlS\nGLAeN7OsBAnNnozyaSJyo/64MA4GA1UdEQQHMAWCAzI1MTANBgkqhkiG9w0BAQsF\nAAOCAgEAQyO+kS/bFSedetdin/fAF64+PaRVvdcqlrttKlVLGoJ7c4VaMcL+ZTNX\nob2AfSG+WkIVpRWD+GgdsT5Ab4ayd7ameHi1ZBwctW+5rvJO9hME71vKO3IXOIhs\n1vuuscGxRvWmRCWiL5jhbeOEg/76K5qBLHpfqsYzIbKk4j4FN5AjsRa8EB6YJZAk\nrck9JXe+u185A7JoilLgjthVbapW2EPvmparUnoF73VxGA0jBQUujK6sCv63fWQX\nb9wo7jvRDcvyso4TkQlUUaoJD3gFmZ5s4g+m8yQTLTi4BKduNQSa5h62/b9mgWS/\neO9TqjjxsAZvRbEmlSH1mwUJIODaCOu688icF3xmKVbLsY+QWnp70rzhveQV304F\njQnh+f/WfkMxCrYHv2euZpqOGZOhe/WMSmBYGAWTyN4tT6SbgBNNNrx7uDrRwXVI\nhcpf+d0TOj4Q8OonKN4zvG1N5xz8RRRWgMt0DQLqWVwSMzciYIZDmpviMdZtfVuO\npHLa7f0PGxOapRFRt0IVX/JEVi9FamsyUSkhPJRuZGwHMsCLl1Ebq++1pZ9LMIbs\n9JuTUtlap0QA4p9A2ytKIKdyjXbfNL9r5Nk7fpUQqvcMa/eVAdt/Vg5hVV7/Uzbm\nEk13R/EoZNJW6XxCiY4LaTRICdM1QpUIeAsHv7rDKQHe8FuwBzg=\n-----END CERTIFICATE-----\n",
"device_private_key": "-----BEGIN RSA PRIVATE KEY-----\nMIIEowIBAAKCAQEAtQV8XMRvxIOIlbHzq0tZf5VfHMZu4gRglyvgHmLKSrgeG0bs\nu3cPO1AY/bZGR+K3K7PbWz+xsKirXxBjqGFzEETNXUeYrUSeIPcoqflwiNIz+mw3\n/5tzBvjP06jb9TO+H2jjJrQu9SRMqkw8fbPwW27/npy5U2esj9kqWrZN2A+nWsUe\nrel6aowWz6KuvqjWl/71EcNmutIh5uuqHrf4OpIykGsOs4Qw4dJGvYR1S+7sLT8y\njznMx5RjNYzLZs7ivwMlryTlUM9tWu3jP4VB1h5ZTCbfH1OSByc17fOmaFvfrH4r\ncvsF5wiC1o/vlWTufZvr4NpBLumCvmTR2bkBeQIDAQABAoIBABFQIQDOWNI2sk1+\n44syfbDSHHKoF9rOCF7s91ZwpGBZE6gQNHwJew91La62TCp76IDphsVHZjs7JojF\nDIMCLvsTWYDeZB/OPEFy4hS/op+zWv1MdF6iA3JbQaVAtSjSdhCS+4asHQGdIyf2\neqKbHxXfm9sTqjXr6Hkpj/91CCTt0SngvY4PnDh0DomluQo/eDWe+59yv5lh//nn\ntdcNI/bssqBat+NuEI1BxAlljd4xQpiGTUEAhz9Y/Tted1jBcJmxDVSsbd3bTI6x\n02s+7KHbzwtFZv2zUi6q68o90nKpn6x5qZzW8ryGhT8vSl/2l6LypAnNG1pKF+Q3\nepblcjECgYEA2QDc01bJWsc2BReeffKFKb2JPE24TU5/6uqTwMK0NYqZa0Ia7mnA\nrrr+vGxHQ7NCdUTCWshLqut8TMEn/6JW0JW3hfT9Vlp5cAk3KhVb3MYbcdzJHuIm\n5xqzPKaKye9WvKLMo+LfqHddnuTZKqghaw+HjcZCaYIed8B61tXtU58CgYEA1Y1K\ngKXSbg2D2pf8PTb2Uk1XcpS7k8gRyqz1aQBn8jmqTD7mvi2/bMcpF9RcGcE5lWKm\nwZZWMD3Hhdun1HUz4/aVV+/+nMFbSDlwD7TuGLysxX3jS3dnKKMOGBu5X81Q2ErS\nxYqHiGyDHoYx4tqZoB7Ambc3u+1D2OgMVj3KU+cCgYEA0xbyuKZ4klSsqMF0DGJI\nxPS5ta6ItLLC3jq35c/Ay0B6wwp39ilrJM2EFLhjjminIjnzsambabVOob9XcGVz\ne83Xy3goo8K41JIyVj6WsKkJc3ZpMiCd5Ttk1X/QDGVEg5T96gI8vLz4liNZWRRa\nBbICk5gjbE0F5EmZdLh+91MCgYA+57zfgnknE/IYLRcPTSZn/gJZbTiIArJC+onb\n4VO9RYFFrm237a/qsgwHyfVLXODCgWnPsqI22avxYU/hxJiBeTxqx/Y/ZLGn58t7\nxPcx8DoQftG6hDSuAsH/FgOYFdnPrNLDf9gGtrg1AKCnS29ffE+NuCvBvtVyioA/\n0znLVwKBgEd+SBECep+95LBK1tCe4o46oPJWBzUK+nGed1JqMkaLnTTt7LwVigtJ\nyIc6HU8kRCE46YMMDsVlaCGjWIg4lNiiXfUK8wSWHNP7UW/EIBuL+DgbwsjsqYbB\nlLUh/mLnvpXqPzuIJvf+LJwA9iUy35oTcfmBaMQ9whQFSu2Qf6gl\n-----END RSA PRIVATE KEY-----\n"
}
}
14 changes: 10 additions & 4 deletions uplink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ tokio-stream = "0.1"
# binary utils
anyhow = "1"
axum = "0.6"
config = { version = "0.13.2", default-features = false, features = ["toml", "json"] }
config = { version = "0.13.2", default-features = false, features = [
"toml",
"json",
] }
structopt = "0.3"

# serializer
Expand All @@ -30,8 +33,8 @@ storage = { path = "../storage" }
# logging
log = "0.4"
regex = "1.7.1"
tracing = { version="0.1", features=["log"] }
tracing-subscriber = { version="=0.3.14", features=["env-filter"] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "=0.3.14", features = ["env-filter"] }

# built-in collectors
# tunshell
Expand All @@ -44,7 +47,10 @@ rand = "0.8"
fs2 = "0.4"
futures-util = "0.3"
human_bytes = "0.4"
reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"] }
reqwest = { version = "0.11", default-features = false, features = [
"stream",
"rustls-tls",
] }
# systemstats
sysinfo = "0.26"
# logcat
Expand Down
30 changes: 26 additions & 4 deletions uplink/src/base/bridge/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use flume::{Receiver, Sender};
use log::error;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::join;
Expand All @@ -20,7 +21,7 @@ pub use self::{
data_lane::{DataBridge, DataBridgeTx},
};

use super::Compression;
use super::{mqtt::MqttShutdown, Compression};
pub use metrics::StreamMetrics;

pub trait Point: Send + Debug {
Expand Down Expand Up @@ -74,6 +75,7 @@ pub(crate) struct DataBridgeShutdown;
pub struct Bridge {
pub(crate) data: DataBridge,
pub(crate) actions: ActionsBridge,
pub(crate) mqtt_shutdown: Sender<MqttShutdown>,
}

impl Bridge {
Expand All @@ -83,15 +85,20 @@ impl Bridge {
metrics_tx: Sender<StreamMetrics>,
actions_rx: Receiver<Action>,
shutdown_handle: Sender<()>,
mqtt_shutdown: Sender<MqttShutdown>,
) -> Self {
let data = DataBridge::new(config.clone(), package_tx.clone(), metrics_tx.clone());
let actions =
ActionsBridge::new(config, package_tx, actions_rx, shutdown_handle, metrics_tx);
Self { data, actions }
Self { data, actions, mqtt_shutdown }
}

pub fn tx(&self) -> BridgeTx {
BridgeTx { data: self.data.tx(), actions: self.actions.tx() }
BridgeTx {
data: self.data.tx(),
actions: self.actions.tx(),
mqtt_shutdown: self.mqtt_shutdown.clone(),
}
}

pub fn register_action_route(
Expand All @@ -115,6 +122,7 @@ impl Bridge {
pub struct BridgeTx {
pub data: DataBridgeTx,
pub actions: ActionsBridgeTx,
pub mqtt_shutdown: Sender<MqttShutdown>,
}

impl BridgeTx {
Expand All @@ -131,6 +139,20 @@ impl BridgeTx {
}

pub async fn trigger_shutdown(&self) {
join!(self.actions.trigger_shutdown(), self.data.trigger_shutdown());
join!(self.actions.trigger_shutdown(), self.data.trigger_shutdown(), async {
if let Err(e) = self.mqtt_shutdown.send_async(MqttShutdown).await {
error!("Failed to trigger mqtt shutdown. Error = {e}")
}
});
}

/*
TODO:
Description: Shutdown MQTT connection with broker when getting signals for exit/quit
Create a directory inside the persistence directory
- save inflight data packets
*/
}
Loading

0 comments on commit 1dda22a

Please sign in to comment.