Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add features as described in Issue 37 #51

Merged
merged 8 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ logs/
book
src
stacks
docker/configs
docker/configs
.venv
5 changes: 5 additions & 0 deletions mantis/models/args_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,10 @@ class ArgsModel(BaseModel):
subdomain: str = Field(None)
list_: bool = False
list_orgs: bool = False
list_domains: bool = False
orgs_list: list[str] = False
asset_types_list: list[str] = False
after_datetime_filter: str = None
before_datetime_filter: str = None
in_scope: bool = False

32 changes: 28 additions & 4 deletions mantis/utils/args_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,14 @@ def args_parse() -> ArgsModel:

list_parser = subparser.add_parser("list", help="List entities present in db", usage=ArgsParse.list_msg())

list_sub_parser = list_parser.add_subparsers(title="List Subcommands", dest="list_sub_command")

list_sub_parser.add_parser("orgs", help="List orgs present in DB")
list_parser.add_argument("-l","--list-orgs", help="list all orgs from database", dest="list_sub_command_ls_orgs", action="store_true")

list_parser.add_argument("-d","--list-domains", help="list domains (tlds/subdomains) for selected orgs", dest="list_sub_command_ls_domains", action="store_true")
list_parser.add_argument("-o","--org", help="select org by name", dest="list_sub_command_orgs_list", type=list[str], action="append")
list_parser.add_argument("-t", "--tlds", help="list tlds for selected orgs", action="store_true", dest="list_sub_command_ls_subs_tlds")
list_parser.add_argument("-s", "--subs", help="list subdomains for selected orgs", action="store_true", dest="list_sub_command_ls_subs_domains")
list_parser.add_argument("-a","--after", type=str, help="Start date in YYYY-MM-DD format", dest="list_sub_command_ls_subs_after_filter")
list_parser.add_argument("-b","--before", type=str, help="End date in YYYY-MM-DD format", dest="list_sub_command_ls_subs_before_filter")

# display help, if no arguments are passed
args = parser.parse_args(args=None if argv[1:] else ['--help'])
Expand Down Expand Up @@ -311,9 +316,28 @@ def args_parse() -> ArgsModel:
if args.subcommand == "list":
parsed_args["list_"] = True

if args.list_sub_command == "orgs":
# python launch.py list -l
if args.list_sub_command_ls_orgs:
parsed_args["list_orgs"] = True

# python launch.py list -d -s -t -o <org> -a 2024-10-04 -b 2024-10-05
if args.list_sub_command_ls_domains:
asset_types = []
if args.list_sub_command_ls_subs_tlds:
asset_types.append("TLD")
if args.list_sub_command_ls_subs_domains:
asset_types.append("subdomain")

parsed_args["asset_types_list"] = asset_types
parsed_args["orgs_list"] = [ ''.join(org) for org in args.list_sub_command_orgs_list]
parsed_args["list_domains"] = True

if args.list_sub_command_ls_subs_after_filter:
parsed_args["after_datetime_filter"] = f"{args.list_sub_command_ls_subs_after_filter}T00:00:00Z"

if args.list_sub_command_ls_subs_before_filter:
parsed_args["before_datetime_filter"] = f"{args.list_sub_command_ls_subs_before_filter}T23:59:59Z"

args_pydantic_obj = ArgsModel.parse_obj(parsed_args)
logging.info(f'parsed args - {args_pydantic_obj}')
logging.info(f"Parsing Arguements - Completed")
Expand Down
57 changes: 47 additions & 10 deletions mantis/utils/list_subcommand_utils.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,58 @@
import logging
from mantis.db.crud_assets import read_assets


async def get_orgs():
pipeline = []

pipeline.extend([
{"$group": {
"_id" : None,
"orgs":{
"$addToSet": "$org"
}
}
}
])
pipeline.extend([{"$group": {"_id": None, "orgs": {"$addToSet": "$org"}}}])
orgs = await read_assets(pipeline)

if orgs:
return orgs[0]["orgs"]

else:
return None
return None


async def get_domains(orgs:list[str], asset_types:list[str], after_filter:str, before_filter:str):
if len(orgs) == 0:
logging.warning('No orgs selected')
return []

if len(asset_types) == 0:
logging.warning('no asset type was selected')
return []

match_filter = {
"org": {"$in": orgs},
"asset_type": {"$in": asset_types},
}

if after_filter:
match_filter.setdefault("created_timestamp", {})
match_filter["created_timestamp"]["$gte"] = after_filter

if before_filter:
match_filter.setdefault("created_timestamp", {})
match_filter["created_timestamp"]["$lte"] = before_filter

pipeline = [
{
"$match": match_filter
},
{
"$group": {
"_id": None,
"asset": {"$addToSet": "$asset"},
},
},
]

result = await read_assets(pipeline)
domains = []
if result and len(result) > 0:
domains = result[0].get('asset', [])
else:
logging.warning('No match found')
return domains
25 changes: 18 additions & 7 deletions mantis/workflows/list_workflow.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
import logging
from mantis.models.args_model import ArgsModel
from mantis.utils.list_subcommand_utils import get_orgs
from mantis.utils.list_subcommand_utils import get_orgs, get_domains


class ListWorkflow:

@staticmethod
async def executor(args: ArgsModel):

if args.list_orgs:
logging.info("Getting orgs from database")
orgs = await get_orgs()
print()
print(f'''Total Number of orgs onboarded: {len(orgs)}''')
print()
for org in orgs:
print(org)
if orgs:
print(f"\nTotal Number of orgs onboarded: {len(orgs)}\n")
print("\n".join(orgs))
else:
logging.info("No orgs found in database")

if args.list_domains:
logging.info("Getting subdomains from database")
domains = await get_domains(args.orgs_list, args.asset_types_list, args.after_datetime_filter, args.before_datetime_filter)
if domains:
print(f"\nOrgs Filter: {','.join(args.orgs_list)}")
print(f"Total Domains Found based on provided filters: {len(domains)}\n")
print('\n'.join(domains))
else:
logging.info("No domains found")