Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agent ClientSet #4718

Merged
merged 13 commits into from
Jan 31, 2024
Merged

Conversation

Future-Outlier
Copy link
Member

@Future-Outlier Future-Outlier commented Jan 11, 2024

Tracking issue

Fixes #3936

Why are the changes needed?

For better readability and scalability in package agent in the future.

What changes were proposed in this pull request?

  • rename getClientFunc to getAgentClientFunc
  • use a struct ClientFuncSet to store getAgentClientFunc and getAgentMetadataClientFunc
  • pass ClientFuncSet to all related functions and objects
  • add related tests

How was this patch tested?

unit test, integration test, and single binary mode.

Setup process

  1. start the agent server
  2. start the single binary mode
  3. run an agent workflow to test it in remote environment
pyflyte serve agent
make compile
flyte start --config ../flyte-single-binary-local-dev.yaml
pyflyte run --remote sensor_example.py wf

flyte-single-binary-local-dev.yaml

tasks:
  task-plugins:
    enabled-plugins:
      - container
      - sidecar
      - K8S-ARRAY
      - agent-service

    default-for-task-types:
      sensor: agent-service
      container: container
      container_array: K8S-ARRAY

plugins:
  agent-service:
    supportedTaskTypes:
      - sensor
    defaultAgent:
      endpoint: "dns:///localhost:8000"
      insecure: true
      timeouts:
        GetTask: 100s
      defaultTimeout: 100s

sensor_example.py

from flytekit.sensor.file_sensor import FileSensor
from flytekit import ImageSpec, task, workflow

sensor = FileSensor(name="test_sensor")

@task()
def t1():
    return

@workflow()
def wf():
    # sensor(path="s3://my-s3-bucket") >> t1()
    sensor(path="/tmp") >> t1()

if __name__ == "__main__":
    wf()

Screenshots

image
image

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

v1
Signed-off-by: Future Outlier <[email protected]>
@dosubot dosubot bot added size:L This PR changes 100-499 lines, ignoring generated files. enhancement New feature or request labels Jan 11, 2024
Copy link

codecov bot commented Jan 11, 2024

Codecov Report

Attention: 67 lines in your changes are missing coverage. Please review.

Comparison is base (c379b71) 59.02% compared to head (34a4049) 58.99%.

Files Patch % Lines
...yteplugins/go/tasks/plugins/webapi/agent/client.go 45.54% 51 Missing and 4 partials ⚠️
...yteplugins/go/tasks/plugins/webapi/agent/plugin.go 25.00% 12 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #4718      +/-   ##
==========================================
- Coverage   59.02%   58.99%   -0.04%     
==========================================
  Files         643      644       +1     
  Lines       55153    55148       -5     
==========================================
- Hits        32555    32535      -20     
- Misses      20018    20039      +21     
+ Partials     2580     2574       -6     
Flag Coverage Δ
unittests 58.99% <42.73%> (-0.04%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Kevin Su <[email protected]>
@Future-Outlier
Copy link
Member Author

Future-Outlier commented Jan 24, 2024

generate mock file by mockery

cd flyte/flyteidl/gen/pb-go/flyteidl/service

mockery --name=AsyncAgentServiceClient --output=/mnt/c/code/dev/flyte/flyteplugins/go/tasks/plugins/webapi/agent/mocks/ --outpkg=mocks

Signed-off-by: Future-Outlier <[email protected]>
@dosubot dosubot bot added size:XL This PR changes 500-999 lines, ignoring generated files. and removed size:L This PR changes 100-499 lines, ignoring generated files. labels Jan 24, 2024
@Future-Outlier
Copy link
Member Author

Only integration test needs to be fixed, other test files are correct!

@pingsutw pingsutw changed the title Agent Client Funcion Set Agent ClientSet Jan 24, 2024
@Future-Outlier
Copy link
Member Author

Future-Outlier commented Jan 24, 2024

  • Improve Error Message
    When you set up the agent-service, and run an agent task, while don't have any connected agent, you will have an error.
  agent-service:
    supportedTaskTypes:
      - sensor
      - spark
      - default_task
      - custom_task
      - chatgpt
      - sensor
      - airflow
    # By default, all the request will be sent to the default agent.
    # defaultAgent:
    #   endpoint: "dns:///localhost:8000"
    #   insecure: true
    #   timeouts:
    #     GetTask: 100s
    #   defaultTimeout: 100s

Before Error Message

image

After Error Message

image

Future-Outlier and others added 10 commits January 25, 2024 06:18
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
@dosubot dosubot bot added the lgtm This PR has been approved by a maintainer label Jan 31, 2024
@Future-Outlier Future-Outlier merged commit 7711df2 into flyteorg:master Jan 31, 2024
47 of 49 checks passed
cfg := GetConfig()
connectionCache := make(map[*Agent]*grpc.ClientConn)
agentRegistry, err := initializeAgentRegistry(cfg, connectionCache, getAgentMetadataClientFunc)
cs, err := initializeClients(context.Background())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please help me understand what happens if one of the agent endpoints is not available when flytepropeller is booting up? Would it just crash?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, it is a very strong assumption that I'm afraid we cannot take.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even the plugin loading is guarded so a single failing plugin would not impact propeller booting, it is still undesired that one failed agent endpoint (out of for example 50 healthy ones) would fail the whole plugin loading. I think some kind of late binding would be nicer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to wait for every endpoint to start; otherwise, FlytePropeller will keep crashing.

Copy link
Member Author

@Future-Outlier Future-Outlier Feb 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice suggestions, I will discuss with @pingsutw and reply the result here.
I think late binding or some kind of endpoint or service detect mechanism will be really helpful.
For example, in k8s, maybe readiness probe can help us realize lazy binding?
I am not 100% familiar with how agent is deployed in flyte cluster, but yes, I will try to help.

Thank you for always providing great advices.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe, only need 1 agent is connected is enough

Copy link
Member

@honnix honnix Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this plugin used to work in a late binding way, meaning the connection was not established until the very first RPC. In this way the propeller does not depend on agents in the wild when booting up.

For example, in k8s, maybe readiness probe can help us realize lazy binding?

I think we cannot make any assumption how agents are deployed, and only interface they expose to the plugin is gRPC endpoints, so we can only make sense out of those.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update the solution to your case with Kevin this week, thank you!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@honnix, We will add a watcher to solve your case!
Thank you for the patience, we will mention you to see the new update, thank you very much.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Future-Outlier That is fantastic. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request lgtm This PR has been approved by a maintainer size:XL This PR changes 500-999 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Core feature] Flyte Agent Ecosystem
3 participants