Skip to content

Commit

Permalink
add compose stuff, args, comments, improve err
Browse files Browse the repository at this point in the history
todo/still wrong with this
- mqtt reconnect sucks and there is no disconnect/reconnect printout
- there is no mock stuff yet
  • Loading branch information
jr1221 committed Jul 11, 2024
1 parent 337e84e commit 5280c26
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 51 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/build-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ jobs:
PROD=true
BACKEND_URL=http://192.168.100.1:8000
MAP_ACCESS_TOKEN=pk.eyJ1IjoibWNrZWVwIiwiYSI6ImNscXBrcmU1ZTBscWIya284cDFyYjR3Nm8ifQ.6TQHlxhAJzptZyV-W28dnw
- name: Build and push Docker image for scylla server
- name: Build and push Docker image for scylla server rust
uses: docker/[email protected]
with:
context: ./scylla-server
context: ./scylla-server-rust
push: true
target: production
platforms: linux/arm64,linux/amd64
tags: ${{ steps.meta.outputs.tags }}-scylla-server # argos-scylla-server
tags: ${{ steps.meta.outputs.tags }}-scylla-server-rust # argos-scylla-server
labels: ${{ steps.meta.outputs.labels }}
# for caching
# cache-from: type=gha
Expand Down
38 changes: 37 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,27 @@ services:
mem_limit: 3gb
stop_grace_period: 1m

scylla-server-rust:
container_name: scylla-server-rust
image: ghcr.io/northeastern-electric-racing/argos:Develop-scylla-server-rust
build:
context: ./scylla-server-rust
restart: unless-stopped
ports:
- 8000:8000
depends_on:
- odyssey-timescale
- siren
environment:
- SOURCE_DATABASE_URL=postgresql://postgres:password@odyssey-timescale:5432/timescaledb
- PROD_SIREN_HOST_URL=siren:1883
- PROD_SCYLLA=true
cpu_shares: 1024
mem_limit: 2gb
stop_grace_period: 10s



client:
container_name: client
restart: unless-stopped
Expand All @@ -25,7 +46,7 @@ services:
context: ./angular-client
args:
PROD: "true"
BACKEND_URL: http://192.168.100.1:8000
BACKEND_URL: http://localhost:8000
MAP_ACCESS_TOKEN: pk.eyJ1IjoibWNrZWVwIiwiYSI6ImNscXBrcmU1ZTBscWIya284cDFyYjR3Nm8ifQ.6TQHlxhAJzptZyV-W28dnw
target: production
dockerfile: Dockerfile
Expand All @@ -34,6 +55,21 @@ services:
cpu_shares: 512
mem_limit: 1gb

siren:
container_name: siren
restart: unless-stopped
image: eclipse-mosquitto:latest
ports:
- 1883:1883
- 9001:9001 # why?
expose:
- 1883
volumes:
- ./siren-base/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf
cpu_shares: 2048
mem_limit: 2gb
oom_kill_disable: true



volumes:
Expand Down
25 changes: 25 additions & 0 deletions scylla-server-rust/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# editor things
.idea/
.zed/
.vscode/

# misc things
.DS_Store
.gitignore
*.nix

# python things
pyrightconfig.json
__pycache__/

# Added by cargo (rust things)
/target
build/
dist/
logs/

# prisma
prisma.rs

# protobuf
serverdata.rs
4 changes: 4 additions & 0 deletions scylla-server-rust/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@ FROM rust:latest

WORKDIR /usr/src/myapp

COPY . .
RUN cargo prisma generate
RUN cargo build --release
CMD ["sh", "-c", "cargo prisma migrate deploy && /usr/src/myapp/target/release/scylla-server-rust"]
11 changes: 10 additions & 1 deletion scylla-server-rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,13 @@ docker volume rm argos_db-data
docker compose run -Pd odyssey-timescale
cargo prisma migrate deploy
SOURCE_DATABASE_URL=postgresql://postgres:[email protected]:5432/timescaledb cargo test -- --test-threads=1
```
```

### Deploy this app

Use the docker compose above to build & deploy. Note the CI prebuilds arm64 and amd64 images upon request in the actions tab of this repository's github page.
```
docker compose build
docker compose up # use -d to fork to background
```
A database migration is triggered upon every bootup.
11 changes: 8 additions & 3 deletions scylla-server-rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tower_http::cors::{Any, CorsLayer};
#[tokio::main]
async fn main() {
// create the database stuff
let db: Database = Arc::new(PrismaClient::_builder().build().await.unwrap());
let db: Database = Arc::new(PrismaClient::_builder().build().await.expect("Could not build prisma DB"));

// create the socket stuff
let (socket_layer, io) = SocketIo::new_layer();
Expand Down Expand Up @@ -55,7 +55,12 @@ async fn main() {
));

// create and spawn the mqtt reciever
let (recv, eloop) = MqttReciever::new(tx, "localhost:1883", db.clone()).await;
let (recv, eloop) = MqttReciever::new(
tx,
std::env::var("PROD_SIREN_HOST_URL").unwrap_or("localhost:1883".to_string()),
db.clone(),
)
.await;
tokio::spawn(recv.recieve_mqtt(eloop));

let app = Router::new()
Expand Down Expand Up @@ -95,7 +100,7 @@ async fn main() {
)
.with_state(db.clone());

let listener = tokio::net::TcpListener::bind("0.0.0.0:8000").await.unwrap();
let listener = tokio::net::TcpListener::bind("0.0.0.0:8000").await.expect("Could not bind to 8000!");
let axum_token = token.clone();
tokio::spawn(async {
axum::serve(listener, app)
Expand Down
68 changes: 47 additions & 21 deletions scylla-server-rust/src/reciever/db_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,18 @@ use crate::{
Database,
};

/// The upload interval for batch uploads
const UPLOAD_INTERVAL: u64 = 5000;

use super::ClientData;
use super::{ClientData, LocationData};

/// A struct defining an in progress location packet
struct LocLock {
location_name: Option<String>,
points: Option<(f64, f64)>,
radius: Option<f64>,
}

struct Loc {
location_name: String,
lat: f64,
long: f64,
radius: f64,
}

impl LocLock {
pub fn new() -> LocLock {
LocLock {
Expand All @@ -39,22 +34,26 @@ impl LocLock {
}
}

/// Add the location name to the packet
pub fn add_loc_name(&mut self, loc_name: String) {
self.location_name = Some(loc_name);
}

/// Add points to the packet
pub fn add_points(&mut self, lat: f64, long: f64) {
self.points = Some((lat, long));
}

/// Add a radius to the packet
pub fn add_radius(&mut self, radius: f64) {
self.radius = Some(radius);
}

pub fn finalize(&mut self) -> Option<Loc> {
/// Attempt to finalize the packet, returning a location data and clearing this object or None if still in progress
pub fn finalize(&mut self) -> Option<LocationData> {
if self.location_name.is_some() && self.points.is_some() && self.radius.is_some() {
self.clear();
return Some(Loc {
return Some(LocationData {
location_name: self.location_name.clone().unwrap(),
lat: self.points.unwrap().0,
long: self.points.unwrap().1,
Expand All @@ -64,19 +63,28 @@ impl LocLock {
None
}

/// Clear the internal state
fn clear(&mut self) {
self.location_name = None;
self.points = None;
self.radius = None;
}
}

/// A few threads to manage the processing and inserting of special types,
/// upserting of metadata for data, and batch uploading the database
pub struct DbHandler {
/// The list of nodes seen by this instance, used for when to upsert
node_list: Vec<String>,
/// The list of data types seen by this instance, used for when to upsert
datatype_list: Vec<String>,
/// The broadcast channel which provides serial datapoints for processing
reciever: BroadcastReceiver<ClientData>,
/// The database
db: Database,
/// An internal state of an in progress location packet
loc_lock: LocLock,
/// Whether the location has been modified this loop
is_loc: bool,
}

Expand All @@ -94,6 +102,9 @@ impl DbHandler {
}
}

/// This loop handles batch uploading, and has no internal state or requirements
/// It uses the queue from data queue to insert to the database specified
/// On cancellation, will await one final queue message to cleanup anything remaining in the channel
pub async fn batching_loop(
mut data_queue: Receiver<Vec<ClientData>>,
database: Database,
Expand All @@ -119,6 +130,11 @@ impl DbHandler {
}
}

/// A loop which uses self and a sender channel to process data
/// If the data is special, i.e. coordinates, driver, etc. it will store it in its special location of the db immediately
/// For all data points it will add the to the data_channel for batch uploading logic when a certain time has elapsed
/// Before this time the data is stored in an internal queue.
/// On cancellation, the messages currently in the queue will be sent as a final flush of any remaining messages recieved before cancellation
pub async fn handling_loop(
mut self,
data_channel: Sender<Vec<ClientData>>,
Expand All @@ -136,8 +152,9 @@ impl DbHandler {
},
Ok(msg) = self.reciever.recv() => {

// If the time is greater than upload interval, push to batch upload thread and clear queue
if tokio::time::Instant::now().duration_since(last_time)
> Duration::from_millis(UPLOAD_INTERVAL)
> Duration::from_millis(UPLOAD_INTERVAL) && !data_queue.is_empty()
{
data_channel.send(data_queue.clone()).await.expect("Could not comm data to db thread");
data_queue.clear();
Expand Down Expand Up @@ -166,18 +183,20 @@ impl DbHandler {
self.datatype_list.push(msg.name.clone());
}

// if data has some special meanings, push them to the database immediately, otherwise enter batching logic
// if data has some special meanings, push them to the database immediately, notably no matter what also enter batching logic
match msg.name.as_str() {
"Driver" => {
let _ = driver_service::upsert_driver(
if let Err(err) = driver_service::upsert_driver(
&self.db,
msg.values
.first()
.unwrap_or(&"PizzaTheHut".to_string())
.to_string(),
msg.run_id,
)
.await;
.await {
println!("Driver upsert error: {:?}", err);
}
}
"location" => {
self.loc_lock.add_loc_name(
Expand All @@ -189,15 +208,17 @@ impl DbHandler {
self.is_loc = true;
}
"system" => {
let _ = system_service::upsert_system(
if let Err(err) = system_service::upsert_system(
&self.db,
msg.values
.first()
.unwrap_or(&"PizzaTheHut".to_string())
.to_string(),
msg.run_id,
)
.await;
.await {
println!("System upsert error: {:?}", err);
}
}
"GPS-Location" => {
self.loc_lock.add_points(
Expand All @@ -224,23 +245,28 @@ impl DbHandler {
);
self.is_loc = true;
}
_ => {
data_queue.push(msg.clone());
}
_ => {}
}
// if location has been modified, push a new location of the loc lock object returns Some
if self.is_loc {
if let Some(loc) = self.loc_lock.finalize() {
let _ = location_service::upsert_location(
if let Err(err) = location_service::upsert_location(
&self.db,
loc.location_name,
loc.lat,
loc.long,
loc.radius,
msg.run_id,
)
.await;
.await {
println!("Location upsert error: {:?}", err);
}
}
self.is_loc = false;
}

// no matter what, batch upload the message
data_queue.push(msg);
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions scylla-server-rust/src/reciever/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,12 @@ pub struct ClientData {
#[serde(skip_serializing)]
pub node: String,
}

/// A final location packet
/// This has the purpose of representing the struct for the service layer to unpack for insertion, and therefore is not serialized
struct LocationData {
location_name: String,
lat: f64,
long: f64,
radius: f64,
}
Loading

0 comments on commit 5280c26

Please sign in to comment.