From a24f910dd6e0abae891705ddb38fc33ab390dc26 Mon Sep 17 00:00:00 2001 From: Christophe Vandeplas Date: Tue, 5 Nov 2024 11:38:53 +0100 Subject: [PATCH] new: [mcsettingsevents] new parser for MC Settings Events --- src/sysdiagnose/parsers/mcsettingsevents.py | 56 +++++++++++++++++++++ tests/test_parsers_mcsettingsevents.py | 24 +++++++++ 2 files changed, 80 insertions(+) create mode 100644 src/sysdiagnose/parsers/mcsettingsevents.py create mode 100644 tests/test_parsers_mcsettingsevents.py diff --git a/src/sysdiagnose/parsers/mcsettingsevents.py b/src/sysdiagnose/parsers/mcsettingsevents.py new file mode 100644 index 0000000..bbe53f2 --- /dev/null +++ b/src/sysdiagnose/parsers/mcsettingsevents.py @@ -0,0 +1,56 @@ +#! /usr/bin/env python3 + +import os +from sysdiagnose.utils.base import BaseParserInterface +import sysdiagnose.utils.misc as misc +from datetime import datetime, timezone + + +class McSettingsEventsParser(BaseParserInterface): + description = "Parsing MC Settings Events plist file" + format = "jsonl" # by default json + + def __init__(self, config: dict, case_id: str): + super().__init__(__file__, config, case_id) + + def get_log_files(self) -> list: + log_files = [ + "logs/MCState/Shared/MCSettingsEvents.plist" + ] + return [os.path.join(self.case_data_subfolder, log_files) for log_files in log_files] + + def execute(self) -> list | dict: + ''' + this is the function that will be called + ''' + result = [] + log_files = self.get_log_files() + for log_file in log_files: + json_data = misc.load_plist_file_as_json(log_file) + for entry in McSettingsEventsParser.traverse_and_collect(json_data): + result.append(entry) + + return result + + def traverse_and_collect(data, path=""): + ''' + recursively traverse json_data and search for dicts that contain the 'timestamp' key. + when found, convert the value of the 'timestamp' key to a datetime object and add it to the entry dict. + also add a field 'setting' with the path of the dict in the json_data where the 'timestamp' key was found. (each depth joined by a dot) + ''' + for key, value in data.items(): + current_path = f"{path}.{key}" if path else key + if 'timestamp' in value: + try: + timestamp = datetime.strptime(value['timestamp'], '%Y-%m-%dT%H:%M:%S.%f') + timestamp = timestamp.replace(tzinfo=timezone.utc) # ensure timezone is UTC + entry = value.copy() + entry['datetime'] = timestamp.isoformat(timespec='microseconds') + entry['timestamp'] = timestamp.timestamp() + entry['setting'] = current_path + yield entry + except ValueError: + pass + elif isinstance(value, dict): + for entry in McSettingsEventsParser.traverse_and_collect(value, current_path): + yield entry diff --git a/tests/test_parsers_mcsettingsevents.py b/tests/test_parsers_mcsettingsevents.py new file mode 100644 index 0000000..c11c8ca --- /dev/null +++ b/tests/test_parsers_mcsettingsevents.py @@ -0,0 +1,24 @@ +from sysdiagnose.parsers.mcsettingsevents import McSettingsEventsParser +from tests import SysdiagnoseTestCase +import unittest +import os + + +class TestParsersMcSettingsEvents(SysdiagnoseTestCase): + + def test_parse_mcsettingsevents(self): + for case_id, case in self.sd.cases().items(): + p = McSettingsEventsParser(self.sd.config, case_id=case_id) + files = p.get_log_files() + self.assertTrue(len(files) > 0) + + p.save_result(force=True) + self.assertTrue(os.path.isfile(p.output_file)) + + result = p.get_result() + for item in result: + self.assertTrue('timestamp' in item) + + +if __name__ == '__main__': + unittest.main()