Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GetDynamicNodeWorkflow endpoint #4689

Merged
merged 9 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/single/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func startPropeller(ctx context.Context, cfg Propeller) error {
SyncPeriod: &propellerCfg.DownstreamEval.Duration,
DefaultNamespaces: namespaceConfigs,
},
NewCache: executors.NewCache,
NewCache: executors.NewCache,
NewClient: executors.NewClient,
Metrics: metricsserver.Options{
// Disable metrics serving
Expand Down
27 changes: 27 additions & 0 deletions flyteadmin/pkg/manager/impl/workflow_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,33 @@ func (w *WorkflowManager) ListWorkflowIdentifiers(ctx context.Context, request a

}

func (w *WorkflowManager) GetDynamicNodeWorkflow(ctx context.Context, request admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error) {
if err := validation.ValidateNodeExecutionIdentifier(request.Id); err != nil {
logger.Debugf(ctx, "can't get node execution data with invalid identifier [%+v]: %v", request.Id, err)
}

ctx = getNodeExecutionContext(ctx, request.Id)
nodeExecutionModel, err := util.GetNodeExecutionModel(ctx, w.db, request.Id)
if err != nil {
logger.Debugf(ctx, "Failed to get node execution with id [%+v] with err %v",
request.Id, err)
return nil, err
}

if nodeExecutionModel.DynamicWorkflowRemoteClosureReference == "" {
return &admin.DynamicNodeWorkflowResponse{}, nil
iaroslav-ciupin marked this conversation as resolved.
Show resolved Hide resolved
}

closure := &core.CompiledWorkflowClosure{}
err = w.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecutionModel.DynamicWorkflowRemoteClosureReference), closure)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal,
"Unable to read WorkflowClosure from location %s : %v", nodeExecutionModel.DynamicWorkflowRemoteClosureReference, err)
}

return &admin.DynamicNodeWorkflowResponse{CompiledWorkflow: closure}, nil
}

func NewWorkflowManager(
db repoInterfaces.Repository,
config runtimeInterfaces.Configuration,
Expand Down
124 changes: 124 additions & 0 deletions flyteadmin/pkg/manager/impl/workflow_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -370,6 +371,129 @@ func TestGetWorkflow_TransformerError(t *testing.T) {
assert.Equal(t, codes.Internal, err.(adminErrors.FlyteAdminError).Code())
}

func Test_GetDynamicNodeWorkflow_Success(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{DynamicWorkflowRemoteClosureReference: remoteClosureIdentifier}, nil
})
mockStorageClient := commonMocks.GetMockStorageClient()
expectedClosure := testutils.GetWorkflowClosure().CompiledWorkflow
mockStorageClient.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func(ctx context.Context, reference storage.DataReference, msg proto.Message) error {
assert.Equal(t, remoteClosureIdentifier, reference.String())
bytes, err := proto.Marshal(expectedClosure)
require.NoError(t, err)
return proto.Unmarshal(bytes, msg)
}
ctx := context.TODO()
workflowManager := NewWorkflowManager(repo, getMockWorkflowConfigProvider(), getMockWorkflowCompiler(),
mockStorageClient, storagePrefix, mockScope.NewTestScope(),
artifacts.NewArtifactRegistry(ctx, nil))
expected := &admin.DynamicNodeWorkflowResponse{
CompiledWorkflow: expectedClosure,
}

resp, err := workflowManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

assert.NoError(t, err)
assert.True(t, proto.Equal(expected, resp))
}

func Test_GetDynamicNodeWorkflow_DBError(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
expectedErr := errors.New("failure")
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{}, expectedErr
})
mockStorageClient := commonMocks.GetMockStorageClient()
ctx := context.TODO()
workflowManager := NewWorkflowManager(repo, getMockWorkflowConfigProvider(), getMockWorkflowCompiler(),
mockStorageClient, storagePrefix, mockScope.NewTestScope(),
artifacts.NewArtifactRegistry(ctx, nil))

resp, err := workflowManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

assert.Equal(t, expectedErr, err)
assert.Empty(t, resp)
}

func Test_GetDynamicNodeWorkflow_NoRemoteReference(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{DynamicWorkflowRemoteClosureReference: ""}, nil
})
mockStorageClient := commonMocks.GetMockStorageClient()
ctx := context.TODO()
workflowManager := NewWorkflowManager(repo, getMockWorkflowConfigProvider(), getMockWorkflowCompiler(),
mockStorageClient, storagePrefix, mockScope.NewTestScope(),
artifacts.NewArtifactRegistry(ctx, nil))
expected := &admin.DynamicNodeWorkflowResponse{CompiledWorkflow: nil}

resp, err := workflowManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

assert.NoError(t, err)
assert.True(t, proto.Equal(expected, resp))
}

func Test_GetDynamicNodeWorkflow_StorageError(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{DynamicWorkflowRemoteClosureReference: remoteClosureIdentifier}, nil
})
mockStorageClient := commonMocks.GetMockStorageClient()
mockStorageClient.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func(ctx context.Context, reference storage.DataReference, msg proto.Message) error {
assert.Equal(t, remoteClosureIdentifier, reference.String())
return errors.New("failure")
}
ctx := context.TODO()
workflowManager := NewWorkflowManager(repo, getMockWorkflowConfigProvider(), getMockWorkflowCompiler(),
mockStorageClient, storagePrefix, mockScope.NewTestScope(),
artifacts.NewArtifactRegistry(ctx, nil))

resp, err := workflowManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

st, ok := status.FromError(err)
assert.True(t, ok)
assert.Equal(t, codes.Internal, st.Code())
assert.Equal(t, "Unable to read WorkflowClosure from location s3://flyte/metadata/admin/remote closure id : failure", st.Message())
assert.Empty(t, resp)
}

func TestListWorkflows(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
workflowListFunc := func(input interfaces.ListResourceInput) (interfaces.WorkflowCollectionOutput, error) {
Expand Down
1 change: 1 addition & 0 deletions flyteadmin/pkg/manager/interfaces/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ type WorkflowInterface interface {
ListWorkflows(ctx context.Context, request admin.ResourceListRequest) (*admin.WorkflowList, error)
ListWorkflowIdentifiers(ctx context.Context, request admin.NamedEntityIdentifierListRequest) (
*admin.NamedEntityIdentifierList, error)
GetDynamicNodeWorkflow(ctx context.Context, request admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error)
}
4 changes: 4 additions & 0 deletions flyteadmin/pkg/manager/mocks/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,7 @@ func (r *MockWorkflowManager) ListWorkflowIdentifiers(ctx context.Context, reque
*admin.NamedEntityIdentifierList, error) {
return nil, nil
}

func (r *MockWorkflowManager) GetDynamicNodeWorkflow(ctx context.Context, request admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error) {
return nil, nil
}
20 changes: 11 additions & 9 deletions flyteadmin/pkg/rpc/adminservice/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ type taskExecutionEndpointMetrics struct {
type workflowEndpointMetrics struct {
scope promutils.Scope

create util.RequestMetrics
get util.RequestMetrics
list util.RequestMetrics
listIds util.RequestMetrics
create util.RequestMetrics
get util.RequestMetrics
list util.RequestMetrics
listIds util.RequestMetrics
getDynamicNodeWorkflow util.RequestMetrics
}

type descriptionEntityEndpointMetrics struct {
Expand Down Expand Up @@ -212,11 +213,12 @@ func InitMetrics(adminScope promutils.Scope) AdminMetrics {
list: util.NewRequestMetrics(adminScope, "list_task_execution"),
},
workflowEndpointMetrics: workflowEndpointMetrics{
scope: adminScope,
create: util.NewRequestMetrics(adminScope, "create_workflow"),
get: util.NewRequestMetrics(adminScope, "get_workflow"),
list: util.NewRequestMetrics(adminScope, "list_workflow"),
listIds: util.NewRequestMetrics(adminScope, "list_workflow_ids"),
scope: adminScope,
create: util.NewRequestMetrics(adminScope, "create_workflow"),
get: util.NewRequestMetrics(adminScope, "get_workflow"),
list: util.NewRequestMetrics(adminScope, "list_workflow"),
listIds: util.NewRequestMetrics(adminScope, "list_workflow_ids"),
getDynamicNodeWorkflow: util.NewRequestMetrics(adminScope, "get_dynamic_node_workflow"),
},

descriptionEntityMetrics: descriptionEntityEndpointMetrics{
Expand Down
18 changes: 18 additions & 0 deletions flyteadmin/pkg/rpc/adminservice/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@
return response, nil
}

func (m *AdminService) GetDynamicNodeWorkflow(ctx context.Context, request *admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}

Check warning on line 61 in flyteadmin/pkg/rpc/adminservice/workflow.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/rpc/adminservice/workflow.go#L57-L61

Added lines #L57 - L61 were not covered by tests

var response *admin.DynamicNodeWorkflowResponse
var err error
m.Metrics.workflowEndpointMetrics.getDynamicNodeWorkflow.Time(func() {
response, err = m.WorkflowManager.GetDynamicNodeWorkflow(ctx, *request)
})
if err != nil {
return nil, util.TransformAndRecordError(err, &m.Metrics.workflowEndpointMetrics.get)
}
m.Metrics.workflowEndpointMetrics.getDynamicNodeWorkflow.Success()
return response, nil

Check warning on line 72 in flyteadmin/pkg/rpc/adminservice/workflow.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/rpc/adminservice/workflow.go#L63-L72

Added lines #L63 - L72 were not covered by tests
}

func (m *AdminService) ListWorkflowIds(ctx context.Context, request *admin.NamedEntityIdentifierListRequest) (
*admin.NamedEntityIdentifierList, error) {
defer m.interceptPanic(ctx, request)
Expand Down
5 changes: 3 additions & 2 deletions flyteadmin/tests/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/testutils"
"github.com/flyteorg/flyte/flyteidl/clients/go/coreutils"
Expand Down Expand Up @@ -48,7 +49,7 @@ func insertTasksForTests(t *testing.T, client service.AdminServiceClient) {
}

_, err := client.CreateTask(ctx, &req)
assert.Nil(t, err)
require.NoError(t, err)
}
}
}
Expand Down Expand Up @@ -105,7 +106,7 @@ func insertWorkflowsForTests(t *testing.T, client service.AdminServiceClient) {
}

_, err := client.CreateWorkflow(ctx, &req)
assert.Nil(t, err, "Failed to create workflow test data with err %v", err)
require.NoError(t, err, "Failed to create workflow test data with err %v", err)
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions flyteadmin/tests/task_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
ptypesStruct "github.com/golang/protobuf/ptypes/struct"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/testutils"
Expand Down Expand Up @@ -51,7 +52,7 @@ func createTaskAndNodeExecution(
Id: taskIdentifier,
Spec: testutils.GetValidTaskRequest().Spec,
})
assert.Nil(t, err)
require.NoError(t, err)

_, err = client.CreateNodeEvent(ctx, &admin.NodeExecutionEventRequest{
RequestId: "request id",
Expand All @@ -64,7 +65,7 @@ func createTaskAndNodeExecution(
OccurredAt: occurredAtProto,
},
})
assert.Nil(t, err)
require.NoError(t, err)
}

func TestCreateTaskExecution(t *testing.T) {
Expand Down
Loading
Loading