diff --git a/airflow/www/static/js/api/index.ts b/airflow/www/static/js/api/index.ts index 4ab90292a48dd..b04da5259a99c 100644 --- a/airflow/www/static/js/api/index.ts +++ b/airflow/www/static/js/api/index.ts @@ -51,6 +51,7 @@ import useDagRuns from "./useDagRuns"; import useHistoricalMetricsData from "./useHistoricalMetricsData"; import { useTaskXcomEntry, useTaskXcomCollection } from "./useTaskXcom"; import useEventLogs from "./useEventLogs"; +import useCalendarData from "./useCalendarData"; axios.interceptors.request.use((config) => { config.paramsSerializer = { @@ -98,4 +99,5 @@ export { useTaskXcomEntry, useTaskXcomCollection, useEventLogs, + useCalendarData, }; diff --git a/airflow/www/static/js/api/useCalendarData.ts b/airflow/www/static/js/api/useCalendarData.ts new file mode 100644 index 0000000000000..e2a7171e8abb4 --- /dev/null +++ b/airflow/www/static/js/api/useCalendarData.ts @@ -0,0 +1,48 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { useQuery } from "react-query"; +import axios, { AxiosResponse } from "axios"; + +import { getMetaValue } from "src/utils"; + +const DAG_ID_PARAM = "dag_id"; + +const dagId = getMetaValue(DAG_ID_PARAM); +const calendarDataUrl = getMetaValue("calendar_data_url"); + +interface DagState { + count: number; + date: string; + state: string; +} + +interface CalendarData { + dagStates: DagState[]; +} + +const useCalendarData = () => + useQuery(["calendarData"], async () => { + const params = { + [DAG_ID_PARAM]: dagId, + }; + return axios.get(calendarDataUrl, { params }); + }); + +export default useCalendarData; diff --git a/airflow/www/static/js/dag/details/dag/Calendar.tsx b/airflow/www/static/js/dag/details/dag/Calendar.tsx new file mode 100644 index 0000000000000..105c20299500e --- /dev/null +++ b/airflow/www/static/js/dag/details/dag/Calendar.tsx @@ -0,0 +1,195 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* global moment */ + +import React from "react"; +import type { EChartsOption } from "echarts"; +import { Spinner } from "@chakra-ui/react"; + +import ReactECharts from "src/components/ReactECharts"; +import { useCalendarData } from "src/api"; +import useFilters from "src/dag/useFilters"; + +const Calendar = () => { + const { onBaseDateChange } = useFilters(); + const { data: calendarData, isLoading } = useCalendarData(); + + if (isLoading) return ; + if (!calendarData) return null; + + const { dagStates } = calendarData; + + const startDate = dagStates[0].date; + const endDate = dagStates[dagStates.length - 1].date; + // @ts-ignore + const startYear = moment(startDate).year(); + // @ts-ignore + const endYear = moment(endDate).year(); + + const calendarOption: EChartsOption["calendar"] = []; + const seriesOption: EChartsOption["series"] = []; + + const flatDates: Record = {}; + const plannedDates: Record = {}; + dagStates.forEach((ds) => { + if (ds.state !== "planned") { + flatDates[ds.date] = { + ...flatDates[ds.date], + [ds.state]: ds.count, + }; + } else { + plannedDates[ds.date] = { + [ds.state]: ds.count, + }; + } + }); + + const proportions = Object.keys(flatDates).map((key) => { + const date = key; + const states = flatDates[key]; + const total = + (states.failed || 0) + (states.success || 0) + (states.running || 0); + const percent = ((states.success || 0) + (states.running || 0)) / total; + return [date, Math.round(percent * 100)]; + }); + + // We need to split the data into multiple years of calendars + if (startYear !== endYear) { + for (let y = startYear; y <= endYear; y += 1) { + const index = y - startYear; + const yearStartDate = y === startYear ? startDate : `${y}-01-01`; + const yearEndDate = `${y}-12-31`; + calendarOption.push({ + left: 100, + top: index * 150 + 20, + range: [yearStartDate, yearEndDate], + cellSize: 15, + }); + seriesOption.push({ + calendarIndex: index, + type: "heatmap", + coordinateSystem: "calendar", + data: proportions.filter( + (p) => typeof p[0] === "string" && p[0].startsWith(y.toString()) + ), + }); + seriesOption.push({ + calendarIndex: index, + type: "scatter", + coordinateSystem: "calendar", + symbolSize: 4, + data: dagStates + .filter( + (ds) => ds.date.startsWith(y.toString()) && ds.state === "planned" + ) + .map((ds) => [ds.date, ds.count]), + }); + } + } else { + calendarOption.push({ + top: 20, + left: 100, + range: [startDate, `${endYear}-12-31`], + cellSize: 15, + }); + seriesOption.push({ + type: "heatmap", + coordinateSystem: "calendar", + data: proportions, + }); + seriesOption.push({ + type: "scatter", + coordinateSystem: "calendar", + symbolSize: () => 4, + data: dagStates + .filter((ds) => ds.state === "planned") + .map((ds) => [ds.date, ds.count]), + }); + } + + const scatterIndexes: number[] = []; + const heatmapIndexes: number[] = []; + + seriesOption.forEach((s, i) => { + if (s.type === "heatmap") heatmapIndexes.push(i); + else if (s.type === "scatter") scatterIndexes.push(i); + }); + + const option: EChartsOption = { + tooltip: { + formatter: (p: any) => { + const date = p.data[0]; + const states = flatDates[date]; + const plannedCount = + p.componentSubType === "scatter" + ? p.data[1] + : plannedDates[date]?.planned || 0; + // @ts-ignore + const formattedDate = moment(date).format("ddd YYYY-MM-DD"); + + return ` + ${formattedDate}
+ ${plannedCount ? `Planned ${plannedCount}
` : ""} + ${states?.failed ? `Failed ${states.failed}
` : ""} + ${states?.running ? `Running ${states.running}
` : ""} + ${states?.success ? `Success ${states.success}
` : ""} + `; + }, + }, + visualMap: [ + { + min: 0, + max: 100, + text: ["% Success", "Failed"], + calculable: true, + orient: "vertical", + left: "0", + top: "0", + seriesIndex: heatmapIndexes, + inRange: { + color: [ + stateColors.failed, + stateColors.up_for_retry, + stateColors.success, + ], + }, + }, + { + seriesIndex: scatterIndexes, + inRange: { + color: "gray", + opacity: 0.6, + }, + }, + ], + calendar: calendarOption, + series: seriesOption, + }; + + const events = { + click(p: any) { + onBaseDateChange(p.data[0]); + }, + }; + + return ; +}; + +export default Calendar; diff --git a/airflow/www/static/js/dag/details/index.tsx b/airflow/www/static/js/dag/details/index.tsx index 0c3315e4dc922..6ef60e0120674 100644 --- a/airflow/www/static/js/dag/details/index.tsx +++ b/airflow/www/static/js/dag/details/index.tsx @@ -43,6 +43,7 @@ import { MdSyncAlt, MdHourglassBottom, MdPlagiarism, + MdEvent, } from "react-icons/md"; import { BiBracket } from "react-icons/bi"; import URLSearchParamsWrapper from "src/utils/URLSearchParamWrapper"; @@ -66,6 +67,7 @@ import XcomCollection from "./taskInstance/Xcom"; import TaskDetails from "./task"; import AuditLog from "./AuditLog"; import RunDuration from "./dag/RunDuration"; +import Calendar from "./dag/Calendar"; const dagId = getMetaValue("dag_id")!; @@ -92,6 +94,7 @@ const tabToIndex = (tab?: string) => { case "run_duration": return 5; case "xcom": + case "calendar": return 6; case "details": default: @@ -129,6 +132,7 @@ const indexToTab = ( if (!runId && !taskId) return "run_duration"; return undefined; case 6: + if (!runId && !taskId) return "calendar"; if (isTaskInstance) return "xcom"; return undefined; default: @@ -323,6 +327,14 @@ const Details = ({ )} + {isDag && ( + + + + Calendar + + + )} {isTaskInstance && ( @@ -418,6 +430,11 @@ const Details = ({ )} + {isDag && ( + + + + )} {isTaskInstance && run && ( + diff --git a/airflow/www/views.py b/airflow/www/views.py index f94e9624d3361..5cffd28aaa3b3 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -116,6 +116,7 @@ from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS from airflow.timetables._cron import CronMixin from airflow.timetables.base import DataInterval, TimeRestriction +from airflow.timetables.simple import ContinuousTimetable from airflow.utils import json as utils_json, timezone, yaml from airflow.utils.airflow_flask_app import get_airflow_app from airflow.utils.dag_edges import dag_edges @@ -2940,6 +2941,103 @@ def calendar(self, dag_id: str, session: Session = NEW_SESSION): dag_model=dag_model, ) + @expose("/object/calendar_data") + @auth.has_access_dag("GET", DagAccessEntity.RUN) + @gzipped + @provide_session + def calendar_data(self, session: Session = NEW_SESSION): + """Get DAG runs as calendar.""" + dag_id = request.args.get("dag_id") + dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session) + if not dag: + return {"error": f"can't find dag {dag_id}"}, 404 + + dag_states = session.execute( + select( + func.date(DagRun.execution_date).label("date"), + DagRun.state, + func.max(DagRun.data_interval_start).label("data_interval_start"), + func.max(DagRun.data_interval_end).label("data_interval_end"), + func.count("*").label("count"), + ) + .where(DagRun.dag_id == dag.dag_id) + .group_by(func.date(DagRun.execution_date), DagRun.state) + .order_by(func.date(DagRun.execution_date).asc()) + ).all() + + data_dag_states = [ + { + # DATE() in SQLite and MySQL behave differently: + # SQLite returns a string, MySQL returns a date. + "date": dr.date if isinstance(dr.date, str) else dr.date.isoformat(), + "state": dr.state, + "count": dr.count, + } + for dr in dag_states + ] + + # Interpret the schedule and show planned dag runs in calendar + if ( + dag_states + and dag_states[-1].data_interval_start + and dag_states[-1].data_interval_end + and not isinstance(dag.timetable, ContinuousTimetable) + ): + last_automated_data_interval = DataInterval( + timezone.coerce_datetime(dag_states[-1].data_interval_start), + timezone.coerce_datetime(dag_states[-1].data_interval_end), + ) + + year = last_automated_data_interval.end.year + restriction = TimeRestriction(dag.start_date, dag.end_date, False) + dates: dict[datetime.date, int] = collections.Counter() + + if isinstance(dag.timetable, CronMixin): + # Optimized calendar generation for timetables based on a cron expression. + dates_iter: Iterator[datetime.datetime | None] = croniter( + dag.timetable._expression, + start_time=last_automated_data_interval.end, + ret_type=datetime.datetime, + ) + for dt in dates_iter: + if dt is None: + break + if dt.year != year: + break + if dag.end_date and dt > dag.end_date: + break + dates[dt.date()] += 1 + else: + prev_logical_date = DateTime.min + while True: + curr_info = dag.timetable.next_dagrun_info( + last_automated_data_interval=last_automated_data_interval, + restriction=restriction, + ) + if curr_info is None: + break # Reached the end. + if curr_info.logical_date <= prev_logical_date: + break # We're not progressing. Maybe a malformed timetable? Give up. + if curr_info.logical_date.year != year: + break # Crossed the year boundary. + last_automated_data_interval = curr_info.data_interval + dates[curr_info.logical_date.date()] += 1 + prev_logical_date = curr_info.logical_date + + data_dag_states.extend( + {"date": date.isoformat(), "state": "planned", "count": count} + for (date, count) in dates.items() + ) + + data = { + "dag_states": data_dag_states, + } + + return ( + htmlsafe_json_dumps(data, separators=(",", ":"), dumps=flask.json.dumps), + {"Content-Type": "application/json; charset=utf-8"}, + ) + @expose("/graph") def legacy_graph(self): """Redirect from url param."""