diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 03a7f1c..89bce2d 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -4,9 +4,9 @@ on: push: branches: - main # Runs on push to the main branch - pull_request: - branches: - - main # Runs on pull requests to the main branch + # pull_request: + # branches: + # - main # Runs on pull requests to the main branch env: diff --git a/README.md b/README.md index fef7330..f4e1045 100644 --- a/README.md +++ b/README.md @@ -155,10 +155,11 @@ The default compute resources are quite small and slow. To speed things up, you Note, you may need to configure IAM permissions to allow Dataflow Runners to access images in your Artifact Registry. Read more about that [here](https://cloud.google.com/dataflow/docs/concepts/security-and-permissions). -## Resources +## References - [HappyWhale](https://happywhale.com/) - [open-oceans/happywhale](https://github.com/open-oceans/happywhale) -- [MBARI's Pacific Ocean Sound Recordings](https://registry.opendata.aws/pacific-sound/) - [NOAA and Google's humpback_whale model](https://tfhub.dev/google/humpback_whale/1) -- [Monterey Bay Hydrophone MARS](https://www.mbari.org/technology/monterey-accelerated-research-system-mars/) - [Google Cloud Console](https://console.cloud.google.com/) +- [Monterey Bay Hydrophone MARS](https://www.mbari.org/technology/monterey-accelerated-research-system-mars/) +- [MBARI's Pacific Ocean Sound Recordings](https://registry.opendata.aws/pacific-sound/) +- J. Ryan et al., "New Passive Acoustic Monitoring in Monterey Bay National Marine Sanctuary," OCEANS 2016 MTS/IEEE Monterey, Monterey, CA, USA, 2016, pp. 1-8, doi: [10.1109/OCEANS.2016.7761363](https://ieeexplore.ieee.org/document/7761363). diff --git a/docs/ladr/LADR_0007_utilizing_volume_mounts.md b/docs/ladr/LADR_0007_utilizing_volume_mounts.md new file mode 100644 index 0000000..727f601 --- /dev/null +++ b/docs/ladr/LADR_0007_utilizing_volume_mounts.md @@ -0,0 +1,107 @@ +# Utilizing Volume Mounts + +During stress-testing the pipeline, we fine some bottlenecks particularly in the I/O operations. +To mitigate this, we can utilize volume mounts to speed up the process. + +We should be careful about where we choose to use volumes though, since persisting data to buckets from parallel workers could result in corrupted data, for example if workers write to the same file at the same time. +That should really only be a problem if you are writing to the same file, which should not happen in this pipeline. + +This document aims to decide where to use volume mounts, how to implement and configure them. + +# 1. Where to use volume mounts +Our pipeline consists of two main parts: the **model server** and the **pipeline** itself. +The model server is a simple Flask server that serves the model, and the pipeline is a Dataflow job that processes the data. + +## Model server +Hosted as its own Cloud Run service on GCP (or a parallel process locally), the most important part of the model server is to classify input audio. +Ideally, this component should live independent of the pipeline, so that future models can be easily swapped in and out without affecting the pipeline. + +### Options +#### A. _Recieve inputs through REST API (POST requests)_ +This is the current implementation. + +**Pros** +- Completely isolated from the pipeline. Could be its own service. Acts like a third-party API. +- Easy to monitor costs, usage, and logs separately from the pipeline. +- Could be exposed as external service on its own. Potentially beneficial if pipeline ends up being not so useful. + +**Cons** +- Requires large network I/O for every request. Could be slow if many requests are made. +- Limits on size of requests. Forced to manually batching and sending requests, causing many more requests per run. Due to parallelism, this could mean many requests simultaneously, which the Cloud Run service will complain about and potentially block. + +#### B. _Load inputs through volume mounts_ +Pipeline sends POST requests with paths to audio within the volume, instead of the data itself. +Whether the model returns the classification array, or writes to volume and returns paths is to be decided. + +**Pros** +- Much smaller I/O sizes. Only need to send the path to the data, not the data itself. +- No limits on size of requests. Can send all data at once. This would require the model server to smartly batch inputs if sizes are too large. This is not necessarily a bad thing, since it would be easier to ensure the correct order of batched model outputs. +- Removes chances of model server getting DDOSed by too many requests at once from the pipeline. +- Could potentially be faster, since the model server can read from the volume directly. +- Might better leverage the stateless nature of the model server, by scaling up to handle new requests on different instances. + +**Cons** +- Compute resources hosting the model server need to be scalable to handle large data and model weights. Currently blocked by scaling quotas in Google Cloud Run, but theoretically, this should not be a problem. +- Requires the model server to have access to the volume. This could be a security risk, since the model server could potentially read any data in the volume. Though, with the data being public, this is not a huge concern. +- Requires all preprocessing to have been complete in the previous steps. This means implementation now will require a rewrite of [sift stage](../../src/stages/sift.py) (should be changed to "preprocess audio", where sift-parsing and resampling are done in the same step). + +## Pipeline +The main pipeline is a Beam workflow aimed to run on a Dataflow runner, meaning it is a distributed system. +Each stage is currently implemented to write its outputs to Google Cloud Storage, while passin gthe same data along to the next stage. +The pipeline should be able to run locally as well, for development and low-resource runs. + +Using volumes here can be useful for persistent storage, but we don't want to sacrifice the parallelism of the pipeline. + +### Options +#### A. _Write to GCS using Beam's I/O connectors_ +This is the current implementation, which does the job well. + +**Pros** +- Already implemented. +- Explicit full paths (or path templates) passed in configs make logging and debugging more understandable. Full paths are written to, logged, then passed to final output tables. + +**Cons** +- Feels less orgainzed or robust. Every write is its own operation, which requires connecting to GCS, writing, and closing the connection. This could be slow if many writes are done. +- Could be more expensive, since every write is a separate operation. (I'm not really sure about this, but could be worth considering.) + +#### B. _Mount GCS bucket as a volume_ +The bucket where our outputs are written to is mounted to the container running the pipeline. + +**Pros** +- Faster writes. Since the bucket is mounted, writes are done directly to the bucket. This could be faster than connecting to GCS for every write. +- Feels like the "true MLOps" choice. Getting the most out of Docker functionality. This may be overkill for this project, but would serve as a good exaample for future projects where this may be even more useful. + +**Cons** +- Going to take a while to implement this for all stages. We just finished implementing separate writes per stage run. +- Not sure how this will change between running with Dataflow and running locally. Will need to test this. + - We only provide the image as a `worker_harness_container_image`. Mounting requires adding the volume tag when running the image. Would need to check that Beam supports this. + +# 2. How to implement volume mounts +## Model server +- What will internal paths to data look like? How does this vary from bucket strcture versus local structure? How detailed of a path can we provide for the mounted volume? + - Currently looks like we can only provide the top level bucket (`bioacoustics`). This would mean the path inside the app would need to be `whale_speech/data/audio` + - Alternatively, we can change the name of the bucket to `whale_speech`, since its already in the bioacoustics project. This would mean the path inside the app would be `data/audio/...` similar to local. +- Should we continue to support recieving data through REST API? Most likely, yes, to avoid breaking backward compatibility. + - If so, how do we check when path is provided versus data? +- Where do model configs lie? + - How to define different batch sizes? + - Is everything just baked into the image from before or shuold we have multiple + +## Pipeline +- How to mount the bucket to the container? + - How does this change between running locally and running on Dataflow? +- How to write to the mounted volume? +- Is this even worth it? + +# Decision + +## Model server +We will implement the volume mounts for the model server. +We will continue to support recieving data through REST API, but will also support recieving paths to data. +We will mount the bucket to the container, and write to the mounted volume. +The paths to data will be the same as the local paths, menaing the top level bucket will now be called `whale_speech`. + +## Pipeline +We will not implement volume mounts for the pipeline. +We will continue to write to GCS using Beam's I/O connectors. + diff --git a/examples/pacific_sound_16kHz.py b/examples/pacific_sound_16kHz.py new file mode 100644 index 0000000..6e7e2fe --- /dev/null +++ b/examples/pacific_sound_16kHz.py @@ -0,0 +1,16 @@ +import boto3 +from botocore import UNSIGNED +from botocore.client import Config + +s3_client = boto3.client('s3', + aws_access_key_id='', + aws_secret_access_key='', + config=Config(signature_version=UNSIGNED) +) + +year = "2024" +month = "08" +bucket = 'pacific-sound-16khz' + +for obj in s3_client.list_objects_v2(Bucket=bucket, Prefix=f'{year}/{month}')['Contents']: + print(obj['Key']) \ No newline at end of file diff --git a/makefile b/makefile index ec82333..91c0786 100644 --- a/makefile +++ b/makefile @@ -1,4 +1,4 @@ -VERSION := 1.0.0 +VERSION := 1.0.1 GIT_SHA := $(shell git rev-parse --short HEAD) TAG := $(VERSION)-$(GIT_SHA) PIPELINE_IMAGE_NAME := whale-speech/pipeline @@ -105,12 +105,13 @@ run-dataflow: --num_workers=8 \ --max_num_workers=8 \ --autoscaling_algorithm=THROUGHPUT_BASED \ - --worker_harness_container_image=$(MODEL_REGISTERY)/$(PIPELINE_WORKER_IMAGE_NAME):$(TAG) \ + --worker_harness_container_image=$(MODEL_REGISTERY)/$(PIPELINE_WORKER_IMAGE_NAME):latest \ --start "2024-07-11" \ --end "2024-07-11" \ --offset 0 \ --margin 1800 \ --batch_duration 60 +# --worker_harness_container_image=$(PUBLIC_MODEL_REGISTERY)/$(PUBLIC_PIPELINE_WORKER_IMAGE_NAME):latest \ run-direct: python3 src/pipeline.py \ @@ -118,11 +119,13 @@ run-direct: --filesystem gcp \ --inference_url $(INFERENCE_URL) \ --runner DirectRunner \ - --worker_harness_container_image=$(MODEL_REGISTERY)/$(PIPELINE_WORKER_IMAGE_NAME) \ + --worker_harness_container_image=$(MODEL_REGISTERY)/$(PIPELINE_WORKER_IMAGE_NAME):latest \ --start "2024-07-11" \ --end "2024-07-11" \ --offset 0 \ - --margin 600 + --margin 1800 \ + --batch_duration 60 +# --worker_harness_container_image=$(PUBLIC_MODEL_REGISTERY)/$(PUBLIC_PIPELINE_WORKER_IMAGE_NAME) \ rebuild-run-dataflow: build-push-pipeline-worker run-dataflow diff --git a/src/stages/audio.py b/src/stages/audio.py index 806b922..a76894c 100644 --- a/src/stages/audio.py +++ b/src/stages/audio.py @@ -159,8 +159,12 @@ def _preprocess(self, df: pd.DataFrame): def _build_time_frames(self, df: pd.DataFrame): margin = self.margin - df["startTime"] = pd.to_datetime(df["startDate"].astype(str) + ' ' + df["startTime"].astype(str)) - df["endTime"] = pd.to_datetime(df["startDate"].astype(str) + ' ' + df["endTime"].astype(str)) + # handle time formats of HH:MM:SS and HH:MM:SS.MMMMMM + df["startTime"] = df["startTime"].astype(str).apply(lambda x: x.split(".")[0]) + df["endTime"] = df["endTime"].astype(str).apply(lambda x: x.split(".")[0]) + + df["startTime"] = pd.to_datetime(df["startDate"].astype(str) + ' ' + df["startTime"]) + df["endTime"] = pd.to_datetime(df["startDate"].astype(str) + ' ' + df["endTime"]) df["start_time"] = df.startTime - timedelta(seconds=margin) df["end_time"] = df.endTime + timedelta(seconds=margin) diff --git a/src/stages/classify.py b/src/stages/classify.py index 6d535b2..093155d 100644 --- a/src/stages/classify.py +++ b/src/stages/classify.py @@ -415,6 +415,11 @@ def process(self, element): logging.info(f"Retrying in {wait*wait} seconds.") wait += 1 time.sleep(wait*wait) + except requests.exceptions.HTTPError as e: + logging.error(f"HTTP error: {e}") + logging.error(f"Retrying in {wait*wait} seconds.") + wait += 1 + time.sleep(wait*wait) response = requests.post(self.inference_url, json=data) response.raise_for_status() diff --git a/src/stages/search.py b/src/stages/search.py index b9dfa66..a5e89e2 100644 --- a/src/stages/search.py +++ b/src/stages/search.py @@ -42,7 +42,7 @@ def __init__(self, config: SimpleNamespace): def process(self, element): start = self._preprocess_date(element.get('start')) - end = self._preprocess_date(element.get('end')) + end = self._preprocess_date(element.get('end', start)) geometry_file = self._get_geometry_file() export_file = self._get_file_buffer("csv") diff --git a/src/stages/sift.py b/src/stages/sift.py index bc318d0..2f2ed0e 100644 --- a/src/stages/sift.py +++ b/src/stages/sift.py @@ -5,7 +5,6 @@ from typing import Dict, Any import apache_beam as beam -import io import logging import json import math @@ -118,10 +117,13 @@ def _postprocess(self, pcoll, min_max_detections): logging.debug(f"Min-max detections: {min_max_detections}") logging.debug(f"Key: {key}") - global_detection_range = [ - min_max_detections[key]["min"], - min_max_detections[key]["max"] - ] + if min_max_detections[key]: + global_detection_range = [ + min_max_detections[key]["min"], + min_max_detections[key]["max"] + ] + else: + global_detection_range = [0, len(signal)] sifted_signal = signal[global_detection_range[0]:global_detection_range[-1]] audio_path = "No sift audio path stored." @@ -133,6 +135,17 @@ def _plot_signal_detections(self, pcoll, min_max_detections, all_detections): signal, start, end, encounter_ids, _ = pcoll key = self._build_key(start, end, encounter_ids) + # normalize and center + signal = signal / np.max(signal) # normalize + signal = signal - np.mean(signal) # center + + # plt.figure(figsize=(20, 10)) + plt.plot(signal) + + if not min_max_detections[key]: + logging.info(f"No detections for {key}.") + return + min_max_detection_samples = [ min_max_detections[key]["min"], # maually fix ordering min_max_detections[key]["max"] @@ -151,13 +164,6 @@ def _plot_signal_detections(self, pcoll, min_max_detections, all_detections): if not beam.io.filesystems.FileSystems.exists(os.path.dirname(plot_path)): beam.io.filesystems.FileSystems.mkdirs(os.path.dirname(plot_path)) - # normalize and center - signal = signal / np.max(signal) # normalize - signal = signal - np.mean(signal) # center - - # plt.figure(figsize=(20, 10)) - fig = plt.figure() - plt.plot(signal) # NOTE: for legend logic, plot min_max window first if len(min_max_detection_samples): @@ -181,7 +187,6 @@ def _plot_signal_detections(self, pcoll, min_max_detections, all_detections): zorder=5, # on top of signal ) - plt.legend(['Input signal', 'detection window', 'all detections']).set_zorder(10) plt.xlabel(f'Samples (seconds * {self.sample_rate} Hz)') plt.ylabel('Amplitude (normalized and centered)') diff --git a/tests/test_audio.py b/tests/test_audio.py index e1d6df5..985379e 100644 --- a/tests/test_audio.py +++ b/tests/test_audio.py @@ -41,8 +41,8 @@ def sample_search_results_df(): data = { 'id': [1, 2, 3], 'startDate': ['2024-07-08', '2024-09-10', '2024-09-10'], - 'startTime': ['00:13:00', '01:37:00', '01:32:00'], - 'endTime': ['00:13:00', '01:37:00', '01:42:00'] + 'startTime': ['00:13:00.123456', '01:37:00', '01:32:00'], + 'endTime': ['00:13:00', '01:37:00', '01:42:00.133700'] } return pd.DataFrame(data) diff --git a/versions/whale-speech-0.0.1.zip b/versions/whale-speech-0.0.1.zip deleted file mode 100644 index 92c1d60..0000000 Binary files a/versions/whale-speech-0.0.1.zip and /dev/null differ