-
Notifications
You must be signed in to change notification settings - Fork 50
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Create base class for the network plugin (#810)
- Loading branch information
Showing
4 changed files
with
166 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
from __future__ import annotations | ||
|
||
from typing import Any, Iterator, Union | ||
|
||
from flow.record.fieldtypes.net import IPAddress, IPNetwork | ||
|
||
from dissect.target.helpers.record import ( | ||
MacInterfaceRecord, | ||
UnixInterfaceRecord, | ||
WindowsInterfaceRecord, | ||
) | ||
from dissect.target.plugin import Plugin, export, internal | ||
from dissect.target.target import Target | ||
|
||
InterfaceRecord = Union[UnixInterfaceRecord, WindowsInterfaceRecord, MacInterfaceRecord] | ||
|
||
|
||
class NetworkPlugin(Plugin): | ||
__namespace__ = "network" | ||
|
||
def __init__(self, target: Target): | ||
super().__init__(target) | ||
self._interface_list: list[InterfaceRecord] | None = None | ||
|
||
def check_compatible(self) -> None: | ||
pass | ||
|
||
def _interfaces(self) -> Iterator[InterfaceRecord]: | ||
yield from () | ||
|
||
def _get_record_type(self, field_name: str) -> Iterator[Any]: | ||
for record in self.interfaces(): | ||
if (output := getattr(record, field_name, None)) is None: | ||
continue | ||
|
||
if isinstance(output, list): | ||
yield from output | ||
else: | ||
yield output | ||
|
||
@export(record=InterfaceRecord) | ||
def interfaces(self) -> Iterator[InterfaceRecord]: | ||
# Only search for the interfaces once | ||
if self._interface_list is None: | ||
self._interface_list = list(self._interfaces()) | ||
|
||
yield from self._interface_list | ||
|
||
@export | ||
def ips(self) -> list[IPAddress]: | ||
return list(self._get_record_type("ip")) | ||
|
||
@export | ||
def gateways(self) -> list[IPAddress]: | ||
return list(self._get_record_type("gateway")) | ||
|
||
@export | ||
def macs(self) -> list[str]: | ||
return list(self._get_record_type("mac")) | ||
|
||
@export | ||
def dns(self) -> list[str]: | ||
return list(self._get_record_type("dns")) | ||
|
||
@internal | ||
def with_ip(self, ip_addr: str) -> Iterator[InterfaceRecord]: | ||
for interface in self.interfaces(): | ||
if ip_addr in interface.ip: | ||
yield interface | ||
|
||
@internal | ||
def with_mac(self, mac: str) -> Iterator[InterfaceRecord]: | ||
for interface in self.interfaces(): | ||
if interface.mac == mac: | ||
yield interface | ||
|
||
@internal | ||
def in_cidr(self, cidr: str) -> Iterator[InterfaceRecord]: | ||
cidr = IPNetwork(cidr) | ||
for interface in self.interfaces(): | ||
if any(ip_addr in cidr for ip_addr in interface.ip): | ||
yield interface |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
from unittest.mock import patch | ||
|
||
import pytest | ||
|
||
from dissect.target.helpers.record import ( | ||
MacInterfaceRecord, | ||
UnixInterfaceRecord, | ||
WindowsInterfaceRecord, | ||
) | ||
from dissect.target.plugins.general.network import InterfaceRecord, NetworkPlugin | ||
from dissect.target.target import Target | ||
|
||
|
||
@pytest.fixture(params=[MacInterfaceRecord, WindowsInterfaceRecord, UnixInterfaceRecord]) | ||
def network_record(request: pytest.FixtureRequest) -> InterfaceRecord: | ||
return request.param( | ||
name="interface_name", | ||
type="physical", | ||
enabled=True, | ||
mac="DE:AD:BE:EF:00:00", | ||
ip=["10.42.42.10"], | ||
gateway=["10.42.42.1"], | ||
dns=["8.8.8.8", "1.1.1.1"], | ||
source="some_file", | ||
) | ||
|
||
|
||
def test_base_network_plugin(target_bare: Target, network_record: InterfaceRecord) -> None: | ||
with patch.object(NetworkPlugin, "_interfaces", return_value=[network_record]): | ||
network = NetworkPlugin(target_bare) | ||
interfaces = list(network.interfaces()) | ||
assert len(interfaces) == 1 | ||
|
||
assert network.ips() == ["10.42.42.10"] | ||
assert network.gateways() == ["10.42.42.1"] | ||
assert network.macs() == ["DE:AD:BE:EF:00:00"] | ||
assert network.dns() == ["8.8.8.8", "1.1.1.1"] | ||
|
||
assert len(list(network.in_cidr("10.42.42.0/24"))) == 1 | ||
assert len(list(network.in_cidr("10.43.42.0/24"))) == 0 | ||
|
||
assert len(list(network.with_mac("DE:AD:BE:EF:00:00"))) == 1 | ||
assert len(list(network.with_mac("DE:AD:BE:EF:00:01"))) == 0 | ||
|
||
assert len(list(network.with_ip("10.42.42.10"))) == 1 | ||
assert len(list(network.with_ip("10.42.42.42"))) == 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters