Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stress test #28

Merged
merged 9 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,11 @@ The default compute resources are quite small and slow. To speed things up, you
Note, you may need to configure IAM permissions to allow Dataflow Runners to access images in your Artifact Registry. Read more about that [here](https://cloud.google.com/dataflow/docs/concepts/security-and-permissions).


## Resources
## References
- [HappyWhale](https://happywhale.com/)
- [open-oceans/happywhale](https://github.com/open-oceans/happywhale)
- [MBARI's Pacific Ocean Sound Recordings](https://registry.opendata.aws/pacific-sound/)
- [NOAA and Google's humpback_whale model](https://tfhub.dev/google/humpback_whale/1)
- [Monterey Bay Hydrophone MARS](https://www.mbari.org/technology/monterey-accelerated-research-system-mars/)
- [Google Cloud Console](https://console.cloud.google.com/)
- [Monterey Bay Hydrophone MARS](https://www.mbari.org/technology/monterey-accelerated-research-system-mars/)
- [MBARI's Pacific Ocean Sound Recordings](https://registry.opendata.aws/pacific-sound/)
- J. Ryan et al., "New Passive Acoustic Monitoring in Monterey Bay National Marine Sanctuary," OCEANS 2016 MTS/IEEE Monterey, Monterey, CA, USA, 2016, pp. 1-8, doi: [10.1109/OCEANS.2016.7761363](https://ieeexplore.ieee.org/document/7761363).
107 changes: 107 additions & 0 deletions docs/ladr/LADR_0007_utilizing_volume_mounts.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Utilizing Volume Mounts

During stress-testing the pipeline, we fine some bottlenecks particularly in the I/O operations.
To mitigate this, we can utilize volume mounts to speed up the process.

We should be careful about where we choose to use volumes though, since persisting data to buckets from parallel workers could result in corrupted data, for example if workers write to the same file at the same time.
That should really only be a problem if you are writing to the same file, which should not happen in this pipeline.

This document aims to decide where to use volume mounts, how to implement and configure them.

# 1. Where to use volume mounts
Our pipeline consists of two main parts: the **model server** and the **pipeline** itself.
The model server is a simple Flask server that serves the model, and the pipeline is a Dataflow job that processes the data.

## Model server
Hosted as its own Cloud Run service on GCP (or a parallel process locally), the most important part of the model server is to classify input audio.
Ideally, this component should live independent of the pipeline, so that future models can be easily swapped in and out without affecting the pipeline.

### Options
#### A. _Recieve inputs through REST API (POST requests)_
This is the current implementation.

**Pros**
- Completely isolated from the pipeline. Could be its own service. Acts like a third-party API.
- Easy to monitor costs, usage, and logs separately from the pipeline.
- Could be exposed as external service on its own. Potentially beneficial if pipeline ends up being not so useful.

**Cons**
- Requires large network I/O for every request. Could be slow if many requests are made.
- Limits on size of requests. Forced to manually batching and sending requests, causing many more requests per run. Due to parallelism, this could mean many requests simultaneously, which the Cloud Run service will complain about and potentially block.

#### B. _Load inputs through volume mounts_
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What authorization is needed to connect to a GCS bucket storage?

Pipeline sends POST requests with paths to audio within the volume, instead of the data itself.
Whether the model returns the classification array, or writes to volume and returns paths is to be decided.

**Pros**
- Much smaller I/O sizes. Only need to send the path to the data, not the data itself.
- No limits on size of requests. Can send all data at once. This would require the model server to smartly batch inputs if sizes are too large. This is not necessarily a bad thing, since it would be easier to ensure the correct order of batched model outputs.
- Removes chances of model server getting DDOSed by too many requests at once from the pipeline.
- Could potentially be faster, since the model server can read from the volume directly.
- Might better leverage the stateless nature of the model server, by scaling up to handle new requests on different instances.

**Cons**
- Compute resources hosting the model server need to be scalable to handle large data and model weights. Currently blocked by scaling quotas in Google Cloud Run, but theoretically, this should not be a problem.
- Requires the model server to have access to the volume. This could be a security risk, since the model server could potentially read any data in the volume. Though, with the data being public, this is not a huge concern.
- Requires all preprocessing to have been complete in the previous steps. This means implementation now will require a rewrite of [sift stage](../../src/stages/sift.py) (should be changed to "preprocess audio", where sift-parsing and resampling are done in the same step).

## Pipeline
The main pipeline is a Beam workflow aimed to run on a Dataflow runner, meaning it is a distributed system.
Each stage is currently implemented to write its outputs to Google Cloud Storage, while passin gthe same data along to the next stage.
The pipeline should be able to run locally as well, for development and low-resource runs.

Using volumes here can be useful for persistent storage, but we don't want to sacrifice the parallelism of the pipeline.

### Options
#### A. _Write to GCS using Beam's I/O connectors_
This is the current implementation, which does the job well.

**Pros**
- Already implemented.
- Explicit full paths (or path templates) passed in configs make logging and debugging more understandable. Full paths are written to, logged, then passed to final output tables.

**Cons**
- Feels less orgainzed or robust. Every write is its own operation, which requires connecting to GCS, writing, and closing the connection. This could be slow if many writes are done.
- Could be more expensive, since every write is a separate operation. (I'm not really sure about this, but could be worth considering.)

#### B. _Mount GCS bucket as a volume_
The bucket where our outputs are written to is mounted to the container running the pipeline.

**Pros**
- Faster writes. Since the bucket is mounted, writes are done directly to the bucket. This could be faster than connecting to GCS for every write.
- Feels like the "true MLOps" choice. Getting the most out of Docker functionality. This may be overkill for this project, but would serve as a good exaample for future projects where this may be even more useful.

**Cons**
- Going to take a while to implement this for all stages. We just finished implementing separate writes per stage run.
- Not sure how this will change between running with Dataflow and running locally. Will need to test this.
- We only provide the image as a `worker_harness_container_image`. Mounting requires adding the volume tag when running the image. Would need to check that Beam supports this.

# 2. How to implement volume mounts
## Model server
- What will internal paths to data look like? How does this vary from bucket strcture versus local structure? How detailed of a path can we provide for the mounted volume?
- Currently looks like we can only provide the top level bucket (`bioacoustics`). This would mean the path inside the app would need to be `whale_speech/data/audio`
- Alternatively, we can change the name of the bucket to `whale_speech`, since its already in the bioacoustics project. This would mean the path inside the app would be `data/audio/...` similar to local.
- Should we continue to support recieving data through REST API? Most likely, yes, to avoid breaking backward compatibility.
- If so, how do we check when path is provided versus data?
- Where do model configs lie?
- How to define different batch sizes?
- Is everything just baked into the image from before or shuold we have multiple

## Pipeline
- How to mount the bucket to the container?
- How does this change between running locally and running on Dataflow?
- How to write to the mounted volume?
- Is this even worth it?

# Decision

## Model server
We will implement the volume mounts for the model server.
We will continue to support recieving data through REST API, but will also support recieving paths to data.
We will mount the bucket to the container, and write to the mounted volume.
The paths to data will be the same as the local paths, menaing the top level bucket will now be called `whale_speech`.

## Pipeline
We will not implement volume mounts for the pipeline.
We will continue to write to GCS using Beam's I/O connectors.

16 changes: 16 additions & 0 deletions examples/pacific_sound_16kHz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import boto3
from botocore import UNSIGNED
from botocore.client import Config

s3_client = boto3.client('s3',
aws_access_key_id='',
aws_secret_access_key='',
config=Config(signature_version=UNSIGNED)
)

year = "2024"
month = "08"
bucket = 'pacific-sound-16khz'

for obj in s3_client.list_objects_v2(Bucket=bucket, Prefix=f'{year}/{month}')['Contents']:
print(obj['Key'])
19 changes: 11 additions & 8 deletions makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
VERSION := 1.0.0
VERSION := 1.0.1
GIT_SHA := $(shell git rev-parse --short HEAD)
TAG := $(VERSION)-$(GIT_SHA)
PIPELINE_IMAGE_NAME := whale-speech/pipeline
Expand Down Expand Up @@ -105,24 +105,27 @@ run-dataflow:
--num_workers=8 \
--max_num_workers=8 \
--autoscaling_algorithm=THROUGHPUT_BASED \
--worker_harness_container_image=$(MODEL_REGISTERY)/$(PIPELINE_WORKER_IMAGE_NAME):$(TAG) \
--start "2024-07-11" \
--end "2024-07-11" \
--worker_harness_container_image=$(MODEL_REGISTERY)/$(PIPELINE_WORKER_IMAGE_NAME):latest \
--start "2024-08-08" \
--end "2024-08-10" \
--offset 0 \
--margin 1800 \
--batch_duration 60
# --worker_harness_container_image=$(PUBLIC_MODEL_REGISTERY)/$(PUBLIC_PIPELINE_WORKER_IMAGE_NAME):latest \

run-direct:
python3 src/pipeline.py \
--job_name "whale-speech-$(GIT_SHA)" \
--filesystem gcp \
--inference_url $(INFERENCE_URL) \
--runner DirectRunner \
--worker_harness_container_image=$(MODEL_REGISTERY)/$(PIPELINE_WORKER_IMAGE_NAME) \
--start "2024-07-11" \
--end "2024-07-11" \
--worker_harness_container_image=$(MODEL_REGISTERY)/$(PIPELINE_WORKER_IMAGE_NAME):latest \
--start "2024-08-08" \
--end "2024-08-08" \
--offset 0 \
--margin 600
--margin 1800 \
--batch_duration 60
# --worker_harness_container_image=$(PUBLIC_MODEL_REGISTERY)/$(PUBLIC_PIPELINE_WORKER_IMAGE_NAME) \


rebuild-run-dataflow: build-push-pipeline-worker run-dataflow
Expand Down
8 changes: 6 additions & 2 deletions src/stages/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,12 @@ def _preprocess(self, df: pd.DataFrame):
def _build_time_frames(self, df: pd.DataFrame):
margin = self.margin

df["startTime"] = pd.to_datetime(df["startDate"].astype(str) + ' ' + df["startTime"].astype(str))
df["endTime"] = pd.to_datetime(df["startDate"].astype(str) + ' ' + df["endTime"].astype(str))
# handle time formats of HH:MM:SS and HH:MM:SS.MMMMMM
df["startTime"] = df["startTime"].astype(str).apply(lambda x: x.split(".")[0])
df["endTime"] = df["endTime"].astype(str).apply(lambda x: x.split(".")[0])

df["startTime"] = pd.to_datetime(df["startDate"].astype(str) + ' ' + df["startTime"])
df["endTime"] = pd.to_datetime(df["startDate"].astype(str) + ' ' + df["endTime"])
pmhalvor marked this conversation as resolved.
Show resolved Hide resolved

df["start_time"] = df.startTime - timedelta(seconds=margin)
df["end_time"] = df.endTime + timedelta(seconds=margin)
Expand Down
5 changes: 5 additions & 0 deletions src/stages/classify.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,11 @@ def process(self, element):
logging.info(f"Retrying in {wait*wait} seconds.")
wait += 1
time.sleep(wait*wait)
except requests.exceptions.HTTPError as e:
logging.error(f"HTTP error: {e}")
logging.error(f"Retrying in {wait*wait} seconds.")
wait += 1
time.sleep(wait*wait)

response = requests.post(self.inference_url, json=data)
response.raise_for_status()
Expand Down
2 changes: 1 addition & 1 deletion src/stages/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, config: SimpleNamespace):

def process(self, element):
start = self._preprocess_date(element.get('start'))
end = self._preprocess_date(element.get('end'))
end = self._preprocess_date(element.get('end', start))

geometry_file = self._get_geometry_file()
export_file = self._get_file_buffer("csv")
Expand Down
31 changes: 18 additions & 13 deletions src/stages/sift.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from typing import Dict, Any

import apache_beam as beam
import io
import logging
import json
import math
Expand Down Expand Up @@ -118,10 +117,13 @@ def _postprocess(self, pcoll, min_max_detections):
logging.debug(f"Min-max detections: {min_max_detections}")
logging.debug(f"Key: {key}")

global_detection_range = [
min_max_detections[key]["min"],
min_max_detections[key]["max"]
]
if min_max_detections[key]:
global_detection_range = [
min_max_detections[key]["min"],
min_max_detections[key]["max"]
]
else:
global_detection_range = [0, len(signal)]

sifted_signal = signal[global_detection_range[0]:global_detection_range[-1]]
audio_path = "No sift audio path stored."
Expand All @@ -133,6 +135,17 @@ def _plot_signal_detections(self, pcoll, min_max_detections, all_detections):
signal, start, end, encounter_ids, _ = pcoll
key = self._build_key(start, end, encounter_ids)

# normalize and center
signal = signal / np.max(signal) # normalize
signal = signal - np.mean(signal) # center

# plt.figure(figsize=(20, 10))
plt.plot(signal)

if not min_max_detections[key]:
logging.info(f"No detections for {key}.")
return

min_max_detection_samples = [
min_max_detections[key]["min"], # maually fix ordering
min_max_detections[key]["max"]
Expand All @@ -151,13 +164,6 @@ def _plot_signal_detections(self, pcoll, min_max_detections, all_detections):
if not beam.io.filesystems.FileSystems.exists(os.path.dirname(plot_path)):
beam.io.filesystems.FileSystems.mkdirs(os.path.dirname(plot_path))

# normalize and center
signal = signal / np.max(signal) # normalize
signal = signal - np.mean(signal) # center

# plt.figure(figsize=(20, 10))
fig = plt.figure()
plt.plot(signal)

# NOTE: for legend logic, plot min_max window first
if len(min_max_detection_samples):
Expand All @@ -181,7 +187,6 @@ def _plot_signal_detections(self, pcoll, min_max_detections, all_detections):
zorder=5, # on top of signal
)


plt.legend(['Input signal', 'detection window', 'all detections']).set_zorder(10)
plt.xlabel(f'Samples (seconds * {self.sample_rate} Hz)')
plt.ylabel('Amplitude (normalized and centered)')
Expand Down
4 changes: 2 additions & 2 deletions tests/test_audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def sample_search_results_df():
data = {
'id': [1, 2, 3],
'startDate': ['2024-07-08', '2024-09-10', '2024-09-10'],
'startTime': ['00:13:00', '01:37:00', '01:32:00'],
'endTime': ['00:13:00', '01:37:00', '01:42:00']
'startTime': ['00:13:00.123456', '01:37:00', '01:32:00'],
'endTime': ['00:13:00', '01:37:00', '01:42:00.133700']
}
return pd.DataFrame(data)

Expand Down
Loading