-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
107 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -514,6 +514,60 @@ def test_update_for_superuser(client_auth, db_session): | |
assert json_response == json.loads(new_statique.model_dump_json()) | ||
|
||
|
||
def test_update_audits(client_auth, db_session): | ||
"""Test the /statique/{id_pdc_itinerance} update endpoint audits (superuser case).""" | ||
id_pdc_itinerance = "FR911E1111ER1" | ||
db_statique = save_statique( | ||
db_session, | ||
StatiqueFactory.build( | ||
id_pdc_itinerance=id_pdc_itinerance, | ||
nom_amenageur="ACME Inc.", | ||
nom_operateur="ACME Inc.", | ||
nom_enseigne="ACME Inc.", | ||
coordonneesXY=Coordinate(-1.0, 1.0), | ||
station_deux_roues=False, | ||
cable_t2_attache=False, | ||
), | ||
) | ||
station = db_session.exec( | ||
select(Station).where( | ||
Station.id_station_itinerance == db_statique.id_station_itinerance | ||
) | ||
).one() | ||
|
||
assert len(station.audits) == 0 | ||
|
||
new_statique = db_statique.model_copy( | ||
update={ | ||
"contact_operateur": "[email protected]", | ||
"nom_amenageur": "Magma Corp.", | ||
"nom_operateur": "Magma Corp.", | ||
"nom_enseigne": "Magma Corp.", | ||
"coordonneesXY": Coordinate(1.0, 2.0), | ||
"station_deux_roues": True, | ||
"cable_t2_attache": True, | ||
}, | ||
deep=True, | ||
) | ||
|
||
response = client_auth.put( | ||
f"/statique/{id_pdc_itinerance}", | ||
json=json.loads(new_statique.model_dump_json()), | ||
) | ||
assert response.status_code == status.HTTP_200_OK | ||
db_session.refresh(station) | ||
|
||
# Get user requesting the server | ||
user = db_session.exec(select(User).where(User.email == "[email protected]")).one() | ||
|
||
# We expect two audits as FKs are updated in a second request (once all other models | ||
# have been updated). | ||
expected_audits = 2 | ||
assert len(station.audits) == expected_audits | ||
assert station.audits[0].author_id == user.id | ||
assert station.audits[1].author_id == user.id | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"client_auth", | ||
( | ||
|
@@ -833,3 +887,56 @@ def test_bulk_update(client_auth, db_session): | |
.one() | ||
.paiement_cb | ||
) | ||
|
||
|
||
def test_bulk_update_audits(client_auth, db_session): | ||
"""Test that bulk endpoint updates audits.""" | ||
size = 10 | ||
statiques = StatiqueFactory.batch( | ||
size=size, | ||
paiement_cb=False, | ||
) | ||
save_statiques(db_session, statiques) | ||
|
||
# Update paiement_cb field | ||
statiques[3].paiement_cb = statiques[7].paiement_cb = True | ||
|
||
payload = [json.loads(s.model_dump_json()) for s in statiques] | ||
response = client_auth.post("/statique/bulk", json=payload) | ||
assert response.status_code == status.HTTP_201_CREATED | ||
|
||
# Get user requesting the server | ||
user = db_session.exec(select(User).where(User.email == "[email protected]")).one() | ||
|
||
# Check changes and audit | ||
pdc3 = db_session.exec( | ||
select(PointDeCharge).where( | ||
PointDeCharge.id_pdc_itinerance == statiques[3].id_pdc_itinerance | ||
) | ||
).one() | ||
assert pdc3.paiement_cb | ||
assert len(pdc3.audits) == 1 | ||
assert pdc3.audits[0].author_id == user.id | ||
|
||
pdc7 = db_session.exec( | ||
select(PointDeCharge).where( | ||
PointDeCharge.id_pdc_itinerance == statiques[7].id_pdc_itinerance | ||
) | ||
).one() | ||
assert pdc7.paiement_cb | ||
assert len(pdc7.audits) == 1 | ||
assert pdc7.audits[0].author_id == user.id | ||
|
||
# Check no other changes exist | ||
not_updated = statiques.copy() | ||
not_updated.pop(3) | ||
not_updated.pop(7) | ||
pdcs = db_session.exec( | ||
select(PointDeCharge).where( | ||
PointDeCharge.id_pdc_itinerance.in_( | ||
s.id_pdc_itinerance for s in not_updated | ||
) | ||
) | ||
).all() | ||
for pdc in pdcs: | ||
assert len(pdc.audits) == 0 |