Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Webhook tasks using FlyteAgents #3058

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open

Webhook tasks using FlyteAgents #3058

wants to merge 12 commits into from

Conversation

kumare3
Copy link
Contributor

@kumare3 kumare3 commented Jan 14, 2025

There have been numerous requests from folks to support invoking arbitrary apis or using webhooks to notify.

This agent supports calling webhooks like slack, github etc directly from a flyte workflow

Possible to write workflows like,

from flytekit.extras.webhook import WebhookTask


@fk.task
def hello(s: str) -> str:
    return "Hello " + s

simple_get = WebhookTask(
    name="simple-get",
    url="http://localhost:8000/",
    method=http.HTTPMethod.GET,
    headers={"Content-Type": "application/json"},
)

get_with_params = WebhookTask(
    name="get-with-params",
    url="http://localhost:8000/items/{inputs.item_id}",
    method=http.HTTPMethod.GET,
    headers={"Content-Type": "application/json"},
    dynamic_inputs={"s": str, "item_id": int},
    show_data=True,
    show_url=True,
    description="Test Webhook Task",
    data={"q": "{inputs.s}"},
)


@fk.workflow
def wf(s: str) -> (dict, dict, dict):
    v = hello(s=s)
    w = WebhookTask(
        name="invoke-slack",
        url="https://hooks.slack.com/services/asdasdasd/asdasdasd",
        headers={"Content-Type": "application/json"},
        data={"text": "{inputs.s}"},
        show_data=True,
        show_url=True,
        description="Test Webhook Task",
        dynamic_inputs={"s": str},
    )
    return simple_get(), get_with_params(s=v, item_id=10), w(s=v)

Summary by Bito

This PR implements WebhookTask and WebhookAgent components in Flytekit, transitioning from aiohttp to httpx for HTTP operations. The implementation supports GET/POST methods with configurable timeouts and dynamic input parameters. The changes include comprehensive documentation and enhanced validation for pip arguments. The system is made more robust through proper error handling, validation, and graceful failure recovery mechanisms.

Unit tests added: True

Estimated effort to review (1-5, lower is better): 5

@flyte-bot
Copy link
Contributor

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - The AI Code Review Agent skipped reviewing this change because it is configured to exclude certain pull requests based on the source/target branch or the pull request status. You can change the settings here, or contact the agent instance creator at [email protected].

Signed-off-by: Ketan Umare <[email protected]>
Signed-off-by: Ketan Umare <[email protected]>
Copy link

codecov bot commented Jan 14, 2025

Codecov Report

Attention: Patch coverage is 0% with 120 lines in your changes missing coverage. Please review.

Project coverage is 76.45%. Comparing base (a2bcf95) to head (5c1537b).
Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
flytekit/extras/webhook/agent.py 0.00% 50 Missing ⚠️
flytekit/utils/dict_formatter.py 0.00% 37 Missing ⚠️
flytekit/extras/webhook/task.py 0.00% 23 Missing ⚠️
flytekit/extras/webhook/constants.py 0.00% 7 Missing ⚠️
flytekit/extras/webhook/__init__.py 0.00% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3058      +/-   ##
==========================================
+ Coverage   72.87%   76.45%   +3.58%     
==========================================
  Files         205      210       +5     
  Lines       21553    21673     +120     
  Branches     2746     2764      +18     
==========================================
+ Hits        15707    16571     +864     
+ Misses       5062     4340     -722     
+ Partials      784      762      -22     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@flyte-bot
Copy link
Contributor

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - The AI Code Review Agent skipped reviewing this change because it is configured to exclude certain pull requests based on the source/target branch or the pull request status. You can change the settings here, or contact the agent instance creator at [email protected].

Signed-off-by: Ketan Umare <[email protected]>
@kumare3 kumare3 changed the title [WIP] Webhook tasks using FlyteAgents Webhook tasks using FlyteAgents Jan 14, 2025
@kumare3 kumare3 marked this pull request as ready for review January 14, 2025 23:22
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 15, 2025

Code Review Agent Run #8c3d4c

Actionable Suggestions - 6
  • flytekit/extras/webhook/constants.py - 1
    • Consider adding type hints to constants · Line 1-8
  • flytekit/extras/webhook/agent.py - 2
  • tests/flytekit/unit/extras/webhook/test_agent.py - 1
    • Consider async mock pattern improvement · Line 35-35
  • flytekit/utils/dict_formatter.py - 1
    • Consider simplifying token truncation logic · Line 31-31
  • tests/flytekit/unit/extras/webhook/test_task.py - 1
    • Consider adding error case test coverage · Line 40-40
Additional Suggestions - 1
  • tests/flytekit/unit/extras/webhook/test_agent.py - 1
    • Add async context simulation to session mock · Line 26-28
Review Details
  • Files reviewed - 8 · Commit Range: 1ccea6f..ef2fd14
    • flytekit/extras/webhook/__init__.py
    • flytekit/extras/webhook/agent.py
    • flytekit/extras/webhook/constants.py
    • flytekit/extras/webhook/task.py
    • flytekit/utils/dict_formatter.py
    • plugins/flytekit-aws-sagemaker/flytekitplugins/awssagemaker_inference/boto3_mixin.py
    • tests/flytekit/unit/extras/webhook/test_agent.py
    • tests/flytekit/unit/extras/webhook/test_task.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 15, 2025

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
New Feature - Webhook Integration Framework

__init__.py - Initializes webhook module with WebhookAgent and WebhookTask exports

agent.py - Implements WebhookAgent for handling HTTP requests with async support

constants.py - Defines webhook-related constants and configuration keys

task.py - Implements WebhookTask for configurable webhook execution

Feature Improvement - Dictionary Formatting Utility Enhancement

dict_formatter.py - Adds utility for formatting nested dictionaries with placeholders

boto3_mixin.py - Refactors dictionary formatting logic to use common utility

Testing - Webhook Feature Test Coverage

test_agent.py - Tests WebhookAgent functionality and HTTP request handling

test_end_to_end.py - End-to-end tests for webhook integration in workflows

test_task.py - Unit tests for WebhookTask configuration and behavior

Comment on lines 1 to 8
TASK_TYPE = "webhook"

URL_KEY = "url"
METHOD_KEY = "method"
HEADERS_KEY = "headers"
BODY_KEY = "body"
SHOW_BODY_KEY = "show_body"
SHOW_URL_KEY = "show_url"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding type hints to constants

Consider adding type hints to the constant declarations to improve code maintainability and IDE support. For example: TASK_TYPE: str = "webhook"

Code suggestion
Check the AI-generated fix before applying
Suggested change
TASK_TYPE = "webhook"
URL_KEY = "url"
METHOD_KEY = "method"
HEADERS_KEY = "headers"
BODY_KEY = "body"
SHOW_BODY_KEY = "show_body"
SHOW_URL_KEY = "show_url"
TASK_TYPE: str = "webhook"
URL_KEY: str = "url"
METHOD_KEY: str = "method"
HEADERS_KEY: str = "headers"
BODY_KEY: str = "body"
SHOW_BODY_KEY: str = "show_body"
SHOW_URL_KEY: str = "show_url"

Code Review Run #8c3d4c


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines 41 to 78
url = final_dict.get(URL_KEY)
body = final_dict.get(BODY_KEY)
headers = final_dict.get(HEADERS_KEY)
method = final_dict.get(METHOD_KEY)
method = http.HTTPMethod(method)
show_body = final_dict.get(SHOW_BODY_KEY, False)
show_url = final_dict.get(SHOW_URL_KEY, False)

session = await self._get_session()

text = None
if method == http.HTTPMethod.GET:
response = await session.get(url, headers=headers)
text = await response.text()
else:
response = await session.post(url, json=body, headers=headers)
text = await response.text()
if response.status != 200:
return Resource(
phase=TaskExecution.FAILED,
message=f"Webhook failed with status code {response.status}, response: {text}",
)
final_response = {
"status_code": response.status,
"body": text,
}
if show_body:
final_response["input_body"] = body
if show_url:
final_response["url"] = url

return Resource(
phase=TaskExecution.SUCCEEDED,
outputs={"info": final_response},
message="Webhook was successfully invoked!",
)
except Exception as e:
return Resource(phase=TaskExecution.FAILED, message=str(e))
Copy link
Contributor

@flyte-bot flyte-bot Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider breaking down long method

The do method is quite long and handles multiple responsibilities including HTTP request handling, response processing, and error handling. Consider breaking it down into smaller, focused methods for better maintainability.

Code suggestion
Check the AI-generated fix before applying
Suggested change
url = final_dict.get(URL_KEY)
body = final_dict.get(BODY_KEY)
headers = final_dict.get(HEADERS_KEY)
method = final_dict.get(METHOD_KEY)
method = http.HTTPMethod(method)
show_body = final_dict.get(SHOW_BODY_KEY, False)
show_url = final_dict.get(SHOW_URL_KEY, False)
session = await self._get_session()
text = None
if method == http.HTTPMethod.GET:
response = await session.get(url, headers=headers)
text = await response.text()
else:
response = await session.post(url, json=body, headers=headers)
text = await response.text()
if response.status != 200:
return Resource(
phase=TaskExecution.FAILED,
message=f"Webhook failed with status code {response.status}, response: {text}",
)
final_response = {
"status_code": response.status,
"body": text,
}
if show_body:
final_response["input_body"] = body
if show_url:
final_response["url"] = url
return Resource(
phase=TaskExecution.SUCCEEDED,
outputs={"info": final_response},
message="Webhook was successfully invoked!",
)
except Exception as e:
return Resource(phase=TaskExecution.FAILED, message=str(e))
return await self._process_webhook(final_dict)
except Exception as e:
return Resource(phase=TaskExecution.FAILED, message=str(e))
async def _make_http_request(self, method: http.HTTPMethod, url: str, headers: dict, body: dict = None) -> tuple:
session = await self._get_session()
if method == http.HTTPMethod.GET:
response = await session.get(url, headers=headers)
else:
response = await session.post(url, json=body, headers=headers)
text = await response.text()
return response, text
def _build_response(self, response: aiohttp.ClientResponse, text: str, body: dict = None, url: str = None,
show_body: bool = False, show_url: bool = False) -> dict:
final_response = {
"status_code": response.status,
"body": text,
}
if show_body:
final_response["input_body"] = body
if show_url:
final_response["url"] = url
return final_response
async def _process_webhook(self, final_dict: dict) -> Resource:
url = final_dict.get(URL_KEY)
body = final_dict.get(BODY_KEY)
headers = final_dict.get(HEADERS_KEY)
method = http.HTTPMethod(final_dict.get(METHOD_KEY))
show_body = final_dict.get(SHOW_BODY_KEY, False)
show_url = final_dict.get(SHOW_URL_KEY, False)
response, text = await self._make_http_request(method, url, headers, body)
if response.status != 200:
return Resource(
phase=TaskExecution.FAILED,
message=f"Webhook failed with status code {response.status}, response: {text}",
)
final_response = self._build_response(response, text, body, url, show_body, show_url)
return Resource(
phase=TaskExecution.SUCCEEDED,
outputs={"info": final_response},
message="Webhook was successfully invoked!",
)

Code Review Run #8c3d4c

Consider validating timeout parameter value

Consider adding a timeout type check to ensure timeout is a positive integer before making the HTTP request. The current implementation assumes timeout is always valid.

Code suggestion
Check the AI-generated fix before applying
 @@ -53,8 +53,11 @@ class WebhookAgent(SyncAgentBase):
      async def _make_http_request(
          self, method: http.HTTPMethod, url: str, headers: dict, data: dict, timeout: int
      ) -> tuple:
 +        if not isinstance(timeout, int) or timeout <= 0:
 +            raise ValueError(f'timeout must be a positive integer, got {timeout}')
 +
          if method == http.HTTPMethod.GET:
              response = await self._client.get(url, headers=headers, params=data, timeout=timeout)

Code Review Run #49f39f


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

outputs={"info": final_response},
message="Webhook was successfully invoked!",
)
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too broad exception handling

Catching generic 'Exception' may hide bugs. Consider catching specific exceptions like 'aiohttp.ClientError'.

Code suggestion
Check the AI-generated fix before applying
Suggested change
except Exception as e:
except (aiohttp.ClientError, ValueError) as e:

Code Review Run #8c3d4c


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

async def test_do_post_success(mock_task_template, mock_aiohttp_session):
mock_response = AsyncMock()
mock_response.status = 200
mock_response.text = "Success"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider async mock pattern improvement

Consider using AsyncMock(return_value='Success') directly for text instead of assigning it as a property. This would better match the async nature of the response.

Code suggestion
Check the AI-generated fix before applying
Suggested change
mock_response.text = "Success"
mock_response.text = AsyncMock(return_value="Success")

Code Review Run #8c3d4c


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

show_url=True
)

settings = SerializationSettings(image_config=ImageConfig.auto_default_image())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error case test coverage

Consider adding test cases to verify error handling when invalid inputs are provided to WebhookTask. For example, testing with invalid URLs, unsupported HTTP methods, or malformed headers/body.

Code suggestion
Check the AI-generated fix before applying
 @@ -48,0 +49,25 @@
 + def test_webhook_task_invalid_inputs():
 +     # Test invalid URL
 +     with pytest.raises(ValueError):
 +         WebhookTask(
 +             name="test_task",
 +             url="invalid-url",
 +             method=http.HTTPMethod.POST
 +         )
 +     
 +     # Test invalid method
 +     with pytest.raises(ValueError):
 +         WebhookTask(
 +             name="test_task", 
 +             url="http://example.com",
 +             method="INVALID"
 +         )
 +         
 +     # Test invalid headers
 +     with pytest.raises(ValueError):
 +         WebhookTask(
 +             name="test_task",
 +             url="http://example.com",
 +             method=http.HTTPMethod.POST,
 +             headers="invalid-headers"
 +         )

Code Review Run #8c3d4c


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Signed-off-by: Ketan Umare <[email protected]>
Signed-off-by: Ketan Umare <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 16, 2025

Code Review Agent Run #cec794

Actionable Suggestions - 5
  • flytekit/extras/webhook/agent.py - 2
    • Consider broader exception handling for webhook · Line 78-78
    • Missing error handling in HTTP request · Line 36-47
  • flytekit/extras/webhook/task.py - 2
    • Consider validating data parameter JSON serialization · Line 86-86
    • Consider backward compatibility for parameter rename · Line 95-96
  • tests/flytekit/unit/extras/webhook/test_agent.py - 1
    • Consider error handling for text mock · Line 36-36
Additional Suggestions - 1
  • tests/flytekit/unit/extras/webhook/test_agent.py - 1
    • Consider consolidating info dictionary assertions · Line 63-64
Review Details
  • Files reviewed - 6 · Commit Range: ef2fd14..5c1537b
    • flytekit/extras/webhook/agent.py
    • flytekit/extras/webhook/constants.py
    • flytekit/extras/webhook/task.py
    • flytekit/utils/dict_formatter.py
    • tests/flytekit/unit/extras/webhook/test_agent.py
    • tests/flytekit/unit/extras/webhook/test_end_to_end.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

status, text = await self._make_http_request(method, url, headers, body)
if status != 200:
return Resource(
phase=TaskExecution.FAILED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider broader exception handling for webhook

Consider adding error handling for the _process_webhook call. The method could raise other exceptions besides aiohttp.ClientError that should be caught.

Code suggestion
Check the AI-generated fix before applying
Suggested change
phase=TaskExecution.FAILED,
return Resource(phase=TaskExecution.FAILED, message=f"HTTP client error: {str(e)}")
except Exception as e:
return Resource(phase=TaskExecution.FAILED, message=f"Webhook processing error: {str(e)}")

Code Review Run #cec794


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines 36 to 47
async def _make_http_request(self, method: http.HTTPMethod, url: str, headers: dict, data: dict = None) -> tuple:
# TODO This is a potential performance bottleneck. Consider using a connection pool. To do this, we need to
# create a session object and reuse it for multiple requests. This will reduce the overhead of creating a new
# connection for each request. The problem for not doing so is local execution, does not have a common event
# loop and agent executor creates a new event loop for each request (in the mixin).
async with aiohttp.ClientSession() as session:
if method == http.HTTPMethod.GET:
response = await session.get(url, headers=headers, params=data)
else:
response = await session.post(url, json=data, headers=headers)
text = await response.text()
return response.status, text
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing error handling in HTTP request

Consider adding error handling for HTTP request failures in _make_http_request. The method should handle connection errors, timeouts and other potential aiohttp exceptions to provide better error reporting.

Code suggestion
Check the AI-generated fix before applying
 -            if method == http.HTTPMethod.GET:
 -                response = await session.get(url, headers=headers, params=data)
 -            else:
 -                response = await session.post(url, json=data, headers=headers)
 -            text = await response.text()
 -            return response.status, text
 +            try:
 +                if method == http.HTTPMethod.GET:
 +                    response = await session.get(url, headers=headers, params=data)
 +                else:
 +                    response = await session.post(url, json=data, headers=headers)
 +                text = await response.text()
 +                return response.status, text
 +            except aiohttp.ClientError as e:
 +                raise RuntimeError(f"HTTP request failed: {str(e)}")
 +            except asyncio.TimeoutError:
 +                raise RuntimeError("HTTP request timed out")
 +            except Exception as e:
 +                raise RuntimeError(f"Unexpected error during HTTP request: {str(e)}")

Code Review Run #cec794


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

self._url = url
self._method = method
self._headers = headers
self._data = data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider validating data parameter JSON serialization

Consider adding validation for the data parameter to ensure it is JSON serializable before sending the request. This could help catch serialization issues early.

Code suggestion
Check the AI-generated fix before applying
 @@ -85,2 +85,7 @@
          self._headers = headers
 +        if data is not None:
 +            import json
 +            try:
 +                json.dumps(data)
 +            except (TypeError, ValueError) as e:
 +                raise ValueError(f"The data parameter must be JSON serializable. Error: {str(e)}")
          self._data = data

Code Review Run #cec794


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +95 to +96
DATA_KEY: self._data or {},
SHOW_DATA_KEY: self._show_data,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider backward compatibility for parameter rename

Consider if renaming from body to data maintains backward compatibility. This change could potentially break existing code that relies on the body parameter.

Code suggestion
Check the AI-generated fix before applying
Suggested change
DATA_KEY: self._data or {},
SHOW_DATA_KEY: self._show_data,
DATA_KEY: self._data or self._body or {},
SHOW_DATA_KEY: self._show_data if self._show_data is not None else self._show_body,

Code Review Run #cec794


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

async def test_do_post_success(mock_task_template, mock_aiohttp_session):
mock_response = AsyncMock()
mock_response.status = 200
mock_response.text = AsyncMock(return_value="Success")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider error handling for text mock

Consider adding error handling for the text property mock. The test assumes the text will always be successfully retrieved, but in real scenarios this could fail.

Code suggestion
Check the AI-generated fix before applying
Suggested change
mock_response.text = AsyncMock(return_value="Success")
# Test success case
mock_response.text = AsyncMock(return_value="Success")
# Test error case
mock_response_error = AsyncMock()
mock_response_error.status = 200
mock_response_error.text = AsyncMock(side_effect=Exception("Failed to get response text"))

Code Review Run #cec794


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Signed-off-by: Ketan Umare <[email protected]>
Signed-off-by: Ketan Umare <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 16, 2025

Code Review Agent Run #882444

Actionable Suggestions - 5
  • flytekit/extras/webhook/agent.py - 4
    • Consider more specific return type annotation · Line 40-40
    • Consider more specific return type annotation · Line 40-40
    • Consider adding error handling for dict formatting · Line 26-26
    • Consider validating timeout_sec parameter value · Line 78-80
  • tests/flytekit/unit/extras/webhook/test_task.py - 1
    • Consider using timedelta for timeout parameter · Line 21-21
Additional Suggestions - 1
  • tests/flytekit/unit/extras/webhook/test_agent.py - 1
    • Consider more flexible response data assertions · Line 68-68
Review Details
  • Files reviewed - 5 · Commit Range: 5c1537b..4feef3f
    • flytekit/extras/webhook/agent.py
    • flytekit/extras/webhook/constants.py
    • flytekit/extras/webhook/task.py
    • tests/flytekit/unit/extras/webhook/test_agent.py
    • tests/flytekit/unit/extras/webhook/test_task.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo


async def _make_http_request(
self, method: http.HTTPMethod, url: str, headers: dict, data: dict, timeout: int
) -> tuple:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider more specific return type annotation

The _make_http_request method's return type annotation tuple is too generic. Consider specifying the exact types being returned (int for status and str for text).

Code suggestion
Check the AI-generated fix before applying
Suggested change
) -> tuple:
) -> tuple[int, str]:

Code Review Run #882444


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

self, task_template: TaskTemplate, output_prefix: str, inputs: Optional[LiteralMap] = None, **kwargs
) -> Resource:
try:
final_dict = self._get_final_dict(task_template, inputs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for dict formatting

Consider adding error handling for self._get_final_dict() call to handle potential exceptions from format_dict() or literal_map_string_repr()

Code suggestion
Check the AI-generated fix before applying
Suggested change
final_dict = self._get_final_dict(task_template, inputs)
try:
final_dict = self._get_final_dict(task_template, inputs)
except ValueError as e:
return Resource(phase=TaskExecution.FAILED, message=f"Failed to format webhook data: {str(e)}")

Code Review Run #882444


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +78 to +80
timeout_sec = final_dict.get(TIMEOUT_SEC, 10)

status, text = await self._make_http_request(method, url, headers, body, timeout_sec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider validating timeout_sec parameter value

Consider validating the timeout_sec value to ensure it's a positive integer. A negative or zero timeout could cause unexpected behavior.

Code suggestion
Check the AI-generated fix before applying
Suggested change
timeout_sec = final_dict.get(TIMEOUT_SEC, 10)
status, text = await self._make_http_request(method, url, headers, body, timeout_sec)
timeout_sec = final_dict.get(TIMEOUT_SEC, 10)
if not isinstance(timeout_sec, int) or timeout_sec <= 0:
timeout_sec = 10
status, text = await self._make_http_request(method, url, headers, body, timeout_sec)

Code Review Run #882444


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

show_data=True,
show_url=True,
description="Test Webhook Task",
timeout=60,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using timedelta for timeout parameter

Consider using timedelta for timeout parameter instead of raw integer value for better type safety and clarity. The WebhookTask already handles converting timedelta to seconds internally.

Code suggestion
Check the AI-generated fix before applying
Suggested change
timeout=60,
timeout=timedelta(seconds=60),

Code Review Run #882444


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@marrrcin
Copy link

What's the purpose and the use case for it? If it's to be used like a standard @task, it looks like an over-engineered version of requests/httpx wrapped in a class.

If the intent was to somehow address the flyteorg/flyte#2317
then with the restriction of the on_cleanup:

The input of clean_up should be the exact same as the input of the workflow.
Source: https://docs.flyte.org/en/v1.14.1/user_guide/development_lifecycle/failure_node.html

then I don't believe it will be useful directly...

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 31, 2025

Code Review Agent Run #a6a0b4

Actionable Suggestions - 0
Additional Suggestions - 10
  • tests/flytekit/integration/remote/test_remote.py - 1
  • flytekit/clis/sdk_in_container/serve.py - 1
    • Add parameter validation for server config · Line 64-64
  • flytekit/models/security.py - 1
    • Consider adding env_var validation check · Line 45-45
  • flytekit/image_spec/default_builder.py - 1
    • Consider extracting conda string concatenation logic · Line 256-291
  • plugins/flytekit-k8sdataservice/tests/k8sdataservice/k8s/test_kube_config.py - 1
    • Consider more flexible warning message assertion · Line 21-23
  • plugins/flytekit-k8sdataservice/setup.py - 1
    • Consider more specific kubernetes version range · Line 7-7
  • plugins/flytekit-ray/tests/test_ray.py - 2
    • Consider extracting pod template creation logic · Line 67-73
    • Consider moving pod_template inside test · Line 23-27
  • tests/flytekit/unit/extras/pydantic_transformer/test_pydantic_basemodel_transformer.py - 1
  • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/k8s/manager.py - 1
    • Consider extracting env var configuration · Line 66-82
Review Details
  • Files reviewed - 79 · Commit Range: 4feef3f..3c9a800
    • .pre-commit-config.yaml
    • Dockerfile.agent
    • docs/source/plugins/k8sstatefuldataservice.rst
    • flytekit/bin/entrypoint.py
    • flytekit/clis/sdk_in_container/run.py
    • flytekit/clis/sdk_in_container/serve.py
    • flytekit/core/array_node_map_task.py
    • flytekit/core/context_manager.py
    • flytekit/core/data_persistence.py
    • flytekit/core/node.py
    • flytekit/core/promise.py
    • flytekit/core/python_function_task.py
    • flytekit/core/resources.py
    • flytekit/core/type_engine.py
    • flytekit/core/worker_queue.py
    • flytekit/exceptions/user.py
    • flytekit/image_spec/default_builder.py
    • flytekit/image_spec/image_spec.py
    • flytekit/models/security.py
    • flytekit/models/task.py
    • flytekit/remote/remote.py
    • flytekit/remote/remote_fs.py
    • flytekit/tools/translator.py
    • flytekit/types/structured/structured_dataset.py
    • plugins/flytekit-envd/flytekitplugins/envd/image_builder.py
    • plugins/flytekit-envd/tests/test_image_spec.py
    • plugins/flytekit-k8sdataservice/dev-requirements.txt
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/__init__.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/agent.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/k8s/kube_config.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/k8s/manager.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/sensor.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/task.py
    • plugins/flytekit-k8sdataservice/setup.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/k8s/test_kube_config.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/k8s/test_manager.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_agent.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_sensor.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_task.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/utils/test_resources.py
    • plugins/flytekit-k8sdataservice/utils/infra.py
    • plugins/flytekit-k8sdataservice/utils/resources.py
    • plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py
    • plugins/flytekit-kf-pytorch/tests/test_elastic_task.py
    • plugins/flytekit-omegaconf/flytekitplugins/omegaconf/dictconfig_transformer.py
    • plugins/flytekit-omegaconf/tests/test_dictconfig_transformer.py
    • plugins/flytekit-optuna/flytekitplugins/optuna/__init__.py
    • plugins/flytekit-optuna/flytekitplugins/optuna/optimizer.py
    • plugins/flytekit-optuna/setup.py
    • plugins/flytekit-optuna/tests/test_callback.py
    • plugins/flytekit-optuna/tests/test_decorator.py
    • plugins/flytekit-optuna/tests/test_imperative.py
    • plugins/flytekit-optuna/tests/test_optimizer.py
    • plugins/flytekit-optuna/tests/test_validation.py
    • plugins/flytekit-ray/flytekitplugins/ray/task.py
    • plugins/flytekit-ray/setup.py
    • plugins/flytekit-ray/tests/test_ray.py
    • plugins/setup.py
    • pydoclint-errors-baseline.txt
    • pyproject.toml
    • tests/flytekit/clis/sdk_in_container/test_serve.py
    • tests/flytekit/integration/remote/test_remote.py
    • tests/flytekit/integration/remote/workflows/basic/get_secret.py
    • tests/flytekit/integration/remote/workflows/basic/sd_attr.py
    • tests/flytekit/integration/remote/workflows/basic/signal_test.py
    • tests/flytekit/unit/bin/test_python_entrypoint.py
    • tests/flytekit/unit/core/image_spec/test_default_builder.py
    • tests/flytekit/unit/core/test_array_node_map_task.py
    • tests/flytekit/unit/core/test_data_persistence.py
    • tests/flytekit/unit/core/test_dataclass.py
    • tests/flytekit/unit/core/test_generice_idl_type_engine.py
    • tests/flytekit/unit/core/test_list.py
    • tests/flytekit/unit/core/test_resources.py
    • tests/flytekit/unit/core/test_type_engine.py
    • tests/flytekit/unit/core/test_type_hints.py
    • tests/flytekit/unit/core/test_worker_queue.py
    • tests/flytekit/unit/exceptions/test_user.py
    • tests/flytekit/unit/extras/pydantic_transformer/test_pydantic_basemodel_transformer.py
    • tests/flytekit/unit/types/structured_dataset/test_structured_dataset.py
  • Files skipped - 3
    • .github/workflows/pythonbuild.yml - Reason: Filter setting
    • plugins/flytekit-k8sdataservice/README.md - Reason: Filter setting
    • plugins/flytekit-optuna/README.md - Reason: Filter setting
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

Signed-off-by: Ketan Umare <[email protected]>
@kumare3
Copy link
Contributor Author

kumare3 commented Feb 1, 2025

What's the purpose and the use case for it? If it's to be used like a standard @task, it looks like an over-engineered version of requests/httpx wrapped in a class.

If the intent was to somehow address the flyteorg/flyte#2317 then with the restriction of the on_cleanup:

The input of clean_up should be the exact same as the input of the workflow.
Source: https://docs.flyte.org/en/v1.14.1/user_guide/development_lifecycle/failure_node.html

then I don't believe it will be useful directly...

Great question, the reason to have it is to provide a simple canonical way to fire endpoints synchronously, without having to create new containers. This makes it possible to have simple notifications directly in your workflow wherever even in on_failure.
The overhead of creating a @task with container can be pretty high, unless you use union and this becomes much more lightweight.

@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 1, 2025

Code Review Agent Run #49f39f

Actionable Suggestions - 7
  • flytekit/extras/webhook/agent.py - 4
    • Consider validating timeout parameter value · Line 41-78
    • Consider adding error handling for requests · Line 56-60
    • Consider adding error handling for requests · Line 56-60
    • Consider adding client cleanup mechanism · Line 30-30
  • tests/flytekit/unit/extras/webhook/test_agent.py - 3
    • Missing assertions for mock call parameters · Line 77-77
    • Consider adding more error test cases · Line 69-74
    • Consider reducing test setup duplication · Line 84-95
Review Details
  • Files reviewed - 4 · Commit Range: 3c9a800..293a704
    • flytekit/extras/webhook/agent.py
    • flytekit/extras/webhook/task.py
    • tests/flytekit/unit/extras/webhook/test_agent.py
    • tests/flytekit/unit/extras/webhook/test_end_to_end.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

Comment on lines +56 to +60
if method == http.HTTPMethod.GET:
response = await self._client.get(url, headers=headers, params=data, timeout=timeout)
else:
response = await self._client.post(url, json=data, headers=headers, timeout=timeout)
return response.status_code, response.text
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for requests

Consider adding error handling for potential httpx request failures. The current implementation may not properly handle network timeouts or connection errors.

Code suggestion
Check the AI-generated fix before applying
Suggested change
if method == http.HTTPMethod.GET:
response = await self._client.get(url, headers=headers, params=data, timeout=timeout)
else:
response = await self._client.post(url, json=data, headers=headers, timeout=timeout)
return response.status_code, response.text
try:
if method == http.HTTPMethod.GET:
response = await self._client.get(url, headers=headers, params=data, timeout=timeout)
else:
response = await self._client.post(url, json=data, headers=headers, timeout=timeout)
return response.status_code, response.text
except (httpx.RequestError, httpx.TimeoutException) as e:
return 500, str(e)

Code Review Run #49f39f


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


def __init__(self, client: Optional[httpx.AsyncClient] = None):
super().__init__(task_type_name=TASK_TYPE)
self._client = client or httpx.AsyncClient()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding client cleanup mechanism

Consider managing the lifecycle of the httpx.AsyncClient() by implementing a cleanup method to properly close the client when it's no longer needed. The client should be closed to free up system resources.

Code suggestion
Check the AI-generated fix before applying
 -    def __init__(self, client: Optional[httpx.AsyncClient] = None):
 -        super().__init__(task_type_name=TASK_TYPE)
 -        self._client = client or httpx.AsyncClient()
 +    def __init__(self, client: Optional[httpx.AsyncClient] = None):
 +        super().__init__(task_type_name=TASK_TYPE)
 +        self._client = client or httpx.AsyncClient()
 +
 +    async def cleanup(self):
 +        if self._client:
 +            await self._client.aclose()

Code Review Run #49f39f


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

mock_httpx_client.post.return_value = mock_response

agent = WebhookAgent(client=mock_httpx_client)
result = await agent.do(mock_task_template, output_prefix="", inputs=LiteralMap({}))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing assertions for mock call parameters

Consider adding assertions to verify that mock_httpx_client.post was called with the expected arguments from mock_task_template.

Code suggestion
Check the AI-generated fix before applying
Suggested change
result = await agent.do(mock_task_template, output_prefix="", inputs=LiteralMap({}))
result = await agent.do(mock_task_template, output_prefix="", inputs=LiteralMap({}))
mock_httpx_client.post.assert_called_once_with(
mock_task_template.custom[URL_KEY],
headers=mock_task_template.custom.get(HEADERS_KEY, {}),
json=mock_task_template.custom.get(DATA_KEY, {}))

Code Review Run #49f39f


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +69 to +74
async def test_do_failure(mock_task_template):
mock_response = AsyncMock(name="httpx.Response")
mock_response.status_code = 500
mock_response.text = "Internal Server Error"
mock_httpx_client = AsyncMock(name="httpx.AsyncClient")
mock_httpx_client.post.return_value = mock_response
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding more error test cases

Consider adding test cases for different HTTP status codes (4xx, 5xx) to ensure comprehensive error handling coverage in the webhook agent.

Code suggestion
Check the AI-generated fix before applying
Suggested change
async def test_do_failure(mock_task_template):
mock_response = AsyncMock(name="httpx.Response")
mock_response.status_code = 500
mock_response.text = "Internal Server Error"
mock_httpx_client = AsyncMock(name="httpx.AsyncClient")
mock_httpx_client.post.return_value = mock_response
@pytest.mark.parametrize("status_code,error_message", [
(400, "Bad Request"),
(401, "Unauthorized"),
(403, "Forbidden"),
(404, "Not Found"),
(500, "Internal Server Error"),
(503, "Service Unavailable")
])
async def test_do_failure(mock_task_template, status_code, error_message):
mock_response = AsyncMock(name="httpx.Response")
mock_response.status_code = status_code
mock_response.text = error_message
mock_httpx_client = AsyncMock(name="httpx.AsyncClient")
mock_httpx_client.post.return_value = mock_response

Code Review Run #49f39f


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +84 to +95
async def test_do_get_failure(mock_task_template):
mock_task_template.custom[METHOD_KEY] = "GET"
mock_task_template.custom.pop(DATA_KEY)
mock_task_template.custom[SHOW_DATA_KEY] = False

mock_response = AsyncMock(name="httpx.Response")
mock_response.status_code = 500
mock_response.text = "Internal Server Error"
mock_httpx_client = AsyncMock(name="httpx.AsyncClient")
mock_httpx_client.get.return_value = mock_response

agent = WebhookAgent(client=mock_httpx_client)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider reducing test setup duplication

The test case for GET failure duplicates much of the setup code from the POST failure test. Consider using a fixture or helper method to reduce duplication.

Code suggestion
Check the AI-generated fix before applying
  @pytest.fixture
 +def mock_error_response():
 +    mock_response = AsyncMock(name="httpx.Response")
 +    mock_response.status_code = 500
 +    mock_response.text = "Internal Server Error"
 +    return mock_response
 +
 [email protected]
 +def mock_http_client(mock_error_response):
 +    mock_client = AsyncMock(name="httpx.AsyncClient")
 +    mock_client.post.return_value = mock_error_response
 +    mock_client.get.return_value = mock_error_response
 +    return mock_client
 +
  @@ -69,31 +80,19 @@
 -async def test_do_failure(mock_task_template):
 -    mock_response = AsyncMock(name="httpx.Response")
 -    mock_response.status_code = 500
 -    mock_response.text = "Internal Server Error"
 -    mock_httpx_client = AsyncMock(name="httpx.AsyncClient")
 -    mock_httpx_client.post.return_value = mock_response
 +async def test_do_failure(mock_task_template, mock_http_client):
 +    agent = WebhookAgent(client=mock_http_client)
 -    agent = WebhookAgent(client=mock_httpx_client)

  @@ -84,16 +95,8 @@
 -async def test_do_get_failure(mock_task_template):
 +async def test_do_get_failure(mock_task_template, mock_http_client):
      mock_task_template.custom[METHOD_KEY] = "GET"
      mock_task_template.custom.pop(DATA_KEY)
      mock_task_template.custom[SHOW_DATA_KEY] = False
 -
 -    mock_response = AsyncMock(name="httpx.Response")
 -    mock_response.status_code = 500
 -    mock_response.text = "Internal Server Error"
 -    mock_httpx_client = AsyncMock(name="httpx.AsyncClient")
 -    mock_httpx_client.get.return_value = mock_response
 -
 -    agent = WebhookAgent(client=mock_httpx_client)
 +    agent = WebhookAgent(client=mock_http_client)

Code Review Run #49f39f


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants