Skip to content

Commit

Permalink
adds /vehiclestatuses endpoint to fms-server
Browse files Browse the repository at this point in the history
  • Loading branch information
eriksven committed Dec 14, 2023
1 parent 83557d1 commit 03a0660
Show file tree
Hide file tree
Showing 4 changed files with 995 additions and 10 deletions.
2 changes: 1 addition & 1 deletion components/fms-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ readme.workspace = true

[dependencies]
async-trait = "0.1"
axum = "0.6"
axum = "0.7.2"
chrono = { workspace = true, features = ["clock", "serde"] }
clap = { workspace = true, features = ["std", "env", "color", "help", "usage", "error-context", "suggestions"] }
const_format = { version = "0.2" }
Expand Down
157 changes: 154 additions & 3 deletions components/fms-server/src/influx_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
//
// SPDX-License-Identifier: Apache-2.0

use chrono::{DateTime, Utc};
use chrono::{DateTime, Utc, NaiveDateTime};
use clap::ArgMatches;
use const_format::formatcp;
use influx_client::connection::InfluxConnection;
use influxrs::InfluxError;
use log::error;

use crate::models::{self, GnssPositionObject, TriggerObject, VehiclePositionObject};
use crate::models::{self, GnssPositionObject, TriggerObject, VehiclePositionObject, VehicleStatusObject, DriverWorkingStateProperty};

const FILTER_FIELDS_POSITION: &str = formatcp!(
r#"filter(fn: (r) => contains(set: ["{}","{}","{}","{}","{}","{}","{}","{}", "{}"], value: r._field))"#,
Expand All @@ -48,14 +49,39 @@ fn unpack_value_i32(value: Option<&String>) -> Option<i32> {
value.and_then(|v| v.parse().ok())
}

fn unpack_value_i64(value: Option<&String>) -> Option<i64> {
value.and_then(|v| v.parse().ok())
}

fn unpack_value_f64(value: Option<&String>) -> Option<f64> {
value.and_then(|v| v.parse().ok())
}

fn unpack_time(value: Option<&String>) -> Option<DateTime<Utc>> {
fn unpack_value_bool(value: Option<&String>) -> Option<bool> {
value.and_then(|v| v.parse().ok())
}

fn unpack_time(value: Option<&String>) -> Option<DateTime<Utc>> {
let timestamp = unpack_value_i64(value)?;
NaiveDateTime::from_timestamp_millis(timestamp)?.and_local_timezone(Utc).latest()
}

fn unpack_driver_working_state(value: Option<&String>) -> Option<DriverWorkingStateProperty> {
if let Some(state) = value {
let working_state = match state.as_str() {
"REST" => Some(DriverWorkingStateProperty::Rest),
"DRIVER_AVAILABLE" => Some(DriverWorkingStateProperty::DriverAvailable),
"WORK" => Some(DriverWorkingStateProperty::Work),
"DRIVE" => Some(DriverWorkingStateProperty::Drive),
"ERROR" => Some(DriverWorkingStateProperty::Error),
"NOT_AVAILABLE" => Some(DriverWorkingStateProperty::NotAvailable),
_ => None,
};
return working_state;
}
None
}

pub struct InfluxReader {
influx_con: InfluxConnection,
}
Expand Down Expand Up @@ -166,4 +192,129 @@ impl InfluxReader {
.collect()
})
}

pub async fn get_vehiclesstatuses(
&self,
start_time: i64,
stop_time: i64,
vin: Option<&String>,
trigger: Option<&String>,
latest_only: Option<bool>,
) -> Result<Vec<models::VehicleStatusObject>, InfluxError> {
// Build Query
let time_filter = format!("range(start: {}, stop: {})", start_time, stop_time);
let vin_filter = match vin {
Some(v) => format!(r#"filter(fn: (r) => r["{}"] == "{}""#, influx_client::TAG_VIN, v),
None => FILTER_TAG_ANY_VIN.to_string(),
};
let trigger_filter = match trigger {
Some(t) => format!(r#"filter(fn: (r) => r["{}"] == "{}")"#, influx_client::TAG_TRIGGER, t),
None => FILTER_TAG_ANY_TRIGGER.to_string(),
};

let mut read_query = influxrs::Query::new(format!(r#"from(bucket: "{}")"#, self.influx_con.bucket))
.then(time_filter)
.then(vin_filter)
.then(trigger_filter)
.then(r#"aggregateWindow(every: 500ms, fn: last, createEmpty: false)"#);
if Some(true) == latest_only {
read_query = read_query
.then(r#"group(columns: ["_measurement", "_field", "vin"], mode:"by")"#)
.then("last()");
}
read_query = read_query
.then(r#"group(columns: ["_field", "trigger", "vin"], mode:"by")"#)
.then(r#"pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")"#)
.then(r#"group(columns: ["_time"], mode:"by")"#);

self.influx_con.client.query(read_query).await
.map_err(|e| {
error!("Error during the query for vehicle statuses: {}", e);
e
})
.map(|measurements| {
measurements
.into_iter()
.filter_map(|entry| {
let vin = entry.get(influx_client::TAG_VIN);
let trigger = entry.get(influx_client::TAG_TRIGGER);
let date_time = entry.get(influx_client::FIELD_CREATED_DATE_TIME);
let unpacked_time = unpack_time(date_time);
match (
vin, trigger, unpacked_time,
) {
(Some(vin), Some(trigger), Some(created_date_time)) => {
let gnss_position = match (
unpack_time(entry.get(influx_client::FIELD_POSITION_DATE_TIME)),
unpack_value_f64(entry.get(influx_client::FIELD_LONGITUDE)),
unpack_value_f64(entry.get(influx_client::FIELD_LATITUDE)),
) {
(Some(position_date_time), Some(longitude), Some(latitude)) => {
Some(GnssPositionObject {
latitude,
longitude,
heading: unpack_value_i32(entry.get(influx_client::FIELD_HEADING)),
altitude: unpack_value_i32(entry.get(influx_client::FIELD_ALTITUDE)),
speed: unpack_value_f64(entry.get(influx_client::FIELD_SPEED)),
position_date_time,
})
}
_ => None,
};

let snapshot_data = Some(models::SnapshotDataObject {
gnss_position,
wheel_based_speed: unpack_value_f64(entry.get(influx_client::FIELD_WHEEL_BASED_SPEED)),
tachograph_speed: unpack_value_f64(entry.get(influx_client::FIELD_TACHOGRAPH_SPEED)),
engine_speed: unpack_value_f64(entry.get(influx_client::FIELD_ENGINE_SPEED)),
electric_motor_speed: None,
fuel_type: None,
fuel_level1: unpack_value_f64(entry.get(influx_client::FIELD_FUEL_LEVEL1)),
fuel_level2: unpack_value_f64(entry.get(influx_client::FIELD_FUEL_LEVEL2)),
catalyst_fuel_level: unpack_value_f64(entry.get(influx_client::FIELD_CATALYST_FUEL_LEVEL)),
driver1_working_state: unpack_driver_working_state(entry.get(influx_client::FIELD_DRIVER1_WORKING_STATE)),
driver2_id: None,
driver2_working_state: unpack_driver_working_state(entry.get(influx_client::FIELD_DRIVER2_WORKING_STATE)),
ambient_air_temperature: unpack_value_f64(entry.get(influx_client::FIELD_AMBIENT_AIR_TEMP)),
parking_brake_switch: unpack_value_bool(entry.get(influx_client::FIELD_PARKING_BREAK_SWITCH)),
hybrid_battery_pack_remaining_charge: None,
battery_pack_charging_status: None,
battery_pack_charging_connection_status: None,
battery_pack_charging_device: None,
battery_pack_charging_power: None,
estimated_time_battery_pack_charging_completed: None,
estimated_distance_to_empty: None,
vehicle_axles: None,
trailers: None,
});

Some(VehicleStatusObject {
vin: vin.to_string(),
trigger_type: TriggerObject::new(
trigger.to_string(),
"RFMS".to_string(),
),
created_date_time,
received_date_time: Utc::now(),
hr_total_vehicle_distance: unpack_value_i64(entry.get(influx_client::FIELD_HR_TOTAL_VEHICLE_DISTANCE)),
total_engine_hours: unpack_value_f64(entry.get(influx_client::FIELD_TOTAL_ENGINE_HOURS)),
total_electric_motor_hours: unpack_value_f64(entry.get(influx_client::FIELD_TOTAL_ELECTRIC_MOTOR_HOURS)),
driver1_id: None,
gross_combination_vehicle_weight: unpack_value_i32(entry.get(influx_client::FIELD_GROSS_COMBINATION_VEHICLE_WEIGHT)),
engine_total_fuel_used: unpack_value_i64(entry.get(influx_client::FIELD_ENGINE_TOTAL_FUEL_USED)),
total_fuel_used_gaseous: None,
total_electric_energy_used: None,
status2_of_doors: None,
door_status: None,
accumulated_data: None,
snapshot_data,
uptime_data: None,
})
}
_ => None,
}
})
.collect()
})
}
}
85 changes: 79 additions & 6 deletions components/fms-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,14 @@ async fn main() {
.route("/", get(root))
.route("/rfms/vehicleposition", get(get_vehicleposition))
.route("/rfms/vehicles", get(get_vehicles))
.route("/rfms/vehiclestatuses", get(get_vehiclesstatuses))
.with_state(influx_reader);
axum::Server::bind(&"0.0.0.0:8081".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
let listener = tokio::net::TcpListener::bind("0.0.0.0:8081").await.unwrap();
axum::serve(listener, app).await.unwrap();
}

async fn root() -> &'static str {
"Welcome to the rFMS server. The following endpoints are implemented: '/rfms/vehicleposition and /rfms/vehicles'"
"Welcome to the rFMS server. The following endpoints are implemented: '/rfms/vehicleposition', '/rfms/vehicles', and '/rfms/vehiclestatuses'"
}

async fn get_vehicleposition(
Expand Down Expand Up @@ -130,7 +129,81 @@ async fn get_vehicles(
Json(result_object)
})
.map_err(|e| {
error!("error retrieving vehicle status: {e}");
error!("error retrieving vehicles: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})
}

fn parse_latest_only(params: &HashMap<String, String>) -> Result<Option<bool>, StatusCode> {
let latest_parameter = params.get("latestOnly");
if let Some(latest_string) = latest_parameter {
let latest_result = latest_string.parse();
if latest_result.is_err() {
return Err(StatusCode::BAD_REQUEST);
}
return Ok(latest_result.ok());
}
return Ok(None);
}

async fn get_vehiclesstatuses(
State(influx_server): State<Arc<InfluxReader>>,
Query(params): Query<HashMap<String, String>>,
) -> Result<Json<serde_json::Value>, StatusCode> {


let start_parameter = params.get("starttime");
let stop_parameter = params.get("stoptime");


let latest_only = match parse_latest_only(&params) {
Ok(value) => value,
Err(status) => return Err(status),
};

if start_parameter.is_none() && latest_only.is_none() {
// rFMS makes it mandatory to either supply the starttime or latestOnly
return Err(StatusCode::BAD_REQUEST);
}

if latest_only.is_some() && (start_parameter.is_some() || stop_parameter.is_some()) {
// rFMS does not allow to set latestOnly and and time at the same time
return Err(StatusCode::BAD_REQUEST);
}

let start_time = start_parameter.map_or(0, |text| {
DateTime::<Utc>::from_str(text).map_or(0, |time| time.timestamp())
});

let stop_time = stop_parameter
.map_or_else(|| Utc::now().timestamp(), |text| {
DateTime::<Utc>::from_str(text).map_or_else(|_| Utc::now().timestamp(), |time| time.timestamp())
});

let vin = params.get("vin");
let trigger_filter = params.get("triggerFilter");


influx_server
.get_vehiclesstatuses(start_time, stop_time, vin, trigger_filter, latest_only)
.await
.map(|vehicles_statuses| {
let response = models::VehicleStatusResponseObjectVehicleStatusResponse {
vehicle_statuses: Some(vehicles_statuses),
};

//TODO for request_server_date_time
// put in start time used in influx query instead of now
let result_object = json!(models::VehicleStatusResponseObject {
vehicle_status_response: response,
more_data_available: false,
more_data_available_link: None,
request_server_date_time: Utc::now()
});
Json(result_object)
})
.map_err(|e| {
error!("error retrieving vehicle statuses: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})
}
Loading

0 comments on commit 03a0660

Please sign in to comment.