Skip to content

Commit

Permalink
GetDynamicNodeWorkflow endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Iaroslav Ciupin <[email protected]>
  • Loading branch information
iaroslav-ciupin committed Jan 10, 2024
1 parent 8ac697e commit e45bd5a
Show file tree
Hide file tree
Showing 40 changed files with 18,843 additions and 2,310 deletions.
27 changes: 27 additions & 0 deletions flyteadmin/pkg/manager/impl/workflow_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,33 @@ func (w *WorkflowManager) ListWorkflowIdentifiers(ctx context.Context, request a

}

func (w *WorkflowManager) GetDynamicNodeWorkflow(ctx context.Context, request admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error) {
if err := validation.ValidateNodeExecutionIdentifier(request.Id); err != nil {
logger.Debugf(ctx, "can't get node execution data with invalid identifier [%+v]: %v", request.Id, err)
}

ctx = getNodeExecutionContext(ctx, request.Id)
nodeExecutionModel, err := util.GetNodeExecutionModel(ctx, w.db, request.Id)
if err != nil {
logger.Debugf(ctx, "Failed to get node execution with id [%+v] with err %v",
request.Id, err)
return nil, err
}

if nodeExecutionModel.DynamicWorkflowRemoteClosureReference == "" {
return &admin.DynamicNodeWorkflowResponse{}, nil
}

closure := &core.CompiledWorkflowClosure{}
err = w.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecutionModel.DynamicWorkflowRemoteClosureReference), closure)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal,
"Unable to read WorkflowClosure from location %s : %v", nodeExecutionModel.DynamicWorkflowRemoteClosureReference, err)
}

return &admin.DynamicNodeWorkflowResponse{CompiledWorkflow: closure}, nil
}

func NewWorkflowManager(
db repoInterfaces.Repository,
config runtimeInterfaces.Configuration,
Expand Down
64 changes: 64 additions & 0 deletions flyteadmin/pkg/manager/impl/workflow_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -370,6 +371,69 @@ func TestGetWorkflow_TransformerError(t *testing.T) {
assert.Equal(t, codes.Internal, err.(adminErrors.FlyteAdminError).Code())
}

func Test_GetDynamicNodeWorkflow_Success(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{DynamicWorkflowRemoteClosureReference: remoteClosureIdentifier}, nil
})
mockStorageClient := commonMocks.GetMockStorageClient()
expectedClosure := testutils.GetWorkflowClosure().CompiledWorkflow
mockStorageClient.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func(ctx context.Context, reference storage.DataReference, msg proto.Message) error {
assert.Equal(t, remoteClosureIdentifier, reference.String())
bytes, err := proto.Marshal(expectedClosure)
require.NoError(t, err)
return proto.Unmarshal(bytes, msg)
}
ctx := context.TODO()
workflowManager := NewWorkflowManager(repo, getMockWorkflowConfigProvider(), getMockWorkflowCompiler(),
mockStorageClient, storagePrefix, mockScope.NewTestScope(),
artifacts.NewArtifactRegistry(ctx, nil))
expected := &admin.DynamicNodeWorkflowResponse{
CompiledWorkflow: expectedClosure,
}

resp, err := workflowManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

assert.NoError(t, err)
assert.True(t, proto.Equal(expected, resp))
}

func Test_GetDynamicNodeWorkflow_NoRemoteReference(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{DynamicWorkflowRemoteClosureReference: ""}, nil
})
mockStorageClient := commonMocks.GetMockStorageClient()
ctx := context.TODO()
workflowManager := NewWorkflowManager(repo, getMockWorkflowConfigProvider(), getMockWorkflowCompiler(),
mockStorageClient, storagePrefix, mockScope.NewTestScope(),
artifacts.NewArtifactRegistry(ctx, nil))
expected := &admin.DynamicNodeWorkflowResponse{CompiledWorkflow: nil}

resp, err := workflowManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

assert.NoError(t, err)
assert.True(t, proto.Equal(expected, resp))
}

func TestListWorkflows(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
workflowListFunc := func(input interfaces.ListResourceInput) (interfaces.WorkflowCollectionOutput, error) {
Expand Down
1 change: 1 addition & 0 deletions flyteadmin/pkg/manager/interfaces/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ type WorkflowInterface interface {
ListWorkflows(ctx context.Context, request admin.ResourceListRequest) (*admin.WorkflowList, error)
ListWorkflowIdentifiers(ctx context.Context, request admin.NamedEntityIdentifierListRequest) (
*admin.NamedEntityIdentifierList, error)
GetDynamicNodeWorkflow(ctx context.Context, request admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error)
}
4 changes: 4 additions & 0 deletions flyteadmin/pkg/manager/mocks/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,7 @@ func (r *MockWorkflowManager) ListWorkflowIdentifiers(ctx context.Context, reque
*admin.NamedEntityIdentifierList, error) {
return nil, nil
}

func (r *MockWorkflowManager) GetDynamicNodeWorkflow(ctx context.Context, request admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error) {
return nil, nil
}
20 changes: 11 additions & 9 deletions flyteadmin/pkg/rpc/adminservice/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ type taskExecutionEndpointMetrics struct {
type workflowEndpointMetrics struct {
scope promutils.Scope

create util.RequestMetrics
get util.RequestMetrics
list util.RequestMetrics
listIds util.RequestMetrics
create util.RequestMetrics
get util.RequestMetrics
list util.RequestMetrics
listIds util.RequestMetrics
getDynamicNodeWorkflow util.RequestMetrics
}

type descriptionEntityEndpointMetrics struct {
Expand Down Expand Up @@ -212,11 +213,12 @@ func InitMetrics(adminScope promutils.Scope) AdminMetrics {
list: util.NewRequestMetrics(adminScope, "list_task_execution"),
},
workflowEndpointMetrics: workflowEndpointMetrics{
scope: adminScope,
create: util.NewRequestMetrics(adminScope, "create_workflow"),
get: util.NewRequestMetrics(adminScope, "get_workflow"),
list: util.NewRequestMetrics(adminScope, "list_workflow"),
listIds: util.NewRequestMetrics(adminScope, "list_workflow_ids"),
scope: adminScope,
create: util.NewRequestMetrics(adminScope, "create_workflow"),
get: util.NewRequestMetrics(adminScope, "get_workflow"),
list: util.NewRequestMetrics(adminScope, "list_workflow"),
listIds: util.NewRequestMetrics(adminScope, "list_workflow_ids"),
getDynamicNodeWorkflow: util.NewRequestMetrics(adminScope, "get_dynamic_node_workflow"),
},

descriptionEntityMetrics: descriptionEntityEndpointMetrics{
Expand Down
18 changes: 18 additions & 0 deletions flyteadmin/pkg/rpc/adminservice/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ func (m *AdminService) GetWorkflow(ctx context.Context, request *admin.ObjectGet
return response, nil
}

func (m *AdminService) GetDynamicNodeWorkflow(ctx context.Context, request *admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}

var response *admin.DynamicNodeWorkflowResponse
var err error
m.Metrics.workflowEndpointMetrics.getDynamicNodeWorkflow.Time(func() {
response, err = m.WorkflowManager.GetDynamicNodeWorkflow(ctx, *request)
})
if err != nil {
return nil, util.TransformAndRecordError(err, &m.Metrics.workflowEndpointMetrics.get)
}
m.Metrics.workflowEndpointMetrics.getDynamicNodeWorkflow.Success()
return response, nil
}

func (m *AdminService) ListWorkflowIds(ctx context.Context, request *admin.NamedEntityIdentifierListRequest) (
*admin.NamedEntityIdentifierList, error) {
defer m.interceptPanic(ctx, request)
Expand Down
48 changes: 48 additions & 0 deletions flyteidl/clients/go/admin/mocks/AdminServiceClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions flyteidl/clients/go/admin/mocks/AdminServiceServer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e45bd5a

Please sign in to comment.