Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add json output to target-query --list #841

Merged
merged 12 commits into from
Oct 29, 2024
19 changes: 14 additions & 5 deletions dissect/target/plugins/general/loaders.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json

from dissect.target.helpers.docs import INDENT_STEP, get_docstring
from dissect.target.loader import LOADERS_BY_SCHEME
from dissect.target.plugin import Plugin, export
from dissect.target.plugin import Plugin, arg, export


class LoaderListPlugin(Plugin):
Expand All @@ -10,7 +12,8 @@ def check_compatible(self) -> None:
pass

@export(output="none")
def loaders(self):
@arg("--json", dest="output_json", action="store_true")
def loaders(self, output_json: bool = False) -> None:
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved
"""List the available loaders."""

loaders_info = {}
Expand All @@ -21,6 +24,12 @@ def loaders(self):
except ImportError:
continue

print("Available loaders:")
for loader_name, loader_description in sorted(loaders_info.items()):
print(f"{INDENT_STEP}{loader_name} - {loader_description}")
loaders = sorted(loaders_info.items())

if output_json:
print(json.dumps([{"name": name, "description": desc} for name, desc in loaders]), end="")

else:
print("Available loaders:")
for loader_name, loader_description in loaders:
print(f"{INDENT_STEP}{loader_name} - {loader_description}")
56 changes: 46 additions & 10 deletions dissect/target/plugins/general/plugins.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import annotations

import json
import textwrap
from typing import Dict, List, Type, Union
from typing import Iterator, Type

from dissect.target import plugin
from dissect.target.helpers.docs import INDENT_STEP, get_plugin_overview
Expand All @@ -23,7 +26,8 @@ def categorize_plugins(plugins_selection: list[dict] = None) -> dict:
return output_dict


def get_exported_plugins():
def get_exported_plugins() -> list:
"""Returns list of exported plugins."""
return [p for p in plugin.plugins() if len(p["exports"])]


Expand All @@ -50,10 +54,10 @@ def update_dict_recursive(source_dict: dict, updated_dict: dict) -> dict:


def output_plugin_description_recursive(
structure_dict: Union[Dict, Plugin],
structure_dict: dict | Plugin,
print_docs: bool,
indentation_step=0,
) -> List[str]:
indentation_step: int = 0,
) -> list[str]:
"""Create plugin overview with identations."""

if isinstance(structure_dict, type) and issubclass(structure_dict, Plugin):
Expand All @@ -78,10 +82,10 @@ def get_plugin_description(


def get_description_dict(
structure_dict: Dict,
structure_dict: dict,
print_docs: bool,
indentation_step: int,
) -> List[str]:
) -> list[str]:
"""Returns a list of indented descriptions."""

output_descriptions = []
Expand All @@ -103,8 +107,12 @@ def check_compatible(self) -> None:

@export(output="none", cache=False)
@arg("--docs", dest="print_docs", action="store_true")
def plugins(self, plugins: list[dict] = None, print_docs: bool = False) -> None:
categorized_plugins = dict(sorted(categorize_plugins(plugins).items()))
@arg("--json", dest="output_json", action="store_true")
def plugins(self, plugins: list[Plugin] = None, print_docs: bool = False, output_json: bool = False) -> None:
"""Print all available plugins."""

dict_plugins = list({p.path: p.plugin_desc for p in plugins}.values())
categorized_plugins = dict(sorted(categorize_plugins(dict_plugins).items()))
plugin_descriptions = output_plugin_description_recursive(categorized_plugins, print_docs)

plugins_list = textwrap.indent(
Expand Down Expand Up @@ -138,4 +146,32 @@ def plugins(self, plugins: list[dict] = None, print_docs: bool = False) -> None:
"Failed to load:",
failed_list,
]
print("\n".join(output_lines))

if output_json:
out = {"loaded": list(generate_plugins_json(plugins))}

if failed_plugins := plugin.failed():
out["failed"] = [
{"module": p["module"], "stacktrace": "".join(p["stacktrace"])} for p in failed_plugins
]

print(json.dumps(out), end="")

else:
print("\n".join(output_lines))


def generate_plugins_json(plugins: list[Plugin]) -> Iterator[dict]:
"""Generates JSON output of a list of :class:`Plugin`s."""

for p in plugins:
func = getattr(p.class_object, p.method_name)
description = getattr(func, "__doc__", None)
summary = description.split("\n\n", 1)[0].strip() if description else None

yield {
"name": p.name,
"output": p.output_type,
"description": summary,
"path": p.path,
}
21 changes: 15 additions & 6 deletions dissect/target/tools/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,31 +169,40 @@ def main():
# Show the list of available plugins for the given optional target and optional
# search pattern, only display plugins that can be applied to ANY targets
if args.list:
collected_plugins = {}
collected_plugins = []

if targets:
for plugin_target in Target.open_all(targets, args.children):
funcs, _ = find_plugin_functions(plugin_target, args.list, compatibility=True, show_hidden=True)
for func in funcs:
collected_plugins[func.path] = func.plugin_desc
collected_plugins.append(func)
else:
funcs, _ = find_plugin_functions(Target(), args.list, compatibility=False, show_hidden=True)
for func in funcs:
collected_plugins[func.path] = func.plugin_desc
collected_plugins.append(func)

# Display in a user friendly manner
target = Target()
fparser = generate_argparse_for_bound_method(target.plugins, usage_tmpl=USAGE_FORMAT_TMPL)
fargs, rest = fparser.parse_known_args(rest)

# Display in a user friendly manner
if collected_plugins:
target.plugins(list(collected_plugins.values()))
if args.json:
print('{"plugins": ', end="")
target.plugins(collected_plugins, output_json=args.json)

# No real targets specified, show the available loaders
if not targets:
fparser = generate_argparse_for_bound_method(target.loaders, usage_tmpl=USAGE_FORMAT_TMPL)
fargs, rest = fparser.parse_known_args(rest)
target.loaders(**vars(fargs))
del fargs.output_json
if args.json:
print(', "loaders": ', end="")
target.loaders(**vars(fargs), output_json=args.json)

if args.json:
print("}")

parser.exit()

if not targets:
Expand Down
8 changes: 4 additions & 4 deletions tests/plugins/general/test_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_update_dict():

def test_plugin_description():
description = [x for x in output_plugin_description_recursive(PluginListPlugin, False)]
assert description == ["plugins - No documentation (output: no output)"]
assert description == ["plugins - Print all available plugins. (output: no output)"]


def test_plugin_description_compacting():
Expand All @@ -39,7 +39,7 @@ def test_plugin_description_compacting():
assert description == [
"hello:",
" world:",
" plugins - No documentation (output: no output)",
" plugins - Print all available plugins. (output: no output)",
]


Expand All @@ -54,9 +54,9 @@ def test_plugin_description_in_dict_multiple():
"hello:",
" world:",
" data:",
" plugins - No documentation (output: no output)",
" plugins - Print all available plugins. (output: no output)",
" data2:",
" plugins - No documentation (output: no output)",
" plugins - Print all available plugins. (output: no output)",
]


Expand Down
54 changes: 54 additions & 0 deletions tests/tools/test_query.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import os
import re
from typing import Any, Optional
Expand Down Expand Up @@ -244,3 +245,56 @@ def test_target_query_dry_run(capsys: pytest.CaptureFixture, monkeypatch: pytest
" execute: network.interfaces (general.network.interfaces)\n"
" execute: osinfo (general.osinfo.osinfo)\n"
)


def test_target_query_list_json(capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None:
"""test if target-query --list --json output is formatted as we expect it to be."""

with monkeypatch.context() as m:
m.setattr("sys.argv", ["target-query", "-l", "-j"])
with pytest.raises((SystemExit, IndexError, ImportError)):
target_query()
out, _ = capsys.readouterr()

try:
output = json.loads(out)
except json.JSONDecodeError:
pass

# test the generic structure of the returned dictionary.
assert isinstance(output, dict), "Could not load JSON output of 'target-query --list --json'"
assert output["plugins"], "Expected a dictionary of plugins"
assert output["loaders"], "Expected a dictionary of loaders"
assert len(output["plugins"]["loaded"]) > 200, "Expected more loaded plugins"
assert not output["plugins"].get("failed"), "Some plugin(s) failed to initialize"

def get_plugin(plugins: list[dict], needle: str) -> dict:
match = [p for p in output["plugins"]["loaded"] if p["name"] == needle]
return match[0] if match else False

# general plugin
users_plugin = get_plugin(output, "users")
assert users_plugin == {
"name": "users",
"description": "Return the users available in the target.",
"output": "record",
"path": "general.default.users",
}

# namespaced plugin
plocate_plugin = get_plugin(output, "plocate.locate")
assert plocate_plugin == {
"name": "plocate.locate",
"description": "Yield file and directory names from the plocate.db.",
"output": "record",
"path": "os.unix.locate.plocate.locate",
}

# regular plugin
sam_plugin = get_plugin(output, "sam")
assert sam_plugin == {
"name": "sam",
"description": "Dump SAM entries",
"output": "record",
"path": "os.windows.sam.sam",
}