Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] ContainerTask with compile time PodTemplate and outputs never ends #4813

Open
2 tasks done
LotemAm opened this issue Feb 1, 2024 · 2 comments
Open
2 tasks done
Labels
backlogged For internal use. Reserved for contributor team workflow. bug Something isn't working

Comments

@LotemAm
Copy link

LotemAm commented Feb 1, 2024

Describe the bug

I have a ContainerTask with custom PodTemplate, and it seems like Flyte is unable to detect when the task/container has finished and written the output file. I also have a different ContainerTask that doesn't use PodTemplate and everything works fine there.
While looking at the task tab under execution for both, I see that the working task has container.dataConfig while the other task has k8sPod.dataConfig, but when I go to the task definition itself I don't see any dataConfig anywhere, for the task that isn't working.

Expected behavior

After container finished, outputs get passed to next task and workflow keeps running.

Additional context to reproduce

In the following code, the first task, lookup, without PodTemplate works fine, but the second task, video_extractor, keeps running forever, long after the primary container has finished.

Code

from flytekit import PodTemplate
import kubernetes.client as k8s
from flytekit import ContainerTask, kwtypes, Resources
from flytekit.types.file import CSVFile

GCP_BUCKET = 'workflow-bucket'


def get_pod_template():
    tpl = PodTemplate(
        annotations={'gke-gcsfuse/volumes': 'true'},
        pod_spec=k8s.V1PodSpec(
            termination_grace_period_seconds=120,  # Long time to allow large files to flush
            containers=[
                k8s.V1Container(
                    name='primary',
                    volume_mounts=[
                        k8s.V1VolumeMount(name='data', mount_path='/data')
                    ],
                    readiness_probe=k8s.V1Probe(
                        _exec=k8s.V1ExecAction(command=['cat', '/tmp/ready']),
                        period_seconds=2,
                        initial_delay_seconds=1,
                    )
                )
            ],
            volumes=[
                k8s.V1Volume(
                    name='data',
                    csi=k8s.V1CSIVolumeSource(
                        driver='gcsfuse.csi.storage.gke.io',
                        volume_attributes={
                            'bucketName': GCP_BUCKET,
                            'mountOptions': 'implicit-dirs'
                        }
                    )
                )
            ]
        )
    )

    return tpl


# This task works fine
lookup = ContainerTask(
    name='lookup',
    # input_data_dir='/var/inputs',
    output_data_dir='/var/outputs',
    inputs=kwtypes(region=str),
    outputs=kwtypes(videos=FlyteFile['json']),
    enable_deck=True,
    image=f'{GCP_REGISTRY}/lookup:latest',
    secret_requests=[*mysql_secrets, proxy_secret],
    command=['python', '-m', 'service'],
    arguments=[
        'lookup',
        '--region',
        '{{.inputs.region}}',
        '--flow-output-path',
        '/var/outputs/videos',
    ],
    requests=Resources(cpu='0.2', mem='256M'),
)


# This task never ends, and Flyte complains it can't find output
# even after container exited successfully

video_extractor = ContainerTask(
    name='video_extractor',
    output_data_dir='/var/outputs',
    inputs=kwtypes(
        region=str,
    ),
    outputs=kwtypes(extractions=CSVFile),
    image=f'{GCP_REGISTRY}/video_extractor:latest',
    secret_requests=mysql_secrets,
    command=[
        'python',
        '-m',
        'service',
    ],
    arguments=[
        'video-extractor',
        '--region',
        '{{.inputs.region}}',
        '--flow-output-path',
        '/var/outputs/extractions',
    ],
    requests=Resources(gpu='1', mem='5G'),
    limits=Resources(mem='6G'),
    pod_template=get_pod_template(),
    environment={'AN_ENV': 'VALUE'}
)

Screenshots

No response

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@LotemAm LotemAm added bug Something isn't working untriaged This issues has not yet been looked at by the Maintainers labels Feb 1, 2024
Copy link

welcome bot commented Feb 1, 2024

Thank you for opening your first issue here! 🛠

Copy link

dosubot bot commented Feb 1, 2024

🤖

Hey there @LotemAm! Great to see you diving into another challenge. Hope all's been well since our last encounter. 👋

The issue you're experiencing might be due to how Flyte handles the task completion status. When using a custom PodTemplate, Flyte checks the status of the primary container to determine the task phase. If the primary container has finished execution but there are other containers (like sidecars) still running in the pod, the task will not be marked as complete until all containers have finished execution.

This behavior is defined in the GetTaskPhaseWithLogs function in the flyteplugins/go/tasks/plugins/k8s/pod/plugin.go file. Here's the relevant code snippet:

// if the primary container annotation does not exist, then the task requires all containers
// to succeed to declare success. therefore, if the pod is not in one of the above states we
// fallback to declaring the task as 'running'.
phaseInfo = pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, &info)
if len(info.Logs) > 0 {
	phaseInfo = phaseInfo.WithVersion(pluginsCore.DefaultPhaseVersion + 1)
}

In this code, if the primary container annotation does not exist, the task requires all containers to succeed to declare success. Therefore, if the pod is not in one of the above states, it falls back to declaring the task as 'running'.

To avoid this, you can use the PrimaryContainerKey annotation to specify the primary container. If this annotation is set, Flyte will use the status of the specified container to determine the task phase.

// if the primary container annotation exists, we use the status of the specified container
phaseInfo = flytek8s.DeterminePrimaryContainerPhase(primaryContainerName, pod.Status.ContainerStatuses, &info)

In this case, once the primary container has finished execution, the task will be marked as complete, regardless of the status of other containers in the pod.

I hope this helps! If you have any other questions or need further clarification, feel free to ask.

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

@eapolinario eapolinario added backlogged For internal use. Reserved for contributor team workflow. and removed untriaged This issues has not yet been looked at by the Maintainers labels Feb 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backlogged For internal use. Reserved for contributor team workflow. bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants