Skip to content

Commit

Permalink
Merge pull request #71 from AISViz/aisdb-reworking
Browse files Browse the repository at this point in the history
Aisdb Reworking
  • Loading branch information
gabrielspadon authored Mar 9, 2024
2 parents 90d451e + 12eea3e commit 5a450a9
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 38 deletions.
15 changes: 10 additions & 5 deletions aisdb/database/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def fast_unzip(zip_filenames, dirname):
fcn(file)


def decode_msgs(filepaths, dbconn, source, vacuum=False, skip_checksum=False, raw_insertion=False, verbose=True):
def decode_msgs(filepaths, dbconn, source, vacuum=False, skip_checksum=False, type_preference="all", raw_insertion=False, verbose=True):
"""
Decode messages from filepaths and insert them into a database.
Expand All @@ -161,6 +161,7 @@ def decode_msgs(filepaths, dbconn, source, vacuum=False, skip_checksum=False, ra
:param source: source identifier for the decoded messages
:param vacuum: whether to vacuum the database after insertion (default is False)
:param skip_checksum: whether to skip checksum validation (default is False)
:param type_preference: preferred file type to be used (default is "all")
:param raw_insertion: whether to insert messages without indexing them (default is False)
:param verbose: whether to print verbose output (default is True)
:return: None
Expand Down Expand Up @@ -277,16 +278,20 @@ def decode_msgs(filepaths, dbconn, source, vacuum=False, skip_checksum=False, ra
for idx_name in ("mmsi", "time", "longitude", "latitude"):
dbconn.execute(f"DROP INDEX idx_{month}_dynamic_{idx_name};")
dbconn.commit()
completed_files = decoder(dbpath="", psql_conn_string=dbconn.connection_string,
files=raw_files, source=source, verbose=verbose, workers=4, allow_swap=False)
completed_files = decoder(dbpath="",
psql_conn_string=dbconn.connection_string,
files=raw_files, source=source, verbose=verbose,
workers=4, type_preference=type_preference, allow_swap=False)

elif isinstance(dbconn, SQLiteDBConn):
with open(os.path.join(sqlpath, "createtable_dynamic_clustered.sql"), "r") as f:
create_table_stmt = f.read()
for month in months:
dbconn.execute(create_table_stmt.format(month))
completed_files = decoder(dbpath=dbconn.dbpath, psql_conn_string="", files=raw_files,
source=source, verbose=verbose, workers=4, allow_swap=False)
completed_files = decoder(dbpath=dbconn.dbpath,
psql_conn_string="", files=raw_files,
source=source, verbose=verbose, workers=4,
type_preference=type_preference, allow_swap=False)
else:
assert False

Expand Down
8 changes: 4 additions & 4 deletions aisdb_lib/src/csvreader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub fn sqlite_decodemsgs_ee_csv(
own_vessel: true,
station: Station::BaseStation,
ais_type: AisClass::Unknown,
mmsi: row.get(0).unwrap().parse::<u32>().expect("MMSI should be an integer"),
mmsi: row.get(0).unwrap().parse::<u32>().ok().expect("Requires an Integer."),
nav_status: NavigationStatus::NotDefined,
rot: row.get(25).unwrap().parse::<f64>().ok(),
rot_direction: None,
Expand Down Expand Up @@ -122,7 +122,7 @@ pub fn sqlite_decodemsgs_ee_csv(
let payload = VesselStaticData {
own_vessel: true,
ais_type: AisClass::Unknown,
mmsi: row.get(0).unwrap().parse().unwrap(),
mmsi: row.get(0).unwrap().parse::<u32>().ok().expect("Requires an Integer."),
ais_version_indicator: row.get(23).unwrap().parse().unwrap_or_default(),
imo_number: row.get(15).unwrap().parse().ok(),
call_sign: row.get(14).unwrap().parse().ok(),
Expand Down Expand Up @@ -227,7 +227,7 @@ pub fn postgres_decodemsgs_ee_csv(
own_vessel: true,
station: Station::BaseStation,
ais_type: AisClass::Unknown,
mmsi: row.get(0).unwrap().parse::<u32>().expect("MMSI should be an integer"),
mmsi: row.get(0).unwrap().parse::<u32>().ok().expect("Requires an Integer."),
nav_status: NavigationStatus::NotDefined,
rot: row.get(25).unwrap().parse::<f64>().ok(),
rot_direction: None,
Expand Down Expand Up @@ -260,7 +260,7 @@ pub fn postgres_decodemsgs_ee_csv(
let payload = VesselStaticData {
own_vessel: true,
ais_type: AisClass::Unknown,
mmsi: row.get(0).unwrap().parse().unwrap(),
mmsi: row.get(0).unwrap().parse::<u32>().ok().expect("Requires an Integer."),
ais_version_indicator: row.get(23).unwrap().parse().unwrap_or_default(),
imo_number: row.get(15).unwrap().parse().ok(),
call_sign: row.get(14).unwrap().parse().ok(),
Expand Down
69 changes: 40 additions & 29 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub fn decoder(
verbose: bool,
workers: u64,
allow_swap: bool,
type_preference: String,
py: Python,
) -> Vec<PathBuf> {
// tuples containing (dbpath, filepath)
Expand Down Expand Up @@ -182,44 +183,54 @@ pub fn decoder(
println!("processing {}", f.display());
}

let extension = f.extension().and_then(std::ffi::OsStr::to_str).map(String::from);
let process_file = match (extension.as_deref(), type_preference.as_str()) {
(Some(ext), "csv") if ext.eq_ignore_ascii_case("csv") => true,
(Some(ext), "other") if !ext.eq_ignore_ascii_case("csv") => true,
(_, "all") => true,
_ => false,
};

match f.extension() {
Some(ext_os_str) => match ext_os_str.to_str() {
Some("nm4") | Some("NM4") | Some("nmea") | Some("NMEA") | Some("rx")
| Some("txt") | Some("RX") | Some("TXT") => {
if !dbpath.to_str().unwrap().is_empty() {
parser = sqlite_decode_insert_msgs(
d.to_path_buf(),
f.clone(),
&source,
parser,
verbose,
)
.expect("decoding NM4");
update_done_files(&mut completed, &mut errored, Ok(f.clone()));
}
if !psql_conn_string.is_empty() {
let sender = sender.clone();
let future = async move {
let parser = NmeaParser::new();
match postgres_decode_insert_msgs(
&psql_conn_string,
if !process_file {
if !dbpath.to_str().unwrap().is_empty() {
parser = sqlite_decode_insert_msgs(
d.to_path_buf(),
f.clone(),
&source,
parser,
verbose,
) {
Err(_) => {
sender
.send(Err(f))
.expect("sending errored filepath from worker");
}
Ok(_) => sender
.send(Ok(f))
.expect("sending completed filepath from worker"),
)
.expect("decoding NM4");
update_done_files(&mut completed, &mut errored, Ok(f.clone()));
}
if !psql_conn_string.is_empty() {
let sender = sender.clone();
let future = async move {
let parser = NmeaParser::new();
match postgres_decode_insert_msgs(
&psql_conn_string,
f.clone(),
&source,
parser,
verbose,
) {
Err(_) => {
sender
.send(Err(f))
.expect("sending errored filepath from worker");
}
Ok(_) => sender
.send(Ok(f))
.expect("sending completed filepath from worker"),
};
};
};
pool.spawn_ok(future);
in_process += 1;
pool.spawn_ok(future);
in_process += 1;
}
}
}
Some("csv") | Some("CSV") => {
Expand Down

0 comments on commit 5a450a9

Please sign in to comment.