Skip to content

Commit

Permalink
Added description for skipped files
Browse files Browse the repository at this point in the history
  • Loading branch information
mehrinkiani committed Feb 22, 2024
1 parent b8e78d5 commit 667816b
Show file tree
Hide file tree
Showing 9 changed files with 387 additions and 159 deletions.
21 changes: 14 additions & 7 deletions modelscan/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@


class ErrorCategories(Enum):
MODEL_FILE = 1
JSON_DATA = 2
DEPENDENCY = 3
MODEL_SCAN = 1
DEPENDENCY = 2
PATH = 3
NESTED_ZIP = 4
PICKLE_GENOPS = 5
MAGIC_NUMBER = 6
JSON_DECODE = 7


class Error:
scan_name: str
category: ErrorCategories
message: Optional[str]
source: Optional[str]

def __init__(self) -> None:
pass

Expand All @@ -17,17 +26,15 @@ def __str__(self) -> str:


class ModelScanError(Error):
scan_name: str
message: Optional[str]
source: Optional[str]

def __init__(
self,
scan_name: str,
category: ErrorCategories,
message: Optional[str] = None,
source: Optional[str] = None,
) -> None:
self.scan_name = scan_name
self.category = category
self.message = message or "None"
self.source = str(source)

Expand Down
121 changes: 90 additions & 31 deletions modelscan/modelscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from modelscan.settings import DEFAULT_SETTINGS

from pathlib import Path
from typing import List, Union, Optional, IO, Dict, Tuple, Any
from typing import List, Union, Optional, IO, Dict, Any
from datetime import datetime

from modelscan.error import Error, ModelScanError
from modelscan.error import ModelScanError, ErrorCategories
from modelscan.skip import ModelScanSkipped, SkipCategories
from modelscan.issues import Issues, IssueSeverity
from modelscan.scanners.scan import ScanBase
from modelscan.tools.utils import _is_zipfile
Expand All @@ -24,9 +25,9 @@ def __init__(
) -> None:
# Output
self._issues = Issues()
self._errors: List[Error] = []
self._init_errors: List[Error] = []
self._skipped: List[str] = []
self._errors: List[ModelScanError] = []
self._init_errors: List[ModelScanError] = []
self._skipped: List[ModelScanSkipped] = []
self._scanned: List[str] = []
self._input_path: str = ""

Expand Down Expand Up @@ -54,7 +55,9 @@ def _load_scanners(self) -> None:
logger.error(f"Error importing scanner {scanner_path}")
self._init_errors.append(
ModelScanError(
scanner_path, f"Error importing scanner {scanner_path}: {e}"
scanner_path,
ErrorCategories.MODEL_SCAN,
f"Error importing scanner: {e}",
)
)

Expand Down Expand Up @@ -86,13 +89,25 @@ def _scan_path(
):
self._scan_zip(path)
elif not scanned:
self._skipped.append(str(path))
# check if added to skipped already
all_skipped_files = [skipped.source for skipped in self._skipped]
if str(path) not in all_skipped_files:
self._skipped.append(
ModelScanSkipped(
"ModelScan",
SkipCategories.SCAN_NOT_SUPPORTED,
f"Model Scan did not scan file",
str(path),
)
)

else:
logger.error(f"Error: path {path} is not valid")
self._errors.append(
ModelScanError("ModelScan", f"Path {path} is not valid")
ModelScanError(
"ModelScan", ErrorCategories.PATH, "Path is not valid", str(Path)
)
)
self._skipped.append(str(path))

def _scan_directory(self, directory_path: Path) -> None:
for path in directory_path.rglob("*"):
Expand All @@ -111,12 +126,18 @@ def _scan_source(
source=source,
data=data,
)

if scan_results is not None:
logger.info(f"Scanning {source} using {scanner.full_name()} model scan")
self._scanned.append(str(source))
self._issues.add_issues(scan_results.issues)
self._errors.extend(scan_results.errors)
scanned = True
if scan_results.errors:
self._errors.extend(scan_results.errors)
elif scan_results.skipped:
self._skipped.extend(scan_results.skipped)
else:
self._scanned.append(str(source))
self._issues.add_issues(scan_results.issues)
scanned = True

return scanned

def _scan_zip(
Expand All @@ -131,18 +152,42 @@ def _scan_zip(
source=f"{source}:{file_name}",
data=file_io,
)

if not scanned:
if _is_zipfile(file_name, data=file_io):
self._errors.append(
ModelScanError(
"ModelScan",
f"{source}:{file_name} is a zip file. ModelScan does not support nested zip files.",
ErrorCategories.NESTED_ZIP,
"ModelScan does not support nested zip files.",
f"{source}:{file_name}",
)
)
self._skipped.append(f"{source}:{file_name}")

# check if added to skipped already
all_skipped_files = [
skipped.source for skipped in self._skipped
]
if f"{source}:{file_name}" not in all_skipped_files:
self._skipped.append(
ModelScanSkipped(
"ModelScan",
SkipCategories.SCAN_NOT_SUPPORTED,
f"Model Scan did not scan file",
f"{source}:{file_name}",
)
)

except zipfile.BadZipFile as e:
logger.debug(f"Skipping zip file {source}, due to error", e, exc_info=True)
self._skipped.append(str(source))
self._skipped.append(
ModelScanSkipped(
"ModelScan",
SkipCategories.BAD_ZIP,
f"Skipping zip file due to error: {e}",
f"{source}:{file_name}",
)
)

def _generate_results(self) -> Dict[str, Any]:
report: Dict[str, Any] = {}
Expand All @@ -168,11 +213,7 @@ def _generate_results(self) -> Dict[str, Any]:
report["summary"]["absolute_path"] = str(absolute_path)
report["summary"]["modelscan_version"] = __version__
report["summary"]["timestamp"] = datetime.now().isoformat()
report["summary"]["skipped"] = {"total_skipped": len(self._skipped)}
report["summary"]["skipped"]["skipped_files"] = [
str(Path(file_name).relative_to(Path(absolute_path)))
for file_name in self._skipped
]

report["summary"]["scanned"] = {"total_scanned": len(self._scanned)}
report["summary"]["scanned"]["scanned_files"] = [
str(Path(file_name).relative_to(Path(absolute_path)))
Expand All @@ -190,17 +231,35 @@ def _generate_results(self) -> Dict[str, Any]:

all_errors = []

for err in self._errors:
error = {}
if err.message is not None:
error["description"] = err.message
if hasattr(err, "source"):
error["source"] = str(Path(err.source).relative_to(Path(absolute_path)))
if error:
all_errors.append(error)
for error in self._errors:
error_information = {}
error_information["category"] = str(error.category.name)
if error.message is not None:
error_information["description"] = error.message
if hasattr(error, "source"):
error_information["source"] = str(
Path(str(error.source)).relative_to(Path(absolute_path))
)

all_errors.append(error_information)

report["errors"] = all_errors

report["summary"]["skipped"] = {"total_skipped": len(self._skipped)}

all_skipped_files = []

for skipped_file in self._skipped:
skipped_file_information = {}
skipped_file_information["category"] = str(skipped_file.category.name)
skipped_file_information["description"] = str(skipped_file.message)
skipped_file_information["source"] = str(
Path(skipped_file.source).relative_to(Path(absolute_path))
)
all_skipped_files.append(skipped_file_information)

report["summary"]["skipped"]["skipped_files"] = all_skipped_files

return report

def is_compatible(self, path: str) -> bool:
Expand All @@ -222,13 +281,13 @@ def issues(self) -> Issues:
return self._issues

@property
def errors(self) -> List[Error]:
def errors(self) -> List[ModelScanError]:
return self._errors

@property
def scanned(self) -> List[str]:
return self._scanned

@property
def skipped(self) -> List[str]:
def skipped(self) -> List[ModelScanSkipped]:
return self._skipped
Loading

0 comments on commit 667816b

Please sign in to comment.