Skip to content

Commit

Permalink
Add Slack signature checks to /slack/events route
Browse files Browse the repository at this point in the history
  • Loading branch information
ThorntonMatthewD committed Dec 10, 2023
1 parent 6039e5f commit e1803bf
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 2 deletions.
74 changes: 73 additions & 1 deletion src/auth.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
"""
Logic for restricting the use of Slack commands to specific parties
and validating incoming requests.
"""

import hashlib
import hmac
import logging
import os
import time
from functools import wraps

from fastapi import HTTPException

from config import SLACK_APP


Expand Down Expand Up @@ -42,3 +49,68 @@ async def auth_wrapper(*args, **kwargs):
)

return auth_wrapper


async def generate_expected_hash(req_timestamp: str, req_body: bytes) -> hmac.HMAC:
"""
Creates an HMAC object by piecing together our signing secret and
the following information provided by the request to our endpoint:
- The X-Slack-Request-Timestamp header
- The request's body
The hex digest will be hashed using the SHA256 algo.
This hash can be used to compare with the X-Slack-Signature of the request
to determine if the request originated from Slack.
"""
singing_secret_as_byte_key = os.getenv("SIGNING_SECRET", "").encode("UTF-8")

return hmac.new(
singing_secret_as_byte_key,
f"v0:{req_timestamp}:".encode() + req_body,
hashlib.sha256,
)


def validate_slack_command_source(request_invocation):
"""
Validates that incoming requests to execute Slack commands have
indeed originated from Slack and not elsewhere.
Raises a generic server error if anything seems out of order.
"""

@wraps(request_invocation)
async def slack_validation_wrapper(*args, **kwargs):
request = kwargs["req"]

# Check for possible replay attacks
if (
abs(time.time() - int(request.headers["X-Slack-Request-Timestamp"]))
> 60 * 5
):
logging.warning("Possible replay attack has been logged.")
raise HTTPException(
status_code=400, detail="There was an issue with your request."
)

expected_hash = await generate_expected_hash(
request.headers["X-Slack-Request-Timestamp"], await request.body()
)
expected_signature = f"v0={expected_hash.hexdigest()}"

# If signatures do not match then either there's a software bug or
# the request wasn't signed by Slack.
if not hmac.compare_digest(
expected_signature, request.headers["X-Slack-Signature"]
):
logging.warning(
"A request to invoke a Slack command failed the signature check."
)
raise HTTPException(
status_code=400, detail="There was an issue with your request."
)

return await request_invocation(*args, **kwargs)

return slack_validation_wrapper
3 changes: 3 additions & 0 deletions src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from starlette.types import Message

import database
from auth import validate_slack_command_source
from bot import periodically_check_api, periodically_delete_old_messages
from config import API, SLACK_APP_HANDLER

Expand Down Expand Up @@ -133,8 +134,10 @@ async def rate_limit_check_api(


@API.post("/slack/events")
@validate_slack_command_source
async def slack_endpoint(req: Request):
"""The front door for all Slack requests"""

return await SLACK_APP_HANDLER.handle(req)


Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

import bot
import config
import server
import database
import server


@pytest.fixture
Expand Down
12 changes: 12 additions & 0 deletions tests/test_auth.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Tests functions contained in src/auth.py
"""
import os

import pytest

Expand All @@ -24,3 +25,14 @@ async def test_is_admin_when_user_is_admin(self, mock_slack_bolt_async_app):
result = await auth.is_admin("admin_user")

assert result is True

@pytest.mark.asyncio
async def test_generation_of_expected_hash(self, mock_slack_bolt_async_app):
os.environ["SIGNING_SECRET"] = "super_secret"

result = await auth.generate_expected_hash("946702800", b"I am a test body")

assert (
result.hexdigest()
== "a02c228c8010f0725da1a2a2524fb0f1dced42c5d56ed1ea11cdb603cf72a434"
)
105 changes: 105 additions & 0 deletions tests/test_server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
"""
Tests for the server.py file.
"""
import hashlib
import hmac
import os
import threading
import time

import helpers
import pytest
Expand Down Expand Up @@ -48,6 +52,10 @@ def test_check_api_whenever_someone_executes_it_for_first_time(
content=helpers.create_slack_request_payload(
command="/check_api", team_domain=TEAM_DOMAIN
),
headers={
"X-Slack-Request-Timestamp": str(int(time.time())),
"X-Slack-Signature": "placeholder",
},
)

# Until the Slack Bolt client is properly mocked this is about as specific as we
Expand Down Expand Up @@ -75,6 +83,10 @@ async def test_check_api_whenever_someone_executes_it_after_expiry(
content=helpers.create_slack_request_payload(
command="/check_api", team_domain=TEAM_DOMAIN
),
headers={
"X-Slack-Request-Timestamp": str(int(time.time())),
"X-Slack-Signature": "placeholder",
},
)

# See:
Expand All @@ -100,3 +112,96 @@ async def test_check_api_whenever_someone_executes_it_before_expiry(
)
assert response.status_code == 200
assert response.content.decode("utf-8") == RATE_LIMIT_COPY


def test_possible_replay_attack_mitigation(test_client, caplog):
"""
If the timestamp provided in the headers is beyond 5 minutes of the
current time then the replay mitigation should be triggered.
"""
test_client.post(
"/slack/events",
content=helpers.create_slack_request_payload(
command="/add_channel", team_domain=TEAM_DOMAIN
),
# Jan 1st, 2000 (way out of the 5 minute threshold)
headers={"X-Slack-Request-Timestamp": "946702800"},
)

assert "Possible replay attack has been logged." in caplog.text


def test_replay_attack_mitigation_skipped(test_client, caplog):
"""
If the timestamp provided in the headers is within 5 minutes of the
current time then the replay mitigation should not be triggered.
"""
test_client.post(
"/slack/events",
content=helpers.create_slack_request_payload(
command="/add_channel", team_domain=TEAM_DOMAIN
),
headers={
"X-Slack-Request-Timestamp": str(int(time.time())),
"X-Slack-Signature": "placeholder",
},
)

assert "Possible replay attack has been logged." not in caplog.text


def test_valid_slack_signature_allows_request_to_be_processed(test_client, caplog):
"""
Whenever the expected Slack signature matches what was provided
in the request headers then the bot will proceed with normal
operations and execute the desired command.
"""
test_signing_secret = "super_secret"
os.environ["SIGNING_SECRET"] = test_signing_secret
timestamp = str(int(time.time()))
body = helpers.create_slack_request_payload(
command="/add_channel", team_domain=TEAM_DOMAIN
)
test_hash = hmac.new(
test_signing_secret.encode("UTF-8"),
f"v0:{timestamp}:".encode() + body,
hashlib.sha256,
)
test_signature = f"v0={test_hash.hexdigest()}"

test_client.post(
"/slack/events",
content=body,
headers={
"X-Slack-Request-Timestamp": timestamp,
"X-Slack-Signature": test_signature,
},
)

assert (
"A request to invoke a Slack command failed the signature check."
not in caplog.text
)


def test_invalid_slack_signature_raises_error(test_client, caplog):
"""
Whenever the expected Slack signature does NOT match what was provided
in the request headers then the bot will raise an exception.
"""
os.environ["SIGNING_SECRET"] = "super_secret"

test_client.post(
"/slack/events",
content=helpers.create_slack_request_payload(
command="/add_channel", team_domain=TEAM_DOMAIN
),
headers={
"X-Slack-Request-Timestamp": str(int(time.time())),
"X-Slack-Signature": "not_correct",
},
)

assert (
"A request to invoke a Slack command failed the signature check." in caplog.text
)

0 comments on commit e1803bf

Please sign in to comment.