Skip to content

Commit

Permalink
Merge pull request #37 from Ostorlab/fixit/ASNParserError-fix
Browse files Browse the repository at this point in the history
Handle ASNParseError during WHOIS lookup
  • Loading branch information
3asm authored Dec 2, 2024
2 parents 835d8f3 + 8c8b7ac commit 310d128
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
2 changes: 2 additions & 0 deletions agent/whois_ip_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ def _process_ip(self, message: m.Message, host: str) -> None:
"Some data not found when agent_whois_ip_asset try to process IP %s",
address,
)
except ipwhois.exceptions.ASNParseError:
logger.error("ASN parse error for IP %s", address)
except exceptions.HTTPRateLimitError:
logger.warning("Rate limit error for IP %s", address)
else:
Expand Down
33 changes: 25 additions & 8 deletions tests/whois_ip_agent_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,14 +318,12 @@ def testWhoisIP_withIPv4AndMaskButNoVersion_shouldHandleVersionCorrectly(
data=message_data,
)

with mock.patch.object(
test_agent, "_redis_client"
) as mock_redis_client, mock.patch.object(
test_agent, "add_ip_network"
) as mock_add_ip_network, mock.patch.object(
test_agent, "start", mock.MagicMock()
), mock.patch.object(test_agent, "run", mock.MagicMock()), mock.patch(
"agent.whois_ip_agent.WhoisIPAgent.main", mock.MagicMock()
with (
mock.patch.object(test_agent, "_redis_client") as mock_redis_client,
mock.patch.object(test_agent, "add_ip_network") as mock_add_ip_network,
mock.patch.object(test_agent, "start", mock.MagicMock()),
mock.patch.object(test_agent, "run", mock.MagicMock()),
mock.patch("agent.whois_ip_agent.WhoisIPAgent.main", mock.MagicMock()),
):
mock_redis_client.sismember.return_value = False

Expand All @@ -347,3 +345,22 @@ def testWhoisIP_whenInvalidIPAddressIsProvided_raisesValueError(

with pytest.raises(ValueError, match="Invalid IP address: invalid_ip"):
test_agent.process(ip_msg)


def testWhoisIp_whenASNParseErrorOccure_logWithoutCrash(
test_agent: whois_ip_agent.WhoisIPAgent,
scan_message_ipv4: message.Message,
agent_persist_mock: dict[str | bytes, str | bytes],
mocker: plugin.MockerFixture,
agent_mock: List[message.Message],
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that ASNParseError is caught and logged."""
mocker.patch(
"ipwhois.IPWhois.lookup_rdap", side_effect=ipwhois.exceptions.ASNParseError
)

test_agent.process(scan_message_ipv4)

assert len(agent_mock) == 0
assert "ASN parse error for IP" in caplog.text

0 comments on commit 310d128

Please sign in to comment.