Skip to content

Commit

Permalink
Refactor target-shell (#812)
Browse files Browse the repository at this point in the history
Co-authored-by: Erik Schamper <[email protected]>
  • Loading branch information
JSCU-CNI and Schamper authored Aug 14, 2024
1 parent 2ca4b77 commit cf080f6
Show file tree
Hide file tree
Showing 15 changed files with 967 additions and 517 deletions.
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,19 @@ Opening a shell on a target is straight-forward. You can do so by specifying a p

```bash
target-shell targets/EXAMPLE.vmx
EXAMPLE /> help
WIN-EXAMPLE:/$ help

Documented commands (type help <topic>):
========================================
cat disks filesystems help less python save
cd exit find hexdump ls readlink stat
clear file hash info pwd registry volumes
attr cls enter find info man registry volumes
cat cyber exit hash less pwd save zcat
cd debug file help ll python stat zless
clear disks filesystems hexdump ls readlink tree

EXAMPLE /> ls
WIN-EXAMPLE:/$ ls
$fs$
c:
efi
sysvol
```

Expand Down
4 changes: 4 additions & 0 deletions dissect/target/filesystems/extfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ def lstat(self) -> fsutil.stat_result:
st_info.st_mtime_ns = self.entry.mtime_ns
st_info.st_ctime_ns = self.entry.ctime_ns

# Set blocks
st_info.st_blocks = self.entry.inode.i_blocks_lo
st_info.st_blksize = self.entry.extfs.block_size

return st_info

def attr(self) -> Any:
Expand Down
12 changes: 8 additions & 4 deletions dissect/target/loaders/tar.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations

import logging
import re
import tarfile
from pathlib import Path
from typing import Union

from dissect.target import filesystem, target
from dissect.target.filesystems.tar import (
Expand All @@ -21,22 +22,25 @@
class TarLoader(Loader):
"""Load tar files."""

def __init__(self, path: Union[Path, str], **kwargs):
def __init__(self, path: Path | str, **kwargs):
super().__init__(path)

if isinstance(path, str):
path = Path(path)

if self.is_compressed(path):
log.warning(
f"Tar file {path!r} is compressed, which will affect performance. "
"Consider uncompressing the archive before passing the tar file to Dissect."
)

self.tar = tarfile.open(path)
self.tar = tarfile.open(fileobj=path.open("rb"))

@staticmethod
def detect(path: Path) -> bool:
return path.name.lower().endswith((".tar", ".tar.gz", ".tgz"))

def is_compressed(self, path: Union[Path, str]) -> bool:
def is_compressed(self, path: Path | str) -> bool:
return str(path).lower().endswith((".tar.gz", ".tgz"))

def map(self, target: target.Target) -> None:
Expand Down
12 changes: 6 additions & 6 deletions dissect/target/loaders/velociraptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import zipfile
from pathlib import Path
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING

from dissect.target.loaders.dir import DirLoader, find_dirs, map_dirs
from dissect.target.plugin import OperatingSystem
Expand All @@ -18,7 +18,7 @@
WINDOWS_ACCESSORS = ["mft", "ntfs", "lazy_ntfs", "ntfs_vss", "auto"]


def find_fs_directories(path: Path) -> tuple[Optional[OperatingSystem], Optional[list[Path]]]:
def find_fs_directories(path: Path) -> tuple[OperatingSystem | None, list[Path] | None]:
fs_root = path.joinpath(FILESYSTEMS_ROOT)

# Unix
Expand Down Expand Up @@ -56,7 +56,7 @@ def find_fs_directories(path: Path) -> tuple[Optional[OperatingSystem], Optional
return None, None


def extract_drive_letter(name: str) -> Optional[str]:
def extract_drive_letter(name: str) -> str | None:
# \\.\X: in URL encoding
if len(name) == 14 and name.startswith("%5C%5C.%5C") and name.endswith("%3A"):
return name[10].lower()
Expand Down Expand Up @@ -91,7 +91,7 @@ def __init__(self, path: Path, **kwargs):
f"Velociraptor target {path!r} is compressed, which will slightly affect performance. "
"Consider uncompressing the archive and passing the uncompressed folder to Dissect."
)
self.root = zipfile.Path(path)
self.root = zipfile.Path(path.open("rb"))
else:
self.root = path

Expand All @@ -105,8 +105,8 @@ def detect(path: Path) -> bool:
# results/
# uploads.json
# [...] other files related to the collection
if path.suffix == ".zip": # novermin
path = zipfile.Path(path)
if path.exists() and path.suffix == ".zip": # novermin
path = zipfile.Path(path.open("rb"))

if path.joinpath(FILESYSTEMS_ROOT).exists() and path.joinpath("uploads.json").exists():
_, dirs = find_fs_directories(path)
Expand Down
50 changes: 50 additions & 0 deletions dissect/target/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
See dissect/target/plugins/general/example.py for an example plugin.
"""

from __future__ import annotations

import fnmatch
import functools
import importlib
import importlib.util
import inspect
Expand Down Expand Up @@ -196,6 +198,8 @@ class attribute. Namespacing results in your plugin needing to be prefixed
The :func:`internal` decorator and :class:`InternalPlugin` set the ``__internal__`` attribute.
Finally. :func:`args` decorator sets the ``__args__`` attribute.
The :func:`alias` decorator populates the ``__aliases__`` private attribute of :class:`Plugin` methods.
Args:
target: The :class:`~dissect.target.target.Target` object to load the plugin for.
"""
Expand Down Expand Up @@ -448,6 +452,11 @@ def register(plugincls: Type[Plugin]) -> None:
exports = []
functions = []

# First pass to resolve aliases
for attr in get_nonprivate_attributes(plugincls):
for alias in getattr(attr, "__aliases__", []):
clone_alias(plugincls, attr, alias)

for attr in get_nonprivate_attributes(plugincls):
if isinstance(attr, property):
attr = attr.fget
Expand Down Expand Up @@ -542,6 +551,47 @@ def decorator(obj):
return decorator


def alias(*args, **kwargs: dict[str, Any]) -> Callable:
"""Decorator to be used on :class:`Plugin` functions to register an alias of that function."""

if not kwargs.get("name") and not args:
raise ValueError("Missing argument 'name'")

def decorator(obj: Callable) -> Callable:
if not hasattr(obj, "__aliases__"):
obj.__aliases__ = []

if name := (kwargs.get("name") or args[0]):
obj.__aliases__.append(name)

return obj

return decorator


def clone_alias(cls: type, attr: Callable, alias: str) -> None:
"""Clone the given attribute to an alias in the provided class."""

# Clone the function object
clone = type(attr)(attr.__code__, attr.__globals__, alias, attr.__defaults__, attr.__closure__)
clone.__kwdefaults__ = attr.__kwdefaults__

# Copy some attributes
functools.update_wrapper(clone, attr)
if wrapped := getattr(attr, "__wrapped__", None):
# update_wrapper sets a new wrapper, we want the original
clone.__wrapped__ = wrapped

# Update module path so we can fool inspect.getmodule with subclassed Plugin classes
clone.__module__ = cls.__module__

# Update the names
clone.__name__ = alias
clone.__qualname__ = f"{cls.__name__}.{alias}"

setattr(cls, alias, clone)


def plugins(
osfilter: Optional[type[OSPlugin]] = None,
special_keys: set[str] = set(),
Expand Down
10 changes: 3 additions & 7 deletions dissect/target/plugins/os/unix/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dissect.target.helpers.descriptor_extensions import UserRecordDescriptorExtension
from dissect.target.helpers.fsutil import TargetPath
from dissect.target.helpers.record import UnixUserRecord, create_extended_descriptor
from dissect.target.plugin import Plugin, export, internal
from dissect.target.plugin import Plugin, alias, export, internal

CommandHistoryRecord = create_extended_descriptor([UserRecordDescriptorExtension])(
"unix/history",
Expand Down Expand Up @@ -36,6 +36,7 @@ class CommandHistoryPlugin(Plugin):
("sqlite", ".sqlite_history"),
("zsh", ".zsh_history"),
("ash", ".ash_history"),
("dissect", ".dissect_history"), # wow so meta
)

def __init__(self, target: Target):
Expand All @@ -56,12 +57,7 @@ def _find_history_files(self) -> List[Tuple[str, TargetPath, UnixUserRecord]]:
history_files.append((shell, history_path, user_details.user))
return history_files

@export(record=CommandHistoryRecord)
def bashhistory(self):
"""Deprecated, use commandhistory function."""
self.target.log.warn("Function 'bashhistory' is deprecated, use the 'commandhistory' function instead.")
return self.commandhistory()

@alias("bashhistory")
@export(record=CommandHistoryRecord)
def commandhistory(self):
"""Return shell history for all users.
Expand Down
2 changes: 1 addition & 1 deletion dissect/target/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __init__(self, path: Union[str, Path] = None):
self._applied = False

try:
self._config = config.load([self.path, os.getcwd()])
self._config = config.load([self.path, Path.cwd(), Path.home()])
except Exception as e:
self.log.warning("Error loading config file: %s", self.path)
self.log.debug("", exc_info=e)
Expand Down
90 changes: 25 additions & 65 deletions dissect/target/tools/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
# -*- coding: utf-8 -*-

import argparse
import datetime
import logging
import operator
import os
import pathlib
import shutil
Expand All @@ -13,7 +11,7 @@
from dissect.target import Target
from dissect.target.exceptions import TargetError
from dissect.target.helpers.fsutil import TargetPath
from dissect.target.tools.shell import stat_modestr
from dissect.target.tools.fsutils import print_ls, print_stat
from dissect.target.tools.utils import (
catch_sigpipe,
configure_generic_arguments,
Expand All @@ -25,75 +23,27 @@
logging.raiseExceptions = False


def human_size(bytes: int, units: list[str] = ["", "K", "M", "G", "T", "P", "E"]) -> str:
"""Helper function to return the human readable string representation of bytes."""
return str(bytes) + units[0] if bytes < 1024 else human_size(bytes >> 10, units[1:])


def ls(t: Target, path: TargetPath, args: argparse.Namespace) -> None:
if args.use_ctime and args.use_atime:
log.error("Can't specify -c and -u at the same time")
return
if not path or not path.exists():
return

_print_ls(args, path, 0)


def _print_ls(args: argparse.Namespace, path: TargetPath, depth: int) -> None:
subdirs = []

if path.is_dir():
contents = sorted(path.iterdir(), key=operator.attrgetter("name"))
elif path.is_file():
contents = [path]

if depth > 0:
print(f"\n{str(path)}:")

if not args.l:
for entry in contents:
print(entry.name)

if entry.is_dir():
subdirs.append(entry)
else:
if len(contents) > 1:
print(f"total {len(contents)}")

for entry in contents:
_print_extensive_file_stat(args, entry, entry.name)

if entry.is_dir():
subdirs.append(entry)

if args.recursive and subdirs:
for subdir in subdirs:
_print_ls(args, subdir, depth + 1)


def _print_extensive_file_stat(args: argparse.Namespace, path: TargetPath, name: str) -> None:
try:
entry = path.get()
stat = entry.lstat()
symlink = f" -> {entry.readlink()}" if entry.is_symlink() else ""
show_time = stat.st_mtime

if args.use_ctime:
show_time = stat.st_ctime
elif args.use_atime:
show_time = stat.st_atime

utc_time = datetime.datetime.utcfromtimestamp(show_time).isoformat()

if args.human_readable:
size = human_size(stat.st_size)
else:
size = stat.st_size

print(f"{stat_modestr(stat)} {stat.st_uid:4d} {stat.st_gid:4d} {size:>6s} {utc_time} {name}{symlink}")
except FileNotFoundError:
print(f"?????????? ? ? ? ????-??-??T??:??:??.?????? {name}")
# Only output with colors if stdout is a tty
use_colors = sys.stdout.buffer.isatty()

print_ls(
path,
0,
sys.stdout,
args.l,
args.human_readable,
args.recursive,
args.use_ctime,
args.use_atime,
use_colors,
)


def cat(t: Target, path: TargetPath, args: argparse.Namespace) -> None:
Expand All @@ -120,6 +70,12 @@ def cp(t: Target, path: TargetPath, args: argparse.Namespace) -> None:
print("[!] Failed, unsuported file type: %s" % path)


def stat(t: Target, path: TargetPath, args: argparse.Namespace) -> None:
if not path or not path.exists():
return
print_stat(path, sys.stdout, args.dereference)


def _extract_path(path: TargetPath, output_path: str) -> None:
print("%s -> %s" % (path, output_path))

Expand Down Expand Up @@ -172,6 +128,10 @@ def main() -> None:
parser_cat = subparsers.add_parser("cat", help="dump file contents", parents=[baseparser])
parser_cat.set_defaults(handler=cat)

parser_stat = subparsers.add_parser("stat", help="display file status", parents=[baseparser])
parser_stat.add_argument("-L", "--dereference", action="store_true")
parser_stat.set_defaults(handler=stat)

parser_find = subparsers.add_parser("walk", help="perform a walk", parents=[baseparser])
parser_find.set_defaults(handler=walk)

Expand Down
Loading

0 comments on commit cf080f6

Please sign in to comment.