Skip to content

Commit

Permalink
🗃️ 🔨 Add a simple script to reset a snapshot to a particular timestamp
Browse files Browse the repository at this point in the history
This is very simple pseudocode.
- delete all entries written after a particular timestamp
- reset all labels that were entered after the timestamp by finding the
  corresponding confirmed trip and removing the `user_input`

```
to_be_deleted_manual_entries = # entries whose metadata.write_ts > timestamp
for me in to_be_deleted_manual_entries:
    matching_confirmed_trip = ea...find_matching_confirmed_trip(me)
    del matching_confirmed_trip['data']['user_input']

edb.get_analysis_timeseries_db.delete_many({"metadata.write_ts" > timestamp )
```

This is essentially the same code as
e-mission#103 (comment)
but with the matching code changed from

```
        confirmed_trip = edb.get_analysis_timeseries_db().find_one({"user_id": t["user_id"],
                "metadata.key": "analysis/confirmed_trip",
                "data.start_ts": t["data"]["start_ts"]}) #gets confirmed trip with matching user id & timestamp
```

to

```
        confirmed_trip = esdt.get_confirmed_obj_for_user_input_obj(ts, ecwe.Entry(t))
```

to support the richer matching algorithm.

And lots of improvements to the logging

Testing done:

```
After parsing, the reset timestamp is 2023-12-20T03:27:00+00:00 -> 1703042820.0
Planning to delete 40502 records from the timeseries
Planning to delete 6477 records from the analysis timeseries
6477
number of manual records after cutoff 143
For input 2023-12-20T15:49:36.486127+07:00 of type manual/purpose_confirm, labeled at 2023-12-20T19:04:06.452000+07:00, found confirmed trip starting at 2023-12-20T14:22:50.209000+07:00 with no user input
For input 2023-12-19T06:28:58.685000+07:00 of type manual/purpose_confirm, labeled at 2023-12-20T19:05:11.080000+07:00, found confirmed trip starting at 2023-12-19T06:28:58.685000+07:00 with user input {'purpose_confirm': 'ໄປວຽກ', 'mode_confirm': 'motorcycle'}
Resetting input of type purpose_confirm
Update results = {'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}
For input 2023-12-19T05:33:56.424000+07:00 of type manual/purpose_confirm, labeled at 2023-12-20T19:05:18.335000+07:00, found confirmed trip starting at 2023-12-19T05:33:56.424000+07:00 with user input {'purpose_confirm': 'ໄປວຽກ', 'mode_confirm': 'motorcycle'}
Resetting input of type purpose_confirm
Update results = {'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}
For input 2023-12-18T20:12:15.920000+07:00 of type manual/purpose_confirm, labeled at 2023-12-20T19:05:28.385000+07:00, found confirmed trip starting at 2023-12-18T20:12:15.920000+07:00 with user input {'purpose_confirm': 'ໄປວຽກ', 'mode_confirm': 'motorcycle'}
Resetting input of type purpose_confirm
Update results = {'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}
For input 2023-12-18T20:12:15.920000+07:00 of type manual/mode_confirm, labeled at 2023-12-20T19:06:27.949000+07:00, found confirmed trip starting at 2023-12-18T20:12:15.920000+07:00 with user input {'mode_confirm': 'motorcycle'}
Resetting input of type mode_confirm
Update results = {'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}
...
For input 2023-12-20T19:31:21.577000+07:00 of type manual/purpose_confirm, labeled at 2023-12-20T20:14:33.454000+07:00, found confirmed trip starting at 2023-12-20T19:05:27.266615+07:00 with no user input
For input 2023-12-20T20:02:07.148000+07:00 of type manual/mode_confirm, labeled at 2023-12-20T20:15:57.305000+07:00, found confirmed trip starting at 2023-12-20T19:05:27.266615+07:00 with no user input
For input 2023-12-20T20:02:07.148000+07:00 of type manual/purpose_confirm, labeled at 2023-12-20T20:16:02.741000+07:00, found confirmed trip starting at 2023-12-20T19:05:27.266615+07:00 with no user input
delete all entries after timestamp 1703042820.0
deleting all timeseries entries after 1703042820.0, {'n': 40502, 'ok': 1.0}
deleting all analysis timeseries entries after 1703042820.0, {'n': 6477, 'ok': 1.0}
```
  • Loading branch information
shankari committed Jan 14, 2024
1 parent e8e1f51 commit cc2d2a4
Showing 1 changed file with 67 additions and 0 deletions.
67 changes: 67 additions & 0 deletions viz_scripts/bin/reset_snapshot_to_ts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
## This is a standalone script that resets the database to a previously
## specified timestamp. It does so by deleting entries after the timestamp,
## and removing labels that were entered after the timestamp as well

import argparse
import arrow
import emission.core.get_database as edb
import emission.storage.decorations.user_queries as ecdu
import emission.core.wrapper.entry as ecwe
import emission.core.wrapper.pipelinestate as ecwp

import emission.storage.decorations.trip_queries as esdt
import emission.storage.timeseries.abstract_timeseries as esta

if __name__ == '__main__':
parser = argparse.ArgumentParser(prog="export_participants_trips")
parser.add_argument("reset_fmt_time", help="reset timestamp in the format displayed on the metrics you are trying to match - e.g. 'YYYY-MM-DDThh:mm:ss.ms+00:00'")

args = parser.parse_args()
reset_ts = arrow.get(args.reset_fmt_time).timestamp()
print(f"After parsing, the reset timestamp is {args.reset_fmt_time} -> {reset_ts}")

# for user_id in ecdu.get_all_uuids():
# mi_pipeline_state = edb.get_pipeline_state_db().find_one({"user_id": user_id,
# "pipeline_stage": ecwp.PipelineStages.USER_INPUT_MATCH_INCOMING.value})
# if mi_pipeline_state is None:
# print(f"for {user_id}, USER_INPUT_MATCH_INCOMING was never run")
# else:
# print(f"for {user_id}, USER_INPUT_MATCH_INCOMING was last run at {arrow.get(mi_pipeline_state['last_run_ts'])}")
#
# if cc_pipeline_state is None:
# print(f"for {user_id}, USER_INPUT_MATCH_INCOMING was never run")
# cc_pipeline_state = edb.get_pipeline_state_db().find_one({"user_id": user_id,
# "pipeline_stage": ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS.value})
# print(f"for {user_id}, USER_INPUT_MATCH_INCOMING was last run at {arrow.get(cc_pipeline_state['last_run_ts'])}")
#
#first, count all entries written after a cutoff time
print(f"Planning to delete {edb.get_timeseries_db().count_documents({ 'metadata.write_ts': { '$gt': reset_ts } })} records from the timeseries")
print(f"Planning to delete {edb.get_analysis_timeseries_db().count_documents({ 'metadata.write_ts': { '$gt': reset_ts } })} records from the analysis timeseries")
print(edb.get_analysis_timeseries_db().count_documents({ "metadata.write_ts": { "$gt": reset_ts } }))

#then, find manual inputs added after the cutoff time
print(f"number of manual records after cutoff {(edb.get_timeseries_db().count_documents({'metadata.write_ts': {'$gt': reset_ts}, 'metadata.key': {'$regex': '^manual/(mode_confirm|purpose_confirm|replaced_mode)$'}}))}")

# ideally, this would use the aggreate timeseries, but retaining this code to make it easier for
# @iantei to understand
ts = esta.TimeSeries.get_aggregate_time_series()
for t in list(edb.get_timeseries_db().find({"metadata.write_ts": {"$gt": reset_ts}, "metadata.key": {"$regex": '^manual/(mode_confirm|purpose_confirm|replaced_mode)$'}}).sort("metadata.write_ts", 1)):
confirmed_trip = esdt.get_confirmed_obj_for_user_input_obj(ts, ecwe.Entry(t))
if confirmed_trip is None:
print("No matching confirmed trip for %s" % t["data"]["start_fmt_time"])
continue

if confirmed_trip["data"]["user_input"] == {}:
print(f"For input {t['data']['start_fmt_time']} of type {t['metadata']['key']}, labeled at {t['metadata']['write_fmt_time']}, found confirmed trip starting at {confirmed_trip['data']['start_fmt_time']} with no user input")
else:
print(f"For input {t['data']['start_fmt_time']} of type {t['metadata']['key']}, labeled at {t['metadata']['write_fmt_time']}, found confirmed trip starting at {confirmed_trip['data']['start_fmt_time']} with user input {confirmed_trip['data']['user_input']}")
input_type = t["metadata"]["key"].split("/")[1]
print(f"Resetting input of type {input_type}")
update_results = edb.get_analysis_timeseries_db().update_one({"_id": confirmed_trip["_id"],
"metadata.key": "analysis/confirmed_trip"}, { "$unset": { 'data.user_input.%s' % input_type: {} } })
print(f"Update results = {update_results.raw_result}")

print(f"delete all entries after timestamp {reset_ts}")
print(f"deleting all timeseries entries after {reset_ts}, {edb.get_timeseries_db().delete_many({ 'metadata.write_ts': { '$gt': reset_ts } }).raw_result}")
print(f"deleting all analysis timeseries entries after {reset_ts}, {edb.get_analysis_timeseries_db().delete_many({ 'metadata.write_ts': { '$gt': reset_ts } }).raw_result}")

0 comments on commit cc2d2a4

Please sign in to comment.