diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml new file mode 100644 index 0000000..03a7f1c --- /dev/null +++ b/.github/workflows/deploy.yml @@ -0,0 +1,55 @@ +name: Build & Push Images to Google Artifact Registry + +on: + push: + branches: + - main # Runs on push to the main branch + pull_request: + branches: + - main # Runs on pull requests to the main branch + + +env: + PROJECT_ID: ${{ secrets.PROJECT_ID }} + SERVICE_ACCOUNT: ${{ secrets.SERVICE_ACCOUNT }} + WORKLOAD_IDENTITY_PROVIDER: ${{ secrets.WORKLOAD_IDENTITY_PROVIDER }} + MODEL_REGISTERY: ${{ secrets.MODEL_REGISTERY }} + +jobs: + build-push-images: + name: Build and Push Images + permissions: + contents: 'read' + id-token: 'write' + + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Google Auth + id: auth + uses: 'google-github-actions/auth@v2' + with: + token_format: 'access_token' + project_id: ${{ env.PROJECT_ID }} + service_account: ${{ env.SERVICE_ACCOUNT }} + workload_identity_provider: ${{ env.WORKLOAD_IDENTITY_PROVIDER }} + + - name: Docker Auth + id: docker-auth + uses: 'docker/login-action@v1' + with: + username: 'oauth2accesstoken' + password: '${{ steps.auth.outputs.access_token }}' + registry: '${{ env.MODEL_REGISTERY }}' + + - name: Build and Push Model Server + run: make build-push-model-server + env: + MODEL_REGISTERY: ${{ env.MODEL_REGISTERY }} + + - name: Build and Push Pipeline Worker + run: make build-push-pipeline-worker + env: + MODEL_REGISTERY: ${{ env.MODEL_REGISTERY }} diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..dd87d1e --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,39 @@ +name: Publish Latest Images to Docker Hub + +on: + push: + branches: + - main # Runs on push to the main branch + + +env: + MODEL_REGISTERY: ${{ secrets.MODEL_REGISTERY }} + DOCKERHUB_USERNAME: ${{ vars.DOCKERHUB_USERNAME }} + DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }} + +jobs: + publish-model-server: + name: Publish Latest Images + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Docker Auth (Docker Hub) + id: docker-auth-dockerhub + uses: docker/login-action@v3 + with: + username: ${{env.DOCKERHUB_USERNAME}} + password: ${{env.DOCKERHUB_TOKEN}} + + - name: Publish Latest Model Server Image + run: make publish-latest-model-server + env: + MODEL_REGISTERY: ${{ env.MODEL_REGISTERY }} + PUBLIC_MODEL_REGISTERY: docker.io/${{ env.DOCKERHUB_USERNAME }} + + - name: Publish Latest Pipeline Worker Image + run: make publish-latest-pipeline-worker + env: + MODEL_REGISTERY: ${{ env.MODEL_REGISTERY }} + PUBLIC_MODEL_REGISTERY: docker.io/${{ env.DOCKERHUB_USERNAME }} diff --git a/.github/workflows/python-test.yml b/.github/workflows/python-test.yml index 9ec5ea1..b19c5b5 100644 --- a/.github/workflows/python-test.yml +++ b/.github/workflows/python-test.yml @@ -28,8 +28,8 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + if [ -f requirements/requirements.txt ]; then pip install -r requirements/requirements.txt; fi - name: Run tests with pytest run: | - python -m pytest \ No newline at end of file + python -m pytest tests/ \ No newline at end of file diff --git a/.gitignore b/.gitignore index f2f108c..0e99a06 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ plots/ model/ hide/ table/ +env.sh # Python basic ignores # Byte-compiled / optimized / DLL files diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..73a5855 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,16 @@ +FROM apache/beam_python3.11_sdk + +COPY src/ /home/src/ +COPY requirements/requirements.txt /home/requirements.txt +COPY data/geo /home/data/geo + +WORKDIR /home + +# Install HDF5 using apt +RUN apt-get update && apt-get install -y \ + libhdf5-dev \ + libsndfile1 \ + gcc +RUN pip install -r requirements.txt + +ENTRYPOINT ["python3", "src/pipeline.py"] diff --git a/Dockerfile.model-server b/Dockerfile.model-server new file mode 100644 index 0000000..6a531aa --- /dev/null +++ b/Dockerfile.model-server @@ -0,0 +1,12 @@ +FROM python:3.11-slim-bullseye + +COPY src/model_server.py /home/src/model_server.py +COPY src/config.py /home/src/config.py +COPY src/config/ /home/src/config/ +COPY requirements/model-requirements.txt /home/requirements.txt + +WORKDIR /home + +RUN pip install -r requirements.txt + +CMD [ "python", "src/model_server.py" ] diff --git a/Dockerfile.pipeline-worker b/Dockerfile.pipeline-worker new file mode 100644 index 0000000..6f0cdfe --- /dev/null +++ b/Dockerfile.pipeline-worker @@ -0,0 +1,14 @@ +FROM apache/beam_python3.11_sdk + +COPY src/ /home +COPY requirements/requirements.txt /home/requirements.txt +COPY data/geo /home/data/geo + +WORKDIR /home + +# Install HDF5 using apt +RUN apt-get update && apt-get install -y \ + libhdf5-dev \ + libsndfile1 \ + gcc +RUN pip install -r requirements.txt \ No newline at end of file diff --git a/README.md b/README.md index fc3778e..fef7330 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,57 @@ Derived from MARS). + + +
+ ## Getting started ### Install + +Create a virtual environment and install the required packages. +We'll use conda for this, but you can use any package manager you prefer. + +Since we're developing on an M1 machine, we'll need to specify the `CONDA_SUBDIR` to `osx-arm64`. +This step should be adapted based on the virtual environment you're using. + #### M1: ```bash CONDA_SUBDIR=osx-arm64 conda create -n whale-speech python=3.11 @@ -24,49 +72,88 @@ conda activate whale-speech pip install -r requirements.txt ``` +### Google Cloud SDK +To run the pipeline on Google Cloud Dataflow, you'll need to install the Google Cloud SDK. +You can find the installation instructions [here](https://cloud.google.com/sdk/docs/install). + +Make sure you authentication your using and initialize the project you are using. +```bash +gcloud auth login +gcloud init +``` + +For newly created projects, each of the services used will need to be enabled. +This can be easily done in the console, or via the command line. +For example: +```bash +gcloud services enable bigquery.googleapis.com +gcloud services enable dataflow.googleapis.com +gcloud services enable storage-api.googleapis.com +gcloud services enable run.googleapis.com +``` + ### Run locally +To run the pipeline and model server locally, you can use the `make` target `local-run`. + ```bash make local-run ``` -## Pipeline description +This target starts by killing any previous model servers that might be running (needed for when a pipeline fails, without tearing down the server, causing the previous call to hang). +Then it starts the model server in the background and runs the pipeline. -Stages: -1. **Input**: When (and where*) to look for whale encounters on [HappyWhale](https://happywhale.com/). -2. **Geometry Search**: Query [open-oceans/happywhale](https://github.com/open-oceans/happywhale) to find potential whale encounters. - → Expected outputs: encounter ids, start and end times, and longitude and latitude. +### Build and push the model server +To build and push the model server to your model registry (stored as an environment variable), you can use the following `make` target. -3. **Retrive Audio**: Download audio from MBARI's [Pacific Ocean Sound Recordings](https://registry.opendata.aws/pacific-sound/) around the time of the encounter. - - → Expected outputs: audio array, start and end times, and encounter ids. - -4. **Filter Frequency**: Break audio into non-overlaping segments with flagged frequency detections. - - → Expected outputs: cut audio array, detection intervals, and encounter ids. +```bash +make build-push-model-server +``` +This target builds the model server image and pushes it to the registry specified in the `env.sh` file. +The tag is a combination of the version set in the makefile and the last git commit hash. +This helps keep track of what is included in the image, and allows for easy rollback if needed. +The target fails if there are any uncommited changes in the git repository. -5. **Classify Audio**: Use a NOAA and Google's [humpback_whale model](https://tfhub.dev/google/humpback_whale/1) to classify the flagged segments. +The `latest` tag is only added to images deployed via GHA. - → Expected outputs: resampled audio, classification score array, and encounter ids. +### Run pipeline with Dataflow +To run the pipeline on Google Cloud Dataflow, you can use the following `make` target. -6. **Postprocess Labels**: Build clip-intervals for each encounter for playback snippets. +```bash +make run-dataflow +``` +Logging in the terminal will tell you the status of the pipeline, and you can follow the progress in the [Dataflow console](https://console.cloud.google.com/dataflow/jobs). - → Expected outputs: encounter ids, cut/resampled audio array, and aggregated classification score. +In addition to providing the inference url and filesystem to store outputs on, the definition of the above target also provides an example on how a user can pass additional arguments to and request different resources for the pipeline run. -7. **Output**: Map the whale encounter ids to the playback snippets. +**Pipeline specific parameters** +You can configure all the paramters set in the config files directly when running the pipeline. +The most important here is probably the start and end time for the initial search. - -[![](https://mermaid.ink/img/pako:eNpVkl1PwjAUhv_KybnSZBAYIrAYE0VBE41GvNJxUbqzrUm3Yj_USfjvlq0aPTfte_qcz3SHXGWECeZSffCSaQvPV2kN3i5ec5bkrMeZpDpjGm7rrbNmDb3eOVwedY-FVBuCJamKrG5gRUzz8jgkaMl5ICtnBIcnj4l3gguXCRW4y47rxLwV1yEoF9KShkU4NL05qnkTAq9bdhHYjZPSUEMwl8wYkTf_iizgrN_39G2gtdooC_d-eOm71u-kf8FD0mXALCvgURm71YqTMXDHNiRNQJctevP606sp4cFZv6T138Fu4KAwwop0xUTml707eFK0JVWUYuKvGeXMSZtiWu89ypxVq6bmmFjtKEKtXFGir-InjNBtM2bpSrBCs-rXu2X1i1L_NCY7_MRkNIj7w2k8nJ6OZ7OTeHQ6irDBZLCP8KuNGPRnnU3icTyenEwnEVImrNL33e_gqs5Fgftv_9OrOw?type=png)](https://mermaid.live/edit#pako:eNpVkl1PwjAUhv_KybnSZBAYIrAYE0VBE41GvNJxUbqzrUm3Yj_USfjvlq0aPTfte_qcz3SHXGWECeZSffCSaQvPV2kN3i5ec5bkrMeZpDpjGm7rrbNmDb3eOVwedY-FVBuCJamKrG5gRUzz8jgkaMl5ICtnBIcnj4l3gguXCRW4y47rxLwV1yEoF9KShkU4NL05qnkTAq9bdhHYjZPSUEMwl8wYkTf_iizgrN_39G2gtdooC_d-eOm71u-kf8FD0mXALCvgURm71YqTMXDHNiRNQJctevP606sp4cFZv6T138Fu4KAwwop0xUTml707eFK0JVWUYuKvGeXMSZtiWu89ypxVq6bmmFjtKEKtXFGir-InjNBtM2bpSrBCs-rXu2X1i1L_NCY7_MRkNIj7w2k8nJ6OZ7OTeHQ6irDBZLCP8KuNGPRnnU3icTyenEwnEVImrNL33e_gqs5Fgftv_9OrOw) +```bash + --start "2024-07-11" \ + --end "2024-07-11" \ + --offset 0 \ + --margin 1800 \ + --batch_duration 60 +``` - - +Note that any parameters with the same name under different sections will only be updated if its the last section in the list. +Also, since these argparse-parameters are added automatically, behavior of boolean flags might be unexpected (always true is added). + +**Compute resources** +The default compute resources are quite small and slow. To speed things up, you can request more workers and a larger machine type. For more on Dataflow resources, check out [the docs](https://cloud.google.com/dataflow/docs/reference/pipeline-options#worker-level_options). +``` + --worker_machine_type=n1-highmem-8 \ + --disk_size_gb=100 \ + --num_workers=8 \ + --max_num_workers=8 \ +``` +Note, you may need to configure IAM permissions to allow Dataflow Runners to access images in your Artifact Registry. Read more about that [here](https://cloud.google.com/dataflow/docs/concepts/security-and-permissions). - -*Currently only support encounters around the Monterey Bay Hydrophone (MARS). - ## Resources - [HappyWhale](https://happywhale.com/) @@ -74,3 +161,4 @@ Stages: - [MBARI's Pacific Ocean Sound Recordings](https://registry.opendata.aws/pacific-sound/) - [NOAA and Google's humpback_whale model](https://tfhub.dev/google/humpback_whale/1) - [Monterey Bay Hydrophone MARS](https://www.mbari.org/technology/monterey-accelerated-research-system-mars/) +- [Google Cloud Console](https://console.cloud.google.com/) diff --git a/docs/howto/model-server-as-cloud-run.md b/docs/howto/model-server-as-cloud-run.md new file mode 100644 index 0000000..dce1ada --- /dev/null +++ b/docs/howto/model-server-as-cloud-run.md @@ -0,0 +1,76 @@ +# Model server as Cloud Run +In this guide, we will deploy the model server as a [Cloud Run](https://cloud.google.com/run/) service. + +Cloud Run is a serverless compute platform that allows you to run prebuilt containers triggered via HTTP requests. +Our model server component is a perfect example of a service that can be deployed on Cloud Run, since it is a REST API listening for POST requests on a specified port and endpoint. + +## Prerequisites +- A Google Cloud Platform (GCP) account and [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects) with [billing enabled](https://cloud.google.com/billing/docs/how-to/modify-project). +- [Docker](https://github.com/docker/docker-install?tab=readme-ov-file#usage) installed on your local machine. +- This code locally cloned (`git clone https://github.com/pmhalvor/whale-speech`). + +## Steps + +### 0. (Optional) Set up Artifact Registry +If you want to store your Docker images in Google Cloud, you can use [Artifact Registry](https://cloud.google.com/artifact-registry/docs/overview). + +You'll likely need to enable the app, create a repository, then add permissions to your local environment to push to the repository. +See more on this authentication process [here](https://cloud.google.com/artifact-registry/docs/docker/pushing-and-pulling#auth). + +### 1. Build the Docker image and push to Google Container Registry +Navigate to the project directory in a terminal, build and tag your model-server image, and push to your model registry. + +If you are using the Google Artifact Registry, you'll need to tag your image with the registry URL and a zone, something like `us-central1-docker.pkg.dev/your_project/whale-speech/model-server:x.y.z`. +If you prefer to use the free Docker hub registry, you can use your public Docker ID as a prefix to your image tag, something like `your_docker_id/whale-speech:model-server-x.y.z`. + +This guide will only document the Google Artifact Registry method. The Docker Hub method is similar, though naming might be different. + +```bash +cd whale-speech +docker build -f Dockerfile.model-server -t us-central1-docker.pkg.dev/your_project/whale-speech/model-server:x.y.z . +docker push us-central1-docker.pkg.dev/your_project/whale-speech/model-server:x.y.z +``` + +The `Dockerfile.model-server` is a Dockerfile written for hosting the model server. +You can find this file in the `whale-speech` directory. +Note there is no need to expose a port in the Dockerfile, as this will be done in the Cloud Run deployment. + + +### 2. Deploy image as Cloud Run service +Navigate to the [Cloud Run](https://console.cloud.google.com/run) page in the GCP console. + +- Select **Deploy container** and then **Service**, since we'll want the container to be server with an endpoint. +- Add you container image URL that you pushed to in the step above `docker push us-central1-docker.pkg.dev/your_project/whale-speech/model-server:x.y.z`. +- Name your service (ex. `whale-speech-model-server`) and select a region (`us-central1` is a good default). +- Open the section for **Container(s), Volumes, Networking, Security**. + - Add the port your model server is listening on (default is `5000`) as the container port. This will be added as an environment variable when running the container. + - Update memory and CPU count as needed. I noticed that 4 GiB and 2 vCPUs worked fine with batch durations of 60 seconds. This value can be adjusted through revisioning later. + - I'd maybe reccomend lowering the max number of requests per container to 1-5, since the inputs will be larger for each request. + - You may need to adjust the min and max number of instances, depending on your expected traffic and quotas. + +- Click **Create**. + +### 3. Test the service +Once the service is deployed, you can test it by sending a POST request to the service's endpoint. +The URL should be available at to top of the service details page. It'll look something like `https://whale-speech-model-server-xxxx.a.run.app`. + +In the `whale-speech` directory, you can run the following command to test the service: +```bash +export INFERENCE_URL="https://whale-speech-model-server-xxxx.a.run.app" +python3 examples/test_model_server.py +``` + +The expected response should be a JSON object with a `prediction` key and a list of floats as the value. + +I'd recommend saving the `export INFERENCE_URL="https://whale-speech-model-server-xxxx.a.run.app"` command to an `env.sh` file in the `whale-speech` directory, so you can easily run the test script in the future. This filename is in the `.gitignore`, so it won't be pushed to the repository. + +In the same file, I export a MODEL_REGISTRY variable, which is the URL of the model server image in the Google Artifact Registry. This is used in the `make` targets, like `build-model-server`, which builds the image to the registry. + + +### Trouble shooting +If you are having trouble with the deployment, you can check the logs in the Cloud Run console. + +Some common issues I've run into are: +- Not exposing the correct port in the container settings. +- Too low memory or CPU count. +- IAM permissions between Artifact Registry and Cloud Run. Read more [here](https://cloud.google.com/artifact-registry/docs/access-control#iam). \ No newline at end of file diff --git a/docs/howto/spin-up-model-server.md b/docs/howto/spin-up-model-server.md new file mode 100644 index 0000000..2351dd2 --- /dev/null +++ b/docs/howto/spin-up-model-server.md @@ -0,0 +1,123 @@ +# How to spin up model server VM + +NOTE: VM for model serving is no longer used for this project. See [Model server as CloudRun](docs/howto/model-server-as-cloud-run.md) for the current model serving method. This doc on VMs is kept for reference. + +Our pipeline requires a publically accessible model server for classifications. +While this code is included in this repo, users will still need to spin up their own server. +This doc will be a walk-through for how I achieved this in GCP. + +## Pre-requisites +- A [GCP project](https://cloud.google.com/resource-manager/docs/creating-managing-projects) with [billing enabled](https://cloud.google.com/billing/docs/how-to/modify-project). +- [`gcloud`](https://cloud.google.com/sdk/gcloud) installed and authenticated with your project (`gcloud init`). +- (Optional) This code locally cloned (`git clone https://github.com/pmhalvor/whale-speech`). +- (Optional) [Docker](https://github.com/docker/docker-install?tab=readme-ov-file#usage) installed on your local machine, and authenticated with your [Docker ID](https://docs.docker.com/accounts/create-account/). + + +## Steps + +### 0. (Optional) Build the Docker image and push to Docker Hub +_This step only needs to be done if you are changing model server code or ports._ +_You can skip this step, and rather use the pre-built image on Docker Hub:_ +[whale-speech:model-server-X.Y.Z](https://hub.docker.com/r/permortenhalvorsen024/whale-speech/tags). + +Navigate to the project directory, build and tag your model-server Docker image, and push to Docker Hub. + +When building the image, you'll need to use your public Docker ID as a prefix to your image tag. +This will tell Docker where to push the image now, and where to pull from later. +Image hosting is free, as long as you make your images public. +Check your team's requirements on whether or not you can make images public or not. + + +```bash +cd whale-speech +docker build -f Dockerfile.model-server -t your_docker_id/whale-speech:model-server-x.y.z . +docker push your_docker_id/whale-speech:model-server-x.y.z +``` + +### 1. Create Compute Engine instance +- Navigate to the [Compute Engine](https://console.cloud.google.com/compute/instances) page in the GCP console. +- Ensure you are in the correct project. +- Select **Create Instance**. +- Name your instance, select a region and zone, and machine type. If you are using the default image from Step 0, you'll need to make sure to use an `arm64` machine type and update your selected **Boot disk** to an `arm64` based image. +- Under **Firewall**, check the box for **Allow HTTP traffic**. +- Under **Advanced options** > **Networking**, add a new **Network tag**. This will be used to create a firewall rule in the next step. I'll call this tag `whale-speech-inference`, but you can call it whatever you like. +- Click **Create**. + +If all goes well, your instance should be up and running in a few minutes. + +### 2. Create Firewall Rule +_This section follows the instructions originally posted by Carlos Rojas on [StackOverflow](https://stackoverflow.com/a/21068402/11260232)_. + +While waiting for the instance to spin up, we'll update the firewall to allow reading through our specified port. + +- Navigate to [Networking > VPC Network > Firewalls](https://console.cloud.google.com/networking/firewalls) in the GCP console. +- Click **Create Firewall Rule**. +- Name your rule something descriptive that you'll recognize later. +- Under **Targets**, select **Specified target tags** and enter the tag you created in Step 1 (`whale-speech-inference`). +- Under **Source filter**, select **IPv4 ranges** and enter your desired host IP (the one the app uses, default is `0.0.0.0`). +- Under **Protocols and ports**, select **Specified protocols and ports**, **TCP**, and enter a range of ports you want to open. Here, I used `5000-5005`, but really only 1 is needed. +- Click **Create**. + +It make take a few minutes to update, but the firewall rule should be live shortly. +Since we haven't yet done anything on our instance, I'd reccomend just restarting it to make sure the firewall rule is applied. + +### 3. Pull and run the Docker image +- SSH into your instance. This can be done two different ways: + - Directly in the GCP console by clicking the SSH button on your instance. A window should pop up with a terminal. + - Via the command line with `gcloud compute ssh your_gcp_username@your_instance_name --zone your_zone --project your_project`. +- Install Docker on your instance. This can be done with the below commands. Alternatively, you can follow the steps linked above to install Docker on your local machine. +```bash +sudo apt-get update +sudo apt-get install docker.io +``` +- Pull the Docker image from Docker Hub. I needed root access when running Docker command, using the `sudo` command. If you skipped Step 0, you can use the pre-built image here, as shown below: +```bash +sudo docker pull permortenhalvorsen024/whale-speech:model-server-x.y.z +``` +- Run the Docker image. This will start the model server on the specified port. +```bash +sudo docker run -p 5000:5000 --name model-server --platform linux/amd64 -it permortenhalvorsen024/whale-speech:model-server-v0.0.2 +``` + +With the container created, and exposed, you should be able to log out of your ssh tunnel without disrupting the model server. + + +### 4. Test model server + +If you are using the default image (where logging is enabled), you should see that your server starts up and is listening on the specified port. +The IP address stated there is only the _internal_ IP address. You'll need to find the _external_ IP address of your instance to access the server from outside the instance. +This can be found in the GCP console, or directly in your ssh termial with the command: +```bash +curl ifconfig.me +``` + +Using the returned external IP address, you can test that your model running in your Google Compute Engine is accessible to the outside world by running the below command on your local machine: +```bash +python examples/test_model_server.py --inference_url your_external_ip --port 5000 +``` + +You should get a result similar to the below: +```bash +{ + 'key': 'encouter1', + 'predictions': [[0.006112830247730017], ...] +} +``` + +If you get an error, I suggest going through each step again, to see if you made any changes to the workflow. +If you still are experiencing issues, feel free to open an issue in this repo. + +## Conclusion +Your server should now be up and running, and ready to be used in the pipeline. +You can use this external IP address as the model uri when running the pipeline directly. + +Note, that what we set up here will incrue costs on your GCP account. +So, don't forget to shut down your instance when you are no longer using it. + +## Next steps +Some next steps on where to take your project from here: +- Set up a domain name for your model server. +- Set up HTTPS +- Wrap these steps into a terraform script for easy reproducibility and portability. For more on IaC, check out the [GCP docs](https://cloud.google.com/deployment-manager/docs/quickstart). + + diff --git a/docs/ladr/LADR_0006_model_server_deployment.md b/docs/ladr/LADR_0006_model_server_deployment.md new file mode 100644 index 0000000..64aed67 --- /dev/null +++ b/docs/ladr/LADR_0006_model_server_deployment.md @@ -0,0 +1,71 @@ +# Model Server Deployment + +Need to decide how to host the model server. + +When running locally, I've been starting a Flask server in a separate thread. +For my production environment, I will likely need a more robust solution. + +The desired solution should: +- Be fast, scalable, and stable +- Be cost-effective +- Be easy to deploy and maintain +- Have some version control + +## Options + +### 1. Compute Engine VM +**Pros**: +- Full control over the environment + - Easy to debug by ssh-ing into the VM + - Manually install and update needed dependencies +- Very similar to local development +- Can host multiple services on the same VM + - Ex. if inference server and pipeline triggers were on the same VM + +**Cons**: +- Requires more setup and maintenance + - Networking Firewall rules in GCP + - Monitoring and logging not built-in +- Not as scalable as other options +- Persistent servers would likely be more expensive than serverless options + +### 2. Cloud Run +**Pros**: +- Serverless + - Only pay for what you use + - Scales automatically +- Easy to deploy + - Can deploy and revise directly from `gcloud` or in the GCP console +- Built-in monitoring and logging +- Built-in version control (using image registry and/or tags) +- Exposes a public endpoint that can be triggered by HTTP requests + +**Cons**: +- Can only serve one contianer per service. Other services would need to be deployed separately. +- Haven't figured out how to scale up (to recieve large input requests) + +### 3. Kubernetes Engine +**Pros**: +- Full control over the environment +- Scalable +- Can host multiple services on the same cluster + +**Cons**: +- Takes a (relatively) long time to start and scale up +- Requires more setup and maintenance +- Not as cost-effective as serverless options +- Probably overkill for this project + + +## Decision +For this project, I'll use Cloud Run. +I tried a VM first, but realized it costs too much over time, and missed the ability to easily scale. + +Cloud Run worked pretty much out of the box, and I was able to deploy the model server in a few minutes. +Figuring out the correct PORT configuration was a bit cumbersome, though. + +I think the stateless nature will be a cheapest option for the end goal of this project. +During times of high activity, we can keep the minimum instance count at 1, to ensure faster response times. +Otherwise, we can scale down to 0 instances, and only pay for the storage of the container image (if using Artifact Registry). + +I just need to figure out how to scale up the instances to handle larger requests. diff --git a/examples/test.py b/examples/test.py new file mode 100644 index 0000000..c92c712 --- /dev/null +++ b/examples/test.py @@ -0,0 +1,14 @@ +import logging + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +# write logging to stdout +handler = logging.StreamHandler() +handler.setLevel(logging.INFO) +formatter = logging.Formatter('%(asctime)s: [%(levelname)s] %(message)s') +handler.setFormatter(formatter) +logger.addHandler(handler) + +logger.info("Logger successfully loaded") +print("Test file executed successfully") \ No newline at end of file diff --git a/examples/test_server.py b/examples/test_server.py new file mode 100644 index 0000000..de070e7 --- /dev/null +++ b/examples/test_server.py @@ -0,0 +1,18 @@ +import requests +import os + + +inference_url = os.environ.get("INFERENCE_URL", "0.0.0.0:5000/predict") + +data = { + "key": "encouter1", + "batch": [ + [0.3745401203632355], [0.9507142901420593], [0.7319939136505127], [0.5986585021018982], [0.15601864044243652], [0.15599452033620265] + ]*10_000*10, # (6 samples * 10_000 = 6 seconds )* 10 = 60 seconds +} + +response = requests.post(inference_url, json=data) +if response.status_code == 200: + print(response.json()) +else: + print(response.text) \ No newline at end of file diff --git a/makefile b/makefile index 9175998..ec82333 100644 --- a/makefile +++ b/makefile @@ -1,8 +1,17 @@ +VERSION := 1.0.0 +GIT_SHA := $(shell git rev-parse --short HEAD) +TAG := $(VERSION)-$(GIT_SHA) +PIPELINE_IMAGE_NAME := whale-speech/pipeline +MODEL_SERVER_IMAGE_NAME := whale-speech/model-server +PIPELINE_WORKER_IMAGE_NAME := whale-speech/pipeline-worker + +PUBLIC_MODEL_SERVER_IMAGE_NAME := $(shell echo $(MODEL_SERVER_IMAGE_NAME) | sed 's/\//-/g') +PUBLIC_PIPELINE_WORKER_IMAGE_NAME := $(shell echo $(PIPELINE_WORKER_IMAGE_NAME) | sed 's/\//-/g') + local-run: bash scripts/kill_model_server.sh python3 src/model_server.py & python3 src/pipeline.py bash scripts/kill_model_server.sh - python3 src/gcp.py --deduplicate run-pipeline: python3 src/pipeline.py @@ -17,4 +26,108 @@ gcp-init: python3 src/gcp.py --init gcp-deduplicate: - python3 src/gcp.py --deduplicate \ No newline at end of file + python3 src/gcp.py --deduplicate + +setup: + conda create -n whale-speech python=3.11 + conda activate whale-speech + sudo apt-get update + sudo apt-get install libhdf5-dev libsndfile1 gcc + python3 -m pip install -r requirements/requirements.txt + python3 -m pip install -r requirements/model-requirements.txt + + +# Docker related +check-uncommited: + git diff-index --quiet HEAD + +build: check-uncommited + docker build -t $(PIPELINE_IMAGE_NAME):$(TAG) --platform linux/amd64 . + +push: check-uncommited + docker tag $(PIPELINE_IMAGE_NAME):$(TAG) $(MODEL_REGISTERY)/$(PIPELINE_IMAGE_NAME):$(TAG) + docker push $(MODEL_REGISTERY)/$(PIPELINE_IMAGE_NAME):$(TAG) + +build-push: build push + + +# Model server related +build-model-server: check-uncommited + docker build -t $(MODEL_SERVER_IMAGE_NAME):$(TAG) --platform linux/amd64 -f Dockerfile.model-server . + +push-model-server: check-uncommited + docker tag $(MODEL_SERVER_IMAGE_NAME):$(TAG) $(MODEL_REGISTERY)/$(MODEL_SERVER_IMAGE_NAME):$(TAG) + docker push $(MODEL_REGISTERY)/$(MODEL_SERVER_IMAGE_NAME):$(TAG) + +push-model-server-latest: check-uncommited + docker tag $(MODEL_SERVER_IMAGE_NAME):$(TAG) $(MODEL_REGISTERY)/$(MODEL_SERVER_IMAGE_NAME):latest + docker push $(MODEL_REGISTERY)/$(MODEL_SERVER_IMAGE_NAME):latest + +build-push-model-server: build-model-server push-model-server push-model-server-latest + +publish-latest-model-server: build-model-server + docker tag $(MODEL_SERVER_IMAGE_NAME):$(TAG) $(PUBLIC_MODEL_REGISTERY)/$(PUBLIC_MODEL_SERVER_IMAGE_NAME):latest + docker push $(PUBLIC_MODEL_REGISTERY)/$(PUBLIC_MODEL_SERVER_IMAGE_NAME):latest + +test-server: + python3 examples/test_server.py + + +# Pipeline worker related +build-pipeline-worker: check-uncommited + docker build -t $(PIPELINE_WORKER_IMAGE_NAME):$(TAG) --platform linux/amd64 -f Dockerfile.pipeline-worker . + +push-pipeline-worker: check-uncommited + docker tag $(PIPELINE_WORKER_IMAGE_NAME):$(TAG) $(MODEL_REGISTERY)/$(PIPELINE_WORKER_IMAGE_NAME):$(TAG) + docker push $(MODEL_REGISTERY)/$(PIPELINE_WORKER_IMAGE_NAME):$(TAG) + +push-pipeline-worker-latest: check-uncommited + docker tag $(PIPELINE_WORKER_IMAGE_NAME):$(TAG) $(MODEL_REGISTERY)/$(PIPELINE_WORKER_IMAGE_NAME):latest + docker push $(MODEL_REGISTERY)/$(PIPELINE_WORKER_IMAGE_NAME):latest + +build-push-pipeline-worker: build-pipeline-worker push-pipeline-worker push-pipeline-worker-latest + +publish-latest-pipeline-worker: build-pipeline-worker + docker tag $(PIPELINE_WORKER_IMAGE_NAME):$(TAG) $(PUBLIC_MODEL_REGISTERY)/$(PUBLIC_PIPELINE_WORKER_IMAGE_NAME):latest + docker push $(PUBLIC_MODEL_REGISTERY)/$(PUBLIC_PIPELINE_WORKER_IMAGE_NAME):latest + + +# Pipeline run related +run-dataflow: + python3 src/pipeline.py \ + --job_name "whale-speech-$(GIT_SHA)" \ + --filesystem gcp \ + --inference_url $(INFERENCE_URL) \ + --runner DataflowRunner \ + --region us-central1 \ + --worker_machine_type=n1-highmem-8 \ + --disk_size_gb=100 \ + --num_workers=8 \ + --max_num_workers=8 \ + --autoscaling_algorithm=THROUGHPUT_BASED \ + --worker_harness_container_image=$(MODEL_REGISTERY)/$(PIPELINE_WORKER_IMAGE_NAME):$(TAG) \ + --start "2024-07-11" \ + --end "2024-07-11" \ + --offset 0 \ + --margin 1800 \ + --batch_duration 60 + +run-direct: + python3 src/pipeline.py \ + --job_name "whale-speech-$(GIT_SHA)" \ + --filesystem gcp \ + --inference_url $(INFERENCE_URL) \ + --runner DirectRunner \ + --worker_harness_container_image=$(MODEL_REGISTERY)/$(PIPELINE_WORKER_IMAGE_NAME) \ + --start "2024-07-11" \ + --end "2024-07-11" \ + --offset 0 \ + --margin 600 + + +rebuild-run-dataflow: build-push-pipeline-worker run-dataflow + +rebuild-run-direct: build-push-pipeline-worker run-direct + +show-url: + echo $(INFERENCE_URL) \ No newline at end of file diff --git a/requirements/model-requirements.txt b/requirements/model-requirements.txt new file mode 100644 index 0000000..2fa10f9 --- /dev/null +++ b/requirements/model-requirements.txt @@ -0,0 +1,5 @@ +flask==3.0.3 +numpy==1.26.4 +pyyaml==6.0.2 +tensorflow==2.17.0 +tensorflow_hub==0.16.1 \ No newline at end of file diff --git a/requirements.txt b/requirements/requirements.txt similarity index 88% rename from requirements.txt rename to requirements/requirements.txt index 85e0385..3b24daf 100644 --- a/requirements.txt +++ b/requirements/requirements.txt @@ -10,8 +10,6 @@ six tensorflow tensorflow_hub -tensorflow_text -tflite librosa soundfile diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..9256858 --- /dev/null +++ b/setup.py @@ -0,0 +1,22 @@ +import setuptools + +setuptools.setup( + name="whale-speech", + version="0.0.1", + packages=setuptools.find_packages(), + intall_requires=[ + "tensorflow", + "tensorflow_hub", + "numpy", + "scipy", + "matplotlib", + "apache_beam" + "git+https://github.com/open-oceans/happywhale.git", + "pytest", + "pyyaml", + "six", + "librosa", + "soundfile", + "apache-beam[gcp]", + ] +) \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/config.py b/src/config.py index f616280..ef9b1f6 100644 --- a/src/config.py +++ b/src/config.py @@ -1,7 +1,5 @@ - from types import SimpleNamespace -import apache_beam as beam import argparse import os import yaml @@ -27,9 +25,9 @@ def add_write_params(config): config["bigquery"] = { - "method": beam.io.WriteToBigQuery.Method.FILE_LOADS, - "create_disposition": beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - "write_disposition": beam.io.BigQueryDisposition.WRITE_APPEND # WRITE_APPEND or WRITE_TRUNCATE + "method": "FILE_LOADS", # beam.io.WriteToBigQuery.Method.FILE_LOADS, + "create_disposition": "CREATE_IF_NEEDED", # beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + "write_disposition": "WRITE_APPEND", # beam.io.BigQueryDisposition.WRITE_APPEND # or WRITE_TRUNCATE } return config diff --git a/src/config/common.yaml b/src/config/common.yaml index 73d891c..da75fa6 100644 --- a/src/config/common.yaml +++ b/src/config/common.yaml @@ -3,7 +3,9 @@ pipeline: verbose: true debug: true show_plots: false - is_local: false + filesystem: "local" + host: "0.0.0.0" + port: 5000 # gcp - bigquery project: "bioacoustics-2024" @@ -36,7 +38,7 @@ pipeline: search: output_path_template: "data/table/{table_id}/geofile={geofile}/encounters.csv" - filename: "monterey_bay_50km" + filename: "monterey_bay_20km" geometry_file_path_template: "data/geo/{filename}.geojson" species: "humpback_whale" search_columns: @@ -70,11 +72,11 @@ pipeline: url_template: "https://pacific-sound-16khz.s3.amazonaws.com/{year}/{month:02}/{filename}" filename_template: "MARS-{year}{month:02}{day:02}T000000Z-16kHz.wav" source_sample_rate: 16000 - margin: 30 # TODO set to 900 # seconds - offset: 13 # TODO set to 0 # hours + margin: 1800 # seconds + offset: 0 # hours - only used for cherry picking during development output_array_path_template: "data/audio/raw/key={key}/{filename}" output_table_path_template: "data/table/{table_id}/metadata.json" - skip_existing: false # if true, skip downstream processing of existing audio files (false during development) + skip_existing: true # if true, skip downstream processing of existing audio files (false during development) audio_table_id: "raw_audio" store_audio: true audio_table_schema: @@ -125,7 +127,7 @@ pipeline: sift_threshold: 0.015 classify: - batch_duration: 600 # seconds + batch_duration: 60 # seconds model_sample_rate: 10000 model_uri: https://www.kaggle.com/models/google/humpback-whale/TensorFlow2/humpback-whale/1 @@ -133,7 +135,7 @@ pipeline: inference_retries: 3 med_filter_size: 3 - plot_scores: true + plot_scores: false hydrophone_sensitivity: -168.8 plot_path_template: "data/plots/results/{params}/{plot_name}.png" output_array_path_template: "data/classifications/{params}/{key}.npy" diff --git a/src/config/gcp.yaml b/src/config/gcp.yaml index b2c3ff8..50d1074 100644 --- a/src/config/gcp.yaml +++ b/src/config/gcp.yaml @@ -3,8 +3,7 @@ pipeline: verbose: true debug: true show_plots: false - is_local: false - + filesystem: "gcp" audio: skip_existing: false # if true, skip downstream processing of existing audio files (false during development) diff --git a/src/config/local.yaml b/src/config/local.yaml index e1dbbe8..9d1cd33 100644 --- a/src/config/local.yaml +++ b/src/config/local.yaml @@ -3,7 +3,7 @@ pipeline: verbose: true debug: true show_plots: false - is_local: true + filesystem: "local" audio: skip_existing: false # if true, skip downstream processing of existing audio files (false during development) @@ -17,7 +17,7 @@ pipeline: classify: batch_duration: 600 # seconds inference_retries: 3 - plot_scores: true + plot_scores: false postprocess: min_gap: 60 # 1 minute diff --git a/src/gcp.py b/src/gcp.py index fb67b80..21c899f 100644 --- a/src/gcp.py +++ b/src/gcp.py @@ -87,7 +87,7 @@ def deduplicate(): parser.add_argument("--deduplicate", action="store_true", help="Dedupliacte BigQuery tables (config.general.tables)") args = parser.parse_args() - if config.general.is_local: + if config.general.filesystem == "local": logging.info("Running in local mode. Exiting.") exit() diff --git a/src/model_server.py b/src/model_server.py index d4b613d..5d76751 100644 --- a/src/model_server.py +++ b/src/model_server.py @@ -1,5 +1,6 @@ from flask import Flask, request, jsonify import tensorflow_hub as hub +import os import numpy as np import tensorflow as tf @@ -8,16 +9,20 @@ from config import load_pipeline_config config = load_pipeline_config() +# Enable verbose logging +logger = logging.getLogger("model_server") +logger.setLevel(logging.INFO) + # Load the TensorFlow model -logging.info("Loading model...") +logger.info("Loading model...") model = hub.load(config.classify.model_uri) score_fn = model.signatures["score"] -logging.info("Model loaded.") +logger.info("Model loaded.") # Initialize Flask app -app = Flask(__name__) +app = Flask("model_server") -# Define the predict endpoint +# Define inference endpoint @app.route('/predict', methods=['POST']) def predict(): try: @@ -25,19 +30,19 @@ def predict(): data = request.json batch = np.array(data['batch'], dtype=np.float32) # Assuming batch is passed as a list key = data['key'] - logging.info(f"batch.shape = {batch.shape}") + logger.info(f"batch.shape = {batch.shape}") # Prepare the input for the model waveform_exp = tf.expand_dims(batch, 0) # Expanding dimensions to fit model input shape - logging.debug(f"waveform_exp.shape = {waveform_exp.shape}") + logger.debug(f"waveform_exp.shape = {waveform_exp.shape}") # Run inference results = score_fn( waveform=waveform_exp, # waveform_exp, context_step_samples=config.classify.model_sample_rate )["scores"][0] # NOTE currently only support batch size 1 - logging.info(f"results.shape = {results.shape}") - logging.debug("results = ", results) + logger.info(f"results.shape = {results.shape}") + logger.debug("results = ", results) # Return the predictions and key as JSON return jsonify({ @@ -46,9 +51,12 @@ def predict(): }) except Exception as e: - logging.error(f"An error occurred: {str(e)}") + logger.error(f"An error occurred: {str(e)}") return jsonify({'error': str(e)}), 500 # Main entry point if __name__ == "__main__": - app.run(host='0.0.0.0', port=5000, debug=True) + port = os.environ.get('PORT', config.general.port) + + logger.info(f"Host: {config.general.host} port: {port}") + app.run(host=config.general.host, port=port, debug=config.general.debug) diff --git a/src/pipeline.py b/src/pipeline.py index 5017812..b3882a2 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -1,6 +1,9 @@ import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions + +from config import load_pipeline_config + from stages.search import GeometrySearch from stages.audio import RetrieveAudio from stages.sift import Butterworth @@ -8,13 +11,13 @@ from stages.postprocess import PostprocessLabels -from config import load_pipeline_config config = load_pipeline_config() def run(): # Initialize pipeline options pipeline_options = PipelineOptions( # runner="DataflowRunner", + # region="us-central1", project=config.general.project, temp_location=config.general.temp_location, ) @@ -38,5 +41,16 @@ def run(): ) +def setup(): + import gcp + gcp.initialize() + +def teardown(): + import gcp + gcp.deduplicate() + + if __name__ == "__main__": + setup() run() + teardown() diff --git a/src/stages/audio.py b/src/stages/audio.py index 49db069..806b922 100644 --- a/src/stages/audio.py +++ b/src/stages/audio.py @@ -22,7 +22,7 @@ class AudioTask(beam.DoFn): def __init__(self, config): self.debug = config.general.debug - self.is_local = config.general.is_local + self.filesystem = config.general.filesystem.lower() self.margin = config.audio.margin self.offset = config.audio.offset @@ -90,7 +90,7 @@ def _get_export_path( filename=filename ) - if not self.is_local: + if self.filesystem == "gcp": file_path = os.path.join( self.workbucket, file_path @@ -293,17 +293,17 @@ def _store( logging.info(f"Writing audio to {file_path}") logging.info(f"Audio shape: {audio.shape}") - if self.is_local: + if self.filesystem == "local": logging.info(f"Updating table {self.table_id} locally") self._store_local(key, file_path, start, end) - else: + elif self.filesystem == "gcp": logging.info(f"Updating table {self.table_id} in BigQuery") - self._store_bigquery(key, file_path, start, end) - + else: + raise ValueError(f"Invalid filesystem: {self.filesystem}") with filesystems.FileSystems.create(file_path) as f: - # same for local and gsc storage + # same for local and gcs storage np.save(f, audio) diff --git a/src/stages/classify.py b/src/stages/classify.py index 132ec4e..6d535b2 100644 --- a/src/stages/classify.py +++ b/src/stages/classify.py @@ -28,7 +28,7 @@ class BaseClassifier(beam.PTransform): def __init__(self, config: SimpleNamespace): self.config = config - self.is_local = config.general.is_local + self.filesystem = config.general.filesystem.lower() self.source_sample_rate = config.audio.source_sample_rate self.batch_duration = config.classify.batch_duration @@ -188,16 +188,18 @@ def _plot_spectrogram_scipy( return t, f, psd def _plot_audio(self, audio, start, key): - with open(f"data/plots/Butterworth/{start.year}/{start.month}/{start.day}/data/{key}_min_max.pkl", "rb") as f: - min_max_samples = pickle.load(f) - with open(f"data/plots/Butterworth/{start.year}/{start.month}/{start.day}/data/{key}_all.pkl", "rb") as f: - all_samples = pickle.load(f) - - def _plot_signal_detections(signal, min_max_detection_samples, all_samples): + try: + with open(f"data/plots/Butterworth/{start.year}/{start.month}/{start.day}/data/{key}_min_max.pkl", "rb") as f: + min_max_samples = pickle.load(f) + with open(f"data/plots/Butterworth/{start.year}/{start.month}/{start.day}/data/{key}_all.pkl", "rb") as f: + all_samples = pickle.load(f) + except FileNotFoundError: + min_max_samples = [] + all_samples = [] + + def _plot_signal_detections(min_max_detection_samples, all_samples): # TODO refactor plot_signal_detections in classify logging.info(f"Plotting signal detections: {min_max_detection_samples}") - - plt.plot(signal) # NOTE: for legend logic, plot min_max window first if len(min_max_detection_samples): @@ -228,8 +230,10 @@ def _plot_signal_detections(signal, min_max_detection_samples, all_samples): title = f"Signal detections: {start.strftime('%Y-%m-%d %H:%M:%S')}" plt.title(title) - - _plot_signal_detections(audio, min_max_samples, all_samples) + plt.plot(audio) + plt.xlabel(f'Samples (seconds * {self.config.audio.source_sample_rate} Hz)') + plt.ylabel('Amplitude (normalized and centered)') + _plot_signal_detections(min_max_samples, all_samples) def _plot(self, output): audio, start, end, encounter_ids, scores, _ = output @@ -279,7 +283,7 @@ def _get_export_path(self, key): key=key ) - if not self.is_local: + if self.filesystem == "gcp": export_path = os.path.join(self.workbucket, export_path) return export_path @@ -295,10 +299,12 @@ def _store(self, outputs): classifications_path = self._get_export_path(key) # update metadata table - if self.is_local: + if self.filesystem == "local": self._store_local(key, classifications_path) - else: + elif self.filesystem == "gcp": self._store_bigquery(key, classifications_path) + else: + raise ValueError(f"Unsupported filesystem: {self.filesystem}") # store classifications with filesystems.FileSystems.create(classifications_path) as f: @@ -384,7 +390,7 @@ def __init__(self, config: SimpleNamespace): def process(self, element): key, batch = element - logging.info(f"Sending batch to inference: {key} with {len(batch)} samples") + logging.info(f"Sending batch {key} with {len(batch)} samples to inference at: {self.inference_url}") # skip empty batches if len(batch) == 0: diff --git a/src/stages/postprocess.py b/src/stages/postprocess.py index f949eb3..b9bb028 100644 --- a/src/stages/postprocess.py +++ b/src/stages/postprocess.py @@ -24,7 +24,7 @@ def __init__(self, config: SimpleNamespace): self.table_id = config.postprocess.postprocess_table_id # storage params - self.is_local = config.general.is_local + self.filesystem = config.general.filesystem.lower() self.output_path = config.postprocess.output_path self.project = config.general.project self.dataset_id = config.general.dataset_id @@ -140,10 +140,12 @@ def _store(self, df: pd.DataFrame): if len(df) == 0: return - if self.is_local: + if self.filesystem == "local": self._store_local(df) - else: + elif self.filesystem == "gcp": self._store_bigquery(df) + else: + raise ValueError(f"Filesystem {self.filesystem} not supported.") def _store_bigquery(self, df: pd.DataFrame): write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE diff --git a/src/stages/search.py b/src/stages/search.py index 953439c..b9dfa66 100644 --- a/src/stages/search.py +++ b/src/stages/search.py @@ -3,17 +3,23 @@ from types import SimpleNamespace import apache_beam as beam +from apache_beam.utils.timestamp import Timestamp +from datetime import timedelta, timezone import io import logging import os import pandas as pd +import pytz + +logger = logging.getLogger(__name__) + class GeometrySearch(beam.DoFn): def __init__(self, config: SimpleNamespace): self.config = config - self.is_local = config.general.is_local + self.filesystem = config.general.filesystem.lower() self.species = config.search.species @@ -41,6 +47,8 @@ def process(self, element): geometry_file = self._get_geometry_file() export_file = self._get_file_buffer("csv") + + logger.info(f"Searching for {self.species} between {start} and {end} in {geometry_file}.") geometry_search(geometry_file, start,end, export_file, self.species) search_results = self._postprocess(export_file) @@ -52,7 +60,7 @@ def process(self, element): @staticmethod def _preprocess_date(date_str): - return date_str.split("T")[0] + return date_str.split("T")[0] if "T" in date_str else date_str @staticmethod @@ -93,6 +101,7 @@ def _postprocess(self, export_file) -> pd.DataFrame: export_file.seek(0) results = pd.read_csv(export_file) results = results[self.search_columns] + results = results.dropna() # NOTE: can be removed if encounter_time column type updated logging.info(f"Search results: \n{results.head()}") return results @@ -100,7 +109,7 @@ def _postprocess(self, export_file) -> pd.DataFrame: def _store(self, search_results): rows = self._convert_to_table_rows(search_results) - if self.is_local: + if self.filesystem == "local": # convert back to dataframe w7 correct columns search_results = pd.DataFrame(rows) if not filesystems.FileSystems.exists(self.output_path): @@ -114,7 +123,7 @@ def _store(self, search_results): search_results.drop_duplicates(inplace=True) search_results.to_csv(self.output_path, index=False) - else: + elif self.filesystem == "gcp": logging.info(f"search_results.columns: {search_results.columns}") rows | f"Update {self.table_id}" >> beam.io.WriteToBigQuery( @@ -125,6 +134,8 @@ def _store(self, search_results): custom_gcs_temp_location=self.temp_location, **self.write_params ) + else: + raise ValueError(f"Filesystem {self.filesystem} not supported.") logging.info(f"Stored search results in {self.output_path}") @@ -137,10 +148,28 @@ def _convert_to_table_rows(self, df): df["longitude"] = df["longitude"].astype(float) df["latitude"] = df["latitude"].astype(float) - df["encounter_time"] = df[["startDate", "startTime"]].apply( - lambda x: f"{x.startDate}T{x.startTime}", axis=1 - ) + df["encounter_time"] = df.apply(self._apply_timezone, axis=1) df = df[[*table_colums]] return df.to_dict(orient="records") + + @staticmethod + def _apply_timezone(row): + dt_str = f"{row['startDate']} {row['startTime']}" + dt = pd.to_datetime(dt_str) + try: + timezone_str = row['timezone'] + if timezone_str.startswith(('-', '+')): + # NOTE: store in UTC for global consistency + hours_offset = int(timezone_str[:3]) + minutes_offset = int(timezone_str[4:]) if len(timezone_str) > 3 else 0 + tz_offset = timezone(timedelta(hours=hours_offset, minutes=minutes_offset)) + dt = dt.replace(tzinfo=tz_offset) + + else: + dt = dt.tz_localize(pytz.timezone(timezone_str)) + except Exception as e: + raise e + + return dt.astimezone(pytz.UTC).isoformat() \ No newline at end of file diff --git a/src/stages/sift.py b/src/stages/sift.py index 162a2f0..bc318d0 100644 --- a/src/stages/sift.py +++ b/src/stages/sift.py @@ -31,7 +31,7 @@ class BaseSift(beam.PTransform): def __init__(self, config: SimpleNamespace): # general params self.debug = config.general.debug - self.is_local = config.general.is_local + self.filesystem = config.general.filesystem.lower() self.sample_rate = config.audio.source_sample_rate self.store = config.sift.store_sift_audio @@ -339,7 +339,7 @@ def _get_export_paths(self, key): filename="detections.npy" ) - if not self.is_local: + if self.filesystem == "gcp": audio_path = os.path.join(self.workbucket,audio_path) detections_path = os.path.join(self.workbucket,detections_path) @@ -364,20 +364,22 @@ def _store(self, sifted_output, detections): audio_path, detections_path = self._get_export_paths(key) # upload metadata to table - if self.is_local: + if self.filesystem == "local": self._store_local( key, audio_path, detections_path, self._get_path_params(), ) - else: + elif self.filesystem == "gcp": self._store_bigquery( key, audio_path, detections_path, self._get_path_params(), ) + else: + raise ValueError(f"Unsupported filesystem: {self.filesystem}") # store sifted audio with filesystems.FileSystems.create(audio_path) as f: diff --git a/tests/test_audio.py b/tests/test_audio.py index 4052e1d..e1d6df5 100644 --- a/tests/test_audio.py +++ b/tests/test_audio.py @@ -27,7 +27,7 @@ def config(): ), general=SimpleNamespace( debug = True, - is_local = True, + filesystem = "local", project="project", dataset_id="dataset_id", workbucket = "workbucket", diff --git a/tests/test_classify.py b/tests/test_classify.py index f74d593..ddec3a5 100644 --- a/tests/test_classify.py +++ b/tests/test_classify.py @@ -2,7 +2,6 @@ from types import SimpleNamespace import numpy as np -import pandas as pd import pytest from stages.classify import BaseClassifier, WhaleClassifier, InferenceClient @@ -13,7 +12,7 @@ def example_config(): return SimpleNamespace( general = SimpleNamespace( - is_local="is_local", + filesystem="local", show_plots=True, project="project", dataset_id="dataset_id", diff --git a/tests/test_postprocess.py b/tests/test_postprocess.py index 7660466..bb6f8cd 100644 --- a/tests/test_postprocess.py +++ b/tests/test_postprocess.py @@ -3,7 +3,7 @@ from datetime import datetime from types import SimpleNamespace -from src.stages.postprocess import PostprocessLabels +from stages.postprocess import PostprocessLabels @pytest.fixture def config(): @@ -22,7 +22,7 @@ def config(): pooled_score=SimpleNamespace(type="FLOAT", mode="REQUIRED"), ), ), - general=SimpleNamespace(project="project", dataset_id="dataset_id", is_local=True), + general=SimpleNamespace(project="project", dataset_id="dataset_id", filesystem="local"), ) @pytest.fixture diff --git a/tests/test_search.py b/tests/test_search.py index 43986dc..03aae71 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -1,8 +1,5 @@ -from datetime import datetime - import pytest import pandas as pd -import numpy as np from stages.search import GeometrySearch from types import SimpleNamespace @@ -36,7 +33,7 @@ def config(): ) ), general=SimpleNamespace( - is_local=True, + filesystem="local", project="project", dataset_id="dataset_id", temp_location="temp_location" diff --git a/tests/test_sift.py b/tests/test_sift.py index 6b40b7a..40f4146 100644 --- a/tests/test_sift.py +++ b/tests/test_sift.py @@ -5,7 +5,6 @@ import pytest from stages.sift import BaseSift, Butterworth -from unittest.mock import patch from types import SimpleNamespace @@ -46,7 +45,7 @@ def config(): ), general=SimpleNamespace( debug = True, - is_local = True, + filesystem = "local", project="project", dataset_id="dataset_id", workbucket = "workbucket", diff --git a/versions/whale-speech-0.0.1.zip b/versions/whale-speech-0.0.1.zip new file mode 100644 index 0000000..92c1d60 Binary files /dev/null and b/versions/whale-speech-0.0.1.zip differ