From 3a43152c4bad5997b501fbdd068a0270b9d929c0 Mon Sep 17 00:00:00 2001 From: Xiaoxue Wang Date: Mon, 23 Dec 2024 15:05:02 +0800 Subject: [PATCH] fix: handler extra header lines in JSONParser (#4317) Signed-off-by: Xiaoxue Wang (cherry picked from commit f60b00ab188a4ed29a95176838ac030b14b0bcf2) (cherry picked from commit 35ff1d5a24c616ada27dae4efca63bb5f8714787) --- insights/core/__init__.py | 36 +++++++++++++------ insights/tests/parsers/test_fwupdagent.py | 19 ++++++++++ insights/tests/test_json_parser.py | 43 ++++++++++++++++++++++- 3 files changed, 87 insertions(+), 11 deletions(-) diff --git a/insights/core/__init__.py b/insights/core/__init__.py index f76a1e485d..8534ea0771 100644 --- a/insights/core/__init__.py +++ b/insights/core/__init__.py @@ -791,24 +791,40 @@ def parse_content(self, content): class JSONParser(Parser, LegacyItemAccess): """ A parser class that reads JSON files. Base your own parser on this. + + Attributes: + data (dict): The loaded json content + unparsed_lines (list): The skipped unparsed lines + + Raises: + ParseException: When any error be thrown during the json loading of `content`. + SkipComponent: When `content` is empty or the loaded data is empty. """ def parse_content(self, content): + # If content is empty then raise a skip exception instead of a parse exception. + if not content: + raise SkipComponent("Empty output.") try: if isinstance(content, list): - self.data = json.loads('\n'.join(content)) + # Find the actual json start line with '{' and '[' as identifier + # To skip any extra lines before the actual json start line + actual_start_index = 0 + for idx, _line in enumerate(content): + line = _line.strip() + if line and line.startswith('{') or line.startswith('['): + actual_start_index = idx + break + self.unparsed_lines = content[:actual_start_index] + self.data = json.loads('\n'.join(content[actual_start_index:])) else: self.data = json.loads(content) except: - # If content is empty then raise a skip exception instead of a parse exception. - if not content: - raise SkipComponent("Empty output.") - else: - tb = sys.exc_info()[2] - cls = self.__class__ - name = ".".join([cls.__module__, cls.__name__]) - msg = "%s couldn't parse json." % name - six.reraise(ParseException, ParseException(msg), tb) + tb = sys.exc_info()[2] + cls = self.__class__ + name = ".".join([cls.__module__, cls.__name__]) + msg = "%s couldn't parse json." % name + six.reraise(ParseException, ParseException(msg), tb) # Kept for backwards compatibility; # JSONParser used to raise an exception for valid "null" JSON string if self.data is None: diff --git a/insights/tests/parsers/test_fwupdagent.py b/insights/tests/parsers/test_fwupdagent.py index 984dbf6568..ebb50089a2 100644 --- a/insights/tests/parsers/test_fwupdagent.py +++ b/insights/tests/parsers/test_fwupdagent.py @@ -8,6 +8,9 @@ from insights.tests import context_wrap DEVICES = """ +INFO: The fwupdagent command is deprecated, use `fwupdmgr --json` instead +WARNING: UEFI firmware can not be updated in legacy BIOS mode + See https://github.com/fwupd/fwupd/wiki/PluginFlag:legacy-bios for more information. { "Devices" : [ { @@ -104,6 +107,9 @@ """ SECURITY = """ +INFO: The fwupdagent command is deprecated, use `fwupdmgr --json` instead +WARNING: UEFI firmware can not be updated in legacy BIOS mode + See https://github.com/fwupd/fwupd/wiki/PluginFlag:legacy-bios for more information. { "HostSecurityAttributes" : [ { @@ -150,6 +156,12 @@ This tool can be used from other tools and from shell scripts. """ +SECURITY_ERROR_3 = """ +INFO: The fwupdagent command is deprecated, use `fwupdmgr --json` instead +WARNING: UEFI firmware can not be updated in legacy BIOS mode + See https://github.com/fwupd/fwupd/wiki/PluginFlag:legacy-bios for more information. +""".strip() + def test_devices(): devices = FwupdagentDevices(context_wrap(DEVICES)) @@ -168,6 +180,10 @@ def test_security(): assert security["HostSecurityAttributes"][0]["HsiResult"] == "not-tainted" assert security["HostSecurityAttributes"][1]["Name"] == "Encrypted RAM" assert security["HostSecurityAttributes"][1]["HsiLevel"] == 4 + assert security.unparsed_lines == [ + "INFO: The fwupdagent command is deprecated, use `fwupdmgr --json` instead", + "WARNING: UEFI firmware can not be updated in legacy BIOS mode", + " See https://github.com/fwupd/fwupd/wiki/PluginFlag:legacy-bios for more information."] with pytest.raises(ParseException): FwupdagentSecurity(context_wrap(SECURITY_ERROR_1)) @@ -175,6 +191,9 @@ def test_security(): with pytest.raises(ParseException): FwupdagentSecurity(context_wrap(SECURITY_ERROR_2)) + with pytest.raises(ParseException): + FwupdagentSecurity(context_wrap(SECURITY_ERROR_3)) + def test_doc_examples(): env = { diff --git a/insights/tests/test_json_parser.py b/insights/tests/test_json_parser.py index 4f66fbc237..32685f5a1d 100644 --- a/insights/tests/test_json_parser.py +++ b/insights/tests/test_json_parser.py @@ -15,11 +15,31 @@ class MyJsonParser(JSONParser): [{'a': '1', 'b': '2'}, {'a': '3', 'b': '4'}] } +JSON_CONTENT_WITH_EXTRA_HEADER_LINES_1 = """ +INFO: The fwupdagent command is deprecated, use `fwupdmgr --json` instead +WARNING: UEFI firmware can not be updated in legacy BIOS mode + See https://github.com/fwupd/fwupd/wiki/PluginFlag:legacy-bios for more information. +{"a": [{"b": "x"}, {"b": "y"}]} +""".strip() +JSON_CONTENT_WITH_EXTRA_HEADER_LINES_2 = """ +INFO: The fwupdagent command is deprecated, use `fwupdmgr --json` instead +WARNING: UEFI firmware can not be updated in legacy BIOS mode + See https://github.com/fwupd/fwupd/wiki/PluginFlag:legacy-bios for more information. +{} +""".strip() +JSON_CONTENT_WITH_EXTRA_HEADER_LINES_3 = """ +INFO: Some info line from a command spec output +WARNING: Some warning line-1 from a command spec output + Some warning line-2 from a command spec output. +""".strip() + def test_json_parser_success(): for jsonstr in json_test_strings: ctx = context_wrap(jsonstr) - assert MyJsonParser(ctx).data == json_test_strings[jsonstr] + my_json_parser = MyJsonParser(ctx) + assert my_json_parser.data == json_test_strings[jsonstr] + assert my_json_parser.unparsed_lines == [] def test_json_parser_failure(): @@ -36,3 +56,24 @@ def test_json_parser_null_value(): MyJsonParser(ctx) assert "Empty input" == ex.value.args[0] + + +def test_json_parser_with_extra_header_lines(): + ctx = context_wrap(JSON_CONTENT_WITH_EXTRA_HEADER_LINES_1) + my_json_parser = MyJsonParser(ctx) + assert my_json_parser.data == {'a': [{'b': 'x'}, {'b': 'y'}]} + assert my_json_parser.unparsed_lines == [ + "INFO: The fwupdagent command is deprecated, use `fwupdmgr --json` instead", + "WARNING: UEFI firmware can not be updated in legacy BIOS mode", + " See https://github.com/fwupd/fwupd/wiki/PluginFlag:legacy-bios for more information."] + + ctx = context_wrap(JSON_CONTENT_WITH_EXTRA_HEADER_LINES_2) + my_json_parser = MyJsonParser(ctx) + assert my_json_parser.data == {} + assert len(my_json_parser.unparsed_lines) == 3 + + ctx = context_wrap(JSON_CONTENT_WITH_EXTRA_HEADER_LINES_3) + with pytest.raises(ParseException) as ex: + MyJsonParser(ctx) + + assert "couldn't parse json" in ex.value.args[0]