Skip to content

Commit

Permalink
Simulated measurements now match the original version
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherRabotin committed Nov 22, 2024
1 parent 6aca68f commit e536886
Show file tree
Hide file tree
Showing 10 changed files with 337 additions and 12 deletions.
2 changes: 2 additions & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ pub enum InputOutputError {
ParseDhall { data: String, err: String },
#[snafu(display("error serializing {what} to Dhall: {err}"))]
SerializeDhall { what: String, err: String },
#[snafu(display("empty dataset error when (de)serializing for {action}"))]
EmptyDataset { action: &'static str },
}

impl PartialEq for InputOutputError {
Expand Down
32 changes: 32 additions & 0 deletions src/od/ground_station.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use anise::astro::{Aberration, AzElRange, PhysicsResult};
use anise::errors::AlmanacResult;
use anise::prelude::{Almanac, Frame, Orbit};

use super::msr::measurement::Measurement;
use super::msr::RangeDoppler;
use super::noise::StochasticNoise;
use super::{ODAlmanacSnafu, ODError, ODTrajSnafu, TrackingDeviceSim};
Expand Down Expand Up @@ -374,6 +375,37 @@ impl TrackingDeviceSim<Spacecraft, RangeDoppler> for GroundStation {

Ok(msr_noises)
}

fn measurement_covar_new(
&self,
msr_type: super::prelude::MeasurementType,
epoch: Epoch,
) -> Result<f64, ODError> {
Ok(match msr_type {
super::msr::MeasurementType::Range => self
.range_noise_km
.ok_or(ODError::NoiseNotConfigured { kind: "Range" })?
.covariance(epoch),
super::msr::MeasurementType::Doppler => self
.doppler_noise_km_s
.ok_or(ODError::NoiseNotConfigured { kind: "Doppler" })?
.covariance(epoch),
})
}

fn measure_new(
&mut self,
epoch: Epoch,
traj: &Traj<Spacecraft>,
rng: Option<&mut Pcg64Mcg>,
almanac: Arc<Almanac>,
) -> Result<Option<super::prelude::measurement::Measurement>, ODError> {
Ok(self.measure(epoch, traj, rng, almanac)?.map(|msr| {
Measurement::new(self.name.clone(), epoch)
.with(super::msr::MeasurementType::Range, msr.obs[0])
.with(super::msr::MeasurementType::Doppler, msr.obs[1])
}))
}
}

impl fmt::Display for GroundStation {
Expand Down
1 change: 1 addition & 0 deletions src/od/msr/arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::md::trajectory::Interpolatable;
use crate::od::prelude::TrkConfig;
use crate::od::{Measurement, TrackingDeviceSim};
use crate::State;

use arrow::array::{Array, Float64Builder, StringBuilder};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
Expand Down
161 changes: 151 additions & 10 deletions src/od/msr/data_arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,36 @@
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use super::{measurement::Measurement, MeasurementType};
use crate::io::watermark::pq_writer;
use crate::io::{ArrowSnafu, InputOutputError, MissingDataSnafu, ParquetSnafu, StdIOSnafu};
use crate::io::{EmptyDatasetSnafu, ExportCfg};
use arrow::array::{Array, Float64Builder, StringBuilder};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow::{
array::{Float64Array, PrimitiveArray, StringArray},
datatypes,
record_batch::RecordBatchReader,
};
use core::fmt;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs::File;
use std::path::Path;

use hifitime::{Duration, Epoch};
use hifitime::prelude::{Duration, Epoch};
use hifitime::TimeScale;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ArrowWriter;
use snafu::{ensure, ResultExt};

use crate::io::{ArrowSnafu, InputOutputError, MissingDataSnafu, ParquetSnafu, StdIOSnafu};

use super::{measurement::Measurement, MeasurementType};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::error::Error;
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::Arc;

/// Tracking data storing all of measurements as a B-Tree.
#[derive(Clone)]
pub struct TrackingDataArc {
/// All measurements in this data arc
pub measurements: BTreeMap<Epoch, Measurement>,
/// Source file if loaded from a file
/// Source file if loaded from a file or saved to a file.
pub source: Option<String>,
}

Expand Down Expand Up @@ -233,6 +240,128 @@ impl TrackingDataArc {
Some(min_sep)
}
}

/// Store this tracking arc to a parquet file.
pub fn to_parquet_simple<P: AsRef<Path> + fmt::Display>(
&self,
path: P,
) -> Result<PathBuf, Box<dyn Error>> {
self.to_parquet(path, ExportCfg::default())
}

/// Store this tracking arc to a parquet file, with optional metadata and a timestamp appended to the filename.
pub fn to_parquet<P: AsRef<Path> + fmt::Display>(
&self,
path: P,
cfg: ExportCfg,
) -> Result<PathBuf, Box<dyn Error>> {
ensure!(
!self.is_empty(),
EmptyDatasetSnafu {
action: "exporting tracking data arc"
}
);

let path_buf = cfg.actual_path(path);

if cfg.step.is_some() {
warn!("The `step` parameter in the export is not supported for tracking arcs.");
}

if cfg.fields.is_some() {
warn!("The `fields` parameter in the export is not supported for tracking arcs.");
}

// Build the schema
let mut hdrs = vec![
Field::new("Epoch (UTC)", DataType::Utf8, false),
Field::new("Tracking device", DataType::Utf8, false),
];

let msr_types = self.unique_types();
let mut msr_fields = msr_types
.iter()
.map(|msr_type| msr_type.to_field())
.collect::<Vec<Field>>();

hdrs.append(&mut msr_fields);

// Build the schema
let schema = Arc::new(Schema::new(hdrs));
let mut record: Vec<Arc<dyn Array>> = Vec::new();

// Build the measurement iterator

let measurements =
if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
let start = cfg
.start_epoch
.unwrap_or_else(|| self.start_epoch().unwrap());
let end = cfg.end_epoch.unwrap_or_else(|| self.end_epoch().unwrap());

info!("Exporting measurements from {start} to {end}.");

self.measurements
.range(start..end)
.map(|(k, v)| (*k, v.clone()))
.collect()
} else {
self.measurements.clone()
};

// Build all of the records

// Epochs
let mut utc_epoch = StringBuilder::new();
for epoch in measurements.keys() {
utc_epoch.append_value(epoch.to_time_scale(TimeScale::UTC).to_isoformat());
}
record.push(Arc::new(utc_epoch.finish()));

// Device names
let mut device_names = StringBuilder::new();
for m in measurements.values() {
device_names.append_value(m.tracker.clone());
}
record.push(Arc::new(device_names.finish()));

// Measurement data, column by column
for msr_type in msr_types {
let mut data_builder = Float64Builder::new();

for m in measurements.values() {
match m.data.get(&msr_type) {
Some(value) => data_builder.append_value(*value),
None => data_builder.append_null(),
};
}
record.push(Arc::new(data_builder.finish()));
}

// Serialize all of the devices and add that to the parquet file too.
let mut metadata = HashMap::new();
metadata.insert("Purpose".to_string(), "Tracking Arc Data".to_string());
if let Some(add_meta) = cfg.metadata {
for (k, v) in add_meta {
metadata.insert(k, v);
}
}

let props = pq_writer(Some(metadata));

let file = File::create(&path_buf)?;

let mut writer = ArrowWriter::try_new(file, schema.clone(), props).unwrap();

let batch = RecordBatch::try_new(schema, record)?;
writer.write(&batch)?;
writer.close()?;

info!("Serialized {self} to {}", path_buf.display());

// Return the path this was written to
Ok(path_buf)
}
}

impl fmt::Display for TrackingDataArc {
Expand All @@ -258,6 +387,18 @@ impl fmt::Display for TrackingDataArc {
}
}

impl fmt::Debug for TrackingDataArc {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self} @ {self:p}")
}
}

impl PartialEq for TrackingDataArc {
fn eq(&self, other: &Self) -> bool {
self.measurements == other.measurements
}
}

#[cfg(test)]
mod ut_tracker {
use super::TrackingDataArc;
Expand Down
19 changes: 18 additions & 1 deletion src/od/msr/measurement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use nalgebra::{allocator::Allocator, DefaultAllocator, DimName, OVector};
use std::collections::{HashMap, HashSet};

/// A type-agnostic simultaneous measurement storage structure. Allows storing any number of simultaneous measurement of a given taker.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub struct Measurement {
/// Tracker alias which made this measurement
pub tracker: String,
Expand All @@ -33,6 +33,23 @@ pub struct Measurement {
}

impl Measurement {
pub fn new(tracker: String, epoch: Epoch) -> Self {
Self {
tracker,
epoch,
data: HashMap::new(),
}
}

pub fn push(&mut self, msr_type: MeasurementType, msr_value: f64) {
self.data.insert(msr_type, msr_value);
}

pub fn with(mut self, msr_type: MeasurementType, msr_value: f64) -> Self {
self.push(msr_type, msr_value);
self
}

/// Builds an observation vector for this measurement provided a set of measurement types.
/// If the requested measurement type is not available, then that specific row is set to zero.
/// The caller must set the appropriate sensitivity matrix rows to zero.
Expand Down
1 change: 1 addition & 0 deletions src/od/msr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod sensitivity;
mod types;

pub use arc::TrackingArc;
pub use data_arc::TrackingDataArc;
pub use range::RangeMsr;
pub use range_doppler::RangeDoppler;
pub use rangerate::RangeRate;
Expand Down
18 changes: 18 additions & 0 deletions src/od/msr/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

use std::collections::HashMap;

use arrow::datatypes::{DataType, Field};

#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
pub enum MeasurementType {
Range,
Expand All @@ -30,4 +34,18 @@ impl MeasurementType {
MeasurementType::Doppler => "km/s",
}
}

/// Returns the fields for this kind of measurement. The metadata includes a `unit` field with the unit.
/// Column is nullable in case there is no such measurement at a given epoch.
pub fn to_field(&self) -> Field {
let mut meta = HashMap::new();
meta.insert("unit".to_string(), self.unit().to_string());

Field::new(
format!("{self:?} ({})", self.unit()),
DataType::Float64,
true,
)
.with_metadata(meta)
}
}
Loading

0 comments on commit e536886

Please sign in to comment.