Skip to content

Commit

Permalink
Creating a map of workflows and their statuses.
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinpalis committed Sep 25, 2024
1 parent 955c161 commit 5e60f62
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 86 deletions.
51 changes: 10 additions & 41 deletions .github/workflows/test_illumina_genotyping_array.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,56 +94,25 @@ jobs:
echo "Submission ID: $SUBMISSION_ID"
# 2. Poll submission status and capture all Workflow IDs from the response
# 2. Poll submission status and get workflow IDs and statuses
RESPONSE=$(firecloud_action poll_status --submission_id "$SUBMISSION_ID")
#print the response
echo "Response: $RESPONSE"
#Parse the response to get the workflow IDs
WORKFLOW_IDS=$(echo "$RESPONSE" | grep -oP 'workflow_id:\s*\K\S+')
#WORKFLOW_IDS=$(firecloud_action poll_status --submission_id "$SUBMISSION_ID" | grep -oP 'workflow_id:\s*\K\S+')
# Check if Workflow IDs were retrieved
if [ -z "$WORKFLOW_IDS" ]; then
# Parse the JSON response to get the workflow ID and statuses
echo "Workflows and their statuses:"
echo "$RESPONSE" | jq
# Check if RESPONSE is empty
if [ -z "$RESPONSE" ]; then
echo "Failed to retrieve Workflow IDs."
exit 1
fi
echo "Workflow IDs: $WORKFLOW_IDS"
# 3. Iterate over all Workflow IDs to get outputs
PIPELINE_NAME="IlluminaGenotypingArray" # Replace with your actual Pipeline name
# 3. Iterate over the Workflow IDs to get outputs
PIPELINE_NAME="IlluminaGenotypingArray"
for WORKFLOW_ID in $WORKFLOW_IDS; do
for WORKFLOW_ID in $(echo "$RESPONSE" | jq -r 'keys[]'); do
firecloud_action get_outputs --submission_id "$SUBMISSION_ID" --workflow_id "$WORKFLOW_ID" --pipeline_name "$PIPELINE_NAME"
done
echo "Workflow outputs retrieved successfully."
# - name: Check the status of a Terra submission
# run: |
# submissionId="44d890f7-f6dd-4a2c-8841-a10faeab3a07"
# namespace="warp-pipelines"
# name="Illumina-Genotyping-Array_np_copy"
# # Assign the access token to a variable
# ACCESS_TOKEN="${{ steps.auth.outputs.access_token }}"

# printf "\nFetching status for submission ID '%s':" "${submissionId}"
# submissionDetails=$(curl \
# -X GET \
# --header 'Accept: application/json' \
# --header "Authorization: Bearer ${ACCESS_TOKEN}" \
# "https://api.firecloud.org/api/workspaces/$namespace/$name/submissions/$submissionId")

# printf "\nFull JSON Response '%s':" "${submissionDetails}"
# submissionStatus=$(jq -r '.status' <<< "${submissionDetails}")
# workflowsStatus=$(jq -r '.workflows[] | .status' <<< "${submissionDetails}")
# printf "\nSubmissionStatus '%s':" "${submissionStatus}"
# printf "\nWorkflowsStatus '%s':" "${workflowsStatus}"

# submissionList=$(curl -X 'GET' \
# "https://api.firecloud.org/api/workspaces/$namespace/$name/submissions" \
# -H 'accept: */*' \
# -H "Authorization: Bearer ${ACCESS_TOKEN}")

# printf "\nSubmissionList '%s':" "${submissionList}"
74 changes: 29 additions & 45 deletions scripts/firecloud_api/firecloud_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import requests
import time
import json
import sys

class FirecloudAPI:
def __init__(self, token, namespace, workspace_name):
Expand Down Expand Up @@ -82,73 +83,50 @@ def create_submission(self, submission_data):

def poll_submission_status(self, submission_id):
"""
Polls the status of a submission until it is complete and returns all workflow IDs.
Polls the status of a submission until it is complete and returns a dictionary of workflow IDs and their statuses.
:param submission_id: The ID of the submission to poll
:return: List of workflow IDs if successful, None otherwise
:return: Dictionary with workflow IDs as keys and their statuses as values
"""
# Construct the API endpoint URL for polling submission status
status_url = f"{self.base_url}/workspaces/{self.namespace}/{self.workspace_name}/submissions/{submission_id}"
previous_workflow_status = []
workflow_ids = []
#print("Polling submission status...") # Added for debugging
workflow_status_map = {}

# Continuously poll the status of the submission until completion
while True:

status_response = requests.get(status_url, headers=self.headers)

# Check if the response status code is successful (200)
if status_response.status_code != 200:
print(f"Error: Received status code {status_response.status_code}")
print(f"Response content: {status_response.text}")
break
print(f"Response content: {status_response.text}", file=sys.stderr)
return {}

try:
# Parse the response as JSON
status_data = status_response.json()
except json.JSONDecodeError as e:
print("Error decoding JSON response:", e)
print(f"Response content: {status_response.text}")
break
print("Error decoding JSON response:", file=sys.stderr)
print(f"Response content: {status_response.text}", file=sys.stderr)
return {}

# Retrieve the overall submission status
submission_status = status_data.get("status", "")
# Retrieve workflows and their statuses
workflows = status_data.get("workflows", [])

# Iterate over all workflows and extract relevant information
for workflow in workflows:
workflow_id = workflow.get("workflowId")
workflow_status = workflow.get("status")
input_resolutions = workflow.get("inputResolutions", [])

# Print the workflow ID, status, and input details
print(f"Workflow ID: {workflow_id}, Status: {workflow_status}")
for input_res in input_resolutions:
input_name = input_res.get("inputName")
input_value = input_res.get("value", "N/A")
print(f" Input: {input_name}, Value: {input_value}")

# Store the workflow ID
if workflow_id:
workflow_ids.append(workflow_id)

# Print the workflow statuses
workflows_status = [workflow.get("status") for workflow in workflows]
if workflows_status != previous_workflow_status:
print(f"Workflows Status: {workflows_status}")
previous_workflow_status = workflows_status

# Check if the submission is complete and if any workflow has failed
if submission_status == "Done" and "Failed" in workflows_status:
print("At least one workflow has failed.")
break
elif submission_status == "Done":
if workflow_id and workflow_status:
workflow_status_map[workflow_id] = workflow_status

# Check if the submission is complete
submission_status = status_data.get("status", "")
if submission_status == "Done":
break

# Wait for 60 seconds before polling again
time.sleep(60)

return workflow_ids if workflow_ids else None
return workflow_status_map


# Bash Script Interaction
Expand Down Expand Up @@ -190,8 +168,14 @@ def poll_submission_status(self, submission_id):
print(submission_id)

elif args.action == 'poll_status':
if not args.submission_id:
print("For 'poll_status', --submission_id is required.")
else:
workflow_ids = firecloud_api.poll_submission_status(args.submission_id)
print(f"Workflow IDs: {workflow_ids}")
if not args.submission_id:
print("For 'poll_status', --submission_id is required.")
else:
workflow_status_map = firecloud_api.poll_submission_status(args.submission_id)

# Convert the dictionary to a JSON string and print it
if workflow_status_map:
import json
print(json.dumps(workflow_status_map)) # This will output the dictionary as a JSON string
else:
print("No workflows found or an error occurred.")

0 comments on commit 5e60f62

Please sign in to comment.