Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added mechanism to break if not enough time remained for another fetc… #37322

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 98 additions & 13 deletions Packs/Akamai_SIEM/Integrations/Akamai_SIEM/Akamai_SIEM.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@

VENDOR = "Akamai"
PRODUCT = "WAF"
FETCH_EVENTS_PAGE_SIZE = 50000
FETCH_EVENTS_MAX_PAGE_SIZE = 20000 # Allowed events limit per request.
TIME_TO_RUN_BUFFER = 30 # When calculating time left to run, will use this as a safe zone delta.
EXECUTION_START_TIME = datetime.now()
ALLOWED_PAGE_SIZE_DELTA_RATIO = 0.95 # uses this delta to overcome differences from Akamai When calculating latest request size.
MAX_ALLOWED_FETCH_LIMIT = 80000
SEND_EVENTS_TO_XSIAM_CHUNK_SIZE = 4 * (10 ** 6) # 4 MB

# Disable insecure warnings
urllib3.disable_warnings()
Expand Down Expand Up @@ -405,6 +410,45 @@ def dedup_events(hashed_events_mapping: dict[str, dict], hashed_events_from_prev
return deduped_events, hashed_events_from_current_run


def is_last_request_smaller_than_page_size(num_events_from_previous_request: int, page_size: int) -> bool:
"""Checks wether the number of events from the last API call was lower by a certain delta than the request page size.

Args:
num_events_from_previous_request (int): The length of the list of events from previous response
page_size (int): the request limit for the last request.

Returns:
bool: True if the number of events from last API call was lower by a certain delta for the requested page size.
Otherwise, return False
"""
demisto.info(f"Checking whether execution should break with {num_events_from_previous_request=} and {page_size=}")
return num_events_from_previous_request < page_size * ALLOWED_PAGE_SIZE_DELTA_RATIO


def is_interval_doesnt_have_enough_time_to_run(min_allowed_delta: int, max_time_took: float) -> tuple[bool, float]:
"""
Checking whether there's enough time for another fetch request to the Akamai API before docker timeout.
The function calculates the time of the first request (including the send_events_to_xsiam_part).
And checks wether the remaining running time (plus a little delta) is less or equal the expected running time.
The remaining running time is docker timeout limit in seconds - the run time so far (now time - docker execution start time).

Args:
min_allowed_delta (int): The minimum allowed delta that should remain before going on another fetch interval.
max_time_took (float): The worst case execution (the first execution) to compare the rest of the executions to.
Returns:
bool: Return True if there's not enough time. Otherwise, return False.
"""
timeout_time_nano_seconds = demisto.callingContext.get('context', {}).get('TimeoutDuration')
YuvHayun marked this conversation as resolved.
Show resolved Hide resolved
demisto.info(f"Got {timeout_time_nano_seconds} non seconds for the execution.")
timeout_time_seconds = timeout_time_nano_seconds / 1_000_000_000
now = datetime.now()
time_since_interval_beginning = (now - EXECUTION_START_TIME).total_seconds()
if not max_time_took:
max_time_took = time_since_interval_beginning
demisto.info(f"Checking if execution should break with {time_since_interval_beginning=}, {max_time_took=}.")
return (timeout_time_seconds - time_since_interval_beginning - min_allowed_delta) <= max_time_took, max_time_took


@logger
def fetch_events_command(
client: Client,
Expand All @@ -425,18 +469,48 @@ def fetch_events_command(
page_size: The number of events to limit for every request.

Yields:
(list[dict], str, int, str): events, new offset, total number of events fetched, and new last_run time to set.
(list[dict], str, int, set[str], bool): events, new offset, total number of events fetched,
event hashes from current fetch, and whether to set nexttrigger=0 for next execution.
"""
total_events_count = 0
from_epoch, _ = parse_date_range(date_range=fetch_time, date_format='%s')
offset = ctx.get("offset")
hashed_events_from_previous_run = set(ctx.get("hashed_events_from_previous_run", set()))
while total_events_count < int(fetch_limit):
auto_trigger_next_run = False
worst_case_time: float = 0
execution_counter = 0
while total_events_count < fetch_limit:
if execution_counter > 0:
demisto.info(f"The execution number is {execution_counter}, checking for breaking conditions.")
if is_last_request_smaller_than_page_size(len(events), page_size): # type: ignore[has-type] # pylint: disable=E0601
demisto.info("last request wasn't big enough, breaking.")
break
should_break, worst_case_time = is_interval_doesnt_have_enough_time_to_run(TIME_TO_RUN_BUFFER, worst_case_time)
if should_break:
demisto.info("Not enough time for another execution, breaking and triggering next run.")
auto_trigger_next_run = True
break
if (remaining_events_to_fetch := fetch_limit - total_events_count) < page_size:
demisto.info(f"{remaining_events_to_fetch=} < {page_size=}, lowering page_size to {remaining_events_to_fetch}.")
page_size = remaining_events_to_fetch
demisto.info(f"Preparing to get events with {offset=}, {page_size=}, and {fetch_limit=}")
events, offset = client.get_events_with_offset(config_ids, offset, page_size, from_epoch)
try:
events, offset = client.get_events_with_offset(config_ids, offset, page_size, from_epoch)
except DemistoException as e:
demisto.info(f"Got an error when trying to request for new events from Akamai\n{e}")
if "Requested Range Not Satisfiable" in str(e):
YuvHayun marked this conversation as resolved.
Show resolved Hide resolved
err_msg = f'Got Index out of range error when attempting to fetch events from Akamai.\n' \
"In order to continue fetching, please run 'akamai-siem-reset-offset' on the specific instance.\n" \
'For more information, please refer to the Troubleshooting section in the integration documentation.\n' \
f'original error: [{e}]'
raise DemistoException(err_msg)
else:
raise DemistoException(e)

if not events:
demisto.info("Didn't receive any events, breaking.")
break
demisto.info(f"got {len(events)} events, moving to processing events data.")
hashed_events_mapping = {}
for event in events:
try:
Expand All @@ -460,8 +534,9 @@ def fetch_events_command(
total_events_count += len(deduped_events)
demisto.info(f"After deduplicate events, Got {len(deduped_events)} events, and {offset=}")
hashed_events_from_previous_run = hashed_events_from_current_run
yield deduped_events, offset, total_events_count, hashed_events_from_previous_run
yield [], offset, total_events_count, hashed_events_from_previous_run
execution_counter += 1
yield deduped_events, offset, total_events_count, hashed_events_from_previous_run, auto_trigger_next_run
yield [], offset, total_events_count, hashed_events_from_previous_run, auto_trigger_next_run


def decode_url(headers: str) -> dict:
Expand Down Expand Up @@ -519,25 +594,35 @@ def main(): # pragma: no cover
demisto.incidents(incidents)
demisto.setLastRun(new_last_run)
elif command == "fetch-events":
page_size = int(params.get("page_size", FETCH_EVENTS_PAGE_SIZE))
page_size = int(params.get("page_size", FETCH_EVENTS_MAX_PAGE_SIZE))
limit = int(params.get("fetchLimit", 300000))
for events, offset, total_events_count, hashed_events_from_current_run in fetch_events_command( # noqa: B007
if limit > MAX_ALLOWED_FETCH_LIMIT:
demisto.info(f"Got {limit=} larger than {MAX_ALLOWED_FETCH_LIMIT=}, setting limit to {MAX_ALLOWED_FETCH_LIMIT}.")
limit = MAX_ALLOWED_FETCH_LIMIT
if limit < page_size:
demisto.info(f"Got {limit=} lower than {page_size=}, lowering page_size to {limit}.")
page_size = limit
for events, offset, total_events_count, hashed_events_from_current_run, auto_trigger_next_run in ( # noqa: B007
fetch_events_command(
client,
"5 minutes",
fetch_limit=limit,
config_ids=params.get("configIds", ""),
ctx=get_integration_context() or {},
page_size=page_size
):
)):
if events:
demisto.info(f"Sending events to xsiam with latest event time is: {events[-1]['_time']}")
send_events_to_xsiam(events, VENDOR, PRODUCT, should_update_health_module=False)
demisto.info(f"Sending {len(events)} events to xsiam with latest event time is: {events[-1]['_time']}")
send_events_to_xsiam(events, VENDOR, PRODUCT, should_update_health_module=False,
chunk_size=SEND_EVENTS_TO_XSIAM_CHUNK_SIZE)
demisto.info(f"Done sending {len(events)} events to xsiam."
f"sent {total_events_count} events to xsiam in total during this interval.")
set_integration_context({"offset": offset,
"hashed_events_from_previous_run": list(hashed_events_from_current_run)})
demisto.updateModuleHealth({'eventsPulled': (total_events_count or 0)})
next_run = {}
if total_events_count >= limit:
demisto.info(f"got at least {limit} events this interval - will automatically trigger next run.")
if auto_trigger_next_run or total_events_count >= limit:
YuvHayun marked this conversation as resolved.
Show resolved Hide resolved
demisto.info(f"got {auto_trigger_next_run=} or at least {limit} events this interval - setting nextTrigger=0.")
next_run["nextTrigger"] = "0"
else:
demisto.info(f"Got less than {limit} events this interval - will not trigger next run automatically.")
Expand Down
6 changes: 3 additions & 3 deletions Packs/Akamai_SIEM/Integrations/Akamai_SIEM/Akamai_SIEM.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ configuration:
type: 0
defaultvalue: '20'
required: false
additionalinfo: Limit on the number of incidents retrieved in a single fetch.
additionalinfo: Limit on the number of incidents retrieved in a single fetch. The maximum is 80k.
section: Collect
- display: Akamai Page size
name: page_size
type: 0
required: false
section: Collect
defaultvalue: '50000'
additionalinfo: The number of events to fetch per request to akamai (multiple requests are made for each fetch). If you're getting aggregated delays, increase the number. The maximum is 600,000.
defaultvalue: '20000'
additionalinfo: The number of events to fetch per request to akamai (multiple requests are made for each fetch). If you're getting aggregated delays, increase the number. The maximum is 80k. Note that if your API has higher ingestion rate, the integration will be able to detect it and immediately starts the next run in order to fetch events quicker.
hidden:
- xsoar
advanced: true
Expand Down
Loading