Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Buf to publish on artifacts branch push #4449

Closed
wants to merge 19 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Artf/switch event (#4428)
Most of the changes here are generated proto changes. Actual code changes are:
IDL
* Remove the supplemental fields in the CloudEventTaskExecution object and move them to CloudEventNodeExecution object.
* Remove some fields that the artifact service ended up not using (parent_node_execution and scheduled_at)

in the cloudevent publisher, change the code filling in of the aforementioned supplemental information to happen for node execution events instead of task execution events.
* Remove the deleted fields.

On the event handling side, move the logic to the handling of the node event instead of the task event.
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
wild-endeavor authored Nov 15, 2023
commit 484e144da5aa636bdeb208e5ddd07b9e3b505a73
172 changes: 123 additions & 49 deletions flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,9 @@ import (
"context"
"fmt"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyte/flytestdlib/contextutils"

"reflect"
"time"

@@ -200,47 +203,89 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
}

return &event.CloudEventWorkflowExecution{
RawEvent: rawEvent,
OutputData: outputs,
OutputInterface: &workflowInterface,
InputData: inputs,
ScheduledAt: spec.GetMetadata().GetScheduledAt(),
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ParentNodeExecution: spec.GetMetadata().GetParentNodeExecution(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
LaunchPlanId: spec.LaunchPlan,
RawEvent: rawEvent,
OutputData: outputs,
OutputInterface: &workflowInterface,
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
LaunchPlanId: spec.LaunchPlan,
}, nil
}

func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Context, rawEvent *event.NodeExecutionEvent) (*event.CloudEventNodeExecution, error) {
return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
}, nil
func getNodeExecutionContext(ctx context.Context, identifier *core.NodeExecutionIdentifier) context.Context {
ctx = contextutils.WithProjectDomain(ctx, identifier.ExecutionId.Project, identifier.ExecutionId.Domain)
ctx = contextutils.WithExecutionID(ctx, identifier.ExecutionId.Name)
return contextutils.WithNodeID(ctx, identifier.NodeId)
}

func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Context, rawEvent *event.TaskExecutionEvent) (*event.CloudEventTaskExecution, error) {
// This is a rough copy of the ListTaskExecutions function in TaskExecutionManager. It can be deprecated once we move the processing out of Admin itself.
// Just return the highest retry attempt.
func (c *CloudEventWrappedPublisher) getLatestTaskExecutions(ctx context.Context, nodeExecutionID core.NodeExecutionIdentifier) (*admin.TaskExecution, error) {
ctx = getNodeExecutionContext(ctx, &nodeExecutionID)

if rawEvent == nil {
return nil, fmt.Errorf("nothing to publish, TaskExecution event is nil")
identifierFilters, err := util.GetNodeExecutionIdentifierFilters(ctx, nodeExecutionID)
if err != nil {
return nil, err
}

// For now, don't append any additional information unless succeeded
if rawEvent.Phase != core.TaskExecution_SUCCEEDED {
return &event.CloudEventTaskExecution{
RawEvent: rawEvent,
OutputData: nil,
OutputInterface: nil,
sort := admin.Sort{
Key: "retry_attempt",
Direction: 0,
}
sortParameter, err := common.NewSortParameter(&sort, models.TaskExecutionColumns)
if err != nil {
return nil, err
}

output, err := c.db.TaskExecutionRepo().List(ctx, repositoryInterfaces.ListResourceInput{
InlineFilters: identifierFilters,
Offset: 0,
Limit: 1,
SortParameter: sortParameter,
})
if err != nil {
return nil, err
}
if output.TaskExecutions == nil || len(output.TaskExecutions) == 0 {
logger.Debugf(ctx, "no task executions found for node exec id [%+v]", nodeExecutionID)
return nil, nil
}

taskExecutionList, err := transformers.FromTaskExecutionModels(output.TaskExecutions, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Debugf(ctx, "failed to transform task execution models for node exec id [%+v] with err: %v", nodeExecutionID, err)
return nil, err
}

return taskExecutionList[0], nil
}

func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Context, rawEvent *event.NodeExecutionEvent) (*event.CloudEventNodeExecution, error) {
if rawEvent == nil || rawEvent.Id == nil {
return nil, fmt.Errorf("nothing to publish, NodeExecution event or ID is nil")
}

// Skip nodes unless they're succeeded and not start nodes
if rawEvent.Phase != core.NodeExecution_SUCCEEDED {
return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
}, nil
} else if rawEvent.Id.NodeId == "start-node" {
return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
}, nil
}
// metric

// This gets the parent workflow execution metadata
executionModel, err := c.db.ExecutionRepo().Get(ctx, repositoryInterfaces.Identifier{
Project: rawEvent.ParentNodeExecutionId.ExecutionId.Project,
Domain: rawEvent.ParentNodeExecutionId.ExecutionId.Domain,
Name: rawEvent.ParentNodeExecutionId.ExecutionId.Name,
Project: rawEvent.Id.ExecutionId.Project,
Domain: rawEvent.Id.ExecutionId.Domain,
Name: rawEvent.Id.ExecutionId.Name,
})
if err != nil {
logger.Infof(ctx, "couldn't find execution [%+v] to save termination cause", rawEvent.ParentNodeExecutionId)
logger.Infof(ctx, "couldn't find execution [%+v] for cloud event processing", rawEvent.Id.ExecutionId)
return nil, err
}

@@ -250,19 +295,9 @@ func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Con
fmt.Printf("there was an error with spec %v %v", err, executionModel.Spec)
}

taskModel, err := c.db.TaskRepo().Get(ctx, repositoryInterfaces.Identifier{
Project: rawEvent.TaskId.Project,
Domain: rawEvent.TaskId.Domain,
Name: rawEvent.TaskId.Name,
Version: rawEvent.TaskId.Version,
})
if err != nil {
// TODO: metric this
logger.Debugf(ctx, "Failed to get task with task id [%+v] with err %v", rawEvent.TaskId, err)
return nil, err
}
task, err := transformers.FromTaskModel(taskModel)

// Get inputs/outputs
// This will likely need to move to the artifact service side, given message size limits.
// Replace with call to GetNodeExecutionData
var inputs *core.LiteralMap
if rawEvent.GetInputData() != nil {
inputs = rawEvent.GetInputData()
@@ -273,9 +308,10 @@ func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Con
fmt.Printf("Error fetching input literal map %v", rawEvent)
}
} else {
logger.Infof(ctx, "Task execution for node exec [%+v] has no input data", rawEvent.ParentNodeExecutionId)
logger.Infof(ctx, "Node execution for node exec [%+v] has no input data", rawEvent.Id)
}

// This will likely need to move to the artifact service side, given message size limits.
var outputs *core.LiteralMap
if rawEvent.GetOutputData() != nil {
outputs = rawEvent.GetOutputData()
@@ -289,16 +325,53 @@ func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Con
}
}

// Fetch the latest task execution if any, and pull out the task interface, if applicable.
// These are optional fields... if the node execution doesn't have a task execution then these will be empty.
var taskExecID *core.TaskExecutionIdentifier
var typedInterface *core.TypedInterface

lte, err := c.getLatestTaskExecutions(ctx, *rawEvent.Id)
if err != nil {
logger.Errorf(ctx, "failed to get latest task execution for node exec id [%+v] with err: %v", rawEvent.Id, err)
return nil, err
}
if lte != nil {
taskModel, err := c.db.TaskRepo().Get(ctx, repositoryInterfaces.Identifier{
Project: lte.Id.TaskId.Project,
Domain: lte.Id.TaskId.Domain,
Name: lte.Id.TaskId.Name,
Version: lte.Id.TaskId.Version,
})
if err != nil {
// TODO: metric this
// metric
logger.Debugf(ctx, "Failed to get task with task id [%+v] with err %v", lte.Id.TaskId, err)
return nil, err
}
task, err := transformers.FromTaskModel(taskModel)
typedInterface = task.Closure.CompiledTask.Template.Interface
taskExecID = lte.Id
}

return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
TaskExecId: taskExecID,
OutputData: outputs,
OutputInterface: typedInterface,
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
LaunchPlanId: spec.LaunchPlan,
}, nil
}

func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Context, rawEvent *event.TaskExecutionEvent) (*event.CloudEventTaskExecution, error) {

if rawEvent == nil {
return nil, fmt.Errorf("nothing to publish, TaskExecution event is nil")
}

return &event.CloudEventTaskExecution{
RawEvent: rawEvent,
OutputData: outputs,
OutputInterface: task.Closure.CompiledTask.Template.Interface,
InputData: inputs,
ScheduledAt: spec.GetMetadata().GetScheduledAt(),
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ParentNodeExecution: spec.GetMetadata().GetParentNodeExecution(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
LaunchPlanId: spec.LaunchPlan,
RawEvent: rawEvent,
}, nil
}

@@ -359,6 +432,7 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy
phase = e.Phase.String()
eventTime = e.OccurredAt.AsTime()
eventID = fmt.Sprintf("%v.%v", executionID, phase)
eventSource = common.FlyteURLKeyFromNodeExecutionID(*msgType.Event.Id)
finalMsg, err = c.TransformNodeExecutionEvent(ctx, e)
case *event.CloudEventExecutionStart:
topic = "cloudevents.ExecutionStart"
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
@@ -206,7 +206,7 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req
}

go func() {
ceCtx := context.Background()
ceCtx := context.TODO()
if err := m.cloudEventsPublisher.Publish(ceCtx, proto.MessageName(&request), &request); err != nil {
logger.Errorf(ctx, "error publishing cloud event [%+v] with err: [%v]", request.RequestId, err)
}
63 changes: 46 additions & 17 deletions flyteartifacts/pkg/server/processor/events_handler.go
Original file line number Diff line number Diff line change
@@ -33,13 +33,14 @@ func (s *ServiceCallHandler) HandleEvent(ctx context.Context, cloudEvent *event2
return s.HandleEventTaskExec(ctx, source, msgType)
case *event.CloudEventNodeExecution:
logger.Debugf(ctx, "Handling CloudEventNodeExecution [%v]", msgType.RawEvent.Id)
return s.HandleEventNodeExec(ctx, msgType)
return s.HandleEventNodeExec(ctx, source, msgType)
default:
return fmt.Errorf("HandleEvent found unknown message type [%T]", msgType)
}
}

func (s *ServiceCallHandler) HandleEventExecStart(_ context.Context, _ *event.CloudEventExecutionStart) error {
// metric
return nil
}

@@ -171,18 +172,57 @@ func getPartitionsAndTag(ctx context.Context, partialID core.ArtifactID, variabl
return partitions, tag, nil
}

func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source string, evt *event.CloudEventTaskExecution) error {
func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, _ string, evt *event.CloudEventTaskExecution) error {

if evt.RawEvent.Phase != core.TaskExecution_SUCCEEDED {
logger.Debug(ctx, "Skipping non-successful task execution event")
return nil
}
// metric

execID := evt.RawEvent.ParentNodeExecutionId.ExecutionId
return nil
}

func (s *ServiceCallHandler) HandleEventNodeExec(ctx context.Context, source string, evt *event.CloudEventNodeExecution) error {
if evt.RawEvent.Phase != core.NodeExecution_SUCCEEDED {
logger.Debug(ctx, "Skipping non-successful task execution event")
return nil
}
if evt.RawEvent.Id.NodeId == "end-node" {
logger.Debug(ctx, "Skipping end node for %s", evt.RawEvent.Id.ExecutionId.Name)
return nil
}
// metric

execID := evt.RawEvent.Id.ExecutionId
if evt.GetOutputData().GetLiterals() == nil || len(evt.OutputData.Literals) == 0 {
logger.Debugf(ctx, "No output data to process for task event from [%v]: %s", execID, evt.RawEvent.TaskId.Name)
logger.Debugf(ctx, "No output data to process for task event from [%s] node %s", execID, evt.RawEvent.Id.NodeId)
}

if evt.OutputInterface == nil {
if evt.GetOutputData() != nil {
// metric this as error
logger.Errorf(ctx, "No output interface to process for task event from [%s] node %s, but output data is not nil", execID, evt.RawEvent.Id.NodeId)
}
logger.Debugf(ctx, "No output interface to process for task event from [%s] node %s", execID, evt.RawEvent.Id.NodeId)
return nil
}

if evt.RawEvent.GetTaskNodeMetadata() != nil {
if evt.RawEvent.GetTaskNodeMetadata().CacheStatus == core.CatalogCacheStatus_CACHE_HIT {
logger.Debugf(ctx, "Skipping cache hit for %s", evt.RawEvent.Id)
return nil
}
}
var taskExecID *core.TaskExecutionIdentifier
if taskExecID = evt.GetTaskExecId(); taskExecID == nil {
logger.Debugf(ctx, "No task execution id to process for task event from [%s] node %s", execID, evt.RawEvent.Id.NodeId)
return nil
}

// See note on the cloudevent_publisher side, we'll have to call one of the get data endpoints to get the actual data
// rather than reading them here. But read here for now.

// Iterate through the output interface. For any outputs that have an artifact ID specified, grab the
// output Literal and construct a Create request and call the service.
for varName, variable := range evt.OutputInterface.Outputs.Variables {
@@ -191,14 +231,8 @@ func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source str

output := evt.OutputData.Literals[varName]

taskExecID := core.TaskExecutionIdentifier{
TaskId: evt.RawEvent.TaskId,
NodeExecutionId: evt.RawEvent.ParentNodeExecutionId,
RetryAttempt: evt.RawEvent.RetryAttempt,
}

// Add a tracking tag to the Literal before saving.
version := fmt.Sprintf("%s/%s", source, varName)
version := fmt.Sprintf("%s/%d/%s", source, taskExecID.RetryAttempt, varName)
trackingTag := fmt.Sprintf("%s/%s/%s", execID.Project, execID.Domain, version)
if output.Metadata == nil {
output.Metadata = make(map[string]string, 1)
@@ -208,7 +242,7 @@ func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source str
spec := artifact.ArtifactSpec{
Value: output,
Type: evt.OutputInterface.Outputs.Variables[varName].Type,
TaskExecution: &taskExecID,
TaskExecution: taskExecID,
Execution: execID,
}

@@ -253,11 +287,6 @@ func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source str
logger.Debugf(ctx, "Created artifact id [%+v] for key %s", resp.Artifact.ArtifactId, varName)
}
}

return nil
}

func (s *ServiceCallHandler) HandleEventNodeExec(_ context.Context, _ *event.CloudEventNodeExecution) error {
return nil
}

6 changes: 3 additions & 3 deletions flyteidl/gen/pb-cpp/flyteidl/artifact/artifacts.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading