Skip to content

Commit

Permalink
Résolution du problème des alertes qui remontent malgré la présence d…
Browse files Browse the repository at this point in the history
…'une suspension d'alerte (#3971)

## Linked issues

- Resolve #3965
  • Loading branch information
louptheron authored Dec 18, 2024
2 parents 95db4d7 + 841e865 commit 95302c9
Show file tree
Hide file tree
Showing 15 changed files with 305 additions and 138 deletions.
3 changes: 3 additions & 0 deletions datascience/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@
RISK_FACTOR_VERIFICATION_THRESHOLD = 2.3
FLAG_STATES_WITHOUT_SYSTEMATIC_VERIFICATION = ["FRA"]

# Missing DEP alerts configuration
MISSING_DEP_TRACK_ANALYSIS_HOURS = 48

# App URL
MONITORFISH_URL = os.getenv("MONITORFISH_URL") # http://monitor.fish/
BACKOFFICE_REGULATION_URL = MONITORFISH_URL + "backoffice/regulation"
Expand Down
16 changes: 11 additions & 5 deletions datascience/src/pipeline/flows/missing_dep_alerts.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from pathlib import Path

from prefect import Flow, case, task
from prefect import Flow, Parameter, case, task
from prefect.executors import LocalDaskExecutor

from config import MISSING_DEP_TRACK_ANALYSIS_HOURS
from src.pipeline.entities.alerts import AlertType
from src.pipeline.generic_tasks import extract
from src.pipeline.shared_tasks.alerts import (
Expand All @@ -16,23 +17,28 @@


@task(checkpoint=False)
def extract_missing_deps():
def extract_missing_deps(hours_from_now: int):
return extract(
db_name="monitorfish_remote", query_filepath="monitorfish/missing_deps.sql"
db_name="monitorfish_remote",
query_filepath="monitorfish/missing_deps.sql",
params={"hours_from_now": hours_from_now},
)


with Flow("Missing DEP alerts", executor=LocalDaskExecutor()) as flow:
flow_not_running = check_flow_not_running()
with case(flow_not_running, True):
vessels_with_missing_deps = extract_missing_deps()
hours_from_now = Parameter("hours_from_now", MISSING_DEP_TRACK_ANALYSIS_HOURS)
vessels_with_missing_deps = extract_missing_deps(hours_from_now)

alerts = make_alerts(
vessels_with_missing_deps,
AlertType.MISSING_DEP_ALERT.value,
AlertType.MISSING_DEP_ALERT.value,
)
silenced_alerts = extract_silenced_alerts(AlertType.MISSING_DEP_ALERT.value)
silenced_alerts = extract_silenced_alerts(
AlertType.MISSING_DEP_ALERT.value, number_of_hours=hours_from_now
)
active_reportings = extract_active_reportings(AlertType.MISSING_DEP_ALERT.value)
filtered_alerts = filter_alerts(alerts, silenced_alerts, active_reportings)

Expand Down
26 changes: 21 additions & 5 deletions datascience/src/pipeline/flows/missing_far_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@


@task(checkpoint=False)
def get_dates(days_without_far: int) -> Tuple[datetime, datetime, datetime, datetime]:
def get_dates(
days_without_far: int,
) -> Tuple[datetime, datetime, datetime, datetime, float]:
"""
Returns the dates used in the flow as a 4-tuple :
Returns the dates used in the flow as a 5-tuple :
- `days_without_far` days ago at 00:00 (beginning of the day) in UTC
- `days_without_far` days ago at 00:00 (beginning of the day) in UTC (1)
- Yesterday at 8pm in UTC
- Today at 00:00 (beginning of the day) in UTC
- Current datetime in UTC
- Current datetime in UTC (2)
- The number of hours that separate 1 and 2
Returns:
Tuple[datetime, datetime, datetime]
Expand All @@ -43,12 +46,16 @@ def get_dates(days_without_far: int) -> Tuple[datetime, datetime, datetime, date
today_at_zero_hours = utcnow.replace(hour=0, minute=0, second=0, microsecond=0)
period_start_at_zero_hours = today_at_zero_hours - timedelta(days=days_without_far)
yesterday_at_eight_pm = today_at_zero_hours - timedelta(hours=4)
period_start_hours_from_now = (
utcnow - period_start_at_zero_hours
).total_seconds() / 3600

return (
period_start_at_zero_hours,
yesterday_at_eight_pm,
today_at_zero_hours,
utcnow,
period_start_hours_from_now,
)


Expand Down Expand Up @@ -301,10 +308,16 @@ def get_vessels_at_sea(positions_at_sea: pd.DataFrame, min_days: int) -> pd.Data
"vessel_name",
"flag_state",
"facade",
"date_time",
"latitude",
"longitude",
]
]
.rename(
columns={
"date_time": "triggering_behaviour_datetime_utc",
}
)
.reset_index(drop=True)
)
return vessels_at_sea
Expand Down Expand Up @@ -426,6 +439,7 @@ def merge_risk_factor(
yesterday_at_eight_pm,
today_at_zero_hours,
utcnow,
period_start_hours_from_now,
) = get_dates(days_without_far)

positions_at_sea_yesterday_everywhere_query = make_positions_at_sea_query(
Expand Down Expand Up @@ -497,7 +511,9 @@ def merge_risk_factor(
districts_columns_to_add=["dml"],
)
alerts = make_alerts(vessels_with_missing_fars, alert_type, alert_config_name)
silenced_alerts = extract_silenced_alerts(alert_type)
silenced_alerts = extract_silenced_alerts(
alert_type, number_of_hours=period_start_hours_from_now
)
alert_without_silenced = filter_alerts(alerts, silenced_alerts)

# Load
Expand Down
6 changes: 4 additions & 2 deletions datascience/src/pipeline/flows/position_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ def get_vessels_in_alert(positions_in_alert: pd.DataFrame) -> pd.DataFrame:
]
.rename(
columns={
"date_time": "creation_date",
"date_time": "triggering_behaviour_datetime_utc",
}
)
.reset_index(drop=True)
Expand Down Expand Up @@ -526,7 +526,9 @@ def get_vessels_in_alert(positions_in_alert: pd.DataFrame) -> pd.DataFrame:
districts_columns_to_add=["dml"],
)
alerts = make_alerts(vessels_in_alert, alert_type, alert_config_name)
silenced_alerts = extract_silenced_alerts(alert_type)
silenced_alerts = extract_silenced_alerts(
alert_type, number_of_hours=hours_from_now
)
alert_without_silenced = filter_alerts(alerts, silenced_alerts)
load_alerts(alert_without_silenced, alert_config_name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ def extract_suspicions_of_under_declaration():
AlertType.SUSPICION_OF_UNDER_DECLARATION_ALERT.value,
)
silenced_alerts = extract_silenced_alerts(
AlertType.SUSPICION_OF_UNDER_DECLARATION_ALERT.value
AlertType.SUSPICION_OF_UNDER_DECLARATION_ALERT.value,
# 8 days, to cover the date range analyzed in
# `extract_suspicions_of_under_declaration`
number_of_hours=192,
)
active_reportings = extract_active_reportings(
AlertType.SUSPICION_OF_UNDER_DECLARATION_ALERT.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ WITH detected_recent_deps AS (
LEFT JOIN districts d
ON d.district_code = v.district_code
WHERE
p.date_time >= CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '48 hours'
p.date_time >= CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL ':hours_from_now hours'
AND p.date_time < CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '2 hours'
AND p.is_at_port = false
AND time_emitting_at_sea = INTERVAL '0'
Expand All @@ -37,6 +37,7 @@ SELECT
d.dml,
d.flag_state,
lp.risk_factor,
d.date_time AS triggering_behaviour_datetime_utc,
d.latitude,
d.longitude
FROM detected_recent_deps d
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
SELECT DISTINCT
SELECT
internal_reference_number,
external_reference_number,
ircs
ircs,
MAX(silenced_before_date) AT TIME ZONE 'UTC' AS silenced_before_date
FROM silenced_alerts
WHERE
NOW() < silenced_before_date
silenced_before_date > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL ':number_of_hours HOURS'
AND value->>'type' = :alert_type
GROUP BY 1, 2, 3
ORDER BY 1, 2, 3
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ SELECT
fe.dml,
fe.flag_state,
lp.risk_factor,
DATE_TRUNC('day', CURRENT_TIMESTAMP AT TIME ZONE 'UTC') - INTERVAL '7 days' AS triggering_behaviour_datetime_utc,
lp.latitude,
lp.longitude
FROM fishing_efforts fe
Expand Down
55 changes: 35 additions & 20 deletions datascience/src/pipeline/shared_tasks/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,31 @@
from src.pipeline.generic_tasks import extract, load
from src.pipeline.processing import (
df_to_dict_series,
join_on_multiple_keys,
left_isin_right_by_decreasing_priority,
)
from src.pipeline.utils import delete_rows, get_table


@task(checkpoint=False)
def extract_silenced_alerts(alert_type: str) -> pd.DataFrame:
def extract_silenced_alerts(alert_type: str, number_of_hours: int = 0) -> pd.DataFrame:
"""
Return DataFrame of vessels with active silenced alerts of the given type.
Args:
alert_type (str): Type of alert for which to extract silenced alerts
number_of_hours (int, optional): Number of hours from current time to extract.
Defaults to 0.
Returns:
pd.DataFrame: Silenced alerts with columns
"""

alert_type = AlertType(alert_type)
return extract(
db_name="monitorfish_remote",
query_filepath="monitorfish/silenced_alerts.sql",
params={"alert_type": alert_type.value},
params={"alert_type": alert_type.value, "number_of_hours": number_of_hours},
)


Expand Down Expand Up @@ -119,10 +128,8 @@ def make_alerts(
- `dml`
- `flag_state`
- `risk_factor`
- and optionally, `creation_date`, `latitude` and `longitude`
If `creation_date` is not one of the columns, it will be added and filled with
`datetime.utcnow`.
- `triggering_behaviour_datetime_utc`
- and optionally, `latitude` and `longitude`
If `latitude` and `longitude` are not columns of the input, they are added and
filled with null values in the result.
Expand All @@ -132,7 +139,6 @@ def make_alerts(
create an alert.
alert_type (str): `type` to specify in the built alerts.
alert_config_name (str): `alert_config_name` to specify in the built alerts.
creation_date (datetime): `creation_date` to specify in the built alerts.
Returns:
pd.DataFrame: `DataFrame` of alerts.
Expand All @@ -145,8 +151,7 @@ def make_alerts(
}
)

if "creation_date" not in alerts:
alerts["creation_date"] = datetime.utcnow()
alerts["creation_date"] = datetime.utcnow()

if "latitude" not in alerts:
alerts["latitude"] = None
Expand Down Expand Up @@ -175,6 +180,7 @@ def make_alerts(
"flag_state",
"vessel_id",
"vessel_identifier",
"triggering_behaviour_datetime_utc",
"creation_date",
"latitude",
"longitude",
Expand Down Expand Up @@ -209,12 +215,16 @@ def filter_alerts(
- vessel_identifier
- flag_state
- facade
- triggering_behaviour_datetime_utc
- creation_date
- latitude
- longitude
- value
- alert_config_name
and the `silenced_alerts` DataFrame must have a `silenced_before_date`
column.
Args:
alerts (pd.DataFrame): positions alerts.
vessels_with_silenced_alerts (pd.DataFrame): vessels with silenced alerts.
Expand All @@ -224,18 +234,14 @@ def filter_alerts(
"""
vessel_id_cols = ["internal_reference_number", "external_reference_number", "ircs"]

if isinstance(vessels_with_active_reportings, pd.DataFrame):
vessels_to_remove = (
pd.concat([vessels_with_silenced_alerts, vessels_with_active_reportings])
.drop_duplicates()
.reset_index(drop=True)
)
else:
vessels_to_remove = vessels_with_silenced_alerts
alerts = join_on_multiple_keys(
alerts, vessels_with_silenced_alerts, or_join_keys=vessel_id_cols, how="left"
)

alerts = alerts.loc[
~left_isin_right_by_decreasing_priority(
alerts[vessel_id_cols], vessels_to_remove[vessel_id_cols]
(
(alerts.silenced_before_date.isna())
| (alerts.triggering_behaviour_datetime_utc > alerts.silenced_before_date)
),
[
"vessel_name",
Expand All @@ -251,7 +257,16 @@ def filter_alerts(
"value",
"alert_config_name",
],
].reset_index(drop=True)
]

if isinstance(vessels_with_active_reportings, pd.DataFrame):
alerts = alerts.loc[
~left_isin_right_by_decreasing_priority(
alerts[vessel_id_cols], vessels_with_active_reportings[vessel_id_cols]
)
]

alerts = alerts.sort_values("internal_reference_number").reset_index(drop=True)

return alerts

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,14 @@ INSERT INTO silenced_alerts (
'DEVINER FIGURE CONSCIENCE', 'ABC000542519', 'RO237719', 'FQ7058', 'INTERNAL_REFERENCE_NUMBER',
NOW() + ('15 DAYS')::interval, 'FR',
'{"type": "MISSING_FAR_ALERT", "seaFront": "NAMO"}'
),
(
'DEVINER FIGURE CONSCIENCE', 'ABC000542519', 'RO237719', 'FQ7058', 'INTERNAL_REFERENCE_NUMBER',
NOW() + ('7 DAYS')::interval, 'FR',
'{"type": "MISSING_FAR_ALERT", "seaFront": "NAMO"}'
),
(
'AUTRE NAVIRE', 'ABC000123456', NULL, NULL, 'INTERNAL_REFERENCE_NUMBER',
NOW() - ('5 HOURS')::interval, 'FR',
'{"type": "MISSING_FAR_ALERT", "seaFront": "NAMO"}'
);
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone

import pandas as pd
import pytest
Expand Down Expand Up @@ -26,6 +26,9 @@ def expected_missing_deps() -> pd.DataFrame:
"dml": ["DML 29"],
"flag_state": ["FR"],
"risk_factor": [2.58],
"triggering_behaviour_datetime_utc": [
datetime.utcnow() - timedelta(hours=2)
],
"latitude": [49.606],
"longitude": [-0.736],
}
Expand Down Expand Up @@ -83,8 +86,23 @@ def reset_test_data_missing_dep_alerts(reset_test_data):
def test_extract_missing_deps(
reset_test_data_missing_dep_alerts, expected_missing_deps
):
res = extract_missing_deps.run()
pd.testing.assert_frame_equal(res, expected_missing_deps)
res = extract_missing_deps.run(hours_from_now=48)
pd.testing.assert_frame_equal(
res.drop(columns=["triggering_behaviour_datetime_utc"]),
expected_missing_deps.drop(columns=["triggering_behaviour_datetime_utc"]),
)

assert (
(
(
res.triggering_behaviour_datetime_utc
- expected_missing_deps.triggering_behaviour_datetime_utc
)
.map(lambda td: td.total_seconds())
.abs()
)
< 10
).all()


def test_flow(reset_test_data_missing_dep_alerts):
Expand Down
Loading

0 comments on commit 95302c9

Please sign in to comment.