-
Notifications
You must be signed in to change notification settings - Fork 310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Webhook tasks using FlyteAgents #3058
Conversation
Signed-off-by: Ketan Umare <[email protected]>
Signed-off-by: Ketan Umare <[email protected]>
Code Review Agent Run Status
|
Signed-off-by: Ketan Umare <[email protected]>
Signed-off-by: Ketan Umare <[email protected]>
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #3058 +/- ##
==========================================
- Coverage 92.24% 83.58% -8.66%
==========================================
Files 118 3 -115
Lines 4991 195 -4796
==========================================
- Hits 4604 163 -4441
+ Misses 387 32 -355 ☔ View full report in Codecov by Sentry. |
Code Review Agent Run Status
|
Signed-off-by: Ketan Umare <[email protected]>
Code Review Agent Run #8c3d4cActionable Suggestions - 6
Additional Suggestions - 1
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
flytekit/extras/webhook/constants.py
Outdated
TASK_TYPE = "webhook" | ||
|
||
URL_KEY = "url" | ||
METHOD_KEY = "method" | ||
HEADERS_KEY = "headers" | ||
BODY_KEY = "body" | ||
SHOW_BODY_KEY = "show_body" | ||
SHOW_URL_KEY = "show_url" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding type hints to the constant declarations to improve code maintainability and IDE support. For example: TASK_TYPE: str = "webhook"
Code suggestion
Check the AI-generated fix before applying
TASK_TYPE = "webhook" | |
URL_KEY = "url" | |
METHOD_KEY = "method" | |
HEADERS_KEY = "headers" | |
BODY_KEY = "body" | |
SHOW_BODY_KEY = "show_body" | |
SHOW_URL_KEY = "show_url" | |
TASK_TYPE: str = "webhook" | |
URL_KEY: str = "url" | |
METHOD_KEY: str = "method" | |
HEADERS_KEY: str = "headers" | |
BODY_KEY: str = "body" | |
SHOW_BODY_KEY: str = "show_body" | |
SHOW_URL_KEY: str = "show_url" |
Code Review Run #8c3d4c
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/extras/webhook/agent.py
Outdated
url = final_dict.get(URL_KEY) | ||
body = final_dict.get(BODY_KEY) | ||
headers = final_dict.get(HEADERS_KEY) | ||
method = final_dict.get(METHOD_KEY) | ||
method = http.HTTPMethod(method) | ||
show_body = final_dict.get(SHOW_BODY_KEY, False) | ||
show_url = final_dict.get(SHOW_URL_KEY, False) | ||
|
||
session = await self._get_session() | ||
|
||
text = None | ||
if method == http.HTTPMethod.GET: | ||
response = await session.get(url, headers=headers) | ||
text = await response.text() | ||
else: | ||
response = await session.post(url, json=body, headers=headers) | ||
text = await response.text() | ||
if response.status != 200: | ||
return Resource( | ||
phase=TaskExecution.FAILED, | ||
message=f"Webhook failed with status code {response.status}, response: {text}", | ||
) | ||
final_response = { | ||
"status_code": response.status, | ||
"body": text, | ||
} | ||
if show_body: | ||
final_response["input_body"] = body | ||
if show_url: | ||
final_response["url"] = url | ||
|
||
return Resource( | ||
phase=TaskExecution.SUCCEEDED, | ||
outputs={"info": final_response}, | ||
message="Webhook was successfully invoked!", | ||
) | ||
except Exception as e: | ||
return Resource(phase=TaskExecution.FAILED, message=str(e)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The do
method is quite long and handles multiple responsibilities including HTTP request handling, response processing, and error handling. Consider breaking it down into smaller, focused methods for better maintainability.
Code suggestion
Check the AI-generated fix before applying
url = final_dict.get(URL_KEY) | |
body = final_dict.get(BODY_KEY) | |
headers = final_dict.get(HEADERS_KEY) | |
method = final_dict.get(METHOD_KEY) | |
method = http.HTTPMethod(method) | |
show_body = final_dict.get(SHOW_BODY_KEY, False) | |
show_url = final_dict.get(SHOW_URL_KEY, False) | |
session = await self._get_session() | |
text = None | |
if method == http.HTTPMethod.GET: | |
response = await session.get(url, headers=headers) | |
text = await response.text() | |
else: | |
response = await session.post(url, json=body, headers=headers) | |
text = await response.text() | |
if response.status != 200: | |
return Resource( | |
phase=TaskExecution.FAILED, | |
message=f"Webhook failed with status code {response.status}, response: {text}", | |
) | |
final_response = { | |
"status_code": response.status, | |
"body": text, | |
} | |
if show_body: | |
final_response["input_body"] = body | |
if show_url: | |
final_response["url"] = url | |
return Resource( | |
phase=TaskExecution.SUCCEEDED, | |
outputs={"info": final_response}, | |
message="Webhook was successfully invoked!", | |
) | |
except Exception as e: | |
return Resource(phase=TaskExecution.FAILED, message=str(e)) | |
return await self._process_webhook(final_dict) | |
except Exception as e: | |
return Resource(phase=TaskExecution.FAILED, message=str(e)) | |
async def _make_http_request(self, method: http.HTTPMethod, url: str, headers: dict, body: dict = None) -> tuple: | |
session = await self._get_session() | |
if method == http.HTTPMethod.GET: | |
response = await session.get(url, headers=headers) | |
else: | |
response = await session.post(url, json=body, headers=headers) | |
text = await response.text() | |
return response, text | |
def _build_response(self, response: aiohttp.ClientResponse, text: str, body: dict = None, url: str = None, | |
show_body: bool = False, show_url: bool = False) -> dict: | |
final_response = { | |
"status_code": response.status, | |
"body": text, | |
} | |
if show_body: | |
final_response["input_body"] = body | |
if show_url: | |
final_response["url"] = url | |
return final_response | |
async def _process_webhook(self, final_dict: dict) -> Resource: | |
url = final_dict.get(URL_KEY) | |
body = final_dict.get(BODY_KEY) | |
headers = final_dict.get(HEADERS_KEY) | |
method = http.HTTPMethod(final_dict.get(METHOD_KEY)) | |
show_body = final_dict.get(SHOW_BODY_KEY, False) | |
show_url = final_dict.get(SHOW_URL_KEY, False) | |
response, text = await self._make_http_request(method, url, headers, body) | |
if response.status != 200: | |
return Resource( | |
phase=TaskExecution.FAILED, | |
message=f"Webhook failed with status code {response.status}, response: {text}", | |
) | |
final_response = self._build_response(response, text, body, url, show_body, show_url) | |
return Resource( | |
phase=TaskExecution.SUCCEEDED, | |
outputs={"info": final_response}, | |
message="Webhook was successfully invoked!", | |
) |
Code Review Run #8c3d4c
Consider adding a timeout type check to ensure timeout
is a positive integer before making the HTTP request. The current implementation assumes timeout
is always valid.
Code suggestion
Check the AI-generated fix before applying
@@ -53,8 +53,11 @@ class WebhookAgent(SyncAgentBase):
async def _make_http_request(
self, method: http.HTTPMethod, url: str, headers: dict, data: dict, timeout: int
) -> tuple:
+ if not isinstance(timeout, int) or timeout <= 0:
+ raise ValueError(f'timeout must be a positive integer, got {timeout}')
+
if method == http.HTTPMethod.GET:
response = await self._client.get(url, headers=headers, params=data, timeout=timeout)
Code Review Run #49f39f
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/extras/webhook/agent.py
Outdated
outputs={"info": final_response}, | ||
message="Webhook was successfully invoked!", | ||
) | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catching generic 'Exception' may hide bugs. Consider catching specific exceptions like 'aiohttp.ClientError'.
Code suggestion
Check the AI-generated fix before applying
except Exception as e: | |
except (aiohttp.ClientError, ValueError) as e: |
Code Review Run #8c3d4c
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
async def test_do_post_success(mock_task_template, mock_aiohttp_session): | ||
mock_response = AsyncMock() | ||
mock_response.status = 200 | ||
mock_response.text = "Success" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using AsyncMock(return_value='Success')
directly for text
instead of assigning it as a property. This would better match the async nature of the response.
Code suggestion
Check the AI-generated fix before applying
mock_response.text = "Success" | |
mock_response.text = AsyncMock(return_value="Success") |
Code Review Run #8c3d4c
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
show_url=True | ||
) | ||
|
||
settings = SerializationSettings(image_config=ImageConfig.auto_default_image()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding test cases to verify error handling when invalid inputs are provided to WebhookTask
. For example, testing with invalid URLs, unsupported HTTP methods, or malformed headers/body.
Code suggestion
Check the AI-generated fix before applying
@@ -48,0 +49,25 @@
+ def test_webhook_task_invalid_inputs():
+ # Test invalid URL
+ with pytest.raises(ValueError):
+ WebhookTask(
+ name="test_task",
+ url="invalid-url",
+ method=http.HTTPMethod.POST
+ )
+
+ # Test invalid method
+ with pytest.raises(ValueError):
+ WebhookTask(
+ name="test_task",
+ url="http://example.com",
+ method="INVALID"
+ )
+
+ # Test invalid headers
+ with pytest.raises(ValueError):
+ WebhookTask(
+ name="test_task",
+ url="http://example.com",
+ method=http.HTTPMethod.POST,
+ headers="invalid-headers"
+ )
Code Review Run #8c3d4c
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Signed-off-by: Ketan Umare <[email protected]>
Signed-off-by: Ketan Umare <[email protected]>
Code Review Agent Run #cec794Actionable Suggestions - 5
Additional Suggestions - 1
Review Details
|
status, text = await self._make_http_request(method, url, headers, body) | ||
if status != 200: | ||
return Resource( | ||
phase=TaskExecution.FAILED, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding error handling for the _process_webhook
call. The method could raise other exceptions besides aiohttp.ClientError
that should be caught.
Code suggestion
Check the AI-generated fix before applying
phase=TaskExecution.FAILED, | |
return Resource(phase=TaskExecution.FAILED, message=f"HTTP client error: {str(e)}") | |
except Exception as e: | |
return Resource(phase=TaskExecution.FAILED, message=f"Webhook processing error: {str(e)}") |
Code Review Run #cec794
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Code Review Agent Run #fa3491Actionable Suggestions - 4
Review Details
|
# It's used to force agent to run in the same event loop in the local execution. | ||
local_agent_loop = asyncio.new_event_loop() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving the local_agent_loop
initialization inside a function or method to avoid potential issues with module-level event loop creation. Module-level event loop initialization might cause problems if the module is imported multiple times or in different contexts.
Code suggestion
Check the AI-generated fix before applying
# It's used to force agent to run in the same event loop in the local execution. | |
local_agent_loop = asyncio.new_event_loop() | |
# It's used to force agent to run in the same event loop in the local execution. | |
_local_agent_loop = None | |
def get_local_agent_loop(): | |
global _local_agent_loop | |
if _local_agent_loop is None: | |
_local_agent_loop = asyncio.new_event_loop() | |
return _local_agent_loop |
Code Review Run #fa3491
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -285,7 +288,7 @@ def execute(self: PythonTask, **kwargs) -> LiteralMap: | |||
output_prefix = ctx.file_access.get_random_remote_directory() | |||
|
|||
agent = AgentRegistry.get_agent(task_template.type, task_template.task_type_version) | |||
resource = asyncio.run( | |||
resource = local_agent_loop.run_until_complete( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using asyncio.get_event_loop()
instead of a global local_agent_loop
. Using a global event loop can lead to issues in concurrent scenarios and makes testing more difficult. The event loop should be managed within the scope where it's needed.
Code suggestion
Check the AI-generated fix before applying
-local_agent_loop = asyncio.new_event_loop()
@@ -291,1 +291,1 @@
- resource = local_agent_loop.run_until_complete(
+ resource = asyncio.get_event_loop().run_until_complete(
Code Review Run #fa3491
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
resource_meta = local_agent_loop.run_until_complete( | ||
self._create(task_template=task_template, output_prefix=output_prefix, inputs=kwargs) | ||
) | ||
resource = asyncio.run(self._get(resource_meta=resource_meta)) | ||
resource = local_agent_loop.run_until_complete(self._get(resource_meta=resource_meta)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a context manager with local_agent_loop
to ensure proper cleanup of resources. The current approach of directly using run_until_complete
could lead to resource leaks if exceptions occur.
Code suggestion
Check the AI-generated fix before applying
- resource_meta = local_agent_loop.run_until_complete(
- self._create(task_template=task_template, output_prefix=output_prefix, inputs=kwargs)
- )
- resource = local_agent_loop.run_until_complete(self._get(resource_meta=resource_meta))
+ async with local_agent_loop:
+ resource_meta = await self._create(task_template=task_template, output_prefix=output_prefix, inputs=kwargs)
+ resource = await self._get(resource_meta=resource_meta)
Code Review Run #fa3491
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -5,7 +5,7 @@ | |||
|
|||
microlib_name = f"flytekitplugins-{PLUGIN_NAME}" | |||
|
|||
plugin_requires = ["flytekit>=1.11.0", "aioboto3>=12.3.0", "xxhash"] | |||
plugin_requires = ["flytekit>1.14.6", "aioboto3>=12.3.0", "xxhash"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using >=
instead of >
for the flytekit
dependency version. Using >
might exclude compatible patch versions unnecessarily. The pattern >=1.14.6
would be more conventional and allow for patch updates while maintaining compatibility.
Code suggestion
Check the AI-generated fix before applying
plugin_requires = ["flytekit>1.14.6", "aioboto3>=12.3.0", "xxhash"] | |
plugin_requires = ["flytekit>=1.14.6", "aioboto3>=12.3.0", "xxhash"] |
Code Review Run #fa3491
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
We should probably delete great expectations plugin? |
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Code Review Agent Run #7908fcActionable Suggestions - 0Additional Suggestions - 10
Review Details
|
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Code Review Agent Run #809ba3Actionable Suggestions - 1
Review Details
|
execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5)) | ||
print("Execution Error:", execution.error) | ||
assert execution.error is None, f"Execution failed with error: {execution.error}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider keeping the timeout at 10 minutes instead of reducing to 5 minutes. The test may need more time to complete in certain environments. Additionally, the error assertion has been improved to provide better failure messages, which is good.
Code suggestion
Check the AI-generated fix before applying
execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5)) | |
print("Execution Error:", execution.error) | |
assert execution.error is None, f"Execution failed with error: {execution.error}" | |
execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=10)) | |
assert execution.error is None, f"Execution failed with error: {execution.error}" |
Code Review Run #809ba3
Should Bito avoid suggestions like this for future reviews? (Manage Rules)
- Yes, avoid them
Signed-off-by: Kevin Su <[email protected]>
Code Review Agent Run #3dcbb2Actionable Suggestions - 0Review Details
|
Signed-off-by: Kevin Su <[email protected]>
Code Review Agent Run #28b84fActionable Suggestions - 0Additional Suggestions - 1
Review Details
|
There have been numerous requests from folks to support invoking arbitrary apis or using webhooks to notify.
This agent supports calling webhooks like slack, github etc directly from a flyte workflow
Possible to write workflows like,
Summary by Bito
This PR enhances Flytekit with WebhookTask and WebhookAgent implementations, enabling direct HTTP requests to external services like Slack and GitHub. It includes httpx integration, caching system improvements, and enhanced agent execution model. The update increases timeout duration from 15 to 20 minutes for remote execution tests and improves the testing framework with better error assertions.Unit tests added: True
Estimated effort to review (1-5, lower is better): 5