Skip to content

Commit

Permalink
Merge remote-tracking branch 'eapolinario-flytepropeller/prepare-for-…
Browse files Browse the repository at this point in the history
…monorepo--v1.1.126' into monorepo--bump-components-versions

Signed-off-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
eapolinario committed Sep 26, 2023
2 parents 0879db5 + d38e892 commit 78d83bc
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 59 deletions.
2 changes: 2 additions & 0 deletions flytepropeller/boilerplate/flyte/end2end/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# TO OPT OUT OF UPDATES, SEE https://github.com/flyteorg/boilerplate/blob/master/Readme.rst

.PHONY: end2end_execute
end2end_execute: export FLYTESNACKS_PRIORITIES ?= P0
end2end_execute: export FLYTESNACKS_VERSION ?= $(shell curl --silent "https://api.github.com/repos/flyteorg/flytesnacks/releases/latest" | jq -r .tag_name)
end2end_execute:
./boilerplate/flyte/end2end/end2end.sh ./boilerplate/flyte/end2end/functional-test-config.yaml --return_non_zero_on_failure

Expand Down
9 changes: 2 additions & 7 deletions flytepropeller/boilerplate/flyte/end2end/end2end.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,9 @@
# ONLY EDIT THIS FILE FROM WITHIN THE 'FLYTEORG/BOILERPLATE' REPOSITORY:
#
# TO OPT OUT OF UPDATES, SEE https://github.com/flyteorg/boilerplate/blob/master/Readme.rst
set -e
set -eu

CONFIG_FILE=$1; shift
EXTRA_FLAGS=( "$@" )

# By default only execute `core` tests
PRIORITIES="${PRIORITIES:-P0}"

LATEST_VERSION=$(curl --silent "https://api.github.com/repos/flyteorg/flytesnacks/releases/latest" | jq -r .tag_name)

python ./boilerplate/flyte/end2end/run-tests.py $LATEST_VERSION $PRIORITIES $CONFIG_FILE ${EXTRA_FLAGS[@]}
python ./boilerplate/flyte/end2end/run-tests.py $FLYTESNACKS_VERSION $FLYTESNACKS_PRIORITIES $CONFIG_FILE ${EXTRA_FLAGS[@]}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
admin:
# For GRPC endpoints you might want to use dns:///flyte.myexample.com
endpoint: localhost:30081
endpoint: dns:///localhost:30080
authType: Pkce
insecure: true
78 changes: 36 additions & 42 deletions flytepropeller/boilerplate/flyte/end2end/run-tests.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
#!/usr/bin/env python3

import click
import datetime
import json
import sys
import time
import traceback
from typing import Dict, List, Mapping, Tuple

import click
import requests
from typing import List, Mapping, Tuple, Dict
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
from flytekit.models.core.execution import WorkflowExecutionPhase
from flytekit.configuration import Config, ImageConfig, SerializationSettings
from flytekit.remote import FlyteRemote
from flytekit.remote.executions import FlyteWorkflowExecution


WAIT_TIME = 10
MAX_ATTEMPTS = 200

Expand All @@ -22,15 +22,14 @@
# starting with "core".
FLYTESNACKS_WORKFLOW_GROUPS: Mapping[str, List[Tuple[str, dict]]] = {
"lite": [
("basics.hello_world.my_wf", {}),
("basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}),
("basics.hello_world.hello_world_wf", {}),
],
"core": [
("basics.deck.wf", {}),
# ("development_lifecycle.decks.image_renderer_wf", {}),
# The chain_workflows example in flytesnacks expects to be running in a sandbox.
# ("control_flow.chain_entities.chain_workflows_wf", {}),
("control_flow.dynamics.wf", {"s1": "Pear", "s2": "Earth"}),
("control_flow.map_task.my_map_workflow", {"a": [1, 2, 3, 4, 5]}),
("advanced_composition.chain_entities.chain_workflows_wf", {}),
("advanced_composition.dynamics.wf", {"s1": "Pear", "s2": "Earth"}),
("advanced_composition.map_task.my_map_workflow", {"a": [1, 2, 3, 4, 5]}),
# Workflows that use nested executions cannot be launched via flyteremote.
# This issue is being tracked in https://github.com/flyteorg/flyte/issues/1482.
# ("control_flow.run_conditions.multiplier", {"my_input": 0.5}),
Expand All @@ -41,24 +40,22 @@
# ("control_flow.run_conditions.nested_conditions", {"my_input": 0.4}),
# ("control_flow.run_conditions.consume_outputs", {"my_input": 0.4, "seed": 7}),
# ("control_flow.run_merge_sort.merge_sort", {"numbers": [5, 4, 3, 2, 1], "count": 5}),
("control_flow.subworkflows.parent_wf", {"a": 3}),
("control_flow.subworkflows.nested_parent_wf", {"a": 3}),
("basics.basic_workflow.my_wf", {"a": 50, "b": "hello"}),
("advanced_composition.subworkflows.parent_workflow", {"my_input1": "hello"}),
("advanced_composition.subworkflows.nested_parent_wf", {"a": 3}),
("basics.workflow.simple_wf", {"x": [1, 2, 3], "y": [1, 2, 3]}),
# TODO: enable new files and folders workflows
# ("basics.files.rotate_one_workflow", {"in_image": "https://upload.wikimedia.org/wikipedia/commons/d/d2/Julia_set_%28C_%3D_0.285%2C_0.01%29.jpg"}),
# ("basics.folders.download_and_rotate", {}),
("basics.hello_world.my_wf", {}),
("basics.lp.my_wf", {"val": 4}),
("basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}),
("basics.named_outputs.my_wf", {}),
("basics.hello_world.hello_world_wf", {}),
("basics.named_outputs.simple_wf_with_named_outputs", {}),
# # Getting a 403 for the wikipedia image
# # ("basics.reference_task.wf", {}),
("type_system.custom_objects.wf", {"x": 10, "y": 20}),
("data_types_and_io.custom_objects.wf", {"x": 10, "y": 20}),
# Enums are not supported in flyteremote
# ("type_system.enums.enum_wf", {"c": "red"}),
("type_system.schema.df_wf", {"a": 42}),
("type_system.typed_schema.wf", {}),
#("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}),
("data_types_and_io.schema.df_wf", {"a": 42}),
("data_types_and_io.typed_schema.wf", {}),
# ("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}),
],
"integrations-k8s-spark": [
("k8s_spark_plugin.pyspark_pi.my_spark", {"triggered_date": datetime.datetime.now()}),
Expand Down Expand Up @@ -97,19 +94,22 @@ def execute_workflow(remote, version, workflow_name, inputs):
wf = remote.fetch_workflow(name=workflow_name, version=version)
return remote.execute(wf, inputs=inputs, wait=False)


def executions_finished(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]) -> bool:
for executions in executions_by_wfgroup.values():
if not all([execution.is_done for execution in executions]):
return False
return True


def sync_executions(remote: FlyteRemote, executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]):
try:
for executions in executions_by_wfgroup.values():
for execution in executions:
print(f"About to sync execution_id={execution.id.name}")
remote.sync(execution)
except:
except Exception:
print(traceback.format_exc())
print("GOT TO THE EXCEPT")
print("COUNT THIS!")

Expand All @@ -119,6 +119,7 @@ def report_executions(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecuti
for execution in executions:
print(execution)


def schedule_workflow_groups(
tag: str,
workflow_groups: List[str],
Expand All @@ -139,17 +140,12 @@ def schedule_workflow_groups(

# Wait for all executions to finish
attempt = 0
while attempt == 0 or (
not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS
):
while attempt == 0 or (not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS):
attempt += 1
print(
f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s"
)
print(f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s")
time.sleep(WAIT_TIME)
sync_executions(remote, executions_by_wfgroup)


report_executions(executions_by_wfgroup)

results = {}
Expand Down Expand Up @@ -192,14 +188,17 @@ def run(

# For a given release tag and priority, this function filters the workflow groups from the flytesnacks
# manifest file. For example, for the release tag "v0.2.224" and the priority "P0" it returns [ "core" ].
manifest_url = "https://raw.githubusercontent.com/flyteorg/flytesnacks/" \
f"{flytesnacks_release_tag}/flyte_tests_manifest.json"
manifest_url = (
"https://raw.githubusercontent.com/flyteorg/flytesnacks/" f"{flytesnacks_release_tag}/flyte_tests_manifest.json"
)
r = requests.get(manifest_url)
parsed_manifest = r.json()
workflow_groups = []
workflow_groups = ["lite"] if "lite" in priorities else [
group["name"] for group in parsed_manifest if group["priority"] in priorities
]
workflow_groups = (
["lite"]
if "lite" in priorities
else [group["name"] for group in parsed_manifest if group["priority"] in priorities]
)

results = []
valid_workgroups = []
Expand All @@ -216,10 +215,7 @@ def run(
valid_workgroups.append(workflow_group)

results_by_wfgroup = schedule_workflow_groups(
flytesnacks_release_tag,
valid_workgroups,
remote,
terminate_workflow_on_failure
flytesnacks_release_tag, valid_workgroups, remote, terminate_workflow_on_failure
)

for workflow_group, succeeded in results_by_wfgroup.items():
Expand Down Expand Up @@ -273,9 +269,7 @@ def cli(
terminate_workflow_on_failure,
):
print(f"return_non_zero_on_failure={return_non_zero_on_failure}")
results = run(
flytesnacks_release_tag, priorities, config_file, terminate_workflow_on_failure
)
results = run(flytesnacks_release_tag, priorities, config_file, terminate_workflow_on_failure)

# Write a json object in its own line describing the result of this run to stdout
print(f"Result of run:\n{json.dumps(results)}")
Expand Down
4 changes: 1 addition & 3 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,9 +598,7 @@ func StartController(ctx context.Context, cfg *config.Config, defaultNamespace s
}

go flyteworkflowInformerFactory.Start(ctx.Done())
if flyteK8sConfig.GetK8sPluginConfig().DefaultPodTemplateName != "" {
go informerFactory.Start(ctx.Done())
}
go informerFactory.Start(ctx.Done())

if err = c.Run(ctx); err != nil {
return errors.Wrapf(err, "Error running FlytePropeller.")
Expand Down
12 changes: 11 additions & 1 deletion flytepropeller/pkg/controller/nodes/array/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"

"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/common"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/codex"
Expand Down Expand Up @@ -45,7 +46,16 @@ func buildTaskExecutionEvent(_ context.Context, nCtx interfaces.NodeExecutionCon
}

nodeExecutionID := nCtx.NodeExecutionMetadata().GetNodeExecutionID()
if nCtx.ExecutionContext().GetEventVersion() != v1alpha1.EventVersion0 {
currentNodeUniqueID, err := common.GenerateUniqueID(nCtx.ExecutionContext().GetParentInfo(), nodeExecutionID.NodeId)
if err != nil {
return nil, err
}
nodeExecutionID.NodeId = currentNodeUniqueID
}

workflowExecutionID := nodeExecutionID.ExecutionId

return &event.TaskExecutionEvent{
TaskId: &idlcore.Identifier{
ResourceType: idlcore.ResourceType_TASK,
Expand All @@ -54,7 +64,7 @@ func buildTaskExecutionEvent(_ context.Context, nCtx interfaces.NodeExecutionCon
Name: nCtx.NodeID(),
Version: "v1", // this value is irrelevant but necessary for the identifier to be valid
},
ParentNodeExecutionId: nCtx.NodeExecutionMetadata().GetNodeExecutionID(),
ParentNodeExecutionId: nodeExecutionID,
RetryAttempt: 0, // ArrayNode will never retry
Phase: taskPhase,
PhaseVersion: taskPhaseVersion,
Expand Down
11 changes: 7 additions & 4 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1230,10 +1230,13 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter

err = nCtx.EventsRecorder().RecordNodeEvent(ctx, nev, c.eventConfig)
if err != nil {
if eventsErr.IsTooLarge(err) {
// With large enough dynamic task fanouts the reported node event, which contains the compiled
// workflow closure, can exceed the gRPC message size limit. In this case we immediately
// transition the node to failing to abort the workflow.
if eventsErr.IsTooLarge(err) || eventsErr.IsInvalidArguments(err) {
// we immediately transition to failing if one of two scenarios occur during node event recording:
// (1) the event is too large to be sent over gRPC. this can occur if, for example, a dynamic task
// has a very large fanout and the compiled workflow closure causes the event to exceed the gRPC
// message size limit.
// (2) the event is invalid. this can occur if, for example, a dynamic task compiles a workflow
// which is invalid per admin limits (ex. maximum resources exceeded).
np = v1alpha1.NodePhaseFailing
p = handler.PhaseInfoFailure(core.ExecutionError_USER, "NodeFailed", err.Error(), p.GetInfo())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ type executionCacheItem struct {
ExecutionOutputs *core.LiteralMap
}

func (e executionCacheItem) IsTerminal() bool {
if e.ExecutionClosure == nil {
return false
}
return e.ExecutionClosure.Phase == core.WorkflowExecution_ABORTED || e.ExecutionClosure.Phase == core.WorkflowExecution_FAILED || e.ExecutionClosure.Phase == core.WorkflowExecution_SUCCEEDED
}

func (e executionCacheItem) ID() string {
return e.String()
}
Expand Down
5 changes: 4 additions & 1 deletion flytepropeller/pkg/controller/nodes/task/plugin_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/k8s"
)

const AgentServiceKey = "agent-service"

var once sync.Once

func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPluginConfig, pr PluginRegistryIface) (enabledPlugins []core.PluginEntry, defaultForTaskTypes map[pluginID][]taskType, err error) {
Expand All @@ -24,8 +26,9 @@ func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPlu
}

// Register the GRPC plugin after the config is loaded
once.Do(func() { agent.RegisterAgentPlugin() })
pluginsConfigMeta, err := cfg.GetEnabledPlugins()
once.Do(func() { agent.RegisterAgentPlugin(pluginsConfigMeta.AllDefaultForTaskTypes[AgentServiceKey]) })

if err != nil {
return nil, nil, err
}
Expand Down

0 comments on commit 78d83bc

Please sign in to comment.