Skip to content

Commit

Permalink
code fixes according to the review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
klevitskiy-cyberint committed Nov 22, 2024
1 parent 74656de commit c884ffb
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 98 deletions.
188 changes: 113 additions & 75 deletions Packs/Cyberint/Integrations/FeedCyberint/FeedCyberint.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@

DATE_FORMAT = "%Y-%m-%d"
DEFAULT_INTERVAL = 240
PAGE_SIZE = 20000
PAGE_SIZE = 50000
EXECUTION_TIMEOUT_SECONDS = 1200 # 20 minutes
MAX_LIMIT_SIZE = 500000


class Client(BaseClient):
Expand Down Expand Up @@ -41,13 +42,14 @@ def __init__(


@logger
def request_daily_feed(self, date_time: str = None, limit: int = 1000, execution_start_time: datetime = datetime.now(), test: bool = False) -> List[Dict[str, Any]]:
def request_daily_feed(self, date_time: str = None, limit: int = 1000, execution_start_time: datetime = datetime.now(),
test: bool = False) -> List[Dict[str, Any]]:
"""
Retrieves all entries from the feed with pagination support.
Args:
date_time (str): The date-time value to use in the URL. Defaults to None.
limit (int): The maximum number of entries to retrieve per request. Defaults to 100.
limit (int): The maximum number of entries to retrieve per request. Defaults to 1000.
execution_start_time (datetime): The start time of the execution. Defaults to now.
test (bool): If true, only return one page for testing connection.
Expand All @@ -59,40 +61,29 @@ def request_daily_feed(self, date_time: str = None, limit: int = 1000, execution
has_more = True

while has_more:
if (offset >= MAX_LIMIT_SIZE):
has_more = False
continue

demisto.debug(f'Fetching feed offset {offset}')

# if the execution exceeded the timeout we will break
if not test:
if is_execution_time_exceeded(start_time=execution_start_time):
demisto.debug(f'Execution time exceeded: {EXECUTION_TIMEOUT_SECONDS} seconds from: {execution_start_time}')
return result
has_more = False
continue

start_time = time.time()
response = self.retrieve_indicators_from_api(date_time, limit, offset)

try: # if json invalid (for example, non 200), end the loop
feeds = response.strip().split("\n")
ioc_feeds = [json.loads(feed) for feed in feeds]
except JSONDecodeError as e:
demisto.error(f'Failed to decode JSON: {e}')
has_more = False
continue
# Using the method to fetch and process the feed response
indicators = self.process_feed_response(date_time, limit, offset)

if not ioc_feeds: # if no data is returned, end the loop
demisto.debug('No more indicators found')
if not indicators:
# No more data or an error occurred
has_more = False
else:
for indicator in ioc_feeds:
indicator_value = indicator["ioc_value"]
if indicator_type := auto_detect_indicator_type(indicator_value):
result.append(
{
"value": indicator_value,
"type": indicator_type,
"FeedURL": self._base_url,
"rawJSON": indicator,
}
)
result.extend(indicators) # Append valid indicators to the result

end_time = time.time()
demisto.debug(f'Duration of offset processing {offset}: {end_time - start_time} seconds')
Expand All @@ -109,6 +100,42 @@ def request_daily_feed(self, date_time: str = None, limit: int = 1000, execution
return result


def process_feed_response(self, date_time: str, limit: int, offset: int) -> List[Dict[str, Any]]:
"""
Makes the API request to retrieve the indicators, handles JSON decoding, and processes the feed.
Args:
date_time (str): The date-time value to use in the URL.
limit (int): The maximum number of entries to retrieve per request.
offset (int): The offset to retrieve the correct page of results.
Returns:
A list of indicator dictionaries, or an empty list if no valid indicators are found.
"""
result = []
response = self.retrieve_indicators_from_api(date_time, limit, offset)

try:
# If json is invalid, return empty list (non 200 or invalid JSON)
feeds = response.strip().split("\n")
ioc_feeds = [json.loads(feed) for feed in feeds]
except JSONDecodeError as e:
demisto.error(f'Failed to decode JSON: {e}')
return result # Return empty result on failure

if not ioc_feeds: # No data found
demisto.debug('No more indicators found')
return result

# Process valid feeds
for indicator in ioc_feeds:
ioc_value = indicator.get('ioc_value')
if auto_detect_indicator_type(ioc_value):
result.append(indicator)

return result


@logger
def retrieve_indicators_from_api(self, date_time, limit, offset):
url_suffix = f"{date_time or get_today_time()}?limit={limit}&offset={offset}"
Expand Down Expand Up @@ -180,34 +207,36 @@ def fetch_indicators(
indicators = []

for item in iterator:
ioc_value = item.get("value")
ioc_type = item.get("type")
raw_data = item.get("rawJSON")
if (
("All" in indicator_types or ioc_type in indicator_types)
and ("All" in feed_names or raw_data.get("detected_activity") in feed_names)
and (raw_data.get("confidence") >= confidence_from)
and (raw_data.get("severity_score") >= severity_from)
("All" in indicator_types or item.get("ioc_type") in indicator_types)
and ("All" in feed_names or item.get("detected_activity") in feed_names)
and (item.get("confidence") >= confidence_from)
and (item.get("severity_score") >= severity_from)
):
indicator_obj = {
"value": ioc_value,
"type": ioc_type,
"service": "Cyberint",
"rawJSON": raw_data,
"fields": {
"reportedby": "Cyberint",
"description": raw_data.get("description"),
"firstseenbysource": raw_data.get("observation_date"),
},
}

if feed_tags:
indicator_obj["fields"]["tags"] = feed_tags

if tlp_color:
indicator_obj["fields"]["trafficlightprotocol"] = tlp_color

indicators.append(indicator_obj)
indicator_value = item["ioc_value"]
if indicator_type := auto_detect_indicator_type(indicator_value):
indicator_obj = {
"type": indicator_type,
"value": indicator_value,
"service": "Cyberint",
"rawJSON": item,
"fields": {
"reportedby": "Cyberint",
"firstseenbysource": item.get("observation_date"),
"detected_activity": item.get("detected_activity"),
"severity_score": item.get("severity_score"),
"confidence": item.get("confidence"),
"description": item.get("description")
},
}

if feed_tags:
indicator_obj["fields"]["tags"] = feed_tags

if tlp_color:
indicator_obj["fields"]["trafficlightprotocol"] = tlp_color

indicators.append(indicator_obj)

if limit > 0 and len(indicators) >= limit:
demisto.debug(f'Indicators limit reached (total): {len(indicators)}')
Expand All @@ -218,45 +247,29 @@ def fetch_indicators(

def get_indicators_command(
client: Client,
params: dict[str, Any],
args: dict[str, Any],
) -> CommandResults:
"""
Wrapper for retrieving indicators from the feed to the war-room.
Args:
client: Cyberint API Client.
params: Integration parameters.
args: Command arguments.
Returns:
Outputs.
Outputs indicators.
"""

limit = arg_to_number(args.get("limit")) or 50
tlp_color = params.get("tlp_color", "")
severity_from = arg_to_number(params.get("severity_from")) or 0
confidence_from = arg_to_number(params.get("confidence_from")) or 0
feed_tags = argToList(params.get("feedTags"))
feed_names = argToList(params.get("feed_name"))
indicator_types = argToList(params.get("indicator_type"))
date = args.get("date") or datetime.now().strftime("%Y-%m-%d")
limit = int(args.get("limit")) or 50
offset = int(args.get("offset")) or 0

indicators = fetch_indicators(
client=client,
tlp_color=tlp_color,
feed_tags=feed_tags,
limit=limit,
feed_names=feed_names,
indicator_types=indicator_types,
severity_from=severity_from,
confidence_from=confidence_from,
)
indicators = client.process_feed_response(date, limit, offset)

human_readable = tableToMarkdown(
"Indicators from Cyberint Feed:",
indicators,
headers=["value", "type"],
headerTransform=string_to_table_header,
headers=["detected_activity", "ioc_type", "ioc_value", "observation_date", "severity_score", "confidence", "description"],
headerTransform=header_transformer,
removeNull=True,
)

Expand Down Expand Up @@ -382,7 +395,7 @@ def main():
return_results(test_module(client))

elif command == "cyberint-get-indicators":
return_results(get_indicators_command(client, params, args))
return_results(get_indicators_command(client, args))

elif command == "fetch-indicators":
indicators = fetch_indicators_command(client, params)
Expand Down Expand Up @@ -417,5 +430,30 @@ def is_execution_time_exceeded(start_time: datetime) -> bool:
return secs_from_beginning > EXECUTION_TIMEOUT_SECONDS


def header_transformer(header: str) -> str:
"""
Returns a correct header.
Args:
header (Str): header.
Returns:
header (Str).
"""
if header == 'detected_activity':
return 'Detected activity'
if header == 'ioc_type':
return 'IoC type'
if header == 'ioc_value':
return 'IoC value'
if header == 'observation_date':
return 'Observation date'
if header == 'severity_score':
return 'Severity score'
if header == 'confidence':
return 'Confidence'
if header == 'description':
return 'Description'
return string_to_table_header(header)


if __name__ in ["__main__", "builtin", "builtins"]:
main()
30 changes: 16 additions & 14 deletions Packs/Cyberint/Integrations/FeedCyberint/FeedCyberint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,33 +146,35 @@ script:
commands:
- name: cyberint-get-indicators
arguments:
- name: date
description: Date of data feed for retrieval (e.g. 2024-12-31). The default value is today.
- name: limit
description: The maximum number of results to return. The default value is 50.
defaultValue: "50"
- name: offset
description: Specifies the starting point or position from which data retrieval or processing should begin.
defaultValue: "0"
outputs:
- contextPath: Cyberint.value
description: The indicator value.
- contextPath: Cyberint.detected_activity
description: Detected activity.
type: String
- contextPath: Cyberint.type
- contextPath: Cyberint.ioc_type
description: The indicator type.
type: String
- contextPath: Cyberint.Tags
description: Tags that are associated with the indicator.
type: String
- contextPath: Cyberint.description
description: The feed description.
type: String
- contextPath: Cyberint.detected_activity
description: The feed detected activity.
- contextPath: Cyberint.ioc_value
description: The indicator value.
type: String
- contextPath: Cyberint.observation_date
description: The feed observation date.
description: Observation date.
type: String
- contextPath: Cyberint.severity_score
description: The feed severity score.
description: Severity score.
type: String
- contextPath: Cyberint.confidence
description: The feed confidence.
description: Confidence.
type: String
- contextPath: Cyberint.description
description: Description.
type: String
description: Gets indicators from the feed.
dockerimage: demisto/python3:3.10.13.90168
Expand Down
10 changes: 1 addition & 9 deletions Packs/Cyberint/Integrations/FeedCyberint/FeedCyberint_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,9 @@ def test_get_indicators_command(
expected_url = "http://www.tal1.com/"
expected_ip = "1.1.1.1"

params = {
"tlp_color": "GREEN",
"severity_from": "0",
"confidence_from": "0",
"feed_name": ["All"],
"indicator_type": ["All"],
"limit": 10,
}
args = {"limit": 20}

result = FeedCyberint.get_indicators_command(mock_client, params, args)
result = FeedCyberint.get_indicators_command(mock_client, args)

url_indicators = {indicator["value"] for indicator in result.raw_response if indicator["type"] == "URL"}
ip_indicators = {indicator["value"] for indicator in result.raw_response if indicator["type"] == "IP"}
Expand Down

0 comments on commit c884ffb

Please sign in to comment.