-
Notifications
You must be signed in to change notification settings - Fork 682
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Agent Sync Plugin #4107
Agent Sync Plugin #4107
Conversation
Signed-off-by: Future Outlier <[email protected]>
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #4107 +/- ##
=======================================
Coverage 59.02% 59.03%
=======================================
Files 622 622
Lines 52780 52793 +13
=======================================
+ Hits 31153 31164 +11
- Misses 19140 19142 +2
Partials 2487 2487
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
func (c CorePlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) { | ||
taskTemplate, err := tCtx.TaskReader().Read(ctx) | ||
|
||
if taskTemplate.Type == "dispatcher" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't taskTemplate.Type
being used for agent routing in in agent/plugin.go
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, taskTemplate.Type is indeed utilized there.
However, we also need to ensure the plugin is directed to use the sync plugin interface.
This is why we have to make the routing here.
I chose not to incorporate a cache mechanism for the sync plugin.
And all cases of do task can go through the dispatcher
task type.
You can refer this pr: flyteorg/flytekit#1822
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for replying.
I'm not sure how routing is supposed to be done for a sync agent. If I understood correctly, it would be utilizing custom
of task template: https://github.com/flyteorg/flytekit/pull/1822/files#diff-cac07a79ac0ced0778cd505ccfb5f8f1b4e33f78e1c40a91bba926171f9d2246R39-R41, is that correct?
I'm also confused by https://github.com/flyteorg/flytekit/pull/1822/files#diff-9f7af27264f8773b069e8200804c224fe19a6fcaaf9dc33edc644f5351cbb3beR137. Does it mean you are assuming there is at most one "dispatcher" (i.e task_template.type == "dispatcher") agent connected to flyte? If so, I think this is a very strong assumption which at least won't work in our setup, because we won't have a centralized agent but multiple ones developed by different teams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could extend task template proto to include a new field indicating it is a "dispatcher", or encode this information in custom
with a preserved key, so here we don't need to rely on type
, and all the routing is handled by agent plugin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
Yes you are right!
In the case of thechatgpt
task, the task type is designated asdispatcher
. We then utilize theChatGPTDispatcher
as specified in the configuration. -
To illustrate a practical scenario:
Consider a situation where there are two dispatchers - theChatGPTDispatcher
and theLangChainDispatcher
.
When either is invoked to call theagent-service
, the task type remainsdispatcher
.
And both of them can be used at the same time with only one server!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More clarification: I meant we have multiple gRPC endpoints, with everything being "dispatcher", agent plugin will not be able to know which endpoint to route to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any new development on this? I still think it is not a safe assumption that there is one and only one "dispatcher" typed agent endpoint in the system.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pingsutw and I is currently trying to fix this!
Maybe we can have a discussion on slack.
And yes, you are right, we will improve the implementation of it.
We will definitely not use only one "dispatcher" typed agent endpoint in the system.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is great! Sure please feel free to pull me to a Slack chat and I will follow up tomorrow. Sorry I am not able to discuss today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem, I will book a time with you, thanks for your attention!
Signed-off-by: Future Outlier <[email protected]>
… agent-sync-plugin Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
… agent-sync-plugin Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
@honnix if possible, please take a look at this PR and the relevant flytekit PR. |
@@ -20,7 +20,7 @@ import ( | |||
|
|||
// A Lazy loading function, that will load the plugin. Plugins should be initialized in this method. It is guaranteed | |||
// that the plugin loader will be called before any Handle/Abort/Finalize functions are invoked | |||
type PluginLoader func(ctx context.Context, iCtx PluginSetupContext) (AsyncPlugin, error) | |||
type PluginLoader func(ctx context.Context, iCtx PluginSetupContext) (Plugin, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a breaking change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, thanks for the review.
with this change, users who previously used AsyncPlugin will now need to modify their code in the same way as the BigQuery plugin does.
BigQuery example: https://github.com/flyteorg/flyte/pull/4107/files#diff-6a9d13d1a2c7f030da6eaa91b837eee26b780d7c26e1084d1a76f2349977ee2dR562
I get it might feel a bit forceful making users adjust their private plugins. But I genuinely think this route is smoother. Going the other way, like suggesting a change where existing plugins (take BigQuery, for example) need deeper tweaks, doesn't seem as clean. Imagine this:
type PluginLoader func(ctx context.Context, iCtx PluginSetupContext) (AsyncPlugin, SyncPlugin, error)
Here's the previous version:
adc1c2c#diff-3119631e7cd5a6d4fb999fc50bbb8509bb685347c4a6e3cceda14acec6aa0eb3R25
In comparison, the current plugin interface is far more easier for users to tweak their code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in general we should be very cautious with breaking changes given the number of flyte deployments and possible plugins in the wild. Unless it is absolutely necessary, we may not want to do that. This case to me doesn't seem to be so convincing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered this approach?
// Deprecated, please use AsyncPluginLoader instead for new plugin development
type PluginLoader func(ctx context.Context, iCtx PluginSetupContext) (AsyncPlugin, error)
type AsyncPluginLoader func(ctx context.Context, iCtx PluginSetupContext) (AsyncPlugin, error)
type SyncPluginLoader func(ctx context.Context, iCtx PluginSetupContext) (SyncPlugin, error)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good advice, looking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not take my words as the proposal, I was just trying to explore how to ship this feature without introducing breaking changes, because I believe we should be able to achieve that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your thought is pretty awesome, I didn't think of this solution before.
I will discuss this with @pingsutw and tell you our thoughts.
If the implementation doesn't work, we will tell you why, and we will update the latest discussion with you tomorrow, thanks a lot for the review!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, we face a problem that one of flytekit sync plugin, chatgpt, will need more than 10 seconds for the reply if we use openai API and gpt-4 as the model.
However, the grpc agent server shouldn't wait that long, so we need to either
add a new server
to handle a task like this, or we need to
set the timeout limit in the request
to the openai server.
If you have time, can you help us to think about the solution?
I think you are really worth learning for me, thanks a lot!
If you don't have time, I can understand your difficulty, since you might be busy.
The below are the 2 solutions' example:
(solution 1) add a new server
CREATE:
- return a task ID to get the result from the API_TASK server,
- API_TASK server create a thread to execute
OPENAI REST API REQUEST
GET:
- Use the task ID to check if the thread finish the execute, and store the output to a place.
- if
finished
returnresult
else
returnRUNNING
DELETE:
- Kill the Thread
(solution 2) set the timeout limit in the request
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
url=openai_url, headers=get_header(openai_organization=self._openai_organization), data=data
) as resp:
if resp.status != 200:
raise Exception(f"Failed to execute chatgpt job with error: {resp.reason}")
response = await resp.json()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will solve this in the future, reviewers can ignore this now.
… agent-sync-plugin
Signed-off-by: Future Outlier <[email protected]>
|
||
// Use the sync plugin to execute the task if the task template has the sync plugin flavor. | ||
if taskTemplate.GetMetadata().GetRuntime().GetFlavor() == syncPlugin { | ||
phaseInfo, err := c.p.(webapi.SyncPlugin).Do(ctx, tCtx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would happen if the contract is broken? For example the tasks does not correctly set the flavour that is expected by the plugin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The task will be executed by AsyncPlugin.
It is implemented by flytekit this PR.
We will set the variable use_sync_plugin
to True.
https://github.com/flyteorg/flytekit/pull/1822/files#diff-63787311eeb5747eb3170136f7c05abe69fe01da23d94222a39baf798273e687R20-R52
And then eventually set the flavor variable to here.
https://github.com/flyteorg/flytekit/pull/1822/files#diff-d123cf5b0acf27c386c1ceb74fd4b0de0775378fe1864c76908b5a334f58fffaR173-R440
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand what's happening in flytekit, but there are also tasks not developed in the flytekit repository so we cannot assume they would all be designed correctly. The backend side should be resilient to this type of contract-breaking situation. I was just wondering what if task doesn't set the flag while the plugin handling the task expects that, and vice versa.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the agent plugin task will return an error.
As we know, flytepropeller will call the function invokePlugin
, and if the plugin is agent service, it will send a grpc request to agent server
, start by pyflyte serve
.
I think we can write 2 things to help users understand how to use the sync and async plugins easily.
- add
logger.errorf
when the plugin doesn't execute successfully, tell users to check whether they use the correct ones or not. - write more annotation for the function and the
use_sync_plugin
variable in both flytekit and flyteplugins.
Do you think this is a good solution?
Or there's any other ideas?
I would like to improve it, thanks a lot for your time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure we should provide good ways for users to figured out the error.
Maybe a more concrete question, what happens if the cast c.p.(webapi.SyncPlugin)
fails here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't try it before, but I totally agree that we should add a cast to check if it is correct!
Maybe something like this. (please ignore the implementation error here)
plugin, err := c.p.(webapi.SyncPlugin)
if err != nil {
logger.Errorf("please check if the sync plugin interface is implemented or not")
return core.UnknownTransition, err
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is the direction to go, and there might be other things to consider as well. Generally, if there exists a contract we need to be more protective on the backend side to avoid a total failure and have a better isolation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot, as a backend beginner, I really appreciate your advice!
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
from flytekit import task, workflow, ImageSpec
from flytekitplugins.chatgpt import ChatGPTTask
chatgpt_job = ChatGPTTask(
name="chatgpt",
config={
"openai_organization": "org-NayNG68kGnVXMJ8Ak4PMgQv7",
"chatgpt_conf": {
"model": "gpt-3.5-turbo",
"temperature": 0.7,
},
},
)
@workflow
def wf() -> str:
message = chatgpt_job(message="Who are you?")
return t1(s=message)
@task(
# container_image=image_spec,
)
def t1(s: str) -> str:
s = "Repsonse: " + s
return s |
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
… agent-sync-plugin Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
…/flyte into agent-sync-plugin
Tracking issue
#3936
flyteorg/flytekit#1822
Describe your changes
DoTask
function inagent.proto
runtime metadata's flavor
intasks.proto
https://github.com/flyteorg/flyte/pull/4107/files#diff-36edb1a8db489fabd420007c4542f657ed7e6d4f9d11052f81536c35f422ac14R91-R104
https://github.com/flyteorg/flyte/pull/4107/files#diff-d9c6122ef023db0768b904e6d6307684dfc35b4d03f0b2152ff7791344178a14R85
sync plugin
routing mechanism
to support sync pluginstype Plugin interface
herehttps://github.com/flyteorg/flyte/pull/4107/files#diff-3119631e7cd5a6d4fb999fc50bbb8509bb685347c4a6e3cceda14acec6aa0eb3R100-R103
Add a helper function to get
sync/async plugin
and handle the errorhttps://github.com/flyteorg/flyte/pull/4107/files#diff-da53a6f262afa680bad44d5bc3e5a6a95f1e8b8764002e0db9a7f015af7d2831R70-R86
Implement
webapi/agent's sync plugin
method to supportapi_task
like ChatGPTNote: you can find similar logic in other functions in this file.
(Create, Get, Delete)
https://github.com/flyteorg/flyte/pull/4107/files#diff-5e3f52f33e7ff5f718bc03e33dd574152be1a784901745e7c652b37e0bfc46a7R63-R114
Split
Handle
function into 2 functions,syncHandle
andasyncHandle
Add test for the above changes
Finish above without introducing breaking changes
Note:
api_task
is a task type which has onlyDo Task Function
, the sync plugin meets it's need.How to test it?
Configuration Example
To set up agent sync plugins, users can now utilize the following configuration:
Run in Remote Environment
Routing Mechanism
To determine the route to the sync plugin, we're leveraging the
task's metadata flavor
.The logic can be found in the PR's changes:
Here's the code
if taskTemplate.GetMetadata().GetRuntime().GetFlavor() == syncPlugin
https://github.com/flyteorg/flyte/pull/4107/files#diff-da53a6f262afa680bad44d5bc3e5a6a95f1e8b8764002e0db9a7f015af7d2831R77
Note to reviewers
Please also take a look at this PR.
flyteorg/flytekit#1822
Both PRs are interconnected and address related functionalities.
Screen Shot
Note: You can find that the ChatGPT task is an
api_task
.How it works at high level
Additional Information
The typecasting for the
webapi.Plugin
towebapi.SyncPlugin
orwebapi.AsyncPlugin
might be a little bit tricky,webapi.Plugin
's interface has a common function withwebapi.SyncPlugin
andwebapi.AsyncPlugin
.In the file
flyte/flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go
, since the functionp.GetConfig()
performances the same no matter the plugin iswebapi.AsyncPlugin
orwebapi.SyncPlugin
, and all the parts forwebapi.AsyncPlugin
andwebapi.SyncPlugin
are the same, it is OK to share the same snippets in the function, but don't need to create morePluginLoaders
interface as the conversation below.flyte/flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go
Lines 231 to 283 in c5bcd99