diff --git a/logprep/util/schema_and_rule_checker.py b/logprep/util/schema_and_rule_checker.py index 05a3ca2d4..50d1015b0 100644 --- a/logprep/util/schema_and_rule_checker.py +++ b/logprep/util/schema_and_rule_checker.py @@ -2,27 +2,28 @@ """Runner for testing schemas and rules""" -from typing import Optional, List -from collections.abc import Iterable - from argparse import ArgumentParser +from collections.abc import Iterable +from json.decoder import JSONDecodeError from logging import Logger from os import walk from os.path import join -from json.decoder import JSONDecodeError +from typing import List, Optional from colorama import Fore -from logprep.util.configuration import Configuration - +from logprep.abc.processor import Processor +from logprep.filter.lucene_filter import LuceneFilterError from logprep.processor.base.exceptions import ( InvalidRuleDefinitionError, MismatchedRuleDefinitionError, ) from logprep.processor.base.rule import Rule -from logprep.abc.processor import Processor -from logprep.processor.labeler.labeling_schema import LabelingSchema, InvalidLabelingSchemaFileError -from logprep.filter.lucene_filter import LuceneFilterError +from logprep.processor.labeler.labeling_schema import ( + InvalidLabelingSchemaFileError, + LabelingSchema, +) +from logprep.util.configuration import Configuration class SchemaAndRuleChecker: @@ -138,14 +139,17 @@ def _validate_rules_in_path( path_schema: str = None, ): number_of_checked_rules = 0 - for root, _, files in walk(path_rules): - for file in files: - number_of_checked_rules += 1 - rule_path = join(root, file) - - multi_rule = self.check_rule_creation_errors(rule_class, rule_path) - self._validate_schema(multi_rule, path_schema, rule_path) - self._print_schema_check_results(path_schema) + if isinstance(path_rules, dict): + self.check_rule_creation_errors(rule_class, path_rules) + else: + for root, _, files in walk(path_rules): + for file in files: + number_of_checked_rules += 1 + rule_path = join(root, file) + + multi_rule = self.check_rule_creation_errors(rule_class, rule_path) + self._validate_schema(multi_rule, path_schema, rule_path) + self._print_schema_check_results(path_schema) if not self.errors: self._print_valid( f"Valid {processor_type} rules in {path_rules} " @@ -198,7 +202,9 @@ def check_rule_creation_errors(self, rule_class: Rule, rule_path: str) -> Option """ rule = None try: - if rule_path.endswith(".json") or rule_path.endswith(".yml"): + if isinstance(rule_path, dict): + rule = rule_class.create_rules_from_target(rule_path) + elif rule_path.endswith(".json") or rule_path.endswith(".yml"): if not rule_path.endswith("_test.json"): rule = rule_class.create_rules_from_target(rule_path) except InvalidRuleDefinitionError as error: