diff --git a/hubspot/utils/signature.py b/hubspot/utils/signature.py index 2982b331..cea8a724 100644 --- a/hubspot/utils/signature.py +++ b/hubspot/utils/signature.py @@ -2,13 +2,25 @@ import hmac import hashlib -from datetime import datetime +from datetime import datetime, UTC, timedelta from hubspot.exceptions import InvalidSignatureVersionError, InvalidSignatureTimestampError class Signature: - MAX_ALLOWED_TIMESTAMP = 3000 + MAX_ALLOWED_TIMESTAMP = 300 + + @staticmethod + def _is_timestamp_valid(timestamp: str) -> bool: + if timestamp is None: + return False + try: + timestamp_float = float(timestamp) + request_time = datetime.fromtimestamp(timestamp_float // 1000, tz=UTC) + current_time = datetime.now(UTC) + return current_time - request_time < timedelta(seconds=Signature.MAX_ALLOWED_TIMESTAMP) + except (ValueError, OverflowError): + return False @staticmethod def is_valid( @@ -18,12 +30,12 @@ def is_valid( http_uri: str = None, http_method: str = "POST", signature_version: str = "v2", - timestamp: float = None + timestamp: str = None ) -> bool: if signature_version == "v3": - current_time = datetime.now() - if timestamp is None or current_time.timestamp() - timestamp > Signature.MAX_ALLOWED_TIMESTAMP: + if timestamp is None or not Signature._is_timestamp_valid(timestamp): raise InvalidSignatureTimestampError(timestamp=timestamp) + hashed_signature = Signature.get_signature( client_secret, request_body, @@ -33,7 +45,7 @@ def is_valid( timestamp ) - return signature == hashed_signature + return hmac.compare_digest(hashed_signature, signature) @staticmethod def get_signature( @@ -42,7 +54,7 @@ def get_signature( signature_version: str, http_uri: str = None, http_method: str = "POST", - timestamp: float = None, + timestamp: str = None ) -> str: if signature_version == "v1": source_string = f"{client_secret}{request_body}" @@ -54,10 +66,10 @@ def get_signature( source_string = f"{http_method}{http_uri}{request_body}{timestamp}" hashed_signature = base64.b64encode( hmac.new( - client_secret.encode("utf-8"), - msg=source_string.encode("utf-8"), - digestmod=hashlib.sha256 - ).digest() + client_secret.encode("utf-8"), + msg=source_string.encode("utf-8"), + digestmod=hashlib.sha256 + ).digest() ).decode() return hashed_signature else: diff --git a/tests/spec/utils/test_signature.py b/tests/spec/utils/test_signature.py index b0fe5510..9cbdb1c0 100644 --- a/tests/spec/utils/test_signature.py +++ b/tests/spec/utils/test_signature.py @@ -1,7 +1,5 @@ import pytest - from datetime import datetime - from hubspot.exceptions import InvalidSignatureVersionError, InvalidSignatureTimestampError from hubspot.utils.signature import Signature @@ -11,7 +9,7 @@ "request_body": "{'example_field':'example_value'}", "url": "https://www.example.com/webhook_uri", "http_method": "POST", - "timestamp": 15000000, + "timestamp": str(int(datetime.now().timestamp() * 1000)), } @@ -48,10 +46,12 @@ def test_get_signature__v2(): def test_get_signature__v3(): data = { - "signature": "HPW73RUtKmcYoEDADG0s6MmGFWUzWJKAW07r8RDgcQw=", - "signature_version": "v3" - } + "signature": "K36dawei4A+QBNolUOqo7s91KQDWQ5MXZ/QufNYuk/Y=", + "signature_version": "v3", + } + data.update(TEST_DATA) + data["timestamp"] = "1693657560000" signature = Signature.get_signature( data["client_secret"], @@ -66,13 +66,11 @@ def test_get_signature__v3(): def test_get_signature__wrong_version(): - with pytest.raises(InvalidSignatureVersionError): Signature.get_signature( TEST_DATA["client_secret"], TEST_DATA["request_body"], "wrong_signature_version" - ) @@ -84,7 +82,6 @@ def test_is_valid__v1(): TEST_DATA["client_secret"], TEST_DATA["request_body"], signature_version="v1" - ) assert result @@ -120,13 +117,12 @@ def test_is_valid__v2_get_method(): def test_is_valid__v3(): - timestamp = datetime.now().timestamp() signature = Signature.get_signature( TEST_DATA["client_secret"], TEST_DATA["request_body"], signature_version="v3", http_uri=TEST_DATA["url"], - timestamp=timestamp + timestamp=TEST_DATA["timestamp"] ) result = Signature.is_valid( @@ -135,7 +131,7 @@ def test_is_valid__v3(): TEST_DATA["request_body"], signature_version="v3", http_uri=TEST_DATA["url"], - timestamp=timestamp + timestamp=TEST_DATA["timestamp"] ) assert result @@ -161,13 +157,14 @@ def test_is_valid__none_timestamp(): def test_is_valid__expired_timestamp(): - timestamp = datetime.now().timestamp() + expired_timestamp = str(int((datetime.now().timestamp() - Signature.MAX_ALLOWED_TIMESTAMP - 1) * 1000)) + signature = Signature.get_signature( TEST_DATA["client_secret"], TEST_DATA["request_body"], signature_version="v3", http_uri=TEST_DATA["url"], - timestamp=timestamp + timestamp=expired_timestamp ) with pytest.raises(InvalidSignatureTimestampError): @@ -177,5 +174,17 @@ def test_is_valid__expired_timestamp(): TEST_DATA["request_body"], signature_version="v3", http_uri=TEST_DATA["url"], - timestamp=timestamp - Signature.MAX_ALLOWED_TIMESTAMP + timestamp=expired_timestamp ) + + +def test_is_timestamp_valid__valid_timestamp(): + current_timestamp = str(int(datetime.now().timestamp() * 1000)) + + assert Signature._is_timestamp_valid(current_timestamp) is True + + +def test_is_timestamp_valid__expired_timestamp(): + expired_timestamp = str(int((datetime.now().timestamp() - Signature.MAX_ALLOWED_TIMESTAMP - 10) * 1000)) + + assert Signature._is_timestamp_valid(expired_timestamp) is False