Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agent Metadata Servicer #4511

Merged
merged 63 commits into from
Dec 20, 2023

Conversation

Future-Outlier
Copy link
Member

@Future-Outlier Future-Outlier commented Dec 1, 2023

Tracking issue

#3936

Note to reviewers

  1. Here's the flytekit implementation
    Agent Metadata Servicer flytekit#2012
  2. Sync agent will be updated after this PR is merged.
    (we will use the IsSyncTask map for routing mechanism)
    flytekit: https://github.com/flyteorg/flytekit/pull/2012/files#diff-9f7af27264f8773b069e8200804c224fe19a6fcaaf9dc33edc644f5351cbb3beR157
  3. The integration test will be added after the sync agent PR and the get agent secret PR are merged.
  4. We need to ensure the agent server is started before the propeller is started because the propeller needs to communicate with the agent server to get the agent Metadata.
  5. getAsyncClientFunc, getAgentMetadataClientFunc, and getSyncClientFunc can be refactored to
1. getGrpc connection
2. get service by 1 line of code
service.NewAgentMetadataServiceClient(conn)`

will be updated in agent integration test in the future.

Describe your changes

  • Use AgentMetadata Proto written by @pingsutw
  • supported task types
  • a map[string]bool for sync plugin routing mechanism
    (We will use the

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Setup Process

Supported Task Types

  • Use this config in single binary yaml.
    Note: I didn't use the supported task type section.
tasks:
  task-plugins:
    enabled-plugins:
      - agent-service
      - container
      - sidecar
      - K8S-ARRAY
    default-for-task-types:
      api_task: agent-service
      airflow: agent-service
      sensor: agent-service
      spark: agent-service
      bigquery_query_job_task: agent-service
      custom_task: agent-service
      container: container
      container_array: K8S-ARRAY

plugins:
  agent-service:
    # supportedTaskTypes:
      # - spark
      # - default_task
      # - custom_task
      # - api_task
      # - sensor
      # - airflow
    # By default, all the request will be sent to the default agent.
    defaultAgent:
      endpoint: "dns:///localhost:8000"
      insecure: true
      timeouts:
        GetTask: 100s
      defaultTimeout: 100s
    agents:
      custom_agent:
        endpoint: "dns:///localhost:8001"
        insecure: true
        defaultServiceConfig: '{"loadBalancingConfig": [{"round_robin":{}}]}'
        timeouts:
          DoTask: 300s
          GetTask: 100s
        defaultTimeout: 300s
    agentForTaskTypes:
      # It will override the default agent for custom_task, which means propeller will send the request to this agent.
      - custom_task: custom_agent 
      - api_task: custom_agent
  • Open 2 server to test real case scenario.
pyflyte serve agent  --port 8000
pyflyte serve agent  --port 8001

Note: I changed the code in prometheus_client/start_http_server because it can't use the same HTTP server in 2 different agent servers.
flyteorg/flytekit@a670fd2#diff-3fb315ad3aeb0e3eff4edd799cf4ec7c9e934ea12537e8f6f50d56828a12a410R46-R57

Screenshots

  • server port 8000
    image

  • server port 8001
    image

  • sync task type
    image

{"json":{"src":"plugin.go:393"},"level":"info","msg":"supported task types: [default_task sensor spark airflow]","ts":"2023-12-04T23:33:21+08:00"}
{"json":{"src":"plugin.go:394"},"level":"info","msg":"is sync task: map[airflow:false sensor:false spark:false]","ts":"2023-12-04T23:33:21+08:00"}
  • test GetAgent
    image
    image
    image

Related PRs

AgentMetadataProto by pingsutw : #4500
AgentMetadataServicer : flyteorg/flytekit#2012

Signed-off-by: Future Outlier <[email protected]>
@Future-Outlier Future-Outlier marked this pull request as draft December 1, 2023 01:26
@dosubot dosubot bot added the size:XXL This PR changes 1000+ lines, ignoring generated files. label Dec 1, 2023
@Future-Outlier Future-Outlier self-assigned this Dec 1, 2023
Copy link

codecov bot commented Dec 1, 2023

Codecov Report

Attention: 35 lines in your changes are missing coverage. Please review.

Comparison is base (6c6656c) 59.03% compared to head (c1e3492) 58.98%.

Files Patch % Lines
...yteplugins/go/tasks/plugins/webapi/agent/plugin.go 51.38% 30 Missing and 5 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #4511      +/-   ##
==========================================
- Coverage   59.03%   58.98%   -0.06%     
==========================================
  Files         622      622              
  Lines       52687    52739      +52     
==========================================
+ Hits        31104    31106       +2     
- Misses      19101    19148      +47     
- Partials     2482     2485       +3     
Flag Coverage Δ
unittests 58.98% <53.94%> (-0.06%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Future Outlier and others added 18 commits December 2, 2023 13:47
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
… agent-metadata-proto-service

Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
flyteplugins/go/tasks/plugins/webapi/agent/plugin.go Outdated Show resolved Hide resolved

return webapi.PluginEntry{
ID: "agent-service",
SupportedTaskTypes: supportedTaskTypes,
SupportedTaskTypes: cfg.SupportedTaskTypes,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SupportedTaskTypes: cfg.SupportedTaskTypes,
SupportedTaskTypes: agentMetadata.SupportedTaskTypes,

@Future-Outlier Future-Outlier changed the title [WIP] Agent Servicer [WIP] Agent Metadata Servicer Dec 10, 2023
Future Outlier added 3 commits December 10, 2023 16:50
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
@Future-Outlier Future-Outlier changed the title [wip] Agent Metadata Servicer Agent Metadata Servicer Dec 18, 2023
@Future-Outlier Future-Outlier enabled auto-merge (squash) December 18, 2023 06:28

type Plugin struct {
metricScope promutils.Scope
cfg *Config
getClient GetClientFunc
connectionCache map[*Agent]*grpc.ClientConn
agentRegistry map[string]map[bool]*Agent // map[taskType][isSync] => Agent
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this used? Or would that be addressed in follow-up PRs?

Copy link
Member Author

@Future-Outlier Future-Outlier Dec 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this will have follow-up PRs, we want to make one task type can have both sync and async agent.
Please leave more questions if this is unclear.
Thank you!

Copy link
Member Author

@Future-Outlier Future-Outlier Dec 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update: we will remove the second dimension, it will improve user experience more in the future, and we will have follow-up PRs for this Registry, thank you.

return context.WithTimeout(ctx, timeout)
}

func initializeAgentRegistry(cfg *Config, connectionCache map[*Agent]*grpc.ClientConn, getAgentMetadataClientFunc GetAgentMetadataClientFunc) (map[string]map[bool]*Agent, error) {
agentRegistry := make(map[string]map[bool]*Agent)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
agentRegistry := make(map[string]map[bool]*Agent)
agentRegistry := make(map[string]*Agent)

@@ -170,7 +170,7 @@ jobs:
cpu: "0"
memory: "0"
EOF
flytectl demo start --image flyte-sandbox-bundled:local --imagePullPolicy Never
flytectl demo start --image flyte-sandbox-bundled:local --disable-agent --imagePullPolicy Never
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

@Future-Outlier
Copy link
Member Author

Future-Outlier commented Dec 19, 2023

Update:
use grpc staus to handle the case when ListAgent Method is Unimplemented.
User can use the old agent settings!
image
image

@Future-Outlier
Copy link
Member Author

Future-Outlier commented Dec 19, 2023

It works well with agent metadata server.

  agent-service:
    # supportedTaskTypes:
    #   - sensor
    #   - spark
    #   - default_task
    #   - custom_task
    #   - api_task
    #   - sensor
    #   - airflow
    # By default, all the request will be sent to the default agent.
    defaultAgent:
      endpoint: "dns:///localhost:8000"
      insecure: true
      timeouts:
        GetTask: 100s
      defaultTimeout: 100s
    # agents:
    #   custom_agent:
    #     endpoint: "dns:///localhost:8001"
    #     insecure: true
    #     defaultServiceConfig: '{"loadBalancingConfig": [{"round_robin":{}}]}'
    #     timeouts:
    #       DoTask: 300s
    #       GetTask: 100s
    #     defaultTimeout: 300s
    agentForTaskTypes:
      # It will override the default agent for custom_task, which means propeller will send the request to this agent.
      - custom_task: custom_agent 
      - default_task: custom_agent

image

@Future-Outlier
Copy link
Member Author

It will still panic when agent server not started.
image

  agent-service:
    # supportedTaskTypes:
    #   - sensor
    #   - spark
    #   - default_task
    #   - custom_task
    #   - api_task
    #   - sensor
    #   - airflow
    # By default, all the request will be sent to the default agent.
    defaultAgent:
      endpoint: "dns:///localhost:8000"
      insecure: true
      timeouts:
        GetTask: 100s
      defaultTimeout: 100s
    # agents:
    #   custom_agent:
    #     endpoint: "dns:///localhost:8001"
    #     insecure: true
    #     defaultServiceConfig: '{"loadBalancingConfig": [{"round_robin":{}}]}'
    #     timeouts:
    #       DoTask: 300s
    #       GetTask: 100s
    #     defaultTimeout: 300s
    agentForTaskTypes:
      # It will override the default agent for custom_task, which means propeller will send the request to this agent.
      - custom_task: custom_agent 
      - default_task: custom_agent

image

@Future-Outlier
Copy link
Member Author

Future-Outlier commented Dec 19, 2023

I build an image for this PR to test whether enable agent and disable agent work correctly.

Enable Agent

flytectl demo start --image futureoutlier/flyte-sandbox:metadata-1219-0411 --force

Panic when the agent server deployment not started.
image
image
It will start the flyte cluster after the agent server deployment start.
image

Disable Agent

flytectl demo start --image futureoutlier/flyte-sandbox:metadata-1219-0411 --force --disable-agent

It will start properly without agentService section.
image
image

flyteidl/protos/flyteidl/service/agent.proto Outdated Show resolved Hide resolved
message DeleteTaskResponse {}

// A message containing the agent metadata.
message Agent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not adding async here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for both sync and async Agent, do you think we need to add it?

Future-Outlier and others added 2 commits December 20, 2023 10:17
Co-authored-by: Haytham Abuelfutuh <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
@Future-Outlier
Copy link
Member Author

updated method ListAgents log. (previously ListAgent)

{
   "json":{
      "src":"plugin.go:383"
   },
   "level":"info",
   "msg":"Agent supports task types: [sensor spark airflow task_type_1 task_type_2]",
   "ts":"2023-12-20T10:45:38+08:00"
}

@pingsutw pingsutw merged commit 3648440 into flyteorg:master Dec 20, 2023
45 of 47 checks passed
for _, agentDeployment := range agentDeployments {
client, err := getAgentMetadataClientFunc(context.Background(), agentDeployment, connectionCache)
if err != nil {
return nil, fmt.Errorf("failed to connect to agent [%v] with error: [%v]", agentDeployment, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this fail flytepropeller startup? If so, I think we need to be more resilient and not fail the whole propeller due to a single misbehaving agent deployment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could issue a warning and move on to the next deployment. When routing task later we will also need to be more resilient and fail if a mapping is not found.

Insecure: true,
DefaultTimeout: config.Duration{Duration: 10 * time.Second},
},
// AsyncPlugin should be registered to at least one task type.
// Reference: https://github.com/flyteorg/flyte/blob/master/flyteplugins/go/tasks/pluginmachinery/registry.go#L27
SupportedTaskTypes: []string{"task_type_1", "task_type_2"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should just remove SupportedTaskTypes to ship the breaking change and from now on we will solely rely on metadata as the single source of truth. If we still want to support this config, we will definitely need to be more resilient when applying the routing to find agent deployment because users might have a bad configuration, more specifically I am referring to getFinalAgent function.

@honnix
Copy link
Member

honnix commented Dec 21, 2023

We need to ensure the agent server is started before the propeller is started because the propeller needs to communicate with the agent server to get the agent Metadata.

As I commented, I'm afraid this is of a too strong assumption in this imperfect world :).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request size:XXL This PR changes 1000+ lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants