Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flyte-core] Flyte Connection #5126

Draft
wants to merge 43 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
dfffe71
Add secret
pingsutw Mar 27, 2024
fa7573c
Create mock class
pingsutw Mar 27, 2024
0e235e1
update mock
pingsutw Mar 27, 2024
60754bc
feat: Add Secrets support to webapi plugin requests
pingsutw Mar 27, 2024
1e82896
wip
pingsutw Apr 3, 2024
18e91a2
merged master
pingsutw Apr 4, 2024
42bbe73
secretPtr
pingsutw Apr 4, 2024
a61fa8c
wip
pingsutw Apr 5, 2024
d895c71
Add connection
pingsutw Apr 17, 2024
61263ad
update proto
pingsutw Apr 17, 2024
0e69505
merged master
pingsutw Apr 17, 2024
e928f35
lint
pingsutw Apr 17, 2024
0b80b41
cleanup
pingsutw Apr 17, 2024
249fbe9
wip
pingsutw Apr 17, 2024
d525cf9
wip
pingsutw Apr 29, 2024
79302e5
go generate
pingsutw Apr 29, 2024
df830ea
fix tests
pingsutw Apr 29, 2024
9574f59
test
pingsutw Apr 29, 2024
7dedb8f
more tests
pingsutw Apr 29, 2024
23357c5
fix test
pingsutw Apr 29, 2024
307c06c
lint
pingsutw Apr 30, 2024
9429ac2
Merge branch 'master' of github.com:flyteorg/flyte into agent-secret
pingsutw Apr 30, 2024
aecfaa5
lint
pingsutw Apr 30, 2024
7afda69
merged master
pingsutw May 1, 2024
0e25e5b
Add interface
pingsutw May 2, 2024
1f01450
wip
pingsutw May 10, 2024
0808ec0
Merge branch 'master' of github.com:flyteorg/flyte into agent-secret
pingsutw May 10, 2024
b4f264c
Add ExternalResourceAttributes
pingsutw May 10, 2024
466c61c
nit
pingsutw May 10, 2024
b19daea
update test
pingsutw May 13, 2024
090a10b
update test
pingsutw May 13, 2024
d96975d
fix test
pingsutw May 13, 2024
22966d0
fix test
pingsutw May 13, 2024
54e5cc8
fix test
pingsutw May 13, 2024
a8b7d06
fix test
pingsutw May 13, 2024
78b7dfc
Add more tests
pingsutw May 13, 2024
bf9b39f
install tabulate
pingsutw May 13, 2024
f7e79b4
Address comment
pingsutw May 15, 2024
d12a889
nit
pingsutw May 15, 2024
ff3ece0
Merge branch 'master' of github.com:flyteorg/flyte into agent-secret
pingsutw May 15, 2024
0c0f723
rename connection_ref
pingsutw May 16, 2024
39093e1
make generate
pingsutw May 31, 2024
ad0a88f
merged master
pingsutw Jun 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 48 additions & 11 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,37 @@ func (m *ExecutionManager) getClusterAssignment(ctx context.Context, request *ad
}, nil
}

func (m *ExecutionManager) getExternalResourceAttributes(ctx context.Context, request *admin.ExecutionCreateRequest) (
*admin.ExternalResourceAttributes, error) {
if request.Spec.ExternalResourceAttributes != nil {
return request.Spec.ExternalResourceAttributes, nil
}

resource, err := m.resourceManager.GetResource(ctx, interfaces.ResourceRequest{
Project: request.Project,
Domain: request.Domain,
ResourceType: admin.MatchableResource_EXTERNAL_RESOURCE,
})
if err != nil && !errors.IsDoesNotExistError(err) {
logger.Errorf(ctx, "Failed to get external resource with error: %v", err)
return nil, err
}
if resource != nil && resource.Attributes.GetExternalResourceAttributes() != nil {
return resource.Attributes.GetExternalResourceAttributes(), nil
}

externalResource := m.config.ExternalResourceConfiguration().GetExternalResource()
connections := make(map[string]*core.Connection)
for key, connection := range externalResource.Connections {
connections[key] = &core.Connection{
Secrets: connection.Secrets,
Configs: connection.Configs,
}
}

return &admin.ExternalResourceAttributes{Connections: connections}, nil
}

func (m *ExecutionManager) launchSingleTaskExecution(
ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) (
context.Context, *models.Execution, error) {
Expand Down Expand Up @@ -957,23 +988,29 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
return nil, nil, nil, err
}

externalResourceAttribute, err := m.getExternalResourceAttributes(ctx, &request)
if err != nil {
return nil, nil, err
}

var executionClusterLabel *admin.ExecutionClusterLabel
if requestSpec.ExecutionClusterLabel != nil {
executionClusterLabel = requestSpec.ExecutionClusterLabel
}

executionParameters := workflowengineInterfaces.ExecutionParameters{
Inputs: executionInputs,
AcceptedAt: requestedAt,
Labels: labels,
Annotations: annotations,
ExecutionConfig: executionConfig,
TaskResources: &platformTaskResources,
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
RawOutputDataConfig: rawOutputDataConfig,
ClusterAssignment: clusterAssignment,
ExecutionClusterLabel: executionClusterLabel,
Inputs: executionInputs,
AcceptedAt: requestedAt,
Labels: labels,
Annotations: annotations,
ExecutionConfig: executionConfig,
TaskResources: &platformTaskResources,
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
RawOutputDataConfig: rawOutputDataConfig,
ClusterAssignment: clusterAssignment,
ExecutionClusterLabel: executionClusterLabel,
ExternalResourceAttributes: externalResourceAttribute,
}

overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, launchPlan.GetSpec().WorkflowId.Name, launchPlan.Id.Name)
Expand Down
87 changes: 87 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5495,6 +5495,93 @@ func TestGetClusterAssignment(t *testing.T) {
})
}

func TestGetExternalResourceAttribute(t *testing.T) {
externalResourceAttribute := &admin.ExternalResourceAttributes{
Connections: map[string]*core.Connection{
"conn1": {
Secrets: map[string]string{"key1": "value1"},
Configs: map[string]string{"key2": "value2"},
},
},
}
resourceManager := managerMocks.MockResourceManager{}
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
ResourceType: admin.MatchableResource_EXTERNAL_RESOURCE,
})
return &managerInterfaces.ResourceResponse{
Attributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_ExternalResourceAttributes{
ExternalResourceAttributes: externalResourceAttribute,
},
},
}, nil
}

executionManager := ExecutionManager{resourceManager: &resourceManager}

t.Run("value from db", func(t *testing.T) {
era, err := executionManager.getExternalResourceAttributes(context.TODO(), &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{},
})
assert.NoError(t, err)
assert.True(t, proto.Equal(era, externalResourceAttribute))
})
t.Run("value from request", func(t *testing.T) {
reqExternalResourceAttribute := &admin.ExternalResourceAttributes{
Connections: map[string]*core.Connection{
"conn2": {
Secrets: map[string]string{"key1": "value1"},
Configs: map[string]string{"key2": "value2"},
},
},
}
era, err := executionManager.getExternalResourceAttributes(context.TODO(), &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{
ExternalResourceAttributes: reqExternalResourceAttribute,
},
})
assert.NoError(t, err)
assert.True(t, proto.Equal(era, reqExternalResourceAttribute))
})
t.Run("value from config", func(t *testing.T) {
name := "conn3"
connections := map[string]runtimeInterfaces.Connection{
name: {
Secrets: map[string]string{"key1": "value1"},
Configs: map[string]string{"key2": "value2"},
},
}
externalResourceProvider := &runtimeIFaceMocks.ExternalResourceConfiguration{}
externalResourceProvider.OnGetExternalResource().Return(runtimeInterfaces.ExternalResource{
Connections: connections,
})
mockConfig := getMockExecutionsConfigProvider()
mockConfig.(*runtimeMocks.MockConfigurationProvider).AddExternalResourceConfiguration(externalResourceProvider)

executionManager := ExecutionManager{
resourceManager: &managerMocks.MockResourceManager{},
config: mockConfig,
}

era, err := executionManager.getExternalResourceAttributes(context.TODO(), &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{},
})
assert.NoError(t, err)
assert.Equal(t, connections[name].Secrets, era.GetConnections()[name].Secrets)
assert.Equal(t, connections[name].Configs, era.GetConnections()[name].Configs)
})
}

func TestResolvePermissions(t *testing.T) {
assumableIamRole := "role"
k8sServiceAccount := "sa"
Expand Down
6 changes: 6 additions & 0 deletions flyteadmin/pkg/runtime/configuration_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type ConfigurationProvider struct {
namespaceMappingConfiguration interfaces.NamespaceMappingConfiguration
qualityOfServiceConfiguration interfaces.QualityOfServiceConfiguration
clusterPoolAssignmentConfiguration interfaces.ClusterPoolAssignmentConfiguration
externalResourceConfiguration interfaces.ExternalResourceConfiguration
}

func (p *ConfigurationProvider) ApplicationConfiguration() interfaces.ApplicationConfiguration {
Expand Down Expand Up @@ -58,6 +59,10 @@ func (p *ConfigurationProvider) ClusterPoolAssignmentConfiguration() interfaces.
return p.clusterPoolAssignmentConfiguration
}

func (p *ConfigurationProvider) ExternalResourceConfiguration() interfaces.ExternalResourceConfiguration {
return p.externalResourceConfiguration
}

func NewConfigurationProvider() interfaces.Configuration {
return &ConfigurationProvider{
applicationConfiguration: NewApplicationConfigurationProvider(),
Expand All @@ -70,5 +75,6 @@ func NewConfigurationProvider() interfaces.Configuration {
namespaceMappingConfiguration: NewNamespaceMappingConfigurationProvider(),
qualityOfServiceConfiguration: NewQualityOfServiceConfigProvider(),
clusterPoolAssignmentConfiguration: NewClusterPoolAssignmentConfigurationProvider(),
externalResourceConfiguration: NewExternalResourceConfigurationProvider(),
}
}
21 changes: 21 additions & 0 deletions flyteadmin/pkg/runtime/external_resource_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package runtime

import (
"github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flytestdlib/config"
)

const externalResourcesKey = "externalResources"

var externalResourcesConfig = config.MustRegisterSection(externalResourcesKey, &interfaces.ExternalResourceConfig{})

// ExternalResourcesConfigurationProvider Implementation of an interfaces.ExternalResourceConfiguration
type ExternalResourcesConfigurationProvider struct{}

func (e ExternalResourcesConfigurationProvider) GetExternalResource() interfaces.ExternalResource {
return externalResourcesConfig.GetConfig().(*interfaces.ExternalResourceConfig).ExternalResource
}

func NewExternalResourceConfigurationProvider() interfaces.ExternalResourceConfiguration {
return &ExternalResourcesConfigurationProvider{}
}
1 change: 1 addition & 0 deletions flyteadmin/pkg/runtime/interfaces/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ type Configuration interface {
NamespaceMappingConfiguration() NamespaceMappingConfiguration
QualityOfServiceConfiguration() QualityOfServiceConfiguration
ClusterPoolAssignmentConfiguration() ClusterPoolAssignmentConfiguration
ExternalResourceConfiguration() ExternalResourceConfiguration
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package interfaces

//go:generate mockery -name ExternalResourceConfiguration -output=mocks -case=underscore

type Connection struct {
Secrets map[string]string `json:"secrets"`
Configs map[string]string `json:"configs"`
}

type ExternalResource struct {
Connections map[string]Connection `json:"connections"`
}

type ExternalResourceConfig struct {
ExternalResource ExternalResource `json:"externalResource"`
}

type ExternalResourceConfiguration interface {
GetExternalResource() ExternalResource
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions flyteadmin/pkg/runtime/mocks/mock_configuration_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type MockConfigurationProvider struct {
namespaceMappingConfiguration interfaces.NamespaceMappingConfiguration
qualityOfServiceConfiguration interfaces.QualityOfServiceConfiguration
clusterPoolAssignmentConfiguration interfaces.ClusterPoolAssignmentConfiguration
externalResourceConfiguration interfaces.ExternalResourceConfiguration
}

func (p *MockConfigurationProvider) ApplicationConfiguration() interfaces.ApplicationConfiguration {
Expand Down Expand Up @@ -79,6 +80,14 @@ func (p *MockConfigurationProvider) AddClusterPoolAssignmentConfiguration(cfg in
p.clusterPoolAssignmentConfiguration = cfg
}

func (p *MockConfigurationProvider) ExternalResourceConfiguration() interfaces.ExternalResourceConfiguration {
return p.externalResourceConfiguration
}

func (p *MockConfigurationProvider) AddExternalResourceConfiguration(cfg interfaces.ExternalResourceConfiguration) {
p.externalResourceConfiguration = cfg
}

func NewMockConfigurationProvider(
applicationConfiguration interfaces.ApplicationConfiguration,
queueConfiguration interfaces.QueueConfiguration,
Expand All @@ -94,6 +103,9 @@ func NewMockConfigurationProvider(
mockClusterPoolAssignmentConfiguration := &ifaceMocks.ClusterPoolAssignmentConfiguration{}
mockClusterPoolAssignmentConfiguration.OnGetClusterPoolAssignments().Return(make(map[string]interfaces.ClusterPoolAssignment))

mockExternalResourceConfiguration := &ifaceMocks.ExternalResourceConfiguration{}
mockExternalResourceConfiguration.OnGetExternalResource().Return(interfaces.ExternalResource{})

return &MockConfigurationProvider{
applicationConfiguration: applicationConfiguration,
queueConfiguration: queueConfiguration,
Expand All @@ -103,5 +115,6 @@ func NewMockConfigurationProvider(
namespaceMappingConfiguration: namespaceMappingConfiguration,
qualityOfServiceConfiguration: mockQualityOfServiceConfiguration,
clusterPoolAssignmentConfiguration: mockClusterPoolAssignmentConfiguration,
externalResourceConfiguration: mockExternalResourceConfiguration,
}
}
19 changes: 17 additions & 2 deletions flyteadmin/pkg/workflowengine/impl/prepare_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func addPermissions(securityCtx *core.SecurityContext, roleNameKey string, flyte

func addExecutionOverrides(taskPluginOverrides []*admin.PluginOverride,
workflowExecutionConfig *admin.WorkflowExecutionConfig, recoveryExecution *core.WorkflowExecutionIdentifier,
taskResources *interfaces.TaskResources, flyteWf *v1alpha1.FlyteWorkflow) {
taskResources *interfaces.TaskResources, externalResource *admin.ExternalResourceAttributes, flyteWf *v1alpha1.FlyteWorkflow) {
executionConfig := v1alpha1.ExecutionConfig{
TaskPluginImpls: make(map[string]v1alpha1.TaskPluginOverride),
RecoveryExecution: v1alpha1.WorkflowExecutionIdentifier{
Expand Down Expand Up @@ -107,6 +107,21 @@ func addExecutionOverrides(taskPluginOverrides []*admin.PluginOverride,
Limits: limits,
}
}

if externalResource != nil && externalResource.GetConnections() != nil {
connections := make(map[string]v1alpha1.Connection)
for name, connection := range externalResource.GetConnections() {
connections[name] = v1alpha1.Connection{
Secrets: connection.GetSecrets(),
Configs: connection.GetConfigs(),
}
}

executionConfig.ExternalResourceAttribute = v1alpha1.ExternalResourceAttributes{
Connections: connections,
}
}

flyteWf.ExecutionConfig = executionConfig
}

Expand Down Expand Up @@ -140,7 +155,7 @@ func PrepareFlyteWorkflow(data interfaces.ExecutionData, flyteWorkflow *v1alpha1
}
flyteWorkflow.WorkflowMeta.EventVersion = v1alpha1.EventVersion(data.ExecutionParameters.EventVersion)
addExecutionOverrides(data.ExecutionParameters.TaskPluginOverrides, data.ExecutionParameters.ExecutionConfig,
data.ExecutionParameters.RecoveryExecution, data.ExecutionParameters.TaskResources, flyteWorkflow)
data.ExecutionParameters.RecoveryExecution, data.ExecutionParameters.TaskResources, data.ExecutionParameters.ExternalResourceAttributes, flyteWorkflow)

if data.ExecutionParameters.RawOutputDataConfig != nil {
flyteWorkflow.RawOutputDataConfig = v1alpha1.RawOutputDataConfig{
Expand Down
Loading
Loading