Skip to content

Commit

Permalink
chore(modeling): only set last run at after task completes (#27229)
Browse files Browse the repository at this point in the history
  • Loading branch information
EDsCODE authored Jan 3, 2025
1 parent 5f894e8 commit b1e06a4
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,13 @@ export const editorSidebarLogic = kea<editorSidebarLogicType>([
nonMaterializedViews: [
(s) => [s.dataWarehouseSavedQueries],
(views): DataWarehouseSavedQuery[] => {
return views.filter((view) => !view.status && !view.last_run_at)
return views.filter((view) => !view.status)
},
],
materializedViews: [
(s) => [s.dataWarehouseSavedQueries],
(views): DataWarehouseSavedQuery[] => {
return views.filter((view) => view.status || view.last_run_at)
return views.filter((view) => view.status)
},
],
activeListItemKey: [
Expand Down
11 changes: 6 additions & 5 deletions posthog/temporal/data_modeling/run_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,6 @@ class StartRunActivityInputs:
@temporalio.activity.defn
async def start_run_activity(inputs: StartRunActivityInputs) -> None:
"""Activity that starts a run by updating statuses of associated models."""
run_at = dt.datetime.fromisoformat(inputs.run_at)

try:
async with asyncio.TaskGroup() as tg:
Expand All @@ -552,7 +551,7 @@ async def start_run_activity(inputs: StartRunActivityInputs) -> None:
continue

tg.create_task(
update_saved_query_status(label, DataWarehouseSavedQuery.Status.RUNNING, run_at, inputs.team_id)
update_saved_query_status(label, DataWarehouseSavedQuery.Status.RUNNING, None, inputs.team_id)
)
except* Exception:
await logger.aexception("Failed to update saved query status when starting run")
Expand Down Expand Up @@ -581,7 +580,7 @@ async def finish_run_activity(inputs: FinishRunActivityInputs) -> None:

for label in inputs.failed:
tg.create_task(
update_saved_query_status(label, DataWarehouseSavedQuery.Status.FAILED, run_at, inputs.team_id)
update_saved_query_status(label, DataWarehouseSavedQuery.Status.FAILED, None, inputs.team_id)
)
except* Exception:
await logger.aexception("Failed to update saved query status when finishing run")
Expand All @@ -602,7 +601,7 @@ async def create_table_activity(inputs: CreateTableActivityInputs) -> None:


async def update_saved_query_status(
label: str, status: DataWarehouseSavedQuery.Status, run_at: dt.datetime, team_id: int
label: str, status: DataWarehouseSavedQuery.Status, run_at: typing.Optional[dt.datetime], team_id: int
):
filter_params: dict[str, int | str | uuid.UUID] = {"team_id": team_id}

Expand All @@ -613,7 +612,9 @@ async def update_saved_query_status(
filter_params["name"] = label

saved_query = await database_sync_to_async(DataWarehouseSavedQuery.objects.filter(**filter_params).get)()
saved_query.last_run_at = run_at

if run_at:
saved_query.last_run_at = run_at
saved_query.status = status

await database_sync_to_async(saved_query.save)()
Expand Down

0 comments on commit b1e06a4

Please sign in to comment.