Skip to content

Commit

Permalink
Migration hook methods for loading dagruns and TIs
Browse files Browse the repository at this point in the history
  • Loading branch information
astro-anand committed Apr 1, 2024
1 parent f96c355 commit 1e84ca8
Showing 1 changed file with 27 additions and 6 deletions.
33 changes: 27 additions & 6 deletions astronomer_starship/compat/starship_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def get_task_instances(
self,
dag_id: str,
limit: int = 5,
) -> requests.Response:
) -> dict:
task_instances = urljoin(self.webserver_url, StarshipAPIHook.TASK_INSTANCES)
resp = _request(
type="get",
Expand All @@ -119,7 +119,20 @@ def get_task_instances(
headers=self.headers,
params={"dag_id": dag_id, "limit": limit},
)
return resp
return resp.json()

def set_task_instances(self, task_instances: list[dict]):
task_instance_endpoint = urljoin(
self.webserver_url, StarshipAPIHook.TASK_INSTANCES
)
resp = _request(
type="post",
endpoint=task_instance_endpoint,
auth=self.auth,
headers=self.headers,
json={"task_instances": task_instances},
)
return resp.json()

def set_dag_state(
self,
Expand Down Expand Up @@ -178,7 +191,15 @@ def load_dagruns_to_target(
dag_id=dag_id,
action="pause",
)
dag_runs = self.source_api_hook.get_dagruns(
dag_id=dag_id,
)
self.target_api_hook.set_dagruns(dag_runs=dag_runs["dag_runs"])
self.get_and_set_dagruns(dag_id)
self.get_and_set_task_instances(dag_id)

def get_and_set_dagruns(self, dag_id):
dag_runs = self.source_api_hook.get_dagruns(
dag_id=dag_id,
)
self.target_api_hook.set_dagruns(dag_runs=dag_runs["dag_runs"])

def get_and_set_task_instances(self, dag_id):
task_instances = self.source_api_hook.get_task_instances(dag_id=dag_id)
self.target_api_hook.set_task_instances(task_instances=task_instances)

0 comments on commit 1e84ca8

Please sign in to comment.