Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance signature #322

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions hubspot/utils/signature.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,25 @@
import hmac
import hashlib

from datetime import datetime
from datetime import datetime, UTC, timedelta

from hubspot.exceptions import InvalidSignatureVersionError, InvalidSignatureTimestampError


class Signature:
MAX_ALLOWED_TIMESTAMP = 3000
MAX_ALLOWED_TIMESTAMP = 300

@staticmethod
def _is_timestamp_valid(timestamp: str) -> bool:
if timestamp is None:
return False
try:
timestamp_float = float(timestamp)
request_time = datetime.fromtimestamp(timestamp_float // 1000, tz=UTC)
current_time = datetime.now(UTC)
return current_time - request_time < timedelta(seconds=Signature.MAX_ALLOWED_TIMESTAMP)
except (ValueError, OverflowError):
return False

@staticmethod
def is_valid(
Expand All @@ -18,12 +30,12 @@ def is_valid(
http_uri: str = None,
http_method: str = "POST",
signature_version: str = "v2",
timestamp: float = None
timestamp: str = None
) -> bool:
if signature_version == "v3":
current_time = datetime.now()
if timestamp is None or current_time.timestamp() - timestamp > Signature.MAX_ALLOWED_TIMESTAMP:
if timestamp is None or not Signature._is_timestamp_valid(timestamp):
raise InvalidSignatureTimestampError(timestamp=timestamp)

hashed_signature = Signature.get_signature(
client_secret,
request_body,
Expand All @@ -33,7 +45,7 @@ def is_valid(
timestamp
)

return signature == hashed_signature
return hmac.compare_digest(hashed_signature, signature)

@staticmethod
def get_signature(
Expand All @@ -42,7 +54,7 @@ def get_signature(
signature_version: str,
http_uri: str = None,
http_method: str = "POST",
timestamp: float = None,
timestamp: str = None
) -> str:
if signature_version == "v1":
source_string = f"{client_secret}{request_body}"
Expand All @@ -54,10 +66,10 @@ def get_signature(
source_string = f"{http_method}{http_uri}{request_body}{timestamp}"
hashed_signature = base64.b64encode(
hmac.new(
client_secret.encode("utf-8"),
msg=source_string.encode("utf-8"),
digestmod=hashlib.sha256
).digest()
client_secret.encode("utf-8"),
msg=source_string.encode("utf-8"),
digestmod=hashlib.sha256
).digest()
).decode()
return hashed_signature
else:
Expand Down
39 changes: 24 additions & 15 deletions tests/spec/utils/test_signature.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import pytest

from datetime import datetime

from hubspot.exceptions import InvalidSignatureVersionError, InvalidSignatureTimestampError
from hubspot.utils.signature import Signature

Expand All @@ -11,7 +9,7 @@
"request_body": "{'example_field':'example_value'}",
"url": "https://www.example.com/webhook_uri",
"http_method": "POST",
"timestamp": 15000000,
"timestamp": str(int(datetime.now().timestamp() * 1000)),
}


Expand Down Expand Up @@ -48,10 +46,12 @@ def test_get_signature__v2():

def test_get_signature__v3():
data = {
"signature": "HPW73RUtKmcYoEDADG0s6MmGFWUzWJKAW07r8RDgcQw=",
"signature_version": "v3"
}
"signature": "K36dawei4A+QBNolUOqo7s91KQDWQ5MXZ/QufNYuk/Y=",
"signature_version": "v3",
}

data.update(TEST_DATA)
data["timestamp"] = "1693657560000"

signature = Signature.get_signature(
data["client_secret"],
Expand All @@ -66,13 +66,11 @@ def test_get_signature__v3():


def test_get_signature__wrong_version():

with pytest.raises(InvalidSignatureVersionError):
Signature.get_signature(
TEST_DATA["client_secret"],
TEST_DATA["request_body"],
"wrong_signature_version"

)


Expand All @@ -84,7 +82,6 @@ def test_is_valid__v1():
TEST_DATA["client_secret"],
TEST_DATA["request_body"],
signature_version="v1"

)

assert result
Expand Down Expand Up @@ -120,13 +117,12 @@ def test_is_valid__v2_get_method():


def test_is_valid__v3():
timestamp = datetime.now().timestamp()
signature = Signature.get_signature(
TEST_DATA["client_secret"],
TEST_DATA["request_body"],
signature_version="v3",
http_uri=TEST_DATA["url"],
timestamp=timestamp
timestamp=TEST_DATA["timestamp"]
)

result = Signature.is_valid(
Expand All @@ -135,7 +131,7 @@ def test_is_valid__v3():
TEST_DATA["request_body"],
signature_version="v3",
http_uri=TEST_DATA["url"],
timestamp=timestamp
timestamp=TEST_DATA["timestamp"]
)

assert result
Expand All @@ -161,13 +157,14 @@ def test_is_valid__none_timestamp():


def test_is_valid__expired_timestamp():
timestamp = datetime.now().timestamp()
expired_timestamp = str(int((datetime.now().timestamp() - Signature.MAX_ALLOWED_TIMESTAMP - 1) * 1000))

signature = Signature.get_signature(
TEST_DATA["client_secret"],
TEST_DATA["request_body"],
signature_version="v3",
http_uri=TEST_DATA["url"],
timestamp=timestamp
timestamp=expired_timestamp
)

with pytest.raises(InvalidSignatureTimestampError):
Expand All @@ -177,5 +174,17 @@ def test_is_valid__expired_timestamp():
TEST_DATA["request_body"],
signature_version="v3",
http_uri=TEST_DATA["url"],
timestamp=timestamp - Signature.MAX_ALLOWED_TIMESTAMP
timestamp=expired_timestamp
)


def test_is_timestamp_valid__valid_timestamp():
current_timestamp = str(int(datetime.now().timestamp() * 1000))

assert Signature._is_timestamp_valid(current_timestamp) is True


def test_is_timestamp_valid__expired_timestamp():
expired_timestamp = str(int((datetime.now().timestamp() - Signature.MAX_ALLOWED_TIMESTAMP - 10) * 1000))

assert Signature._is_timestamp_valid(expired_timestamp) is False
Loading