diff --git a/contentctl/actions/detection_testing/GitHubService.py b/contentctl/actions/detection_testing/GitHubService.py index 886a3e05..2750414c 100644 --- a/contentctl/actions/detection_testing/GitHubService.py +++ b/contentctl/actions/detection_testing/GitHubService.py @@ -137,11 +137,21 @@ def get_detections_changed(self, director: DirectorOutputDto) -> list[Detection] f"Error: self.repo must be initialized before getting changed detections." ) ) - raise (Exception("not implemented")) + + differences = self.repo.git.diff("--name-status", self.config.main_branch).split("\n") + new_content = list(filter(lambda x: x.split('\t')[1].startswith("A") , differences)) + modified_content = list(filter(lambda x: x.split('\t')[1].startswith("M") , differences)) + deleted_content =list(filter(lambda x: x.split('\t')[1].startswith("D") , differences)) + + content_to_test = list(filter(lambda x: x.startswith("detections"), new_content+modified_content )) + + import code + code.interact(local=locals()) + return [] def __init__(self, config: TestConfig): - self.repo = None + self.repo = git.Repo(config.repo_path) self.requested_detections: list[pathlib.Path] = [] self.config = config @@ -174,39 +184,7 @@ def __init__(self, config: TestConfig): return elif config.mode == DetectionTestingMode.changes: - # Changes is ONLY possible if the app is version controlled - # in a github repo. Ensure that this is the case and, if not - # raise an exception - raise (Exception("Mode [changes] is not yet supported.")) - try: - repo = git.Repo(config.repo_path) - except Exception as e: - raise ( - Exception( - f"Error: detection mode [{config.mode}] REQUIRES that [{config.repo_path}] is a git repository, but it is not." - ) - ) - if config.main_branch == config.test_branch: - raise ( - Exception( - f"Error: test_branch [{config.test_branch}] is the same as the main_branch [{config.main_branch}]. When using mode [{config.mode}], these two branches MUST be different." - ) - ) - - # Ensure that the test branch is checked out - if self.repo.active_branch.name != config.test_branch: - raise ( - Exception( - f"Error: detection mode [{config.mode}] REQUIRES that the test_branch [{config.test_branch}] be checked out at the beginning of the test, but it is not." - ) - ) - - # Ensure that the base branch exists - - if Utils.validate_git_branch_name( - config.repo_path, "NO_URL", config.main_branch - ): - return + return elif config.mode == DetectionTestingMode.all: return diff --git a/contentctl/objects/test_config.py b/contentctl/objects/test_config.py index dca3f2d6..e8a7f33b 100644 --- a/contentctl/objects/test_config.py +++ b/contentctl/objects/test_config.py @@ -1,7 +1,7 @@ # Needed for a staticmethod to be able to return an instance of the class it belongs to from __future__ import annotations - +import git import validators import pathlib import yaml @@ -46,9 +46,10 @@ class TestConfig(BaseModel, extra=Extra.forbid, validate_assignment=True): default=None, title="HTTP(s) path to the repo for repo_path. If this field is blank, it will be inferred from the repo", ) - # main_branch: Union[str,None] = Field(default=None, title="Main branch of the repo, if applicable.") - # test_branch: Union[str,None] = Field(default=None, title="Branch of the repo to be tested, if applicable.") - # commit_hash: Union[str,None] = Field(default=None, title="Commit hash of the repo state to be tested, if applicable") + main_branch: Union[str,None] = Field(default=None, title="Main branch of the repo, if applicable.") + test_branch: Union[str,None] = Field(default=None, title="Branch of the repo to be tested, if applicable.") + commit_hash: Union[str,None] = Field(default=None, title="Commit hash of the repo state to be tested, if applicable") + target_infrastructure: DetectionTestingTargetInfrastructure = Field( default=DetectionTestingTargetInfrastructure.container, title=f"Control where testing should be launched. Choose one of {DetectionTestingTargetInfrastructure._member_names_}", @@ -75,7 +76,7 @@ class TestConfig(BaseModel, extra=Extra.forbid, validate_assignment=True): num_containers: int = Field( default=1, title="Number of testing containers to start in parallel." ) - # pr_number: Union[int,None] = Field(default=None, title="The number of the PR to test") + pr_number: Union[int,None] = Field(default=None, title="The number of the PR to test") splunk_app_username: Union[str, None] = Field( default="admin", title="The name of the user for testing" ) @@ -143,101 +144,112 @@ def validate_ports_overlap(cls, v): # Ensure that at least 1 of test_branch, commit_hash, and/or pr_number were passed. # Otherwise, what are we testing?? # @root_validator(pre=False) - # def ensure_there_is_something_to_test(cls, values): - # if 'test_branch' not in values and 'commit_hash' not in values and'pr_number' not in values: - # if 'mode' in values and values['mode'] == DetectionTestingMode.changes: - # raise(ValueError(f"Under mode [{DetectionTestingMode.changes}], 'test_branch', 'commit_hash', and/or 'pr_number' must be defined so that we know what to test.")) - - # return values - - # @validator('repo_path', always=True) - # def validate_repo_path(cls,v): - - # try: - # path = pathlib.Path(v) - # except Exception as e: - # raise(ValueError(f"Error, the provided path is is not a valid path: '{v}'")) - - # try: - # r = git.Repo(path) - # except Exception as e: - # raise(ValueError(f"Error, the provided path is not a valid git repo: '{path}'")) - - # try: - - # if ALWAYS_PULL_REPO: - # r.remotes.origin.pull() - # except Exception as e: - # raise ValueError(f"Error pulling git repository {v}: {str(e)}") - - # return v - - # @validator('repo_url', always=True) - # def validate_repo_url(cls, v, values): - # Utils.check_required_fields('repo_url', values, ['repo_path']) - - # #First try to get the value from the repo - # try: - # remote_url_from_repo = git.Repo(values['repo_path']).remotes.origin.url - # except Exception as e: - # raise(ValueError(f"Error reading remote_url from the repo located at {values['repo_path']}")) - - # if v is not None and remote_url_from_repo != v: - # raise(ValueError(f"The url of the remote repo supplied in the config file {v} does not "\ - # f"match the value read from the repository at {values['repo_path']}, {remote_url_from_repo}")) - - # if v is None: - # v = remote_url_from_repo - - # #Ensure that the url is the proper format - # try: - # if bool(validators.url(v)) == False: - # raise(Exception) - # except: - # raise(ValueError(f"Error validating the repo_url. The url is not valid: {v}")) - - # return v - - # @validator('main_branch', always=True) - # def valid_main_branch(cls, v, values): - # Utils.check_required_fields('main_branch', values, ['repo_path', 'repo_url']) - - # if v is None: - # print(f"main_branch is not supplied. Inferring from '{values['repo_path']}'...",end='') - - # main_branch = Utils.get_default_branch_name(values['repo_path'], values['repo_url']) - # print(f"main_branch name '{main_branch}' inferred'") - # #continue with the validation - # v = main_branch - - # try: - # Utils.validate_git_branch_name(values['repo_path'],values['repo_url'], v) - # except Exception as e: - # raise ValueError(f"Error validating main_branch: {str(e)}") - # return v - - # @validator('test_branch', always=True) - # def validate_test_branch(cls, v, values): - # Utils.check_required_fields('test_branch', values, ['repo_path', 'repo_url', 'main_branch']) - # if v is None: - # print(f"No test_branch provided, so we will default to using the main_branch '{values['main_branch']}'") - # return values['main_branch'] - # try: - # Utils.validate_git_branch_name(values['repo_path'],values['repo_url'], v) - # except Exception as e: - # raise ValueError(f"Error validating test_branch: {str(e)}") - # return v - - # @validator('commit_hash', always=True) - # def validate_commit_hash(cls, v, values): - # Utils.check_required_fields('commit_hash', values, ['repo_path', 'repo_url', 'test_branch']) - - # try: - # #We can a hash with this function too - # Utils.validate_git_hash(values['repo_path'],values['repo_url'], v, values['test_branch']) - # except Exception as e: - # raise ValueError(f"Error validating commit_hash '{v}': {str(e)}") - # return v + def ensure_there_is_something_to_test(cls, values): + if 'test_branch' not in values and 'commit_hash' not in values and'pr_number' not in values: + if 'mode' in values and values['mode'] == DetectionTestingMode.changes: + raise(ValueError(f"Under mode [{DetectionTestingMode.changes}], 'test_branch', 'commit_hash', and/or 'pr_number' must be defined so that we know what to test.")) + + return values + + @validator('repo_path', always=True) + def validate_repo_path(cls,v): + print(f"checking repo path '{v}'") + try: + path = pathlib.Path(v) + except Exception as e: + print("exception 1") + raise(ValueError(f"Error, the provided path is is not a valid path: '{v}'")) + + try: + r = git.Repo(path) + except Exception as e: + print(f"exception 2: {str(e)}") + raise(ValueError(f"Error, the provided path is not a valid git repo: '{path}'")) + + try: + + if ALWAYS_PULL_REPO: + r.remotes.origin.pull() + except Exception as e: + print("exception 3") + raise ValueError(f"Error pulling git repository {v}: {str(e)}") + print("repo path looks good") + return v + + @validator('repo_url', always=True) + def validate_repo_url(cls, v, values): + Utils.check_required_fields('repo_url', values, ['repo_path']) + + #First try to get the value from the repo + try: + remote_url_from_repo = git.Repo(values['repo_path']).remotes.origin.url + except Exception as e: + raise(ValueError(f"Error reading remote_url from the repo located at {values['repo_path']}")) + + if v is not None and remote_url_from_repo != v: + raise(ValueError(f"The url of the remote repo supplied in the config file {v} does not "\ + f"match the value read from the repository at {values['repo_path']}, {remote_url_from_repo}")) + + if v is None: + v = remote_url_from_repo + + #Ensure that the url is the proper format + try: + if bool(validators.url(v)) == False: + raise(Exception) + except: + raise(ValueError(f"Error validating the repo_url. The url is not valid: {v}")) + + return v + @validator('main_branch', always=True) + def valid_main_branch(cls, v, values): + Utils.check_required_fields('main_branch', values, ['repo_path', 'repo_url']) + print("checking the branch") + if v is None: + print(f"main_branch is not supplied. Inferring from '{values['repo_path']}'...",end='') + + main_branch = Utils.get_default_branch_name(values['repo_path'], values['repo_url']) + print(f"main_branch name '{main_branch}' inferred'") + #continue with the validation + v = main_branch + + try: + Utils.validate_git_branch_name(values['repo_path'],values['repo_url'], v) + except Exception as e: + raise ValueError(f"Error validating main_branch: {str(e)}") + return v + + @validator('test_branch', always=True) + def validate_test_branch(cls, v, values): + Utils.check_required_fields('test_branch', values, ['repo_path', 'repo_url', 'main_branch']) + if v is None: + print(f"No test_branch provided, so we will default to using the main_branch '{values['main_branch']}'") + v = values['main_branch'] + try: + Utils.validate_git_branch_name(values['repo_path'],values['repo_url'], v) + except Exception as e: + raise ValueError(f"Error validating test_branch: {str(e)}") + + r = git.Repo(values.get("repo_path")) + try: + if r.active_branch != v: + print(f"We are trying to test {v} but the current active branch is {r.active_branch}") + print(f"Checking out {v}") + r.git.checkout(v) + except Exception as e: + raise ValueError(f"Error checking out test_branch '{v}': {str(e)}") + return v + + @validator('commit_hash', always=True) + def validate_commit_hash(cls, v, values): + Utils.check_required_fields('commit_hash', values, ['repo_path', 'repo_url', 'test_branch']) + + try: + #We can a hash with this function too + Utils.validate_git_hash(values['repo_path'],values['repo_url'], v, values['test_branch']) + except Exception as e: + raise ValueError(f"Error validating commit_hash '{v}': {str(e)}") + return v @validator("full_image_path", always=True) def validate_full_image_path(cls, v, values): @@ -397,27 +409,27 @@ def validate_num_containers(cls, v): ) return v - # @validator('pr_number', always=True) - # def validate_pr_number(cls, v, values): - # Utils.check_required_fields('pr_number', values, ['repo_path', 'commit_hash']) + @validator('pr_number', always=True) + def validate_pr_number(cls, v, values): + Utils.check_required_fields('pr_number', values, ['repo_path', 'commit_hash']) - # if v == None: - # return v + if v == None: + return v - # hash = Utils.validate_git_pull_request(values['repo_path'], v) + hash = Utils.validate_git_pull_request(values['repo_path'], v) - # #Ensure that the hash is equal to the one in the config file, if it exists. - # if values['commit_hash'] is None: - # values['commit_hash'] = hash - # else: - # if values['commit_hash'] != hash: - # raise(ValueError(f"commit_hash specified in configuration was {values['commit_hash']}, but commit_hash"\ - # f" from pr_number {v} was {hash}. These must match. If you're testing"\ - # " a PR, you probably do NOT want to provide the commit_hash in the configuration file "\ - # "and always want to test the head of the PR. This will be done automatically if you do "\ - # "not provide the commit_hash.")) + #Ensure that the hash is equal to the one in the config file, if it exists. + if values['commit_hash'] is None: + values['commit_hash'] = hash + else: + if values['commit_hash'] != hash: + raise(ValueError(f"commit_hash specified in configuration was {values['commit_hash']}, but commit_hash"\ + f" from pr_number {v} was {hash}. These must match. If you're testing"\ + " a PR, you probably do NOT want to provide the commit_hash in the configuration file "\ + "and always want to test the head of the PR. This will be done automatically if you do "\ + "not provide the commit_hash.")) - # return v + return v @validator("splunk_app_password", always=True) def validate_splunk_app_password(cls, v):