Skip to content

Commit

Permalink
Some fixes for Zenoh variant
Browse files Browse the repository at this point in the history
Also updated protobuf dependency to version 3.5 to remove reference to
unsupported lint from generated code.
  • Loading branch information
sophokles73 committed Oct 10, 2024
1 parent e86089a commit b6ccb6d
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 147 deletions.
19 changes: 10 additions & 9 deletions components/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ fms-proto = { path = "fms-proto" }
influx-client = { path = "influx-client", default-features = false }
influxrs = { version = "2.0" }
log = { version = "0.4" }
protobuf = { version = "3.3" }
protobuf-codegen = { version = "3.3" }
protobuf = { version = "3.5" }
protobuf-codegen = { version = "3.5" }
protoc-bin-vendored = { version = "3.0" }
# prost has no features
prost = { version = "0.12" }
Expand Down
1 change: 1 addition & 0 deletions components/fms-consumer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ clap = { workspace = true, features = [
"error-context",
"suggestions",
] }
duration-str = { workspace = true }
env_logger = { workspace = true }
fms-proto = { workspace = true }
futures = { version = "0.3" }
Expand Down
147 changes: 85 additions & 62 deletions components/fms-consumer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ fn parse_zenoh_args(args: &ArgMatches) -> Config {
.set_enabled(Some(*values))
.unwrap();
}

if let Some(values) = args.get_one::<Duration>("session-timeout") {
let millis = u64::try_from(values.as_millis()).unwrap_or(u64::MAX);
config.scouting.set_timeout(Some(millis)).unwrap();
}
config
}

Expand Down Expand Up @@ -310,10 +313,20 @@ async fn run_async_processor_zenoh(args: &ArgMatches) {
let config = parse_zenoh_args(zenoh_args);

info!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
let session = zenoh::open(config).res().await.unwrap_or_else(|e| {
error!("failed to open Zenoh session: {e}");
process::exit(1);
});

info!("Declaring Subscriber on '{}'...", &KEY_EXPR);
let subscriber = session.declare_subscriber(KEY_EXPR).res().await.unwrap();
let subscriber = session
.declare_subscriber(KEY_EXPR)
.res()
.await
.unwrap_or_else(|e| {
error!("failed to create Zenoh subscriber: {e}");
process::exit(1);
});
loop {
select!(
sample = subscriber.recv_async() => {
Expand Down Expand Up @@ -342,69 +355,79 @@ pub async fn main() {
.subcommand_required(true)
.subcommand(
Command::new(SUBCOMMAND_HONO)
.about("Forwards VSS data to an Influx DB server from Hono's north bound Kafka API").arg(
Arg::new(PARAM_KAFKA_PROPERTIES_FILE)
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long(PARAM_KAFKA_PROPERTIES_FILE)
.help("The path to a file containing Kafka client properties for connecting to the Kafka broker(s).")
.action(ArgAction::Set)
.value_name("PATH")
.env("KAFKA_PROPERTIES_FILE")
.required(true),
)
.arg(
Arg::new(PARAM_KAFKA_TOPIC_NAME)
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long(PARAM_KAFKA_TOPIC_NAME)
.alias("topic")
.help("The name of the Kafka topic to consume VSS data from.")
.value_name("TOPIC")
.required(true)
.env("KAFKA_TOPIC_NAME"),
),
.about("Forwards VSS data to an Influx DB server from Hono's north bound Kafka API")
.arg(
Arg::new(PARAM_KAFKA_PROPERTIES_FILE)
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long(PARAM_KAFKA_PROPERTIES_FILE)
.help("The path to a file containing Kafka client properties for connecting to the Kafka broker(s).")
.action(ArgAction::Set)
.value_name("PATH")
.env("KAFKA_PROPERTIES_FILE")
.required(true),
)
.arg(
Arg::new(PARAM_KAFKA_TOPIC_NAME)
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long(PARAM_KAFKA_TOPIC_NAME)
.alias("topic")
.help("The name of the Kafka topic to consume VSS data from.")
.value_name("TOPIC")
.required(true)
.env("KAFKA_TOPIC_NAME"),
),
)
.subcommand(
Command::new(SUBCOMMAND_ZENOH)
.about("Forwards VSS data to an Influx DB server from Eclipse Zenoh")
.arg(
Arg::new("mode")
.value_parser(clap::value_parser!(WhatAmI))
.long("mode")
.short('m')
.help("The Zenoh session mode (peer by default).")
.required(false),
)
.arg(
Arg::new("connect")
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long("connect")
.short('e')
.help("Endpoints to connect to.")
.required(false),
)
.arg(
Arg::new("listen")
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long("listen")
.short('l')
.help("Endpoints to listen on.")
.required(false),
)
.arg(
Arg::new("no-multicast-scouting")
.long("no-multicast-scouting")
.help("Disable the multicast-based scouting mechanism.")
.action(clap::ArgAction::SetFalse)
.required(false),
)
.arg(
Arg::new("config")
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long("config")
.short('c')
.help("A configuration file.")
.required(false),
),
.arg(
Arg::new("mode")
.value_parser(clap::value_parser!(WhatAmI))
.long("mode")
.short('m')
.help("The Zenoh session mode (peer by default).")
.required(false),
)
.arg(
Arg::new("connect")
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long("connect")
.short('e')
.help("Endpoints to connect to.")
.required(false),
)
.arg(
Arg::new("listen")
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long("listen")
.short('l')
.help("Endpoints to listen on.")
.required(false),
)
.arg(
Arg::new("no-multicast-scouting")
.long("no-multicast-scouting")
.help("Disable the multicast-based scouting mechanism.")
.action(clap::ArgAction::SetFalse)
.required(false),
)
.arg(
Arg::new("config")
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long("config")
.short('c')
.help("A configuration file.")
.required(false),
)
.arg(
Arg::new("session-timeout")
.value_parser(|s: &str| duration_str::parse(s))
.long("session-timeout")
.help("The time to wait for establishment of a Zenoh session, e.g. 10s.")
.value_name("DURATION_SPEC")
.required(false)
.default_value("20s")
),
);

let args = parser.get_matches();
Expand Down
21 changes: 0 additions & 21 deletions components/fms-forwarder/src/mqtt_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ const PARAM_CA_PATH: &str = "ca-path";
const PARAM_DEVICE_CERT: &str = "device-cert";
const PARAM_DEVICE_KEY: &str = "device-key";
const PARAM_ENABLE_HOSTNAME_VERIFICATION: &str = "enable-hostname-verification";
const PARAM_MQTT_CLIENT_ID: &str = "mqtt-client-id";
const PARAM_MQTT_URI: &str = "mqtt-uri";
const PARAM_MQTT_USERNAME: &str = "mqtt-username";
const PARAM_MQTT_PASSWORD: &str = "mqtt-password";
Expand All @@ -40,7 +39,6 @@ const PARAM_TRUST_STORE_PATH: &str = "trust-store-path";
///
/// | Long Name | Environment Variable | Default Value |
/// |------------------------------|------------------------------|---------------|
/// | mqtt-client-id | MQTT_CLIENT_ID | - |
/// | mqtt-uri | MQTT_URI | - |
/// | mqtt-username | MQTT_USERNAME | - |
/// | mqtt-password | MQTT_PASSWORD | - |
Expand All @@ -52,15 +50,6 @@ const PARAM_TRUST_STORE_PATH: &str = "trust-store-path";
///
pub fn add_command_line_args(command: Command) -> Command {
command
.arg(
Arg::new(PARAM_MQTT_CLIENT_ID)
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long(PARAM_MQTT_CLIENT_ID)
.help("The client identifier to use in the MQTT Connect Packet.")
.value_name("ID")
.required(false)
.env("MQTT_CLIENT_ID"),
)
.arg(
Arg::new(PARAM_MQTT_URI)
.value_parser(clap::builder::NonEmptyStringValueParser::new())
Expand Down Expand Up @@ -142,7 +131,6 @@ pub fn add_command_line_args(command: Command) -> Command {
pub struct MqttConnection {
pub mqtt_client: AsyncClient,
pub uri: String,
pub client_id: String,
}

impl MqttConnection {
Expand Down Expand Up @@ -221,17 +209,12 @@ impl MqttConnection {
pub async fn new(args: &ArgMatches) -> Result<Self, Box<dyn std::error::Error>> {
let connect_options = MqttConnection::get_connect_options(args)?;
let mqtt_uri = args.get_one::<String>(PARAM_MQTT_URI).unwrap().to_owned();
let client_id = args
.get_one::<String>(PARAM_MQTT_CLIENT_ID)
.unwrap_or(&"".to_string())
.to_owned();
info!("connecting to MQTT endpoint at {}", mqtt_uri);
match CreateOptionsBuilder::new()
.server_uri(&mqtt_uri)
.max_buffered_messages(50)
.send_while_disconnected(true)
.delete_oldest_messages(true)
.client_id(&client_id)
.create_client()
{
Err(e) => {
Expand All @@ -247,7 +230,6 @@ impl MqttConnection {
Ok(MqttConnection {
mqtt_client: client,
uri: mqtt_uri,
client_id,
})
}
}
Expand Down Expand Up @@ -289,9 +271,6 @@ mod tests {
matches.get_one::<String>(super::PARAM_MQTT_URI).unwrap(),
"mqtts://non-existing.host.io"
);
assert!(matches
.get_one::<String>(super::PARAM_MQTT_CLIENT_ID)
.is_none());
assert!(matches
.get_one::<String>(super::PARAM_MQTT_USERNAME)
.is_none());
Expand Down
7 changes: 0 additions & 7 deletions components/fms-server/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ pub mod vehicle;

/// This description is placed here due to limitations of describing references in OpenAPI Property __driverId__: The driver id of driver. (independant whether it is driver or Co-driver) This is only set if the TriggerType = DRIVER_LOGIN, DRIVER_LOGOUT, DRIVER_1_WORKING_STATE_CHANGED or DRIVER_2_WORKING_STATE_CHANGED For DRIVER_LOGIN it is the id of the driver that logged in For DRIVER_LOGOUT it is the id of the driver that logged out For DRIVER_1_WORKING_STATE_CHANGED it is the id of driver 1 For DRIVER_2_WORKING_STATE_CHANGED it is the id of driver 2 Property __tellTaleInfo__: The tell tale(s) that triggered this message. This is only set if the TriggerType = TELL_TALE
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))]
pub struct TriggerObject {
/// Trigger types for Context=RFMS: TIMER - Data was sent due to a timer trigger. (Timer value set outside rFMS scope) IGNITION_ON - Data was sent due to an ignition on IGNITION_OFF - Data was sent due to an ignition off PTO_ENABLED - Data was sent due to that a PTO was enabled, will be sent for each PTO that gets enabled PTO_DISABLED - Data was sent due to that a PTO was disabled, will be sent for each PTO that gets disabled. DRIVER_LOGIN - Data was sent due to a successful driver login. DRIVER_LOGOUT - Data was sent due to a driver logout TELL_TALE - Data was sent due to that at least one tell tale changed state ENGINE_ON - Data was sent due to an engine on. For electric motor crank is on ENGINE_OFF - Data was sent due to an engine off. For electric motor crank is off DRIVER_1_WORKING_STATE_CHANGED - Data was sent due to that driver 1 changed working state DRIVER_2_WORKING_STATE_CHANGED - Data was sent due to that driver 2 changed working state DISTANCE_TRAVELLED - Data was sent due to that a set distance was travelled. (Distance set outside rFMS scope) FUEL_TYPE_CHANGE - Data was sent due to that the type of fuel currently being utilized by the vehicle changed PARKING_BRAKE_SWITCH_CHANGE - Data was sent due to that the parking brake state has changed BATTERY_PACK_CHARGING_STATUS_CHANGE - Data was sent due to a change in the battery pack charging status. BATTERY_PACK_CHARGING_CONNECTION_STATUS_CHANGE - Data was sent due to a change in the battery pack charging connection status. TRAILER_CONNECTED - One or several trailers were connected TRAILER_DISCONNECTED - One or several trailers were disconnected
#[serde(rename = "triggerType")]
Expand Down Expand Up @@ -79,7 +78,6 @@ impl TriggerObject {
}

#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))]
pub struct DriverIdObject {
#[serde(rename = "tachoDriverIdentification")]
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -91,7 +89,6 @@ pub struct DriverIdObject {
}

#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))]
pub struct TellTaleObject {
// Note: inline enums are not fully supported by openapi-generator
#[serde(rename = "tellTale")]
Expand All @@ -110,7 +107,6 @@ pub struct TellTaleObject {

/// Additional information can be provided if the trigger type is BATTERY_PACK_CHARGING_STATUS_CHANGE.
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))]
pub struct TriggerObjectChargingStatusInfo {
/// CHARGING_STARTED - Charging has started CHARGING_COMPLETED - Charging is completed CHARGING_INTERRUPTED - Charging has been interrupted (no error) ERROR - An error occurred when charging ESTIMATED_COMPLETION_TIME_CHANGED - The estimated time for completed charging has changed. (Threshold is outside scope of rFMS) TIMER - A predefined time has passed since last charge status update. (Frequency is outside the scope of rFMS) CHARGING_LEVEL - The charging level has reached a predefined level. (Charging levels are outside the scope of rFMS)
// Note: inline enums are not fully supported by openapi-generator
Expand All @@ -126,7 +122,6 @@ pub struct TriggerObjectChargingStatusInfo {

/// Additional information can be provided if the trigger type is BATTERY_PACK_CHARGING_CONNECTION_STATUS_CHANGE.
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))]
pub struct TriggerObjectChargingConnectionStatusInfo {
/// CONNECTING - Vehicle is being connected to a charger CONNECTED - Vehicle is connected to a charger DISCONNECTING - Vehicle is being disconnected from the charger DISCONNECTED - Vehicle is not connected to a charger ERROR - An error occurred
// Note: inline enums are not fully supported by openapi-generator
Expand All @@ -141,7 +136,6 @@ pub struct TriggerObjectChargingConnectionStatusInfo {
}

#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))]
pub struct DriverIdObjectTachoDriverIdentification {
/// The unique identification of a driver in a Member State. This fields is formatted according the definition for driverIdentification in COMMISSION REGULATION (EC) No 1360/2002 Annex 1b
#[serde(rename = "driverIdentification")]
Expand Down Expand Up @@ -169,7 +163,6 @@ pub struct DriverIdObjectTachoDriverIdentification {
}

#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))]
pub struct DriverIdObjectOemDriverIdentification {
/// Contains an optional id type (e.g. pin, USB, encrypted EU id...)
#[serde(rename = "idType")]
Expand Down
Loading

0 comments on commit b6ccb6d

Please sign in to comment.