Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Iaroslav Ciupin <[email protected]>
  • Loading branch information
iaroslav-ciupin committed Jan 8, 2024
1 parent 8ac697e commit 429da8f
Show file tree
Hide file tree
Showing 37 changed files with 18,765 additions and 2,301 deletions.
27 changes: 27 additions & 0 deletions flyteadmin/pkg/manager/impl/workflow_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,33 @@ func (w *WorkflowManager) ListWorkflowIdentifiers(ctx context.Context, request a

}

func (w *WorkflowManager) GetDynamicNodeWorkflow(ctx context.Context, request admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error) {
if err := validation.ValidateNodeExecutionIdentifier(request.Id); err != nil {
logger.Debugf(ctx, "can't get node execution data with invalid identifier [%+v]: %v", request.Id, err)
}

ctx = getNodeExecutionContext(ctx, request.Id)
nodeExecutionModel, err := util.GetNodeExecutionModel(ctx, w.db, request.Id)
if err != nil {
logger.Debugf(ctx, "Failed to get node execution with id [%+v] with err %v",
request.Id, err)
return nil, err
}

if nodeExecutionModel.DynamicWorkflowRemoteClosureReference == "" {
return &admin.DynamicNodeWorkflowResponse{}, nil
}

closure := &core.CompiledWorkflowClosure{}
err = w.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecutionModel.DynamicWorkflowRemoteClosureReference), closure)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal,
"Unable to read WorkflowClosure from location %s : %v", nodeExecutionModel.DynamicWorkflowRemoteClosureReference, err)
}

return &admin.DynamicNodeWorkflowResponse{CompiledWorkflow: closure}, nil
}

func NewWorkflowManager(
db repoInterfaces.Repository,
config runtimeInterfaces.Configuration,
Expand Down
1 change: 1 addition & 0 deletions flyteadmin/pkg/manager/interfaces/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ type WorkflowInterface interface {
ListWorkflows(ctx context.Context, request admin.ResourceListRequest) (*admin.WorkflowList, error)
ListWorkflowIdentifiers(ctx context.Context, request admin.NamedEntityIdentifierListRequest) (
*admin.NamedEntityIdentifierList, error)
GetDynamicNodeWorkflow(ctx context.Context, request *admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error)
}
19 changes: 19 additions & 0 deletions flyteadmin/pkg/rpc/adminservice/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,25 @@ func (m *AdminService) GetWorkflow(ctx context.Context, request *admin.ObjectGet
return response, nil
}

func (m *AdminService) GetDynamicNodeWorkflow(ctx context.Context, request *admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error) {
defer m.interceptPanic(ctx, request)
if request == nil {
return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
}

var response *admin.DynamicNodeWorkflowResponse
var err error
// TODO create separate prometheus metric for this endpoint
m.Metrics.workflowEndpointMetrics.get.Time(func() {
response, err = m.WorkflowManager.GetDynamicNodeWorkflow(ctx, request)
})
if err != nil {
return nil, util.TransformAndRecordError(err, &m.Metrics.workflowEndpointMetrics.get)
}
m.Metrics.workflowEndpointMetrics.get.Success()
return response, nil
}

func (m *AdminService) ListWorkflowIds(ctx context.Context, request *admin.NamedEntityIdentifierListRequest) (
*admin.NamedEntityIdentifierList, error) {
defer m.interceptPanic(ctx, request)
Expand Down
48 changes: 48 additions & 0 deletions flyteidl/clients/go/admin/mocks/AdminServiceClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions flyteidl/clients/go/admin/mocks/AdminServiceServer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 429da8f

Please sign in to comment.