Skip to content

Commit

Permalink
Do not use rx time for ordering anymore
Browse files Browse the repository at this point in the history
  • Loading branch information
jurriaan committed Jun 13, 2024
1 parent 6635007 commit 30a2d8b
Showing 1 changed file with 5 additions and 27 deletions.
32 changes: 5 additions & 27 deletions src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,14 @@ async fn process_mesh_packet(
let _ = sqlx::query!(
"UPDATE nodes
SET last_rx_time = ?, last_rx_snr = ?, last_rx_rssi = ?, last_hop_start = ?, last_hop_limit = ?, updated_at = ?
WHERE node_id = ? AND (last_rx_time IS NULL OR last_rx_time < ?)",
WHERE node_id = ?",
rx_time_nanos, // New last_rx_time value
packet.rx_snr, // New last_rx_snr value
packet.rx_rssi, // New last_rx_rssi value
packet.hop_start, // New last_hop_start value
packet.hop_limit, // New last_hop_limit value
now,
packet.from, // node_id condition
rx_time_nanos // Comparison value for last_rx_time
)
.execute(&mut **txn)
.await?;
Expand Down Expand Up @@ -237,21 +236,15 @@ async fn handle_position_payload(
.fetch_one(&mut **txn)
.await?;

let rx_time_nanos = packet.rx_time as i64 * 1_000_000_000;

let _ = sqlx::query!(
"UPDATE nodes
SET latitude = ?, longitude = ?, altitude = ?, last_position_id = ?
WHERE node_id = ? AND (last_position_id IS NULL OR NOT EXISTS (
SELECT 1 FROM mesh_packets
JOIN positions ON positions.mesh_packet_id = mesh_packets.id
WHERE positions.id = nodes.last_position_id AND mesh_packets.rx_time > ?))",
WHERE node_id = ?",
latitude,
longitude,
position_payload.altitude,
result.id,
packet.from,
rx_time_nanos
)
.execute(&mut **txn)
.await?;
Expand Down Expand Up @@ -302,7 +295,6 @@ async fn handle_telemetry_payload(
) -> Result<(), anyhow::Error> {
if let Ok(telemetry_payload) = Telemetry::decode(&*data.payload) {
let time = none_if_default(telemetry_payload.time).map(|time| time as i64 * 1000000000);
let rx_time = packet.rx_time as i64 * 1_000_000_000;

match telemetry_payload.variant {
Some(telemetry::Variant::DeviceMetrics(device_metrics_payload)) => {
Expand All @@ -326,18 +318,14 @@ async fn handle_telemetry_payload(
// Update nodes with device metrics
let _ = sqlx::query!(
"UPDATE nodes SET battery_level = ?, voltage = ?, air_util_tx = ?, channel_utilization = ?, uptime_seconds = ?, last_device_metrics_id = ?
WHERE node_id = ? AND (last_device_metrics_id IS NULL OR NOT EXISTS (
SELECT 1 FROM mesh_packets
JOIN device_metrics ON device_metrics.mesh_packet_id = mesh_packets.id
WHERE device_metrics.id = nodes.last_device_metrics_id AND mesh_packets.rx_time > ?))",
WHERE node_id = ?",
device_metrics_payload.battery_level,
device_metrics_payload.voltage,
device_metrics_payload.air_util_tx,
device_metrics_payload.channel_utilization,
device_metrics_payload.uptime_seconds,
result.id,
packet.from,
rx_time,
)
.execute(&mut **txn)
.await?;
Expand All @@ -363,18 +351,14 @@ async fn handle_telemetry_payload(
// Update nodes with environment metrics
let _ = sqlx::query!(
"UPDATE nodes SET temperature = ?, relative_humidity = ?, barometric_pressure = ?, gas_resistance = ?, iaq = ?, last_environment_metrics_id = ?
WHERE node_id = ? AND (last_environment_metrics_id IS NULL OR NOT EXISTS (
SELECT 1 FROM mesh_packets
JOIN environment_metrics ON environment_metrics.mesh_packet_id = mesh_packets.id
WHERE environment_metrics.id = nodes.last_environment_metrics_id AND mesh_packets.rx_time > ?))",
WHERE node_id = ?",
environment_metrics_payload.temperature,
environment_metrics_payload.relative_humidity,
environment_metrics_payload.barometric_pressure,
environment_metrics_payload.gas_resistance,
environment_metrics_payload.iaq,
result.id,
packet.from,
rx_time,
)
.execute(&mut **txn)
.await?;
Expand Down Expand Up @@ -431,16 +415,11 @@ async fn handle_nodeinfo_payload(
.fetch_one(&mut **txn)
.await?;

let rx_time_nanos = packet.rx_time as i64 * 1_000_000_000;

// Update nodes table
let _ = sqlx::query!(
"UPDATE nodes
SET user_id = ?, long_name = ?, short_name = ?, hw_model_id = ?, is_licensed = ?, role = ?, last_node_info_id = ?
WHERE node_id = ? AND (last_node_info_id IS NULL OR NOT EXISTS (
SELECT 1 FROM mesh_packets
JOIN node_info ON node_info.mesh_packet_id = mesh_packets.id
WHERE node_info.id = nodes.last_node_info_id AND mesh_packets.rx_time > ?))",
WHERE node_id = ?",
node_info_payload.id,
node_info_payload.long_name,
node_info_payload.short_name,
Expand All @@ -449,7 +428,6 @@ async fn handle_nodeinfo_payload(
node_info_payload.role,
result.id,
packet.from,
rx_time_nanos,
)
.execute(&mut **txn)
.await?;
Expand Down

0 comments on commit 30a2d8b

Please sign in to comment.