From d3b2394486ad98685adeaac8e258ae9e0ad464eb Mon Sep 17 00:00:00 2001 From: Per Halvorsen Date: Sun, 29 Sep 2024 22:51:40 +0200 Subject: [PATCH 01/14] start postprocess labels stage --- data/output.txt | 3 ++ src/pipeline.py | 36 +++++++++++------------- src/stages/postprocess.py | 59 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 19 deletions(-) create mode 100644 data/output.txt create mode 100644 src/stages/postprocess.py diff --git a/data/output.txt b/data/output.txt new file mode 100644 index 0000000..de54fc9 --- /dev/null +++ b/data/output.txt @@ -0,0 +1,3 @@ +{'encounter_id': '11486', 'latitude': 36.91, 'longitude': -122.02, 'displayImgUrl': 'https://au-hw-media-m.happywhale.com/c5522187-058e-4a1a-83d7-893560ba6b2c.jpg', 'audio': array([], dtype=float32), 'start': Timestamp('2016-12-21 00:20:30'), 'end': Timestamp('2016-12-21 00:21:30'), 'classifications': []} +{'encounter_id': '9182', 'latitude': 36.91, 'longitude': -122.02, 'displayImgUrl': 'https://au-hw-media-m.happywhale.com/d40b9e6e-07cf-4f20-8cb4-4042ba22a00b.jpg', 'audio': array([-0.00352275, -0.00346267, -0.00334585, ..., -0.00339496, + -0.00333035, -0.00329852], dtype=float32), 'start': Timestamp('2016-12-21 00:49:30'), 'end': Timestamp('2016-12-21 00:50:30'), 'classifications': [[0.8753612041473389], [0.746759295463562], [0.26265254616737366], [0.45787951350212097], [0.35406064987182617], [0.42348742485046387], [0.4947870969772339], [0.7287474274635315], [0.7099379897117615], [0.2122703194618225], [0.044488538056612015], [0.00849922839552164], [0.024390267208218575], [0.33750119805336], [0.6530888080596924], [0.3057247996330261], [0.1243574470281601], [0.027093390002846718], [0.011367958970367908], [0.004032353404909372], [0.026372192427515984], [0.021978065371513367], [0.006407670211046934], [0.5405446887016296], [0.34207114577293396], [0.6080849766731262], [0.5394770503044128], [0.3662146031856537], [0.16772609949111938], [0.3641503155231476], [0.060217034071683884], [0.008764371275901794], [0.012523961253464222], [0.009186000563204288], [0.022050702944397926], [0.3908870816230774], [0.15179167687892914], [0.3454047441482544], [0.4770602285861969], [0.07589100301265717], [0.5439115166664124], [0.8634722232818604], [0.985602617263794], [0.3311924636363983], [0.8832067847251892], [0.6166273951530457], [0.42301759123802185], [0.03573732450604439], [0.09752023965120316], [0.01426385436207056], [0.022987568750977516], [0.012294118292629719], [0.010207954794168472], [0.00296270614489913]]} diff --git a/src/pipeline.py b/src/pipeline.py index d93aa78..51b8f2b 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -5,6 +5,8 @@ from stages.audio import RetrieveAudio, WriteAudio, WriteSiftedAudio from stages.sift import Butterworth from stages.classify import WhaleClassifier, WriteClassifications +from stages.postprocess import PostprocessLabels + from config import load_pipeline_config config = load_pipeline_config() @@ -19,29 +21,25 @@ def run(): } with beam.Pipeline(options=pipeline_options) as p: - input_data = p | "Create Input" >> beam.Create([args]) - search_output = input_data | "Run Geometry Search" >> beam.ParDo(GeometrySearch()) - - audio_output = search_output | "Retrieve Audio" >> beam.ParDo(RetrieveAudio()) - audio_output | "Store Audio (temp)" >> beam.ParDo(WriteAudio()) - - sifted_audio = audio_output | "Sift Audio" >> Butterworth() - sifted_audio | "Store Sifted Audio" >> beam.ParDo(WriteSiftedAudio("butterworth")) - - classifications = sifted_audio | "Classify Audio" >> WhaleClassifier(config) - classifications | "Store Classifications" >> beam.ParDo(WriteClassifications(config)) - - - # # Post-process the labels - # postprocessed_labels = classified_audio | "Postprocess Labels" >> PostprocessLabels() + input_data = p | "Create Input" >> beam.Create([args]) + search_output = input_data | "Run Geometry Search" >> beam.ParDo(GeometrySearch()) + audio_output = search_output | "Retrieve Audio" >> beam.ParDo(RetrieveAudio()) + sifted_audio = audio_output | "Sift Audio" >> Butterworth() + classifications = sifted_audio | "Classify Audio" >> WhaleClassifier(config) + postprocess_labels = classifications | "Postprocess Labels" >> beam.ParDo( + PostprocessLabels(config), + search_output=beam.pvalue.AsSingleton(search_output), + ) + + # Store results + audio_output | "Store Audio (temp)" >> beam.ParDo(WriteAudio()) + sifted_audio | "Store Sifted Audio" >> beam.ParDo(WriteSiftedAudio("butterworth")) + classifications | "Store Classifications" >> beam.ParDo(WriteClassifications(config)) + postprocess_labels | "Write Results" >> beam.io.WriteToText("data/output.txt", shard_name_template="") # Output results # postprocessed_labels | "Write Results" >> beam.io.WriteToText("output.txt") - # For debugging, you can write the output to a text file - # audio_files | "Write Audio Output" >> beam.io.WriteToText('audio_files.txt') - # search_results | "Write Search Output" >> beam.io.WriteToText('search_results.txt') - if __name__ == "__main__": run() diff --git a/src/stages/postprocess.py b/src/stages/postprocess.py new file mode 100644 index 0000000..7a1bb63 --- /dev/null +++ b/src/stages/postprocess.py @@ -0,0 +1,59 @@ +import apache_beam as beam + +from datetime import datetime +from typing import Dict, Any, Tuple +from types import SimpleNamespace +from matplotlib import gridspec + +import librosa +import logging +import numpy as np +import os +import time +import pandas as pd + +import requests +import math +import matplotlib.pyplot as plt +import scipy.signal + + +class PostprocessLabels(beam.DoFn): + def __init__(self, config: SimpleNamespace): + self.config = config + + self.search_output_path_template = config.search.export_template + self.sifted_audio_path_template = config.sift.output_path_template + self.classification_path = config.classify.classification_path + + + def process(self, element: Dict[str, Any], search_output: Dict[str, Any]): + logging.info(f"element \n{element}") + logging.info(f"search_output \n{search_output}") + breakpoint() + + classifications_df = pd.DataFrame([element], columns=["audio", "start", "end", "encounter_ids", "classifications"]) + classifications_df = classifications_df.explode("encounter_ids").rename(columns={"encounter_ids": "encounter_id"}) + classifications_df["encounter_id"] = classifications_df["encounter_id"].astype(str) + + # TODO pool classifications + + + search_output = search_output.rename(columns={"id": "encounter_id"}) + search_output["encounter_id"] = search_output["encounter_id"].astype(str) # TODO do in one line + search_output = search_output[[ + # TODO refactor to confing + "encounter_id", + "latitude", + "longitude", + "displayImgUrl", + # "species", # TODO add in geo search stage (require rm local file) + ]] + + # join dataframes + joined_df = pd.merge(search_output, classifications_df, how="inner", on="encounter_id") + + logging.info(f"Final output: \n{joined_df.head()}") + + + return joined_df.to_dict(orient="records") From ebba9290b32802cc811496da62e2e8def9370a95 Mon Sep 17 00:00:00 2001 From: Per Halvorsen Date: Mon, 30 Sep 2024 23:05:48 +0200 Subject: [PATCH 02/14] begin discussion on saving output --- .../LADR_0005_persist_intermediate_outputs.md | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 docs/ladr/LADR_0005_persist_intermediate_outputs.md diff --git a/docs/ladr/LADR_0005_persist_intermediate_outputs.md b/docs/ladr/LADR_0005_persist_intermediate_outputs.md new file mode 100644 index 0000000..96bdb4a --- /dev/null +++ b/docs/ladr/LADR_0005_persist_intermediate_outputs.md @@ -0,0 +1,66 @@ +# Persisting intermediate stage outputs + +This doc discusses storing outputs from each stage or keeping stateless until the end, and where to do each in our pipeline. + +Some stages (like geo-search) require storing outputs, since the (unaltered) Happywhale API currently writes found encounters to file, and does not return a df. + +Other stages like audio retrival may make sense to keep stateless to avoid storage costs on our end. +For debugging purposes, storing these data makes sense, since it speeds up iteration time. +But a productionized pipeline might only run once per geofile-date, and not need to store intermediate outputs. + +Another point to consider is that data-usage agreements with the audio proivders. +The audio from MBARI is ok to download, but what happens when we scale to include other hydrophone sources? + + +Some questions to consider: +- Do we want to store outputs from any intermediate stage, or just the last one? Ex. start/stop times of sifted audio, full classification arrays or pooled results. + +- How to handle overwrites or parallel writes? Parallel writes should never occur, since we find overlapping encounter ids, and group them together. Overwrites could occur if stage does not check if data exists for stage before writing. + +- Do we have a true key throughout our entire dataflow? Do we need one? After geo-search, we could consider a concatenation of start, end, and encounter_id as key, though this might be misleading, if sifted audio changes the start and end times. + + + + +Interesting blog post discussing some of these issues, comparing stateful DoFns to CombineFn: https://beam.apache.org/blog/stateful-processing/ + +## Stages +For every stage, I'll discussion the necessity of storing outputs, and if so, what outputs to store. + +### 1. Geo-search +We are forced to store the outputs for this stage. +Maybe outputs should be written to a database to avoid duplicates, but is this overengineering? + +What was the problem with proivding a ByteIO object as the export file? That could be used to convert the data to a df, and then distribute how we see fit, instead of loading the data from a file and passing onward into the pipeline. + + +### 2. Audio retrieval +We should likely not store the outputs for this stage. +The data is open source and can be retrieved at any time, only costs to download. +The main argument for storing here would be if download costs were significantly higher than storage. +For now, we assume that this pipeline will be run on new servers often, mostly once per date, meaning storing audio does is not worth it. + + +### 3. Audio sifting +How much audio sift data should be persisted? Full audio arrays with start, stop times and encounter ids? +Or just the start, stop times and encounter ids, assuming the audio will be downloaded and passed from the previous stage? + +The main argument for storing the full audio arrays is that it speeds up iteration time, and allows for easier debugging. +We likely also want this data easily accessible, if our final outputs are going to contain the audio snippets with classifications. +That's kinda the whole point of this pipeline, so _some_ audio will eventually need to be be stored. +And its likely at this stage we will want to store it, since this audio is truncated. + + +### 4. Classification +After the audio has been fed through the model, we'll get an array shorter than the length of the audio array, but linearally scaled to the audio. So larger context windows will eventually start producing very large classification arrays. Are all of these data necessary to save, or would a pooled score be best? It depends on the use-case ;) + +We could alternatively cut the audio to only the parts where a min and max classification above a threshold is found. +This would eliminate any real dependency on audio sifting (in case that stage turns out to not be needed later). +And This would serve as the best waste-reduction strategy, since we would only store the audio that we are confident contains a whale call. + + +### 5. Postprocessings (pooling and labelling) +The final stage definitely needs to be store, but the main discussion here becomes, what to store? +If we already have stored intermediate stages like sifted audio or truncated classified audio, we could avoid saving them again, and rather load from those tables when presenting aggregated results. + +Though, I like the idea of the last stage of the pipeline containing all the data necessary found through the pipeline. This makes data sharing easier, with a concrete final product, instead of a bunch of fragmentated tables that need to be joined to have any true value. \ No newline at end of file From 37535f1413179453c4d70b682312c97816a1092d Mon Sep 17 00:00:00 2001 From: Per Halvorsen Date: Tue, 1 Oct 2024 20:16:31 +0200 Subject: [PATCH 03/14] only run todo to issue on merge to main --- .github/workflows/todo.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/todo.yml b/.github/workflows/todo.yml index af970b4..af2e465 100644 --- a/.github/workflows/todo.yml +++ b/.github/workflows/todo.yml @@ -1,6 +1,8 @@ name: "Run TODO to Issue" on: push: + branches: + - "master" workflow_dispatch: inputs: MANUAL_COMMIT_REF: From 7e02333784926a70a4ae2cae3e924ad3f795bd46 Mon Sep 17 00:00:00 2001 From: Per Halvorsen Date: Tue, 1 Oct 2024 23:07:27 +0200 Subject: [PATCH 04/14] start integration w/ bigquery (broken on non picklable client) --- makefile | 4 ++ src/config/common.yaml | 5 +++ src/create_table.py | 74 +++++++++++++++++++++++++++++++++++ src/stages/postprocess.py | 81 +++++++++++++++++++++++++++++++-------- 4 files changed, 148 insertions(+), 16 deletions(-) create mode 100644 src/create_table.py diff --git a/makefile b/makefile index beca38d..d7de1a3 100644 --- a/makefile +++ b/makefile @@ -1,5 +1,6 @@ local-run: bash scripts/kill_model_server.sh + python3 src/create_table.py python3 src/model_server.py & python3 src/pipeline.py bash scripts/kill_model_server.sh @@ -11,3 +12,6 @@ model-server: kill-model-server: bash scripts/kill_model_server.sh + +create-table: + python3 src/create_table.py \ No newline at end of file diff --git a/src/config/common.yaml b/src/config/common.yaml index 309cb01..94408d9 100644 --- a/src/config/common.yaml +++ b/src/config/common.yaml @@ -4,6 +4,10 @@ pipeline: debug: true show_plots: false + # gcp + project: "bioacoustic-2024" + dataset_id: "whale_speech" + input: start: "2016-12-21T00:30:00" end: "2016-12-21T00:45:00" @@ -66,6 +70,7 @@ pipeline: pooling: "average" confidence_threshold: 0.5 output_path_template: "data/labels/{year}/{month:02}/{day:02}.csv" + postprocess_table_id: "mapped_audio" diff --git a/src/create_table.py b/src/create_table.py new file mode 100644 index 0000000..d814415 --- /dev/null +++ b/src/create_table.py @@ -0,0 +1,74 @@ +from google.cloud import bigquery +from google.api_core.exceptions import Conflict + +from config import load_pipeline_config + +config = load_pipeline_config() + +client = bigquery.Client() + + +# Define the table schema +schema = [ + bigquery.SchemaField("key", "STRING"), + bigquery.SchemaField("audio", "FLOAT64", mode="REPEATED"), # 'REPEATED' for arrays + bigquery.SchemaField("pooled_score", "FLOAT64"), + bigquery.SchemaField("encounter_ids", "STRING", mode="REPEATED"), + bigquery.SchemaField("encounter_img_urls", "STRING", mode="REPEATED"), + bigquery.SchemaField("longitude", "FLOAT64"), + bigquery.SchemaField("latitude", "FLOAT64"), + bigquery.SchemaField("start", "TIMESTAMP"), + bigquery.SchemaField("end", "TIMESTAMP"), +] + + +# Create a dataset +def create_dataset(dataset_id): + try: + dataset_path = f"{client.project}.{dataset_id}" + dataset = bigquery.Dataset(dataset_path) + dataset.location = "US" + dataset = client.create_dataset(dataset, timeout=30) + print(f"Created dataset {client.project}.{dataset.dataset_id}") + except Conflict as e: + if "Already Exists" in str(e): + dataset = client.get_dataset(dataset_id) + print(f"Dataset {client.project}.{dataset.dataset_id} already exists. Continuing.") + else: + raise e + + return dataset + + +# Create a table +def create_table(dataset_id, table_id, schema=schema): + try: + table_path = f"{client.project}.{dataset_id}.{table_id}" + table = bigquery.Table(table_path, schema=schema) + table = client.create_table(table) + print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}") + except Conflict as e: + if "Already Exists" in str(e): + table = client.get_table(table_path) + print(f"Table {table.project}.{table.dataset_id}.{table.table_id} already exists. Continuing.") + else: + raise e + + +def run(args): + dataset = create_dataset(args.dataset_id) + table = create_table(dataset.dataset_id, args.table_id) + return table + + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("--dataset_id", type=str, help="BigQuery dataset ID", default=config.general.dataset_id) + parser.add_argument("--table_id", type=str, help="BigQuery table ID", default=config.postprocess.postprocess_table_id) + + args = parser.parse_args() + + run(args) diff --git a/src/stages/postprocess.py b/src/stages/postprocess.py index 885e7bb..f10047d 100644 --- a/src/stages/postprocess.py +++ b/src/stages/postprocess.py @@ -1,21 +1,13 @@ import apache_beam as beam - -from datetime import datetime -from typing import Dict, Any, Tuple -from types import SimpleNamespace -from matplotlib import gridspec - -import librosa import logging import numpy as np -import os -import time import pandas as pd -import requests -import math -import matplotlib.pyplot as plt -import scipy.signal +# from google.cloud import bigquery +from apache_beam.io.gcp.internal.clients import bigquery + +from typing import Dict, Any, Tuple +from types import SimpleNamespace class PostprocessLabels(beam.DoFn): @@ -26,19 +18,28 @@ def __init__(self, config: SimpleNamespace): self.sifted_audio_path_template = config.sift.output_path_template self.classification_path = config.classify.classification_path + self.pooling = config.postprocess.pooling + self.project = config.general.project + self.dataset_id = config.general.dataset_id + self.table_id = config.postprocess.postprocess_table_id + + self.client = bigquery.Client() # requires creatials set up in env + self.table_ref = f"{self.project}.{self.dataset_id}.{self.table_id}" + def process(self, element: Dict[str, Any], search_output: Dict[str, Any]): logging.info(f"element \n{element}") logging.info(f"search_output \n{search_output}") - breakpoint() + # convert element to dataframe classifications_df = pd.DataFrame([element], columns=["audio", "start", "end", "encounter_ids", "classifications"]) classifications_df = classifications_df.explode("encounter_ids").rename(columns={"encounter_ids": "encounter_id"}) classifications_df["encounter_id"] = classifications_df["encounter_id"].astype(str) - # TODO pool classifications in postprocessing - + # pool classifications in postprocessing + classifications_df["pooled_score"] = classifications_df["classifications"].apply(self._pool_classifications) + # convert search_output to dataframe search_output = search_output.rename(columns={"id": "encounter_id"}) search_output["encounter_id"] = search_output["encounter_id"].astype(str) # TODO do in one line search_output = search_output[[ @@ -55,5 +56,53 @@ def process(self, element: Dict[str, Any], search_output: Dict[str, Any]): logging.info(f"Final output: \n{joined_df.head()}") + # write to BigQuery + # self._write_to_bigquery(joined_df) return joined_df.to_dict(orient="records") + + def _build_classification_df(self, element: Tuple) -> pd.DataFrame: + # convert element to dataframe + classifications_df = pd.DataFrame([element], columns=["audio", "start", "end", "encounter_ids", "classifications"]) + classifications_df = classifications_df.explode("encounter_ids").rename(columns={"encounter_ids": "encounter_id"}) + classifications_df["encounter_id"] = classifications_df["encounter_id"].astype(str) + + # convert audio arrays to list(floats) + classifications_df["audio"] = classifications_df["audio"].apply(lambda x: x.tolist()) + + + # pool classifications in postprocessing + # TODO check that shape (n,1) is handled correctly + classifications_df["pooled_score"] = classifications_df["classifications"].apply(self._pool_classifications) + logging.info(f"Classifications: \n{classifications_df.head()}") + logging.info(f"Classifications shape: {classifications_df.shape}") + + + return classifications_df + + + def _pool_classifications(self, classifications: np.array) -> Dict[str, Any]: + if self.pooling == "mean" or self.pooling == "avg" or self.pooling == "average": + pooled_score = np.mean(classifications) + elif self.pooling == "max": + pooled_score = np.max(classifications) + elif self.pooling == "min": + pooled_score = np.min(classifications) + else: + raise ValueError(f"Pooling method {self.pooling} not supported.") + + return pooled_score + + + def _write_to_bigquery(self, df: pd.DataFrame): + + for row in df.to_dict(orient="records"): + self._insert_row(row) + logging.debug(f"Inserted row {row} to BigQuery table {self.table_ref}") + + + def _insert_row(self, row: Dict[str, Any]): + # Insert data into BigQuery + errors = self.client.insert_rows_json(self.table_ref, [row]) + if errors: + raise Exception(f"Error inserting rows: {errors}") From 2866d30d035f01210097ddf8b29cf61cb507e12d Mon Sep 17 00:00:00 2001 From: Per Halvorsen Date: Thu, 3 Oct 2024 21:59:54 +0200 Subject: [PATCH 05/14] example write to bigquery from beam --- examples/write_to_bigquery.py | 40 +++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 examples/write_to_bigquery.py diff --git a/examples/write_to_bigquery.py b/examples/write_to_bigquery.py new file mode 100644 index 0000000..4e152eb --- /dev/null +++ b/examples/write_to_bigquery.py @@ -0,0 +1,40 @@ + +from apache_beam.io.gcp.internal.clients import bigquery +import apache_beam as beam + + +# project-id:dataset_id.table_id +table_spec = 'bioacoustics-2024.whale_speech.sample_quotes' + + +table_schema = { + 'fields': [{ + 'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE' + }, { + 'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED' + }] +} + +# Create a pipeline +temp_location = "gs://bioacoustics/whale-speech/temp" + +with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions()) as pipeline: + + quotes = pipeline | beam.Create([ + { + 'source': 'Mahatma Gandhi', 'quote': 'My life is my message.' + }, + { + 'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'." + }, + ]) + quotes | beam.io.WriteToBigQuery( + table_spec, + schema=table_schema, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + custom_gcs_temp_location=temp_location, + method=beam.io.WriteToBigQuery.Method.FILE_LOADS + ) + +print("Completed writing to BigQuery") \ No newline at end of file From d31efb9e3196886d9eba16a5838b09420b16f64d Mon Sep 17 00:00:00 2001 From: Per Halvorsen Date: Thu, 3 Oct 2024 22:01:52 +0200 Subject: [PATCH 06/14] messy working impl of write to big query --- data/output.txt | 3 - src/config/common.yaml | 2 +- src/model_server.py | 4 +- src/pipeline.py | 51 +++++++++++- src/stages/postprocess.py | 162 ++++++++++++++++++++++++++++---------- 5 files changed, 172 insertions(+), 50 deletions(-) delete mode 100644 data/output.txt diff --git a/data/output.txt b/data/output.txt deleted file mode 100644 index de54fc9..0000000 --- a/data/output.txt +++ /dev/null @@ -1,3 +0,0 @@ -{'encounter_id': '11486', 'latitude': 36.91, 'longitude': -122.02, 'displayImgUrl': 'https://au-hw-media-m.happywhale.com/c5522187-058e-4a1a-83d7-893560ba6b2c.jpg', 'audio': array([], dtype=float32), 'start': Timestamp('2016-12-21 00:20:30'), 'end': Timestamp('2016-12-21 00:21:30'), 'classifications': []} -{'encounter_id': '9182', 'latitude': 36.91, 'longitude': -122.02, 'displayImgUrl': 'https://au-hw-media-m.happywhale.com/d40b9e6e-07cf-4f20-8cb4-4042ba22a00b.jpg', 'audio': array([-0.00352275, -0.00346267, -0.00334585, ..., -0.00339496, - -0.00333035, -0.00329852], dtype=float32), 'start': Timestamp('2016-12-21 00:49:30'), 'end': Timestamp('2016-12-21 00:50:30'), 'classifications': [[0.8753612041473389], [0.746759295463562], [0.26265254616737366], [0.45787951350212097], [0.35406064987182617], [0.42348742485046387], [0.4947870969772339], [0.7287474274635315], [0.7099379897117615], [0.2122703194618225], [0.044488538056612015], [0.00849922839552164], [0.024390267208218575], [0.33750119805336], [0.6530888080596924], [0.3057247996330261], [0.1243574470281601], [0.027093390002846718], [0.011367958970367908], [0.004032353404909372], [0.026372192427515984], [0.021978065371513367], [0.006407670211046934], [0.5405446887016296], [0.34207114577293396], [0.6080849766731262], [0.5394770503044128], [0.3662146031856537], [0.16772609949111938], [0.3641503155231476], [0.060217034071683884], [0.008764371275901794], [0.012523961253464222], [0.009186000563204288], [0.022050702944397926], [0.3908870816230774], [0.15179167687892914], [0.3454047441482544], [0.4770602285861969], [0.07589100301265717], [0.5439115166664124], [0.8634722232818604], [0.985602617263794], [0.3311924636363983], [0.8832067847251892], [0.6166273951530457], [0.42301759123802185], [0.03573732450604439], [0.09752023965120316], [0.01426385436207056], [0.022987568750977516], [0.012294118292629719], [0.010207954794168472], [0.00296270614489913]]} diff --git a/src/config/common.yaml b/src/config/common.yaml index 94408d9..07ae562 100644 --- a/src/config/common.yaml +++ b/src/config/common.yaml @@ -5,7 +5,7 @@ pipeline: show_plots: false # gcp - project: "bioacoustic-2024" + project: "bioacoustics-2024" dataset_id: "whale_speech" input: diff --git a/src/model_server.py b/src/model_server.py index efd06fb..23cd6e6 100644 --- a/src/model_server.py +++ b/src/model_server.py @@ -12,8 +12,8 @@ # Load the TensorFlow model logging.info("Loading model...") -# model = hub.load("https://www.kaggle.com/models/google/humpback-whale/TensorFlow2/humpback-whale/1") -model = hub.load("https://tfhub.dev/google/humpback_whale/1") +model = hub.load("https://www.kaggle.com/models/google/humpback-whale/TensorFlow2/humpback-whale/1") +# model = hub.load("https://tfhub.dev/google/humpback_whale/1") score_fn = model.signatures["score"] logging.info("Model loaded.") diff --git a/src/pipeline.py b/src/pipeline.py index 51b8f2b..e73c736 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -7,18 +7,55 @@ from stages.classify import WhaleClassifier, WriteClassifications from stages.postprocess import PostprocessLabels +from apache_beam.io.gcp.internal.clients import bigquery + from config import load_pipeline_config config = load_pipeline_config() def run(): # Initialize pipeline options - pipeline_options = PipelineOptions() + pipeline_options = PipelineOptions( + # runner="DataflowRunner", + project="bioacoustics-2024", + temp_location="gs://bioacoustics/whale-speech/temp", + # region=config.general.region, + # job_name=config.general.job_name, + # temp_location=config.general.temp_location, + # setup_file="./setup.py" + ) pipeline_options.view_as(SetupOptions).save_main_session = True args = { "start": config.input.start, "end": config.input.end } + schema = { + "fields" : [ + # {'name': 'key', 'type': 'STRING', 'mode': 'REQUIRED'}, + {'name': 'classifications', 'type': 'FLOAT64', 'mode': 'REPEATED'}, + {'name': 'pooled_score', 'type': 'FLOAT64', 'mode': 'REQUIRED'}, + {'name': 'encounter_id', 'type': 'STRING', 'mode': 'REQUIRED'}, + {'name': 'displayImgUrl', 'type': 'STRING', 'mode': 'REQUIRED'}, + {'name': 'longitude', 'type': 'FLOAT64', 'mode': 'REQUIRED'}, + {'name': 'latitude', 'type': 'FLOAT64', 'mode': 'REQUIRED'}, + {'name': 'start', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'}, + {'name': 'end', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'} + ] + } + # table_ref = "{project_id}:{dataset_id}.{table_id}".format( + # project_id=config.general.project, + # dataset_id=config.general.dataset_id, + # table_id=config.postprocess.postprocess_table_id + # ) + # table_spec = bigquery.TableReference( + # projectId=config.general.project, + # datasetId=config.general.dataset_id, + # tableId=config.postprocess.postprocess_table_id + # ) + table_spec = f"bioacoustics-2024:whale_sppech.mapped_audio" + + print(f"Writing to table: {table_spec}") + print(f"PipelineOptions: {pipeline_options}") with beam.Pipeline(options=pipeline_options) as p: input_data = p | "Create Input" >> beam.Create([args]) @@ -26,16 +63,26 @@ def run(): audio_output = search_output | "Retrieve Audio" >> beam.ParDo(RetrieveAudio()) sifted_audio = audio_output | "Sift Audio" >> Butterworth() classifications = sifted_audio | "Classify Audio" >> WhaleClassifier(config) + # postprocess_labels = classifications | "Postprocess Labels" >> PostprocessLabels(config, search_output) postprocess_labels = classifications | "Postprocess Labels" >> beam.ParDo( PostprocessLabels(config), search_output=beam.pvalue.AsSingleton(search_output), ) + postprocess_labels | "Write to BigQuery" >> beam.io.WriteToBigQuery( + "mapped_audio", + dataset=config.general.dataset_id, + project=config.general.project, + schema=schema, + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + method=beam.io.WriteToBigQuery.Method.FILE_LOADS + ) # Store results audio_output | "Store Audio (temp)" >> beam.ParDo(WriteAudio()) sifted_audio | "Store Sifted Audio" >> beam.ParDo(WriteSiftedAudio("butterworth")) classifications | "Store Classifications" >> beam.ParDo(WriteClassifications(config)) - postprocess_labels | "Write Results" >> beam.io.WriteToText("data/output.txt", shard_name_template="") + # postprocess_labels | "Write Results" >> beam.io.WriteToText("data/output.txt", shard_name_template="") # Output results # postprocessed_labels | "Write Results" >> beam.io.WriteToText("output.txt") diff --git a/src/stages/postprocess.py b/src/stages/postprocess.py index f10047d..7b8839b 100644 --- a/src/stages/postprocess.py +++ b/src/stages/postprocess.py @@ -10,6 +10,7 @@ from types import SimpleNamespace +# class PostprocessLabels(beam.PTransform): class PostprocessLabels(beam.DoFn): def __init__(self, config: SimpleNamespace): self.config = config @@ -23,43 +24,106 @@ def __init__(self, config: SimpleNamespace): self.dataset_id = config.general.dataset_id self.table_id = config.postprocess.postprocess_table_id - self.client = bigquery.Client() # requires creatials set up in env - self.table_ref = f"{self.project}.{self.dataset_id}.{self.table_id}" + self._init_big_query_writer(config) - def process(self, element: Dict[str, Any], search_output: Dict[str, Any]): - logging.info(f"element \n{element}") - logging.info(f"search_output \n{search_output}") + def _init_big_query_writer(self, config: SimpleNamespace): + self.table_spec = bigquery.TableReference( + projectId=self.project, + datasetId=self.dataset_id, + tableId=self.table_id + ) - # convert element to dataframe - classifications_df = pd.DataFrame([element], columns=["audio", "start", "end", "encounter_ids", "classifications"]) - classifications_df = classifications_df.explode("encounter_ids").rename(columns={"encounter_ids": "encounter_id"}) - classifications_df["encounter_id"] = classifications_df["encounter_id"].astype(str) + + + def process(self, element, search_output): + joined_df = self._build_dfs(element, search_output) + + for row in joined_df.to_dict(orient="records"): + if row["classifications"] == []: + logging.info(f"Skipping row with no classification: {row.keys()}") + logging.info(f"Row: {row}") + continue + + logging.info(f"Writing row to BigQuery: {row.keys()} \n{row}") + yield row + + + # def expand(self, pcoll, search_output): + # return ( + # pcoll + # | "Process" >> beam.ParDo(self._build_dfs, search_output) + # | "Write to BigQuery" >> beam.io.WriteToBigQuery( + # self.table_spec, + # schema=self.schema, + # write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + # create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED + # ) + # ) + + + # for row in self._build_dfs(element, search_output): + # row | beam.io.WriteToBigQuery( + # self.table_spec, + # schema=self.schema, + # write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + # create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED + # ) + + + # def _process(self, element: Dict[str, Any], search_output: Dict[str, Any]): + # # TODO remove _process (replaced by expand) + # logging.info(f"element \n{element}") + # logging.info(f"search_output \n{search_output}") + + # # convert element to dataframe + # classifications_df = pd.DataFrame([element], columns=["audio", "start", "end", "encounter_ids", "classifications"]) + # classifications_df = classifications_df.explode("encounter_ids").rename(columns={"encounter_ids": "encounter_id"}) + # classifications_df["encounter_id"] = classifications_df["encounter_id"].astype(str) - # pool classifications in postprocessing - classifications_df["pooled_score"] = classifications_df["classifications"].apply(self._pool_classifications) + # # pool classifications in postprocessing + # classifications_df["pooled_score"] = classifications_df["classifications"].apply(self._pool_classifications) + + # # convert search_output to dataframe + # search_output = search_output.rename(columns={"id": "encounter_id"}) + # search_output["encounter_id"] = search_output["encounter_id"].astype(str) # TODO do in one line + # search_output = search_output[[ + # # TODO refactor to confing + # "encounter_id", + # "latitude", + # "longitude", + # "displayImgUrl", + # # "species", # TODO add in geo search stage (require rm local file) + # ]] + + # # join dataframes + # joined_df = pd.merge(search_output, classifications_df, how="inner", on="encounter_id") + + # logging.info(f"Final output: \n{joined_df.head()}") + + # # write to BigQuery + # # self._write_to_bigquery(joined_df) + + # return joined_df.to_dict(orient="records") + + def _build_dfs(self, element, search_output): + # logging.info(f"element \n{element}") + # logging.info(f"search_output \n{search_output}") + + # convert element to dataframe + classifications_df = self._build_classification_df(element) # convert search_output to dataframe - search_output = search_output.rename(columns={"id": "encounter_id"}) - search_output["encounter_id"] = search_output["encounter_id"].astype(str) # TODO do in one line - search_output = search_output[[ - # TODO refactor to confing - "encounter_id", - "latitude", - "longitude", - "displayImgUrl", - # "species", # TODO add in geo search stage (require rm local file) - ]] + search_output_df = self._build_search_output_df(search_output) # join dataframes - joined_df = pd.merge(search_output, classifications_df, how="inner", on="encounter_id") + joined_df = pd.merge(search_output_df, classifications_df, how="inner", on="encounter_id") logging.info(f"Final output: \n{joined_df.head()}") + logging.info(f"Final output columns: {joined_df.columns}") - # write to BigQuery - # self._write_to_bigquery(joined_df) + return joined_df - return joined_df.to_dict(orient="records") def _build_classification_df(self, element: Tuple) -> pd.DataFrame: # convert element to dataframe @@ -68,19 +132,46 @@ def _build_classification_df(self, element: Tuple) -> pd.DataFrame: classifications_df["encounter_id"] = classifications_df["encounter_id"].astype(str) # convert audio arrays to list(floats) - classifications_df["audio"] = classifications_df["audio"].apply(lambda x: x.tolist()) + # classifications_df["audio"] = classifications_df["audio"].apply(lambda x: x.tolist()) + # drop audio column + classifications_df = classifications_df.drop(columns=["audio"]) + # extract classifications from shape (n, 1) to (n,) + # classifications_df["classifications"] = classifications_df["classifications"].apply(lambda x: x.flatten()) + classifications_df["classifications"] = classifications_df["classifications"].apply(lambda x:[e[0] for e in x]) # pool classifications in postprocessing - # TODO check that shape (n,1) is handled correctly + logging.info(f"Classifications: \n{classifications_df['classifications']}") classifications_df["pooled_score"] = classifications_df["classifications"].apply(self._pool_classifications) + + # convert start adn end to isoformat + classifications_df["start"] = classifications_df["start"].apply(lambda x: x.isoformat()) + classifications_df["end"] = classifications_df["end"].apply(lambda x: x.isoformat()) + logging.info(f"Classifications: \n{classifications_df.head()}") logging.info(f"Classifications shape: {classifications_df.shape}") - return classifications_df + def _build_search_output_df(self, search_output: Dict[str, Any]) -> pd.DataFrame: + # convert search_output to dataframe + search_output = search_output.rename(columns={"id": "encounter_id"}) + search_output["encounter_id"] = search_output["encounter_id"].astype(str) + search_output = search_output[[ + # TODO refactor to confing + "encounter_id", + "latitude", + "longitude", + "displayImgUrl", + # "species", # TODO add in geo search stage (require rm local file) + ]] + logging.info(f"Search output: \n{search_output.head()}") + logging.info(f"Search output shape: {search_output.shape}") + + return search_output + + def _pool_classifications(self, classifications: np.array) -> Dict[str, Any]: if self.pooling == "mean" or self.pooling == "avg" or self.pooling == "average": pooled_score = np.mean(classifications) @@ -92,17 +183,4 @@ def _pool_classifications(self, classifications: np.array) -> Dict[str, Any]: raise ValueError(f"Pooling method {self.pooling} not supported.") return pooled_score - - - def _write_to_bigquery(self, df: pd.DataFrame): - - for row in df.to_dict(orient="records"): - self._insert_row(row) - logging.debug(f"Inserted row {row} to BigQuery table {self.table_ref}") - - - def _insert_row(self, row: Dict[str, Any]): - # Insert data into BigQuery - errors = self.client.insert_rows_json(self.table_ref, [row]) - if errors: - raise Exception(f"Error inserting rows: {errors}") + \ No newline at end of file From 3454531773e15191fe49fd3ee704f91229f61145 Mon Sep 17 00:00:00 2001 From: Per Halvorsen Date: Thu, 3 Oct 2024 22:02:19 +0200 Subject: [PATCH 07/14] hack: adjust plotting to show detection in final result plot (todo revert) --- src/stages/classify.py | 69 +++++++++++++++++++++++++++++++++++------- src/stages/sift.py | 11 ++++++- 2 files changed, 68 insertions(+), 12 deletions(-) diff --git a/src/stages/classify.py b/src/stages/classify.py index 1fa5802..918d966 100644 --- a/src/stages/classify.py +++ b/src/stages/classify.py @@ -11,6 +11,7 @@ import os import time import pandas as pd +import pickle import requests import math @@ -103,6 +104,7 @@ def _plot_scores(self, scores, t=None): plt.ylabel('Model Score') plt.xlabel('Seconds') plt.xlim(0, len(t)) if t is not None else None + plt.title('Model Scores') if self.med_filter_size is not None: scores_int = [int(s[0]*1000) for s in scores] @@ -147,12 +149,58 @@ def _plot_spectrogram_scipy( plt.title(f'Calibrated spectrum levels, 16 {self.source_sample_rate / 1000.0} kHz data') return t, f, psd - def _plot_audio(self, audio, key): - plt.plot(audio) - plt.xlabel('Samples') - plt.xlim(0, len(audio)) - plt.ylabel('Energy') - plt.title(f'Raw audio signal for {key}') + def _plot_audio(self, audio, start, key): + # plt.plot(audio) + # plt.xlabel('Samples') + # plt.xlim(0, len(audio)) + # plt.ylabel('Energy') + # plt.title(f'Raw audio signal for {key}') + with open(f"data/plots/Butterworth/{start.year}/{start.month}/{start.day}/data/{key}_min_max.pkl", "rb") as f: + min_max_samples = pickle.load(f) + with open(f"data/plots/Butterworth/{start.year}/{start.month}/{start.day}/data/{key}_all.pkl", "rb") as f: + all_samples = pickle.load(f) + # plt.plot(audio) # TODO remove this if does not work properly + + def _plot_signal_detections(signal, min_max_detection_samples, all_samples): + # TODO refactor plot_signal_detections in classify + logging.info(f"Plotting signal detections: {min_max_detection_samples}") + + plt.plot(signal) + + # NOTE: for legend logic, plot min_max window first + if len(min_max_detection_samples): + # shade window that resulted in detection + plt.axvspan( + min_max_detection_samples[0], + min_max_detection_samples[-1], + alpha=0.3, + color='y', + zorder=7, # on top of all detections + ) + + if len(all_samples): + # shade window that resulted in detection + for detection in all_samples: + plt.axvspan( + detection - 512/2, # TODO replace w/ window size from config + detection + 512/2, + alpha=0.5, + color='r', + zorder=5, # on top of signal + ) + + plt.legend(['Input signal', 'detection window', 'all detections']).set_zorder(10) + plt.xlabel(f'Samples (seconds * {16000} Hz)') # TODO replace with sample rate from config + plt.ylabel('Amplitude (normalized and centered)') + + title = f"Signal detections: {start.strftime('%Y-%m-%d %H:%M:%S')}" + plt.title(title) + + + _plot_signal_detections(audio, min_max_samples, all_samples) + + + def _plot(self, output): audio, start, end, encounter_ids, scores = output @@ -170,7 +218,8 @@ def _plot(self, output): # Plot spectrogram: plt.subplot(gs[0]) - self._plot_audio(audio, key) + # self._plot_audio(audio, key) + self._plot_audio(audio, start, key) # Plot spectrogram: plt.subplot(gs[1]) @@ -260,7 +309,7 @@ def process(self, element): logging.info(f"Received response:\n key: {key} predictions:{len(predictions)}") - yield (key, predictions) + yield (key, predictions) # TODO fix mixing yield and return in DoFn class ListCombine(beam.CombineFn): @@ -316,10 +365,8 @@ def process(self, element): classification_df = pd.concat([classification_df, row], axis=0, ignore_index=True) # drop duplicates - logging.info(f"Dropping duplicates from {len(classification_df)} rows") - logging.info(f"before: \n {classification_df}") + logging.debug(f"Dropping duplicates from {len(classification_df)} rows") classification_df = classification_df.drop_duplicates(subset=["start", "end"], keep="last") # , "encounter_ids" - logging.info(f"resulting df: \n {classification_df}") # write to file classification_df.to_csv(self.classification_path, sep='\t', index=False) diff --git a/src/stages/sift.py b/src/stages/sift.py index fdf21f9..58096d4 100644 --- a/src/stages/sift.py +++ b/src/stages/sift.py @@ -9,6 +9,7 @@ import matplotlib.pyplot as plt import numpy as np import os +import pickle from config import load_pipeline_config @@ -111,7 +112,8 @@ def _plot_signal_detections(self, pcoll, min_max_detections, all_detections, par signal = signal / np.max(signal) # normalize signal = signal - np.mean(signal) # center - plt.figure(figsize=(20, 10)) + # plt.figure(figsize=(20, 10)) + fig = plt.figure() plt.plot(signal) # NOTE: for legend logic, plot min_max window first @@ -146,6 +148,13 @@ def _plot_signal_detections(self, pcoll, min_max_detections, all_detections, par title += f"Encounters: {encounter_ids}" plt.title(title) plt.savefig(plot_path) + + # TODO remove hack to reuse sift figure later + with open(f"{plot_path.split(key)[0]}/data/{key}_min_max.pkl", 'wb') as handle: + pickle.dump(min_max_detection_samples, handle) + with open(f"{plot_path.split(key)[0]}/data/{key}_all.pkl", 'wb') as handle: + pickle.dump(all_detections[key], handle) + plt.show() if self.show_plots else plt.close() From d6afeb58ffad86f2d5bf90751a368f6774c7c1b6 Mon Sep 17 00:00:00 2001 From: Per Halvorsen Date: Fri, 4 Oct 2024 21:52:49 +0200 Subject: [PATCH 08/14] finalize decision on output storage (mainly for new PR) --- .../LADR_0005_persist_intermediate_outputs.md | 116 ++++++++++++++---- 1 file changed, 93 insertions(+), 23 deletions(-) diff --git a/docs/ladr/LADR_0005_persist_intermediate_outputs.md b/docs/ladr/LADR_0005_persist_intermediate_outputs.md index 96bdb4a..849416c 100644 --- a/docs/ladr/LADR_0005_persist_intermediate_outputs.md +++ b/docs/ladr/LADR_0005_persist_intermediate_outputs.md @@ -1,66 +1,136 @@ -# Persisting intermediate stage outputs +# Intermediate stage outputs -This doc discusses storing outputs from each stage or keeping stateless until the end, and where to do each in our pipeline. +This doc discusses handling the intermediate outputs between stages. +Whether or not to store the output should be confirgurable for the end user, either via command line params or in [config.yaml](../../src/config/common.yaml) +We want to enable these storage options to support local adn cloud storage. +This means we need to consider costs and effeciency when designing output schemas. + +Some stages (like geo-search) require locally storing outputs, since the (unaltered) Happywhale API currently writes found encounters to file, and does not return a df. -Some stages (like geo-search) require storing outputs, since the (unaltered) Happywhale API currently writes found encounters to file, and does not return a df. - Other stages like audio retrival may make sense to keep stateless to avoid storage costs on our end. -For debugging purposes, storing these data makes sense, since it speeds up iteration time. -But a productionized pipeline might only run once per geofile-date, and not need to store intermediate outputs. +Or storing only start and stop values for the audio, with a link to that day's data. + + +For during local development and debugging, having all intermediate data stored helps speed up iteration time. +Additionally, if data already exists for run on a particular date, the pipeline should skip these stages and load the outputs from the previous run. +While a productionized pipeline might only run once per geofile-date, and not need to store intermediate outputs, this decision should be left up to the end user. Another point to consider is that data-usage agreements with the audio proivders. -The audio from MBARI is ok to download, but what happens when we scale to include other hydrophone sources? +Make sure to read any agreements to ensure that storing intermediate outputs is allowed. + Some questions to consider: -- Do we want to store outputs from any intermediate stage, or just the last one? Ex. start/stop times of sifted audio, full classification arrays or pooled results. +- Exactly what data should we preserve from each stage? Will this different from the output of the stage? +Ex. start/stop times of sifted audio, full classification arrays or pooled results. - How to handle overwrites or parallel writes? Parallel writes should never occur, since we find overlapping encounter ids, and group them together. Overwrites could occur if stage does not check if data exists for stage before writing. -- Do we have a true key throughout our entire dataflow? Do we need one? After geo-search, we could consider a concatenation of start, end, and encounter_id as key, though this might be misleading, if sifted audio changes the start and end times. - - - -Interesting blog post discussing some of these issues, comparing stateful DoFns to CombineFn: https://beam.apache.org/blog/stateful-processing/ +- Do we have a true key throughout our entire dataflow? Do we need one? After geo-search, we could consider a concatenation of start, end, and encounter_id as key, though this might be misleading, if sifted audio changes the start and end times. ## Stages -For every stage, I'll discussion the necessity of storing outputs, and if so, what outputs to store. +For every stage, I'll discuss what outputs to store, and how they should be written locally and in the cloud. ### 1. Geo-search -We are forced to store the outputs for this stage. -Maybe outputs should be written to a database to avoid duplicates, but is this overengineering? +#### Local storage +We are forced to store the outputs for this stage, since the API requires a file path to write the pandas df to. +This means, there is a chance of overwites when running locally, or on a persistant server. +Can however be solved by providing a temporary file location to the API, loading in the old data and the temporary outputs, then write to the final location. -What was the problem with proivding a ByteIO object as the export file? That could be used to convert the data to a df, and then distribute how we see fit, instead of loading the data from a file and passing onward into the pipeline. +#### Cloud storage +This data is very structured, and could be stored in a database. +We should init and create a table in our project.dataset_id. +This can be the alternative to storing the temporary file to a more persistant final location. ### 2. Audio retrieval -We should likely not store the outputs for this stage. +We should likely not store the full outputs for this stage. The data is open source and can be retrieved at any time, only costs to download. -The main argument for storing here would be if download costs were significantly higher than storage. -For now, we assume that this pipeline will be run on new servers often, mostly once per date, meaning storing audio does is not worth it. +The main argument for storing here would be if download costs were significantly higher than storage, i.e. on a persistant server. +Development and debugging are still easier with the data stored, so we also need to smartly design these outputs. + +#### Local storage +Writing to my local machine has been easy enough with np.save. +This makes the data easily accessible for listening to, which is extremely helpful when analysing plots. +For now, I'll assume this is good enough, and rather rely on the built-in teardown of the DataflowRunner to clean this data up if wronglly configured during cloud runs. +#### Cloud storage +We could store the start, stop times of (and maybe url link to) the audio retrived for the found encounter ids in a table in our project.dataset_id. +This will can be beneficial if a user decides not to use any audio sifting. +Maybe add config option to allow storing full audio? +If stored, should be identified by a key (start, stop, encounter_id) to avoid overwrites, and stored in a bucket, not a table. ### 3. Audio sifting -How much audio sift data should be persisted? Full audio arrays with start, stop times and encounter ids? +How much audio sift data should be persisted? +Full audio arrays with start, stop times and encounter ids? Or just the start, stop times and encounter ids, assuming the audio will be downloaded and passed from the previous stage? +There is really no need to double storage here, but option should still be available. The main argument for storing the full audio arrays is that it speeds up iteration time, and allows for easier debugging. We likely also want this data easily accessible, if our final outputs are going to contain the audio snippets with classifications. That's kinda the whole point of this pipeline, so _some_ audio will eventually need to be be stored. And its likely at this stage we will want to store it, since this audio is truncated. +Will need to think about the key or unique identifier here, since there are a lot of parameters that can affect how much audio was truncated. essentially, all of these can be in the path, but that will make for extremely long paths. + +#### Local storage +Again, np.save is a good option for storing the audio arrays. + +#### Cloud storage +Similar as before, needs to be stored in a bucket. +Can maybe inherit same write method from previous stage, if we fingure out how to pass classes between stage local stages-files. ### 4. Classification -After the audio has been fed through the model, we'll get an array shorter than the length of the audio array, but linearally scaled to the audio. So larger context windows will eventually start producing very large classification arrays. Are all of these data necessary to save, or would a pooled score be best? It depends on the use-case ;) +After the audio has been fed through the model, we'll get an array shorter than the length of the audio array, but still arbitrry lengths. +Large context windows will produce large classification arrays, meaning high storage costs. +Are all of these data necessary to save, or would a pooled score be best? It depends on the use-case ;) We could alternatively cut the audio to only the parts where a min and max classification above a threshold is found. This would eliminate any real dependency on audio sifting (in case that stage turns out to not be needed later). And This would serve as the best waste-reduction strategy, since we would only store the audio that we are confident contains a whale call. +#### Local storage +For now, let' stick to storing the entire clasasification array for this stage, using np.save, with similar paths to audio storage. + +#### Cloud storage +Since we are dealing with arbitrary lengths, I'd say stick to bucket with parameters as path variables. + ### 5. Postprocessings (pooling and labelling) The final stage definitely needs to be store, but the main discussion here becomes, what to store? If we already have stored intermediate stages like sifted audio or truncated classified audio, we could avoid saving them again, and rather load from those tables when presenting aggregated results. -Though, I like the idea of the last stage of the pipeline containing all the data necessary found through the pipeline. This makes data sharing easier, with a concrete final product, instead of a bunch of fragmentated tables that need to be joined to have any true value. \ No newline at end of file +Though, I like the idea of the last stage of the pipeline containing all the data necessary found through the pipeline. +This makes data sharing easier, with a concrete final product, instead of a bunch of fragmentated tables that need to be joined to have any true value. +Maybe storing the easily queryable data in a table, then include a link to the audio storage location (whether that be a file by me or MBARI or other hydrophone provider). + +#### Local storage +I'll assume a similar structure to the expected tobale when saving local. This means arrays data like audio and classifications are excluded. +This frees me up to store one entry per encounter id. +Paths to the audio and classification arrays can be stored in the table. + +#### Cloud storage +I feel like this table shuold always be written to (i.e. no config option to disable). +Outputs will include: +``` +encounter_id +longitude +latitude +start_time +stop_time +pooled_score +img_path +audio_path +classification_path +``` + +## Conclusion +- All stages (except last) will be configurable to save or not. +- File exists sensors should allow skipping a stage. +- Local storage or arrays w/ variable lengths with np.save, and paramter values in path. +- Structured data will be stored in tables with relevant links to array data + + +## Resources +- Interesting blog post comparing stateful DoFns to CombineFn: https://beam.apache.org/blog/stateful-processing/ From 8eee93b53e6950ae830ba3b53efadb204ba6b0e4 Mon Sep 17 00:00:00 2001 From: Per Halvorsen Date: Sat, 5 Oct 2024 00:33:04 +0200 Subject: [PATCH 09/14] clean output writing for postprocessing (local & cloud) --- data/classifications.tsv | 2 - data/pipeline-diagram.json | 1 - src/config/common.yaml | 37 +++++-- src/config/local.yaml | 1 + src/create_table.py | 15 ++- src/model_server.py | 2 - src/pipeline.py | 48 +-------- src/stages/classify.py | 12 ++- src/stages/postprocess.py | 206 +++++++++++++++++++++---------------- 9 files changed, 163 insertions(+), 161 deletions(-) delete mode 100644 data/classifications.tsv delete mode 100644 data/pipeline-diagram.json diff --git a/data/classifications.tsv b/data/classifications.tsv deleted file mode 100644 index a58acb1..0000000 --- a/data/classifications.tsv +++ /dev/null @@ -1,2 +0,0 @@ -start end encounter_ids classifications -2016-12-21T00:49:30 2016-12-21T00:50:30 ['9182'] [[0.8753612041473389], [0.746759295463562], [0.26265254616737366], [0.45787951350212097], [0.35406064987182617], [0.42348742485046387], [0.4947870969772339], [0.7287474274635315], [0.7099379897117615], [0.2122703194618225], [0.044488538056612015], [0.00849922839552164], [0.024390267208218575], [0.33750119805336], [0.6530888080596924], [0.3057247996330261], [0.1243574470281601], [0.027093390002846718], [0.011367958970367908], [0.004032353404909372], [0.026372192427515984], [0.021978065371513367], [0.006407670211046934], [0.5405446887016296], [0.34207114577293396], [0.6080849766731262], [0.5394770503044128], [0.3662146031856537], [0.16772609949111938], [0.3641503155231476], [0.060217034071683884], [0.008764371275901794], [0.012523961253464222], [0.009186000563204288], [0.022050702944397926], [0.3908870816230774], [0.15179167687892914], [0.3454047441482544], [0.4770602285861969], [0.07589100301265717], [0.5439115166664124], [0.8634722232818604], [0.985602617263794], [0.3311924636363983], [0.8832067847251892], [0.6166273951530457], [0.42301759123802185], [0.03573732450604439], [0.09752023965120316], [0.01426385436207056], [0.022987568750977516], [0.012294118292629719], [0.010207954794168472], [0.00296270614489913]] diff --git a/data/pipeline-diagram.json b/data/pipeline-diagram.json deleted file mode 100644 index 247a514..0000000 --- a/data/pipeline-diagram.json +++ /dev/null @@ -1 +0,0 @@ -[{"state":{"code":"flowchart TD\n A[fa:fa-calendar Inputs] --> B(fa:fa-globe Geometry Search)\n A --> C(fa:fa-music Retrive Audio)\n B --> C\n C --> E(fa:fa-filter Filter Frequency)\n E --> F(fa:fa-bullseye Classify Audio)\n F --> G(fa:fa-tag Postprocess Labels)\n G --> H[fa:fa-fish Output]\n B --> H \n ","mermaid":"{\n \"theme\": \"default\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487,"editorMode":"code"},"time":1726314834573,"type":"auto","id":"80a09c2c-1234-44b0-9d16-aaaae58962ff","name":"yellow-nigeria"},{"state":{"code":"flowchart TD\n A[fa:fa-calendar Inputs] --> B(fa:fa-globe Geometry Search)\n A --> C(fa:fa-music Retrive Audio)\n B --> C\n C --> E(fa:fa-filter Filter Frequency)\n E --> F(fa:fa-tag Classify Audio)\n F --> G(fa:fa-map Postprocess Labels)\n G --> H[fa:fa-fish Output]\n B --> H \n ","mermaid":"{\n \"theme\": \"default\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487,"editorMode":"code"},"time":1726314774573,"type":"auto","id":"f17806f0-86f7-49d8-9cf7-a60e91cdaf16","name":"puny-lunch"},{"state":{"code":"flowchart TD\n A[fa:fa-calendar Inputs] --> B(fa:fa-globe Geometry Search)\n A --> C(fa:fa-music Retrive Audio)\n B --> C\n C --> E(fa:fa-filter Filter Frequency)\n E --> F(fa:fa-tag Classify Audio)\n F --> G(Postprocess Labels)\n G --> H[fa:fa-fish Output]\n B --> H \n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487,"editorMode":"code"},"time":1726314714578,"type":"auto","id":"abe11adf-0ebf-40f3-a97c-b82d77455145","name":"rich-apple"},{"state":{"code":"flowchart TD\n A[fa:fa-calendar Inputs] --> B(fa:fa-globe Geometry Search)\n A --> C(fa:fa-music Retrive Audio)\n B --> C\n C --> E(fa:fa-gate Filter Frequency)\n E --> F(Classify Audio)\n F --> G(Postprocess Labels)\n G --> H[fa:fa-fish Output]\n B --> H \n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487,"editorMode":"code"},"time":1726314654572,"type":"auto","id":"6ac55d30-8244-4a7d-9928-16699958b045","name":"obnoxious-woman"},{"state":{"code":"flowchart TD\n A[fa:fa-calendar Inputs] --> B(Geometry Search)\n A --> C(Retrive Audio)\n B --> C\n C --> E(Filter Frequency)\n E --> F(Classify Audio)\n F --> G(Postprocess Labels)\n G --> H[fa:fa-fish Output]\n B --> H\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487},"time":1726174988485,"type":"auto","id":"f501c97c-00e0-4f34-8be6-758b3bf36e97","name":"quaint-cpu"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Retrive Audio)\n B --> C\n C --> E(Filter Frequency)\n E --> F(Classify Audio)\n F --> G(Postprocess Labels)\n G --> H[fa:fa-fish Output]\n B --> H\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487},"time":1726174928166,"type":"auto","id":"9afa49db-e080-493d-b3e8-80d0442cf901","name":"embarrassed-evening"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Retrive Audio)\n B --> C\n C --> E(Filter Frequency)\n E --> F(Classify Audio)\n F --> G(Postprocess Clips)\n B --> H[Output]\n G --> H\n C -->|Three| F[fa:fa-fish Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487},"time":1726174868110,"type":"auto","id":"c5406a00-e37a-4988-9ac3-2cae3ad529d0","name":"moldy-branch"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Retrive Audio)\n B --> C\n C --> E(Filter Frequency)\n E --> F(Classify Audio)\n F --> G(Postprocess Clips)\n B --> H[Output]\n G --> H\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487},"time":1726174808113,"type":"auto","id":"94befebd-1740-4eea-9add-b3a4d456683a","name":"strong-father"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Retrive Audio)\n B --> C\n C --> E(Filter Frequency)\n E --> F(Classify Audio)\n B --> H[Output]\n G --> H\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487},"time":1726174748112,"type":"auto","id":"8ddd6b83-8967-428d-a920-b07da7525483","name":"sour-zettabyte"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> C\n C --> E(Frequency Detection)\n E --> F(Audio Classification)\n F --> G(Audio Clips)\n B --> H[Output]\n G --> H\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":false,"pan":{"x":302.18218659942363,"y":0},"zoom":0.9999999725257487},"time":1726172906224,"type":"auto","id":"fa11762a-401e-49e3-a584-268598479e52","name":"loose-knife"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> C\n C --> E(Frequency Detection)\n E --> F(Audio Classification)\n F --> G(Audio Clips)\n B --> H[Output]\n G --> H\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":true,"pan":{"x":289.8924659991275,"y":57.030104204754096},"zoom":0.8936022236523117},"time":1726172846221,"type":"auto","id":"a9e1445d-a5cc-41f6-8e8e-9eeb7f0d02a2","name":"old-optician"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> C\n C --> E(Frequency Detection)\n E --> F(Audio Classification)\n F --> G(Audio Clips)\n B --> H[Output]\n G --> H\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false,"panZoom":true},"time":1726172786221,"type":"auto","id":"edf676d2-a9d6-428b-b974-1a7cf755574d","name":"abandoned-midnight"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> | windown aroudn sighting| C\n C --> E(Frequency Detection)\n E --> F(Audio Classification)\n F --> G(Audio Clips)\n B --> H[Output]\n G --> H\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172726225,"type":"auto","id":"adb79b5b-498b-4c87-8d42-6fa81df6b784","name":"rancid-nest"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n B --> D(Sightings)\n A --> C(Audio Retrival)\n D --> C\n C --> E(Frequency Detection)\n D --> E\n E --> F(Audio Classification)\n F --> G(Audio Clips)\n B --> H[Output]\n G --> H\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172666223,"type":"auto","id":"1e8e08bc-7a3b-4e73-856f-f719bd152473","name":"whining-gold"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> D(Sightings)\n D --> C\n C --> E(Frequency Detection)\n D --> E\n E --> F(Audio Classification)\n F --> G(Audio Clips)\n B --> H[Output]\n G --> H\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172606223,"type":"auto","id":"0c53c3f4-7124-4b76-94ad-4b6453505ccb","name":"blue-china"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> D(Sightings)\n D --> C\n C --> E(Frequency Detection)\n E --> F(Audio Classification)\n F --> G(Audio Clips)\n B --> H[Output]\n G --> H\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172546223,"type":"auto","id":"638c3efa-6445-47ec-bf72-ede14fb952f2","name":"happy-electrician"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> D(Sightings)\n D --> C\n C --> E(Frequency Detection)\n E --> F(Audio Classification)\n F --> G(Audio Clips)\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172486225,"type":"auto","id":"f46c484a-2050-4e3a-b935-96ace611778b","name":"cuddly-analyst"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> D(Sightings)\n D --> C\n C --> E(Frequency Detection)\n E --> F(Audio Classification)\n \n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172426228,"type":"auto","id":"434e7429-ecda-4555-aefa-9cb608cd2e00","name":"spicy-football"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> D(Sightings)\n D --> C\n C --> E(Frequency Detec)\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172366227,"type":"auto","id":"0a8f6c61-b028-4016-974d-043bb8a5e7ae","name":"hallowed-angle"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> D(Sightings)\n D --> C\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172306227,"type":"auto","id":"5afb754e-d9d3-45cc-9832-e3537ce6ba27","name":"bulky-librarian"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> D(Sightings)\n D --> E()\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172246229,"type":"auto","id":"3cd4968d-2edb-4df5-8971-2547148df82c","name":"jealous-furniture"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n B --> \n C --> D[Laptop]\n %% C -->|Two| E[iPhone]\n %% C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172186227,"type":"auto","id":"0003b272-dcea-4339-bbbc-8de48cd1edcf","name":"kind-candle"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n C -->|One| D[Laptop]\n C -->|Two| E[iPhone]\n C -->|Three| F[fa:fa-image Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172126231,"type":"auto","id":"5078c8e0-2ba7-44fe-b6ab-86710238bdf3","name":"bashful-kitchen"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Geometry Search)\n A --> C(Audio Retrival)\n C -->|One| D[Laptop]\n C -->|Two| E[iPhone]\n C -->|Three| F[fa:fa-car Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172066232,"type":"auto","id":"915bf687-f51e-4aed-a8a4-814aeffcd641","name":"hissing-eve"},{"state":{"code":"flowchart TD\n A[Inputs] --> B(Go shopping)\n B --> C{Let me think}\n C -->|One| D[Laptop]\n C -->|Two| E[iPhone]\n C -->|Three| F[fa:fa-car Car]\n ","mermaid":"{\n \"theme\": \"dark\"\n}","autoSync":true,"rough":false,"updateDiagram":false},"time":1726172006253,"type":"auto","id":"610fb017-4f91-4c57-8906-e0dcdcfe30ac","name":"drab-child"}] \ No newline at end of file diff --git a/src/config/common.yaml b/src/config/common.yaml index 07ae562..eea945c 100644 --- a/src/config/common.yaml +++ b/src/config/common.yaml @@ -3,6 +3,7 @@ pipeline: verbose: true debug: true show_plots: false + is_local: false # gcp project: "bioacoustics-2024" @@ -66,12 +67,36 @@ pipeline: med_filter_size: 3 postprocess: + confidence_threshold: 0.5 min_gap: 60 # 1 minute + output_path: "data/postprocess/output.json" pooling: "average" - confidence_threshold: 0.5 - output_path_template: "data/labels/{year}/{month:02}/{day:02}.csv" postprocess_table_id: "mapped_audio" - - - - + postprocess_table_schema: + encounter_id: + type: 'STRING' + mode: 'REQUIRED' + latitude: + type: 'FLOAT64' + mode: 'REQUIRED' + longitude: + type: 'FLOAT64' + mode: 'REQUIRED' + start: + type: 'TIMESTAMP' + mode: 'REQUIRED' + end: + type: 'TIMESTAMP' + mode: 'REQUIRED' + pooled_score: + type: 'FLOAT64' + mode: 'REQUIRED' + img_path: + type: 'STRING' + mode: 'NULLABLE' + audio_path: + type: 'STRING' + mode: 'NULLABLE' + classification_path: + type: 'STRING' + mode: 'NULLABLE' diff --git a/src/config/local.yaml b/src/config/local.yaml index 050a51b..94dd9e7 100644 --- a/src/config/local.yaml +++ b/src/config/local.yaml @@ -3,6 +3,7 @@ pipeline: verbose: true debug: true show_plots: false + # is_local: true search: export_template: "data/encounters/{filename}-{timeframe}.csv" diff --git a/src/create_table.py b/src/create_table.py index d814415..0dea21f 100644 --- a/src/create_table.py +++ b/src/create_table.py @@ -10,15 +10,12 @@ # Define the table schema schema = [ - bigquery.SchemaField("key", "STRING"), - bigquery.SchemaField("audio", "FLOAT64", mode="REPEATED"), # 'REPEATED' for arrays - bigquery.SchemaField("pooled_score", "FLOAT64"), - bigquery.SchemaField("encounter_ids", "STRING", mode="REPEATED"), - bigquery.SchemaField("encounter_img_urls", "STRING", mode="REPEATED"), - bigquery.SchemaField("longitude", "FLOAT64"), - bigquery.SchemaField("latitude", "FLOAT64"), - bigquery.SchemaField("start", "TIMESTAMP"), - bigquery.SchemaField("end", "TIMESTAMP"), + { + "name": name, + "type": getattr(config.postprocess.postprocess_table_schema, name).type, + "mode": getattr(config.postprocess.postprocess_table_schema, name).mode + } + for name in vars(config.postprocess.postprocess_table_schema) ] diff --git a/src/model_server.py b/src/model_server.py index 23cd6e6..9e2997d 100644 --- a/src/model_server.py +++ b/src/model_server.py @@ -8,8 +8,6 @@ from config import load_pipeline_config config = load_pipeline_config() - - # Load the TensorFlow model logging.info("Loading model...") model = hub.load("https://www.kaggle.com/models/google/humpback-whale/TensorFlow2/humpback-whale/1") diff --git a/src/pipeline.py b/src/pipeline.py index e73c736..db25baa 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -5,7 +5,7 @@ from stages.audio import RetrieveAudio, WriteAudio, WriteSiftedAudio from stages.sift import Butterworth from stages.classify import WhaleClassifier, WriteClassifications -from stages.postprocess import PostprocessLabels +from stages.postprocess import PostprocessLabels, WritePostprocess from apache_beam.io.gcp.internal.clients import bigquery @@ -19,43 +19,12 @@ def run(): # runner="DataflowRunner", project="bioacoustics-2024", temp_location="gs://bioacoustics/whale-speech/temp", - # region=config.general.region, - # job_name=config.general.job_name, - # temp_location=config.general.temp_location, - # setup_file="./setup.py" ) pipeline_options.view_as(SetupOptions).save_main_session = True args = { "start": config.input.start, "end": config.input.end } - schema = { - "fields" : [ - # {'name': 'key', 'type': 'STRING', 'mode': 'REQUIRED'}, - {'name': 'classifications', 'type': 'FLOAT64', 'mode': 'REPEATED'}, - {'name': 'pooled_score', 'type': 'FLOAT64', 'mode': 'REQUIRED'}, - {'name': 'encounter_id', 'type': 'STRING', 'mode': 'REQUIRED'}, - {'name': 'displayImgUrl', 'type': 'STRING', 'mode': 'REQUIRED'}, - {'name': 'longitude', 'type': 'FLOAT64', 'mode': 'REQUIRED'}, - {'name': 'latitude', 'type': 'FLOAT64', 'mode': 'REQUIRED'}, - {'name': 'start', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'}, - {'name': 'end', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'} - ] - } - # table_ref = "{project_id}:{dataset_id}.{table_id}".format( - # project_id=config.general.project, - # dataset_id=config.general.dataset_id, - # table_id=config.postprocess.postprocess_table_id - # ) - # table_spec = bigquery.TableReference( - # projectId=config.general.project, - # datasetId=config.general.dataset_id, - # tableId=config.postprocess.postprocess_table_id - # ) - table_spec = f"bioacoustics-2024:whale_sppech.mapped_audio" - - print(f"Writing to table: {table_spec}") - print(f"PipelineOptions: {pipeline_options}") with beam.Pipeline(options=pipeline_options) as p: input_data = p | "Create Input" >> beam.Create([args]) @@ -63,29 +32,16 @@ def run(): audio_output = search_output | "Retrieve Audio" >> beam.ParDo(RetrieveAudio()) sifted_audio = audio_output | "Sift Audio" >> Butterworth() classifications = sifted_audio | "Classify Audio" >> WhaleClassifier(config) - # postprocess_labels = classifications | "Postprocess Labels" >> PostprocessLabels(config, search_output) postprocess_labels = classifications | "Postprocess Labels" >> beam.ParDo( PostprocessLabels(config), search_output=beam.pvalue.AsSingleton(search_output), ) - postprocess_labels | "Write to BigQuery" >> beam.io.WriteToBigQuery( - "mapped_audio", - dataset=config.general.dataset_id, - project=config.general.project, - schema=schema, - write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - method=beam.io.WriteToBigQuery.Method.FILE_LOADS - ) # Store results audio_output | "Store Audio (temp)" >> beam.ParDo(WriteAudio()) sifted_audio | "Store Sifted Audio" >> beam.ParDo(WriteSiftedAudio("butterworth")) classifications | "Store Classifications" >> beam.ParDo(WriteClassifications(config)) - # postprocess_labels | "Write Results" >> beam.io.WriteToText("data/output.txt", shard_name_template="") - - # Output results - # postprocessed_labels | "Write Results" >> beam.io.WriteToText("output.txt") + postprocess_labels | "Write to BigQuery" >> beam.ParDo(WritePostprocess(config)) if __name__ == "__main__": diff --git a/src/stages/classify.py b/src/stages/classify.py index 918d966..aeb0b45 100644 --- a/src/stages/classify.py +++ b/src/stages/classify.py @@ -199,9 +199,6 @@ def _plot_signal_detections(signal, min_max_detection_samples, all_samples): _plot_signal_detections(audio, min_max_samples, all_samples) - - - def _plot(self, output): audio, start, end, encounter_ids, scores = output key = self._build_key(start, end, encounter_ids) @@ -256,6 +253,8 @@ def expand(self, pcoll): if self.plot_scores: outputs | "Plot scores" >> beam.Map(self._plot) + + logging.info(f"Finished {self.name} stage: {outputs}") return outputs @@ -307,7 +306,7 @@ def process(self, element): predictions = response.json().get("predictions", []) - logging.info(f"Received response:\n key: {key} predictions:{len(predictions)}") + logging.info(f"Inference response:\n key: {key} predictions:{len(predictions)}") yield (key, predictions) # TODO fix mixing yield and return in DoFn @@ -349,6 +348,7 @@ def __init__(self, config: SimpleNamespace): def process(self, element): logging.info(f"Writing classifications to {self.classification_path}") + logging.debug(f"Received element: {element}") # skip if empty if self._is_empty(element): @@ -375,8 +375,10 @@ def process(self, element): def _is_empty(self, element): + if len(element) == 0: + return True array, start, end, encounter_ids, classifications = element - logging.debug(f"Checking if classifications are empty for start {start.strftime('%Y-%m-%dT%H:%M:%S')}: {len(classifications)}") + logging.info(f"Checking if classifications are empty for start {start.strftime('%Y-%m-%dT%H:%M:%S')}: {len(classifications)}") return len(classifications) == 0 diff --git a/src/stages/postprocess.py b/src/stages/postprocess.py index 7b8839b..c599533 100644 --- a/src/stages/postprocess.py +++ b/src/stages/postprocess.py @@ -2,6 +2,7 @@ import logging import numpy as np import pandas as pd +import os # from google.cloud import bigquery from apache_beam.io.gcp.internal.clients import bigquery @@ -27,6 +28,7 @@ def __init__(self, config: SimpleNamespace): self._init_big_query_writer(config) + def _init_big_query_writer(self, config: SimpleNamespace): self.table_spec = bigquery.TableReference( projectId=self.project, @@ -35,81 +37,7 @@ def _init_big_query_writer(self, config: SimpleNamespace): ) - def process(self, element, search_output): - joined_df = self._build_dfs(element, search_output) - - for row in joined_df.to_dict(orient="records"): - if row["classifications"] == []: - logging.info(f"Skipping row with no classification: {row.keys()}") - logging.info(f"Row: {row}") - continue - - logging.info(f"Writing row to BigQuery: {row.keys()} \n{row}") - yield row - - - # def expand(self, pcoll, search_output): - # return ( - # pcoll - # | "Process" >> beam.ParDo(self._build_dfs, search_output) - # | "Write to BigQuery" >> beam.io.WriteToBigQuery( - # self.table_spec, - # schema=self.schema, - # write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, - # create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED - # ) - # ) - - - # for row in self._build_dfs(element, search_output): - # row | beam.io.WriteToBigQuery( - # self.table_spec, - # schema=self.schema, - # write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, - # create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED - # ) - - - # def _process(self, element: Dict[str, Any], search_output: Dict[str, Any]): - # # TODO remove _process (replaced by expand) - # logging.info(f"element \n{element}") - # logging.info(f"search_output \n{search_output}") - - # # convert element to dataframe - # classifications_df = pd.DataFrame([element], columns=["audio", "start", "end", "encounter_ids", "classifications"]) - # classifications_df = classifications_df.explode("encounter_ids").rename(columns={"encounter_ids": "encounter_id"}) - # classifications_df["encounter_id"] = classifications_df["encounter_id"].astype(str) - - # # pool classifications in postprocessing - # classifications_df["pooled_score"] = classifications_df["classifications"].apply(self._pool_classifications) - - # # convert search_output to dataframe - # search_output = search_output.rename(columns={"id": "encounter_id"}) - # search_output["encounter_id"] = search_output["encounter_id"].astype(str) # TODO do in one line - # search_output = search_output[[ - # # TODO refactor to confing - # "encounter_id", - # "latitude", - # "longitude", - # "displayImgUrl", - # # "species", # TODO add in geo search stage (require rm local file) - # ]] - - # # join dataframes - # joined_df = pd.merge(search_output, classifications_df, how="inner", on="encounter_id") - - # logging.info(f"Final output: \n{joined_df.head()}") - - # # write to BigQuery - # # self._write_to_bigquery(joined_df) - - # return joined_df.to_dict(orient="records") - - def _build_dfs(self, element, search_output): - # logging.info(f"element \n{element}") - # logging.info(f"search_output \n{search_output}") - # convert element to dataframe classifications_df = self._build_classification_df(element) @@ -119,38 +47,39 @@ def _build_dfs(self, element, search_output): # join dataframes joined_df = pd.merge(search_output_df, classifications_df, how="inner", on="encounter_id") - logging.info(f"Final output: \n{joined_df.head()}") - logging.info(f"Final output columns: {joined_df.columns}") + # add paths + final_df = self._add_paths(joined_df) + + logging.info(f"Final output: \n{final_df.head()}") + logging.info(f"Final output columns: {final_df.columns}") - return joined_df + yield final_df.to_dict(orient="records") def _build_classification_df(self, element: Tuple) -> pd.DataFrame: # convert element to dataframe classifications_df = pd.DataFrame([element], columns=["audio", "start", "end", "encounter_ids", "classifications"]) + + # explode encounter_ids classifications_df = classifications_df.explode("encounter_ids").rename(columns={"encounter_ids": "encounter_id"}) classifications_df["encounter_id"] = classifications_df["encounter_id"].astype(str) - # convert audio arrays to list(floats) - # classifications_df["audio"] = classifications_df["audio"].apply(lambda x: x.tolist()) - # drop audio column - classifications_df = classifications_df.drop(columns=["audio"]) - - # extract classifications from shape (n, 1) to (n,) - # classifications_df["classifications"] = classifications_df["classifications"].apply(lambda x: x.flatten()) - classifications_df["classifications"] = classifications_df["classifications"].apply(lambda x:[e[0] for e in x]) - + # TODO replace classifications check w/ pooled_score check + classifications_df = classifications_df[classifications_df["classifications"].apply(lambda x: len(x) > 0)] # pool classifications in postprocessing - logging.info(f"Classifications: \n{classifications_df['classifications']}") classifications_df["pooled_score"] = classifications_df["classifications"].apply(self._pool_classifications) - # convert start adn end to isoformat + # convert start and end to isoformat classifications_df["start"] = classifications_df["start"].apply(lambda x: x.isoformat()) classifications_df["end"] = classifications_df["end"].apply(lambda x: x.isoformat()) + # drop audio and classification columns + classifications_df = classifications_df.drop(columns=["audio"]) + classifications_df = classifications_df.drop(columns=["classifications"]) + + logging.info(f"Classifications: \n{classifications_df.head()}") logging.info(f"Classifications shape: {classifications_df.shape}") - return classifications_df @@ -183,4 +112,101 @@ def _pool_classifications(self, classifications: np.array) -> Dict[str, Any]: raise ValueError(f"Pooling method {self.pooling} not supported.") return pooled_score - \ No newline at end of file + + + def _add_paths(self, df: pd.DataFrame) -> pd.DataFrame: + df["audio_path"] = "NotImplemented" + df["classification_path"] = "NotImplemented" + df["img_path"] = df["displayImgUrl"] + df = df.drop(columns=["displayImgUrl"]) + return df + + +class WritePostprocess(beam.DoFn): + def __init__(self, config: SimpleNamespace): + self.config = config + + self.is_local = config.general.is_local + self.output_path = config.postprocess.output_path + self.project = config.general.project + self.dataset_id = config.general.dataset_id + self.table_id = config.postprocess.postprocess_table_id + self.columns = list(vars(config.postprocess.postprocess_table_schema)) + self.schema = self._schema_to_dict(config.postprocess.postprocess_table_schema) + + + def process(self, element): + if len(element) == 0: + return + + if self.is_local: + return self._write_local(element) + else: + return self._write_gcp(element) + + def _get_output_path(self, start, key): + return self.output_path_template.format( + year=start.year, + month=start.month, + day=start.day, + key=key + ) + + def _schema_to_dict(self, schema): + return { + "fields": [ + { + "name": name, + "type": getattr(schema, name).type, + "mode": getattr(schema, name).mode + } + for name in vars(schema) + ] + } + + def _write_gcp(self, element): + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED + method=beam.io.WriteToBigQuery.Method.FILE_LOADS + custom_gcs_temp_location="gs://bioacoustics/whale-speech/temp" + + logging.info(f"Writing to BigQuery") + logging.info(f"Table: {self.table_id}") + logging.info(f"Dataset: {self.dataset_id}") + logging.info(f"Project: {self.project}") + logging.info(f"Schema: {self.schema}") + logging.info(f"Len element: {len(element)}") + logging.info(f"Element keys: {element[0].keys()}") + + element | "Write to BigQuery" >> beam.io.WriteToBigQuery( + # self.table_id, + # dataset=self.dataset_id, + # project=self.project, + "bioacoustics-2024.whale_speech.mapped_audio", + schema=self.schema, + write_disposition=write_disposition, + create_disposition=create_disposition, + method=method, + custom_gcs_temp_location=custom_gcs_temp_location + ) + + yield element + + def _write_local(self, element): + if os.path.exists(self.output_path): + stored_df = pd.read_json(self.output_path, orient="records") + + # convert encounter_id to str + stored_df["encounter_id"] = stored_df["encounter_id"].astype(str) + + else: + os.makedirs(os.path.dirname(self.output_path), exist_ok=True) + stored_df = pd.DataFrame([], columns=self.columns) + + element_df = pd.DataFrame(element, columns=self.columns) + final_df = pd.concat([stored_df, element_df], ignore_index=True) + final_df = final_df.drop_duplicates() + + # store as json (hack: to remove \/\/ escapes) + final_df_json = final_df.to_json(orient="records").replace("\\/", "/") + print(final_df_json, file=open(self.output_path, "w")) From 467208515afdff2a71e1ab2d5f275c6fc3f928da Mon Sep 17 00:00:00 2001 From: Per Halvorsen Date: Sat, 5 Oct 2024 00:48:08 +0200 Subject: [PATCH 10/14] clean table spec def --- src/stages/postprocess.py | 32 ++++---------------------------- 1 file changed, 4 insertions(+), 28 deletions(-) diff --git a/src/stages/postprocess.py b/src/stages/postprocess.py index c599533..5a06aff 100644 --- a/src/stages/postprocess.py +++ b/src/stages/postprocess.py @@ -26,17 +26,6 @@ def __init__(self, config: SimpleNamespace): self.table_id = config.postprocess.postprocess_table_id - self._init_big_query_writer(config) - - - def _init_big_query_writer(self, config: SimpleNamespace): - self.table_spec = bigquery.TableReference( - projectId=self.project, - datasetId=self.dataset_id, - tableId=self.table_id - ) - - def process(self, element, search_output): # convert element to dataframe classifications_df = self._build_classification_df(element) @@ -55,7 +44,6 @@ def process(self, element, search_output): yield final_df.to_dict(orient="records") - def _build_classification_df(self, element: Tuple) -> pd.DataFrame: # convert element to dataframe classifications_df = pd.DataFrame([element], columns=["audio", "start", "end", "encounter_ids", "classifications"]) @@ -82,7 +70,6 @@ def _build_classification_df(self, element: Tuple) -> pd.DataFrame: logging.info(f"Classifications shape: {classifications_df.shape}") return classifications_df - def _build_search_output_df(self, search_output: Dict[str, Any]) -> pd.DataFrame: # convert search_output to dataframe search_output = search_output.rename(columns={"id": "encounter_id"}) @@ -100,7 +87,6 @@ def _build_search_output_df(self, search_output: Dict[str, Any]) -> pd.DataFrame return search_output - def _pool_classifications(self, classifications: np.array) -> Dict[str, Any]: if self.pooling == "mean" or self.pooling == "avg" or self.pooling == "average": pooled_score = np.mean(classifications) @@ -112,7 +98,6 @@ def _pool_classifications(self, classifications: np.array) -> Dict[str, Any]: raise ValueError(f"Pooling method {self.pooling} not supported.") return pooled_score - def _add_paths(self, df: pd.DataFrame) -> pd.DataFrame: df["audio_path"] = "NotImplemented" @@ -134,7 +119,6 @@ def __init__(self, config: SimpleNamespace): self.columns = list(vars(config.postprocess.postprocess_table_schema)) self.schema = self._schema_to_dict(config.postprocess.postprocess_table_schema) - def process(self, element): if len(element) == 0: return @@ -143,14 +127,6 @@ def process(self, element): return self._write_local(element) else: return self._write_gcp(element) - - def _get_output_path(self, start, key): - return self.output_path_template.format( - year=start.year, - month=start.month, - day=start.day, - key=key - ) def _schema_to_dict(self, schema): return { @@ -179,10 +155,10 @@ def _write_gcp(self, element): logging.info(f"Element keys: {element[0].keys()}") element | "Write to BigQuery" >> beam.io.WriteToBigQuery( - # self.table_id, - # dataset=self.dataset_id, - # project=self.project, - "bioacoustics-2024.whale_speech.mapped_audio", + self.table_id, + dataset=self.dataset_id, + project=self.project, + # "bioacoustics-2024.whale_speech.mapped_audio", schema=self.schema, write_disposition=write_disposition, create_disposition=create_disposition, From 2721fb0e3e4f20dbf9b96452f4a0a416d3d99df4 Mon Sep 17 00:00:00 2001 From: Per Halvorsen Date: Sat, 5 Oct 2024 01:15:45 +0200 Subject: [PATCH 11/14] add unit tests --- src/stages/postprocess.py | 2 +- tests/test_postprocess.py | 116 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 tests/test_postprocess.py diff --git a/src/stages/postprocess.py b/src/stages/postprocess.py index 5a06aff..7b085b4 100644 --- a/src/stages/postprocess.py +++ b/src/stages/postprocess.py @@ -68,7 +68,7 @@ def _build_classification_df(self, element: Tuple) -> pd.DataFrame: logging.info(f"Classifications: \n{classifications_df.head()}") logging.info(f"Classifications shape: {classifications_df.shape}") - return classifications_df + return classifications_df.reset_index(drop=True) def _build_search_output_df(self, search_output: Dict[str, Any]) -> pd.DataFrame: # convert search_output to dataframe diff --git a/tests/test_postprocess.py b/tests/test_postprocess.py new file mode 100644 index 0000000..6829f78 --- /dev/null +++ b/tests/test_postprocess.py @@ -0,0 +1,116 @@ +import pytest +import pandas as pd + +from datetime import datetime +from types import SimpleNamespace +from src.stages.postprocess import PostprocessLabels + +@pytest.fixture +def config(): + return SimpleNamespace( + search=SimpleNamespace(export_template="template"), + sift=SimpleNamespace(output_path_template="template"), + classify=SimpleNamespace(classification_path="path"), + postprocess=SimpleNamespace(pooling="mean", postprocess_table_id="table_id"), + general=SimpleNamespace(project="project", dataset_id="dataset_id") + ) + +@pytest.fixture +def element(): + return { + "audio": "audio", + "start": datetime(2024, 9, 10, 11, 12, 13), + "end": datetime(2024, 9, 10, 12, 13, 14), + "encounter_ids": ["a123", "b456"], + "classifications": [1, 2, 3] + } + +@pytest.fixture +def search_output(): + return pd.DataFrame({ + "id": ["a123", "b456", "c789"], + "latitude": [1.0, 2.0, 3.0], + "longitude": [1.1, 2.2, 3.3], + "displayImgUrl": ["example.com/a123", "example.com/b456", "example.com/c789"], + "extra_column": ["extra1", "extra2", "extra3"] + }) + + +def test_build_classification_df(config, element): + # Arrange + postprocess_labels = PostprocessLabels(config) + + expected = pd.DataFrame([ + { + "start": "2024-09-10T11:12:13", + "end": "2024-09-10T12:13:14", + "encounter_id": "a123", + "pooled_score": 2.0 + }, + { + "start": "2024-09-10T11:12:13", + "end": "2024-09-10T12:13:14", + "encounter_id": "b456", + "pooled_score": 2.0 + } + ]) + + # Act + actual = postprocess_labels._build_classification_df(element) + + # Assert + assert expected.equals(actual) + + +def test_build_search_output_df(config, search_output): + # Arrange + postprocess_labels = PostprocessLabels(config) + + expected = pd.DataFrame({ + "encounter_id": ["a123", "b456", "c789"], + "latitude": [1.0, 2.0, 3.0], + "longitude": [1.1, 2.2, 3.3], + "displayImgUrl": ["example.com/a123", "example.com/b456", "example.com/c789"], + }) + + # Act + actual = postprocess_labels._build_search_output_df(search_output) + + # Assert + assert expected.equals(actual) + + +def test_pool_classifications(config): + # Arrange + postprocess_labels = PostprocessLabels(config) + classifications = [1, 2, 3, 4] + + # Act + actual = postprocess_labels._pool_classifications(classifications) + + # Assert + assert actual == 2.5 # note only checks mean, update for more + + +def test_add_paths(config, search_output): + # Arrange + postprocess_labels = PostprocessLabels(config) + + expected = pd.DataFrame({ + # reusing same data as above for simplicity + "id": ["a123", "b456", "c789"], + "latitude": [1.0, 2.0, 3.0], + "longitude": [1.1, 2.2, 3.3], + "extra_column": ["extra1", "extra2", "extra3"], + + # added path columns + "audio_path": ["NotImplemented", "NotImplemented", "NotImplemented"], + "classification_path": ["NotImplemented", "NotImplemented", "NotImplemented"], + "img_path": ["example.com/a123", "example.com/b456", "example.com/c789"], + }) + + # Act + actual = postprocess_labels._add_paths(search_output) + + # Assert + assert expected.equals(actual) From a2eeb9548f7f4bb661e1a4bf4594d21d4c8cae49 Mon Sep 17 00:00:00 2001 From: Per Halvorsen Date: Sat, 5 Oct 2024 01:15:54 +0200 Subject: [PATCH 12/14] add resource --- docs/ladr/LADR_0005_persist_intermediate_outputs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/ladr/LADR_0005_persist_intermediate_outputs.md b/docs/ladr/LADR_0005_persist_intermediate_outputs.md index 849416c..5e6a267 100644 --- a/docs/ladr/LADR_0005_persist_intermediate_outputs.md +++ b/docs/ladr/LADR_0005_persist_intermediate_outputs.md @@ -134,3 +134,4 @@ classification_path ## Resources - Interesting blog post comparing stateful DoFns to CombineFn: https://beam.apache.org/blog/stateful-processing/ +- Beam BigQuery walkthrough: https://beam.apache.org/documentation/io/built-in/google-bigquery/ From 50109f2f02bec288c8f96dd259eda1f2ce10cbb1 Mon Sep 17 00:00:00 2001 From: Per Halvorsen Date: Sat, 5 Oct 2024 09:46:01 +0200 Subject: [PATCH 13/14] updating naming for model uri and inference url --- src/config/common.yaml | 4 ++-- src/stages/classify.py | 10 +++++----- src/stages/postprocess.py | 26 ++++++++++++-------------- tests/test_classify.py | 2 +- 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/src/config/common.yaml b/src/config/common.yaml index eea945c..50810a3 100644 --- a/src/config/common.yaml +++ b/src/config/common.yaml @@ -62,8 +62,8 @@ pipeline: plot_scores: true plot_path_template: "data/plots/results/{year}/{month:02}/{plot_name}.png" classification_path: "data/classifications.tsv" - url: https://tfhub.dev/google/humpback_whale/1 - model_url: "http://127.0.0.1:5000/predict" + model_uri: https://tfhub.dev/google/humpback_whale/1 + inference_url: "http://127.0.0.1:5000/predict" med_filter_size: 3 postprocess: diff --git a/src/stages/classify.py b/src/stages/classify.py index aeb0b45..e340349 100644 --- a/src/stages/classify.py +++ b/src/stages/classify.py @@ -32,7 +32,7 @@ def __init__(self, config: SimpleNamespace): self.batch_duration = config.classify.batch_duration self.model_sample_rate = config.classify.model_sample_rate - self.model_url = config.classify.model_url + self.inference_url = config.classify.inference_url # plotting parameters self.hydrophone_sensitivity = config.classify.hydrophone_sensitivity @@ -271,7 +271,7 @@ def _postprocess(self, pcoll, grouped_outputs): class InferenceClient(beam.DoFn): def __init__(self, config: SimpleNamespace): - self.model_url = config.classify.model_url + self.inference_url = config.classify.inference_url self.retries = config.classify.inference_retries def process(self, element): @@ -292,7 +292,7 @@ def process(self, element): wait = 0 while wait < 5: try: - response = requests.post(self.model_url, json=data) + response = requests.post(self.inference_url, json=data) response.raise_for_status() break except requests.exceptions.ConnectionError as e: @@ -301,7 +301,7 @@ def process(self, element): wait += 1 time.sleep(wait*wait) - response = requests.post(self.model_url, json=data) + response = requests.post(self.inference_url, json=data) response.raise_for_status() predictions = response.json().get("predictions", []) @@ -424,7 +424,7 @@ def sample_run(): batch_duration=30, # seconds hydrophone_sensitivity=-168.8, model_sample_rate=10_000, - model_url="http://127.0.0.1:5000/predict", + inference_url="http://127.0.0.1:5000/predict", plot_scores=True, plot_path_template="data/plots/results/{year}/{month:02}/{plot_name}.png", med_filter_size=3, diff --git a/src/stages/postprocess.py b/src/stages/postprocess.py index 7b085b4..76c0eb8 100644 --- a/src/stages/postprocess.py +++ b/src/stages/postprocess.py @@ -46,29 +46,27 @@ def process(self, element, search_output): def _build_classification_df(self, element: Tuple) -> pd.DataFrame: # convert element to dataframe - classifications_df = pd.DataFrame([element], columns=["audio", "start", "end", "encounter_ids", "classifications"]) + df = pd.DataFrame([element], columns=["audio", "start", "end", "encounter_ids", "classifications"]) + df = df[df["classifications"].apply(lambda x: len(x) > 0)] # rm empty rows # explode encounter_ids - classifications_df = classifications_df.explode("encounter_ids").rename(columns={"encounter_ids": "encounter_id"}) - classifications_df["encounter_id"] = classifications_df["encounter_id"].astype(str) + df = df.explode("encounter_ids").rename(columns={"encounter_ids": "encounter_id"}) + df["encounter_id"] = df["encounter_id"].astype(str) - # TODO replace classifications check w/ pooled_score check - classifications_df = classifications_df[classifications_df["classifications"].apply(lambda x: len(x) > 0)] # pool classifications in postprocessing - classifications_df["pooled_score"] = classifications_df["classifications"].apply(self._pool_classifications) + df["pooled_score"] = df["classifications"].apply(self._pool_classifications) # convert start and end to isoformat - classifications_df["start"] = classifications_df["start"].apply(lambda x: x.isoformat()) - classifications_df["end"] = classifications_df["end"].apply(lambda x: x.isoformat()) + df["start"] = df["start"].apply(lambda x: x.isoformat()) + df["end"] = df["end"].apply(lambda x: x.isoformat()) # drop audio and classification columns - classifications_df = classifications_df.drop(columns=["audio"]) - classifications_df = classifications_df.drop(columns=["classifications"]) + df = df.drop(columns=["audio"]) + df = df.drop(columns=["classifications"]) - - logging.info(f"Classifications: \n{classifications_df.head()}") - logging.info(f"Classifications shape: {classifications_df.shape}") - return classifications_df.reset_index(drop=True) + logging.info(f"Classifications: \n{df.head()}") + logging.info(f"Classifications shape: {df.shape}") + return df.reset_index(drop=True) def _build_search_output_df(self, search_output: Dict[str, Any]) -> pd.DataFrame: # convert search_output to dataframe diff --git a/tests/test_classify.py b/tests/test_classify.py index 0ce0c27..d6908ff 100644 --- a/tests/test_classify.py +++ b/tests/test_classify.py @@ -20,7 +20,7 @@ def example_config(): batch_duration=30, # seconds hydrophone_sensitivity=-168.8, model_sample_rate=10_000, - model_url="http://127.0.0.1:5000/predict", + inference_url="http://127.0.0.1:5000/predict", plot_scores=True, plot_path_template="data/plots/results/{year}/{month:02}/{plot_name}.png", med_filter_size=3, From b4c6a23028f3127a54c76d7c6d3c03b17b27a09a Mon Sep 17 00:00:00 2001 From: perhalvorsen <31341520+pmhalvor@users.noreply.github.com> Date: Sat, 5 Oct 2024 10:08:33 +0200 Subject: [PATCH 14/14] Apply inline suggestions from code review --- src/config/common.yaml | 2 +- src/config/local.yaml | 2 +- src/model_server.py | 3 +-- src/stages/classify.py | 2 +- src/stages/postprocess.py | 6 ++---- 5 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/config/common.yaml b/src/config/common.yaml index 50810a3..4abb209 100644 --- a/src/config/common.yaml +++ b/src/config/common.yaml @@ -5,7 +5,7 @@ pipeline: show_plots: false is_local: false - # gcp + # gcp - bigquery project: "bioacoustics-2024" dataset_id: "whale_speech" diff --git a/src/config/local.yaml b/src/config/local.yaml index 94dd9e7..fe7ca78 100644 --- a/src/config/local.yaml +++ b/src/config/local.yaml @@ -3,7 +3,7 @@ pipeline: verbose: true debug: true show_plots: false - # is_local: true + is_local: true search: export_template: "data/encounters/{filename}-{timeframe}.csv" diff --git a/src/model_server.py b/src/model_server.py index 9e2997d..d4b613d 100644 --- a/src/model_server.py +++ b/src/model_server.py @@ -10,8 +10,7 @@ # Load the TensorFlow model logging.info("Loading model...") -model = hub.load("https://www.kaggle.com/models/google/humpback-whale/TensorFlow2/humpback-whale/1") -# model = hub.load("https://tfhub.dev/google/humpback_whale/1") +model = hub.load(config.classify.model_uri) score_fn = model.signatures["score"] logging.info("Model loaded.") diff --git a/src/stages/classify.py b/src/stages/classify.py index e340349..429a39d 100644 --- a/src/stages/classify.py +++ b/src/stages/classify.py @@ -32,7 +32,7 @@ def __init__(self, config: SimpleNamespace): self.batch_duration = config.classify.batch_duration self.model_sample_rate = config.classify.model_sample_rate - self.inference_url = config.classify.inference_url + self.inference_url = config.classify.inference_url # plotting parameters self.hydrophone_sensitivity = config.classify.hydrophone_sensitivity diff --git a/src/stages/postprocess.py b/src/stages/postprocess.py index 76c0eb8..5d863f7 100644 --- a/src/stages/postprocess.py +++ b/src/stages/postprocess.py @@ -4,14 +4,12 @@ import pandas as pd import os -# from google.cloud import bigquery from apache_beam.io.gcp.internal.clients import bigquery from typing import Dict, Any, Tuple from types import SimpleNamespace -# class PostprocessLabels(beam.PTransform): class PostprocessLabels(beam.DoFn): def __init__(self, config: SimpleNamespace): self.config = config @@ -30,7 +28,7 @@ def process(self, element, search_output): # convert element to dataframe classifications_df = self._build_classification_df(element) - # convert search_output to dataframe + # clean up search_output dataframe search_output_df = self._build_search_output_df(search_output) # join dataframes @@ -73,7 +71,6 @@ def _build_search_output_df(self, search_output: Dict[str, Any]) -> pd.DataFrame search_output = search_output.rename(columns={"id": "encounter_id"}) search_output["encounter_id"] = search_output["encounter_id"].astype(str) search_output = search_output[[ - # TODO refactor to confing "encounter_id", "latitude", "longitude", @@ -180,6 +177,7 @@ def _write_local(self, element): element_df = pd.DataFrame(element, columns=self.columns) final_df = pd.concat([stored_df, element_df], ignore_index=True) final_df = final_df.drop_duplicates() + logging.debug(f"Appending df to {self.output_path} \n{final_df}") # store as json (hack: to remove \/\/ escapes) final_df_json = final_df.to_json(orient="records").replace("\\/", "/")