diff --git a/flyteplugins/go/tasks/logs/config.go b/flyteplugins/go/tasks/logs/config.go index b802844a4a..c8eb293035 100644 --- a/flyteplugins/go/tasks/logs/config.go +++ b/flyteplugins/go/tasks/logs/config.go @@ -28,8 +28,7 @@ type LogConfig struct { StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"` StackDriverTemplateURI tasklog.TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"` - IsFlyinEnabled bool `json:"flyin-enabled" pflag:",Enable Log-links to flyin logs"` - FlyinTemplateURI tasklog.TemplateURI `json:"flyin-template-uri" pflag:",Template Uri to use when building flyin log links"` + DynamicLogLinks map[string]tasklog.TemplateLogPlugin `json:"dynamic-log-links" pflag:"-,Map of dynamic log links"` Templates []tasklog.TemplateLogPlugin `json:"templates" pflag:"-,"` } diff --git a/flyteplugins/go/tasks/logs/logconfig_flags.go b/flyteplugins/go/tasks/logs/logconfig_flags.go index de8ba022dc..00c08a8a58 100755 --- a/flyteplugins/go/tasks/logs/logconfig_flags.go +++ b/flyteplugins/go/tasks/logs/logconfig_flags.go @@ -61,7 +61,5 @@ func (cfg LogConfig) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "gcp-project"), DefaultConfig.GCPProjectName, "Name of the project in GCP") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "stackdriver-logresourcename"), DefaultConfig.StackdriverLogResourceName, "Name of the logresource in stackdriver") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "stackdriver-template-uri"), DefaultConfig.StackDriverTemplateURI, "Template Uri to use when building stackdriver log links") - cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "flyin-enabled"), DefaultConfig.IsFlyinEnabled, "Enable Log-links to flyin logs") - cmdFlags.String(fmt.Sprintf("%v%v", prefix, "flyin-template-uri"), DefaultConfig.FlyinTemplateURI, "Template Uri to use when building flyin log links") return cmdFlags } diff --git a/flyteplugins/go/tasks/logs/logconfig_flags_test.go b/flyteplugins/go/tasks/logs/logconfig_flags_test.go index dfbee43c69..8bb775df1f 100755 --- a/flyteplugins/go/tasks/logs/logconfig_flags_test.go +++ b/flyteplugins/go/tasks/logs/logconfig_flags_test.go @@ -253,32 +253,4 @@ func TestLogConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_flyin-enabled", func(t *testing.T) { - - t.Run("Override", func(t *testing.T) { - testValue := "1" - - cmdFlags.Set("flyin-enabled", testValue) - if vBool, err := cmdFlags.GetBool("flyin-enabled"); err == nil { - testDecodeJson_LogConfig(t, fmt.Sprintf("%v", vBool), &actual.IsFlyinEnabled) - - } else { - assert.FailNow(t, err.Error()) - } - }) - }) - t.Run("Test_flyin-template-uri", func(t *testing.T) { - - t.Run("Override", func(t *testing.T) { - testValue := "1" - - cmdFlags.Set("flyin-template-uri", testValue) - if vString, err := cmdFlags.GetString("flyin-template-uri"); err == nil { - testDecodeJson_LogConfig(t, fmt.Sprintf("%v", vString), &actual.FlyinTemplateURI) - - } else { - assert.FailNow(t, err.Error()) - } - }) - }) } diff --git a/flyteplugins/go/tasks/logs/logging_utils.go b/flyteplugins/go/tasks/logs/logging_utils.go index 20f3522e27..04d23f4ec8 100644 --- a/flyteplugins/go/tasks/logs/logging_utils.go +++ b/flyteplugins/go/tasks/logs/logging_utils.go @@ -14,7 +14,7 @@ import ( ) // Internal -func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme, taskTemplate *core.TaskTemplate) ([]*core.TaskLog, error) { +func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVars []tasklog.TemplateVar, taskTemplate *core.TaskTemplate) ([]*core.TaskLog, error) { if logPlugin == nil { return nil, nil } @@ -39,19 +39,19 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas logs, err := logPlugin.GetTaskLogs( tasklog.Input{ - PodName: pod.Name, - PodUID: string(pod.GetUID()), - Namespace: pod.Namespace, - ContainerName: pod.Spec.Containers[index].Name, - ContainerID: pod.Status.ContainerStatuses[index].ContainerID, - LogName: nameSuffix, - PodRFC3339StartTime: time.Unix(startTime, 0).Format(time.RFC3339), - PodRFC3339FinishTime: time.Unix(finishTime, 0).Format(time.RFC3339), - PodUnixStartTime: startTime, - PodUnixFinishTime: finishTime, - TaskExecutionID: taskExecID, - ExtraTemplateVarsByScheme: extraLogTemplateVarsByScheme, - TaskTemplate: taskTemplate, + PodName: pod.Name, + PodUID: string(pod.GetUID()), + Namespace: pod.Namespace, + ContainerName: pod.Spec.Containers[index].Name, + ContainerID: pod.Status.ContainerStatuses[index].ContainerID, + LogName: nameSuffix, + PodRFC3339StartTime: time.Unix(startTime, 0).Format(time.RFC3339), + PodRFC3339FinishTime: time.Unix(finishTime, 0).Format(time.RFC3339), + PodUnixStartTime: startTime, + PodUnixFinishTime: finishTime, + TaskExecutionID: taskExecID, + ExtraTemplateVars: extraLogTemplateVars, + TaskTemplate: taskTemplate, }, ) @@ -63,13 +63,14 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas } type templateLogPluginCollection struct { - plugins []tasklog.TemplateLogPlugin + plugins []tasklog.TemplateLogPlugin + dynamicPlugins []tasklog.TemplateLogPlugin } func (t templateLogPluginCollection) GetTaskLogs(input tasklog.Input) (tasklog.Output, error) { var taskLogs []*core.TaskLog - for _, plugin := range t.plugins { + for _, plugin := range append(t.plugins, t.dynamicPlugins...) { o, err := plugin.GetTaskLogs(input) if err != nil { return tasklog.Output{}, err @@ -84,39 +85,43 @@ func (t templateLogPluginCollection) GetTaskLogs(input tasklog.Input) (tasklog.O func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) { // Use a list to maintain order. var plugins []tasklog.TemplateLogPlugin + var dynamicPlugins []tasklog.TemplateLogPlugin if cfg.IsKubernetesEnabled { if len(cfg.KubernetesTemplateURI) > 0 { - plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.KubernetesTemplateURI}, MessageFormat: core.TaskLog_JSON}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", TemplateURIs: []tasklog.TemplateURI{cfg.KubernetesTemplateURI}, MessageFormat: core.TaskLog_JSON}) } else { - plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, MessageFormat: core.TaskLog_JSON}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, MessageFormat: core.TaskLog_JSON}) } } if cfg.IsCloudwatchEnabled { if len(cfg.CloudwatchTemplateURI) > 0 { - plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.CloudwatchTemplateURI}, MessageFormat: core.TaskLog_JSON}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", TemplateURIs: []tasklog.TemplateURI{cfg.CloudwatchTemplateURI}, MessageFormat: core.TaskLog_JSON}) } else { - plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, MessageFormat: core.TaskLog_JSON}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, MessageFormat: core.TaskLog_JSON}) } } if cfg.IsStackDriverEnabled { if len(cfg.StackDriverTemplateURI) > 0 { - plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.StackDriverTemplateURI}, MessageFormat: core.TaskLog_JSON}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", TemplateURIs: []tasklog.TemplateURI{cfg.StackDriverTemplateURI}, MessageFormat: core.TaskLog_JSON}) } else { - plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, MessageFormat: core.TaskLog_JSON}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, MessageFormat: core.TaskLog_JSON}) } } - if cfg.IsFlyinEnabled { - if len(cfg.FlyinTemplateURI) > 0 { - plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Flyin Logs", Scheme: tasklog.TemplateSchemeFlyin, TemplateURIs: []tasklog.TemplateURI{cfg.FlyinTemplateURI}, MessageFormat: core.TaskLog_JSON}) - } else { - plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Flyin Logs", Scheme: tasklog.TemplateSchemeFlyin, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://flyin.%s/logs/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", cfg.GCPProjectName)}, MessageFormat: core.TaskLog_JSON}) - } + for logLinkType, dynamicLogLink := range cfg.DynamicLogLinks { + dynamicPlugins = append( + dynamicPlugins, + tasklog.TemplateLogPlugin{ + Name: logLinkType, + DisplayName: dynamicLogLink.DisplayName, + DynamicTemplateURIs: dynamicLogLink.TemplateURIs, + MessageFormat: core.TaskLog_JSON, + }) } plugins = append(plugins, cfg.Templates...) - return templateLogPluginCollection{plugins: plugins}, nil + return templateLogPluginCollection{plugins: plugins, dynamicPlugins: dynamicPlugins}, nil } diff --git a/flyteplugins/go/tasks/logs/logging_utils_test.go b/flyteplugins/go/tasks/logs/logging_utils_test.go index 46eb682201..946d069d50 100644 --- a/flyteplugins/go/tasks/logs/logging_utils_test.go +++ b/flyteplugins/go/tasks/logs/logging_utils_test.go @@ -334,7 +334,6 @@ func TestGetLogsForContainerInPod_Templates(t *testing.T) { "https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .taskRetryAttempt }}/view/logs", }, MessageFormat: core.TaskLog_JSON, - Scheme: tasklog.TemplateSchemeTaskExecution, }, }, }, nil, []*core.TaskLog{ @@ -351,30 +350,164 @@ func TestGetLogsForContainerInPod_Templates(t *testing.T) { }) } -func TestGetLogsForContainerInPod_Flyin(t *testing.T) { - assertTestSucceeded(t, - &LogConfig{ - IsKubernetesEnabled: true, - KubernetesTemplateURI: "https://k8s.com", - IsFlyinEnabled: true, - FlyinTemplateURI: "https://flyin.mydomain.com:{{ .port }}/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", +func TestGetLogsForContainerInPod_Flyteinteractive(t *testing.T) { + tests := []struct { + name string + config *LogConfig + template *core.TaskTemplate + expectedTaskLogs []*core.TaskLog + }{ + { + "Flyteinteractive enabled but no task template", + &LogConfig{ + DynamicLogLinks: map[string]tasklog.TemplateLogPlugin{ + "vscode": tasklog.TemplateLogPlugin{ + DisplayName: "vscode link", + TemplateURIs: []tasklog.TemplateURI{ + "https://flyteinteractive.mydomain.com:{{ .taskConfig.port }}/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", + }, + }, + }, + }, + nil, + nil, }, - &core.TaskTemplate{ - Config: map[string]string{ - "link_type": "vscode", - "port": "65535", + { + "Flyteinteractive enabled but config not found in task template", + &LogConfig{ + DynamicLogLinks: map[string]tasklog.TemplateLogPlugin{ + "vscode": tasklog.TemplateLogPlugin{ + DisplayName: "vscode link", + TemplateURIs: []tasklog.TemplateURI{ + "https://flyteinteractive.mydomain.com:{{ .taskConfig.port }}/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", + }, + }, + }, }, + &core.TaskTemplate{}, + nil, }, - []*core.TaskLog{ - { - Uri: "https://k8s.com", - MessageFormat: core.TaskLog_JSON, - Name: "Kubernetes Logs my-Suffix", + { + "Flyteinteractive disabled but config present in TaskTemplate", + &LogConfig{}, + &core.TaskTemplate{ + Config: map[string]string{ + "link_type": "vscode", + "port": "65535", + }, }, - { - Uri: "https://flyin.mydomain.com:65535/my-namespace/my-pod/ContainerName/ContainerID", - MessageFormat: core.TaskLog_JSON, - Name: "Flyin Logs my-Suffix", + nil, + }, + { + "Flyteinteractive - multiple dynamic options", + &LogConfig{ + DynamicLogLinks: map[string]tasklog.TemplateLogPlugin{ + "vscode": tasklog.TemplateLogPlugin{ + DisplayName: "vscode link", + TemplateURIs: []tasklog.TemplateURI{ + "https://abc.com:{{ .taskConfig.port }}/{{ .taskConfig.route }}", + }, + }, + }, + }, + &core.TaskTemplate{ + Config: map[string]string{ + "link_type": "vscode", + "port": "65535", + "route": "a-route", + }, + }, + []*core.TaskLog{ + { + Uri: "https://abc.com:65535/a-route", + MessageFormat: core.TaskLog_JSON, + Name: "vscode link my-Suffix", + }, + }, + }, + { + "Flyteinteractive - multiple uses of the template (invalid use of ports in a URI)", + &LogConfig{ + DynamicLogLinks: map[string]tasklog.TemplateLogPlugin{ + "vscode": tasklog.TemplateLogPlugin{ + DisplayName: "vscode link", + TemplateURIs: []tasklog.TemplateURI{ + "https://abc.com:{{ .taskConfig.port }}:{{ .taskConfig.port}}", + }, + }, + }, + }, + &core.TaskTemplate{ + Config: map[string]string{ + "link_type": "vscode", + "port": "65535", + }, + }, + []*core.TaskLog{ + { + Uri: "https://abc.com:65535:65535", + MessageFormat: core.TaskLog_JSON, + Name: "vscode link my-Suffix", + }, + }, + }, + { + "Flyteinteractive disabled and K8s enabled and flyteinteractive config present in TaskTemplate", + &LogConfig{ + IsKubernetesEnabled: true, + KubernetesTemplateURI: "https://k8s.com/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", + }, + &core.TaskTemplate{ + Config: map[string]string{ + "link_type": "vscode", + "port": "65535", + }, + }, + []*core.TaskLog{ + { + Uri: "https://k8s.com/my-namespace/my-pod/ContainerName/ContainerID", + MessageFormat: core.TaskLog_JSON, + Name: "Kubernetes Logs my-Suffix", + }, + }, + }, + { + "Flyteinteractive and K8s enabled", + &LogConfig{ + IsKubernetesEnabled: true, + KubernetesTemplateURI: "https://k8s.com/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", + DynamicLogLinks: map[string]tasklog.TemplateLogPlugin{ + "vscode": tasklog.TemplateLogPlugin{ + DisplayName: "vscode link", + TemplateURIs: []tasklog.TemplateURI{ + "https://flyteinteractive.mydomain.com:{{ .taskConfig.port }}/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", + }, + }, + }, }, + &core.TaskTemplate{ + Config: map[string]string{ + "link_type": "vscode", + "port": "65535", + }, + }, + []*core.TaskLog{ + { + Uri: "https://k8s.com/my-namespace/my-pod/ContainerName/ContainerID", + MessageFormat: core.TaskLog_JSON, + Name: "Kubernetes Logs my-Suffix", + }, + { + Uri: "https://flyteinteractive.mydomain.com:65535/my-namespace/my-pod/ContainerName/ContainerID", + MessageFormat: core.TaskLog_JSON, + Name: "vscode link my-Suffix", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assertTestSucceeded(t, tt.config, tt.template, tt.expectedTaskLogs) }) + } } diff --git a/flyteplugins/go/tasks/pluginmachinery/core/phase_enumer.go b/flyteplugins/go/tasks/pluginmachinery/core/phase_enumer.go index bd26c4d560..caa4d86250 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/phase_enumer.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/phase_enumer.go @@ -6,9 +6,9 @@ import ( "fmt" ) -const _PhaseName = "PhaseUndefinedPhaseNotReadyPhaseWaitingForResourcesPhaseQueuedPhaseInitializingPhaseRunningPhaseSuccessPhaseRetryableFailurePhasePermanentFailurePhaseWaitingForCache" +const _PhaseName = "PhaseUndefinedPhaseNotReadyPhaseWaitingForResourcesPhaseQueuedPhaseInitializingPhaseRunningPhaseSuccessPhaseRetryableFailurePhasePermanentFailurePhaseWaitingForCachePhaseAborted" -var _PhaseIndex = [...]uint8{0, 14, 27, 51, 62, 79, 91, 103, 124, 145, 165} +var _PhaseIndex = [...]uint8{0, 14, 27, 51, 62, 79, 91, 103, 124, 145, 165, 177} func (i Phase) String() string { if i < 0 || i >= Phase(len(_PhaseIndex)-1) { @@ -17,7 +17,7 @@ func (i Phase) String() string { return _PhaseName[_PhaseIndex[i]:_PhaseIndex[i+1]] } -var _PhaseValues = []Phase{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} +var _PhaseValues = []Phase{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} var _PhaseNameToValueMap = map[string]Phase{ _PhaseName[0:14]: 0, @@ -30,6 +30,7 @@ var _PhaseNameToValueMap = map[string]Phase{ _PhaseName[103:124]: 7, _PhaseName[124:145]: 8, _PhaseName[145:165]: 9, + _PhaseName[165:177]: 10, } // PhaseString retrieves an enum value from the enum constants string name. diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go index da2357a6d9..1caf8a2bac 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go @@ -7,16 +7,6 @@ import ( pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" ) -//go:generate enumer --type=TemplateScheme --trimprefix=TemplateScheme -json -yaml - -type TemplateScheme int - -const ( - TemplateSchemePod TemplateScheme = iota - TemplateSchemeTaskExecution - TemplateSchemeFlyin -) - // TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates. type TemplateURI = string @@ -25,32 +15,23 @@ type TemplateVar struct { Value string } -type TemplateVars []TemplateVar - -type TemplateVarsByScheme struct { - Common TemplateVars - Pod TemplateVars - TaskExecution TemplateVars - Flyin TemplateVars -} - // Input contains all available information about task's execution that a log plugin can use to construct task's // log links. type Input struct { - HostName string - PodName string - Namespace string - ContainerName string - ContainerID string - LogName string - PodRFC3339StartTime string - PodRFC3339FinishTime string - PodUnixStartTime int64 - PodUnixFinishTime int64 - PodUID string - TaskExecutionID pluginsCore.TaskExecutionID - ExtraTemplateVarsByScheme *TemplateVarsByScheme - TaskTemplate *core.TaskTemplate + HostName string + PodName string + Namespace string + ContainerName string + ContainerID string + LogName string + PodRFC3339StartTime string + PodRFC3339FinishTime string + PodUnixStartTime int64 + PodUnixFinishTime int64 + PodUID string + TaskExecutionID pluginsCore.TaskExecutionID + ExtraTemplateVars []TemplateVar + TaskTemplate *core.TaskTemplate } // Output contains all task logs a plugin generates for a given Input. @@ -65,8 +46,9 @@ type Plugin interface { } type TemplateLogPlugin struct { - DisplayName string `json:"displayName" pflag:",Display name for the generated log when displayed in the console."` - TemplateURIs []TemplateURI `json:"templateUris" pflag:",URI Templates for generating task log links."` - MessageFormat core.TaskLog_MessageFormat `json:"messageFormat" pflag:",Log Message Format."` - Scheme TemplateScheme `json:"scheme" pflag:",Templating scheme to use. Supported values are Pod and TaskExecution."` + Name string `json:"name" pflag:",Name of the plugin."` + DisplayName string `json:"displayName" pflag:",Display name for the generated log when displayed in the console."` + TemplateURIs []TemplateURI `json:"templateUris" pflag:",URI Templates for generating task log links."` + DynamicTemplateURIs []TemplateURI `json:"dynamictemplateUris" pflag:",URI Templates for generating dynamic task log links."` + MessageFormat core.TaskLog_MessageFormat `json:"messageFormat" pflag:",Log Message Format."` } diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go index 3b319f291e..e5481ecfbd 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go @@ -13,6 +13,12 @@ func MustCreateRegex(varName string) *regexp.Regexp { return regexp.MustCompile(fmt.Sprintf(`(?i){{\s*[\.$]%s\s*}}`, varName)) } +var taskConfigVarRegex = regexp.MustCompile(`(?i){{\s*.taskConfig[\.$]([a-zA-Z_]+)\s*}}`) + +func MustCreateDynamicLogRegex(varName string) *regexp.Regexp { + return regexp.MustCompile(fmt.Sprintf(`(?i){{\s*.taskConfig[\.$]%s\s*}}`, varName)) +} + type templateRegexes struct { LogName *regexp.Regexp PodName *regexp.Regexp @@ -35,7 +41,6 @@ type templateRegexes struct { ExecutionProject *regexp.Regexp ExecutionDomain *regexp.Regexp GeneratedName *regexp.Regexp - Port *regexp.Regexp } func initDefaultRegexes() templateRegexes { @@ -61,13 +66,12 @@ func initDefaultRegexes() templateRegexes { MustCreateRegex("executionProject"), MustCreateRegex("executionDomain"), MustCreateRegex("generatedName"), - MustCreateRegex("port"), } } var defaultRegexes = initDefaultRegexes() -func replaceAll(template string, vars TemplateVars) string { +func replaceAll(template string, vars []TemplateVar) string { for _, v := range vars { if len(v.Value) > 0 { template = v.Regex.ReplaceAllLiteralString(template, v.Value) @@ -76,48 +80,33 @@ func replaceAll(template string, vars TemplateVars) string { return template } -func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { - vars := TemplateVars{ - {defaultRegexes.LogName, input.LogName}, +func (input Input) templateVars() []TemplateVar { + vars := []TemplateVar{ + TemplateVar{defaultRegexes.LogName, input.LogName}, } - gotExtraTemplateVars := input.ExtraTemplateVarsByScheme != nil + gotExtraTemplateVars := input.ExtraTemplateVars != nil if gotExtraTemplateVars { - vars = append(vars, input.ExtraTemplateVarsByScheme.Common...) + vars = append(vars, input.ExtraTemplateVars...) } - switch scheme { - case TemplateSchemeFlyin: - port := input.TaskTemplate.GetConfig()["port"] - if port == "" { - port = "8080" - } - vars = append( - vars, - TemplateVar{defaultRegexes.Port, port}, - ) - fallthrough - case TemplateSchemePod: - // Container IDs are prefixed with docker://, cri-o://, etc. which is stripped by fluentd before pushing to a log - // stream. Therefore, we must also strip the prefix. - containerID := input.ContainerID - stripDelimiter := "://" - if split := strings.Split(input.ContainerID, stripDelimiter); len(split) > 1 { - containerID = split[1] - } - vars = append( - vars, - TemplateVar{defaultRegexes.PodName, input.PodName}, - TemplateVar{defaultRegexes.PodUID, input.PodUID}, - TemplateVar{defaultRegexes.Namespace, input.Namespace}, - TemplateVar{defaultRegexes.ContainerName, input.ContainerName}, - TemplateVar{defaultRegexes.ContainerID, containerID}, - TemplateVar{defaultRegexes.Hostname, input.HostName}, - ) - if gotExtraTemplateVars { - vars = append(vars, input.ExtraTemplateVarsByScheme.Pod...) - } - case TemplateSchemeTaskExecution: + // Container IDs are prefixed with docker://, cri-o://, etc. which is stripped by fluentd before pushing to a log + // stream. Therefore, we must also strip the prefix. + containerID := input.ContainerID + stripDelimiter := "://" + if split := strings.Split(input.ContainerID, stripDelimiter); len(split) > 1 { + containerID = split[1] + } + vars = append( + vars, + TemplateVar{defaultRegexes.PodName, input.PodName}, + TemplateVar{defaultRegexes.PodUID, input.PodUID}, + TemplateVar{defaultRegexes.Namespace, input.Namespace}, + TemplateVar{defaultRegexes.ContainerName, input.ContainerName}, + TemplateVar{defaultRegexes.ContainerID, containerID}, + TemplateVar{defaultRegexes.Hostname, input.HostName}, + ) + if input.TaskExecutionID != nil { taskExecutionIdentifier := input.TaskExecutionID.GetID() vars = append( vars, @@ -172,9 +161,6 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { }, ) } - if gotExtraTemplateVars { - vars = append(vars, input.ExtraTemplateVarsByScheme.TaskExecution...) - } } vars = append( @@ -194,25 +180,25 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { return vars } +func getDynamicLogLinkTypes(taskTemplate *core.TaskTemplate) []string { + if taskTemplate == nil { + return nil + } + config := taskTemplate.GetConfig() + if config == nil { + return nil + } + linkType := config["link_type"] + if linkType == "" { + return nil + } + return strings.Split(linkType, ",") +} + func (p TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) { - templateVars := input.templateVarsForScheme(p.Scheme) + templateVars := input.templateVars() taskLogs := make([]*core.TaskLog, 0, len(p.TemplateURIs)) - - // Grab metadata from task template and check if key "link_type" is set to "vscode". - // If so, add a vscode link to the task logs. - isFlyin := false - if input.TaskTemplate != nil && input.TaskTemplate.GetConfig() != nil { - config := input.TaskTemplate.GetConfig() - if config != nil && config["link_type"] == "vscode" { - isFlyin = true - } - } for _, templateURI := range p.TemplateURIs { - // Skip Flyin logs if plugin is enabled but no metadata is defined in input's task template. - // This is to prevent Flyin logs from being generated for tasks that don't have a Flyin metadata section. - if p.DisplayName == "Flyin Logs" && !isFlyin { - continue - } taskLogs = append(taskLogs, &core.TaskLog{ Uri: replaceAll(templateURI, templateVars), Name: p.DisplayName + input.LogName, @@ -220,5 +206,24 @@ func (p TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) { }) } + for _, dynamicLogLinkType := range getDynamicLogLinkTypes(input.TaskTemplate) { + for _, dynamicTemplateURI := range p.DynamicTemplateURIs { + if p.Name == dynamicLogLinkType { + for _, match := range taskConfigVarRegex.FindAllStringSubmatch(dynamicTemplateURI, -1) { + if len(match) > 1 { + if value, found := input.TaskTemplate.GetConfig()[match[1]]; found { + templateVars = append(templateVars, TemplateVar{MustCreateDynamicLogRegex(match[1]), value}) + } + } + } + taskLogs = append(taskLogs, &core.TaskLog{ + Uri: replaceAll(dynamicTemplateURI, templateVars), + Name: p.DisplayName + input.LogName, + MessageFormat: p.MessageFormat, + }) + } + } + } + return Output{TaskLogs: taskLogs}, nil } diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go index eedc2e622c..42226bd7c0 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go @@ -43,19 +43,23 @@ func dummyTaskExecID() pluginCore.TaskExecutionID { return tID } -func Test_Input_templateVarsForScheme(t *testing.T) { +func Test_Input_templateVars(t *testing.T) { testRegexes := struct { - Foo *regexp.Regexp - Bar *regexp.Regexp - Baz *regexp.Regexp - Ham *regexp.Regexp - Spam *regexp.Regexp + Foo *regexp.Regexp + Bar *regexp.Regexp + Baz *regexp.Regexp + Ham *regexp.Regexp + Spam *regexp.Regexp + LinkType *regexp.Regexp + Port *regexp.Regexp }{ MustCreateRegex("foo"), MustCreateRegex("bar"), MustCreateRegex("baz"), MustCreateRegex("ham"), MustCreateRegex("spam"), + MustCreateDynamicLogRegex("link_type"), + MustCreateDynamicLogRegex("port"), } podBase := Input{ HostName: "my-host", @@ -78,40 +82,20 @@ func Test_Input_templateVarsForScheme(t *testing.T) { PodUnixStartTime: 123, PodUnixFinishTime: 12345, } - flyinBase := Input{ - HostName: "my-host", - PodName: "my-pod", - PodUID: "my-pod-uid", - Namespace: "my-namespace", - ContainerName: "my-container", - ContainerID: "docker://containerID", - LogName: "main_logs", - PodRFC3339StartTime: "1970-01-01T01:02:03+01:00", - PodRFC3339FinishTime: "1970-01-01T04:25:45+01:00", - PodUnixStartTime: 123, - PodUnixFinishTime: 12345, - TaskTemplate: &core.TaskTemplate{ - Config: map[string]string{ - "port": "1234", - }, - }, - } tests := []struct { name string - scheme TemplateScheme baseVars Input - extraVars *TemplateVarsByScheme - exact TemplateVars - contains TemplateVars - notContains TemplateVars + extraVars []TemplateVar + exact []TemplateVar + contains []TemplateVar + notContains []TemplateVar }{ { "pod happy path", - TemplateSchemePod, podBase, nil, - TemplateVars{ + []TemplateVar{ {defaultRegexes.LogName, "main_logs"}, {defaultRegexes.PodName, "my-pod"}, {defaultRegexes.PodUID, "my-pod-uid"}, @@ -129,49 +113,32 @@ func Test_Input_templateVarsForScheme(t *testing.T) { }, { "pod with extra vars", - TemplateSchemePod, podBase, - &TemplateVarsByScheme{ - Common: TemplateVars{ - {testRegexes.Foo, "foo"}, - }, - Pod: TemplateVars{ - {testRegexes.Bar, "bar"}, - {testRegexes.Baz, "baz"}, - }, - }, - nil, - TemplateVars{ + []TemplateVar{ {testRegexes.Foo, "foo"}, {testRegexes.Bar, "bar"}, {testRegexes.Baz, "baz"}, }, nil, - }, - { - "pod with unused extra vars", - TemplateSchemePod, - podBase, - &TemplateVarsByScheme{ - TaskExecution: TemplateVars{ - {testRegexes.Bar, "bar"}, - {testRegexes.Baz, "baz"}, - }, - }, - nil, - nil, - TemplateVars{ + []TemplateVar{ + {testRegexes.Foo, "foo"}, {testRegexes.Bar, "bar"}, {testRegexes.Baz, "baz"}, }, + nil, }, { "task execution happy path", - TemplateSchemeTaskExecution, taskExecutionBase, nil, - TemplateVars{ + []TemplateVar{ {defaultRegexes.LogName, "main_logs"}, + {defaultRegexes.PodName, ""}, + {defaultRegexes.PodUID, ""}, + {defaultRegexes.Namespace, ""}, + {defaultRegexes.ContainerName, ""}, + {defaultRegexes.ContainerID, ""}, + {defaultRegexes.Hostname, ""}, {defaultRegexes.NodeID, "n0-0-n0"}, {defaultRegexes.GeneratedName, "generated-name"}, {defaultRegexes.TaskRetryAttempt, "1"}, @@ -192,92 +159,36 @@ func Test_Input_templateVarsForScheme(t *testing.T) { }, { "task execution with extra vars", - TemplateSchemeTaskExecution, taskExecutionBase, - &TemplateVarsByScheme{ - Common: TemplateVars{ - {testRegexes.Foo, "foo"}, - }, - TaskExecution: TemplateVars{ - {testRegexes.Bar, "bar"}, - {testRegexes.Baz, "baz"}, - }, - }, - nil, - TemplateVars{ + []TemplateVar{ {testRegexes.Foo, "foo"}, {testRegexes.Bar, "bar"}, {testRegexes.Baz, "baz"}, }, nil, - }, - { - "task execution with unused extra vars", - TemplateSchemeTaskExecution, - taskExecutionBase, - &TemplateVarsByScheme{ - Pod: TemplateVars{ - {testRegexes.Bar, "bar"}, - {testRegexes.Baz, "baz"}, - }, - }, - nil, - nil, - TemplateVars{ + []TemplateVar{ + {testRegexes.Foo, "foo"}, {testRegexes.Bar, "bar"}, {testRegexes.Baz, "baz"}, }, - }, - { - "flyin happy path", - TemplateSchemeFlyin, - flyinBase, - nil, - nil, - TemplateVars{ - {defaultRegexes.Port, "1234"}, - }, - nil, - }, - { - "flyin and pod happy path", - TemplateSchemeFlyin, - flyinBase, - nil, - TemplateVars{ - {defaultRegexes.LogName, "main_logs"}, - {defaultRegexes.Port, "1234"}, - {defaultRegexes.PodName, "my-pod"}, - {defaultRegexes.PodUID, "my-pod-uid"}, - {defaultRegexes.Namespace, "my-namespace"}, - {defaultRegexes.ContainerName, "my-container"}, - {defaultRegexes.ContainerID, "containerID"}, - {defaultRegexes.Hostname, "my-host"}, - {defaultRegexes.PodRFC3339StartTime, "1970-01-01T01:02:03+01:00"}, - {defaultRegexes.PodRFC3339FinishTime, "1970-01-01T04:25:45+01:00"}, - {defaultRegexes.PodUnixStartTime, "123"}, - {defaultRegexes.PodUnixFinishTime, "12345"}, - }, - nil, nil, }, { "pod with port not affected", - TemplateSchemePod, podBase, nil, nil, nil, - TemplateVars{ - {defaultRegexes.Port, "1234"}, + []TemplateVar{ + {testRegexes.Port, "1234"}, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { base := tt.baseVars - base.ExtraTemplateVarsByScheme = tt.extraVars - got := base.templateVarsForScheme(tt.scheme) + base.ExtraTemplateVars = tt.extraVars + got := base.templateVars() if tt.exact != nil { assert.Equal(t, got, tt.exact) } @@ -476,7 +387,6 @@ func TestTemplateLogPlugin(t *testing.T) { { "task-with-task-execution-identifier", TemplateLogPlugin{ - Scheme: TemplateSchemeTaskExecution, TemplateURIs: []TemplateURI{"https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .taskRetryAttempt }}/view/logs"}, MessageFormat: core.TaskLog_JSON, }, @@ -508,7 +418,6 @@ func TestTemplateLogPlugin(t *testing.T) { { "mapped-task-with-task-execution-identifier", TemplateLogPlugin{ - Scheme: TemplateSchemeTaskExecution, TemplateURIs: []TemplateURI{"https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .subtaskParentRetryAttempt }}/mappedIndex/{{ .subtaskExecutionIndex }}/mappedAttempt/{{ .subtaskRetryAttempt }}/view/logs"}, MessageFormat: core.TaskLog_JSON, }, @@ -525,12 +434,10 @@ func TestTemplateLogPlugin(t *testing.T) { PodUnixStartTime: 123, PodUnixFinishTime: 12345, TaskExecutionID: dummyTaskExecID(), - ExtraTemplateVarsByScheme: &TemplateVarsByScheme{ - TaskExecution: TemplateVars{ - {MustCreateRegex("subtaskExecutionIndex"), "1"}, - {MustCreateRegex("subtaskRetryAttempt"), "1"}, - {MustCreateRegex("subtaskParentRetryAttempt"), "0"}, - }, + ExtraTemplateVars: []TemplateVar{ + {MustCreateRegex("subtaskExecutionIndex"), "1"}, + {MustCreateRegex("subtaskRetryAttempt"), "1"}, + {MustCreateRegex("subtaskParentRetryAttempt"), "0"}, }, }, }, @@ -545,11 +452,11 @@ func TestTemplateLogPlugin(t *testing.T) { }, }, { - "flyin", + "flyteinteractive", TemplateLogPlugin{ - Scheme: TemplateSchemeFlyin, - TemplateURIs: []TemplateURI{"vscode://flyin:{{ .port }}/{{ .podName }}"}, - MessageFormat: core.TaskLog_JSON, + Name: "vscode", + DynamicTemplateURIs: []TemplateURI{"vscode://flyteinteractive:{{ .taskConfig.port }}/{{ .podName }}"}, + MessageFormat: core.TaskLog_JSON, }, args{ input: Input{ @@ -565,54 +472,54 @@ func TestTemplateLogPlugin(t *testing.T) { Output{ TaskLogs: []*core.TaskLog{ { - Uri: "vscode://flyin:1234/my-pod-name", + Uri: "vscode://flyteinteractive:1234/my-pod-name", MessageFormat: core.TaskLog_JSON, }, }, }, }, { - "flyin - default port", + "flyteinteractive - no link_type in task template", TemplateLogPlugin{ - Scheme: TemplateSchemeFlyin, - TemplateURIs: []TemplateURI{"vscode://flyin:{{ .port }}/{{ .podName }}"}, - MessageFormat: core.TaskLog_JSON, + Name: "vscode", + DynamicTemplateURIs: []TemplateURI{"vscode://flyteinteractive:{{ .taskConfig.port }}/{{ .podName }}"}, + MessageFormat: core.TaskLog_JSON, + DisplayName: "Flyteinteractive Logs", }, args{ input: Input{ PodName: "my-pod-name", - TaskTemplate: &core.TaskTemplate{ - Config: map[string]string{ - "link_type": "vscode", - }, - }, }, }, Output{ - TaskLogs: []*core.TaskLog{ - { - Uri: "vscode://flyin:8080/my-pod-name", - MessageFormat: core.TaskLog_JSON, - }, - }, + TaskLogs: []*core.TaskLog{}, }, }, { - "flyin - no link_type in task template", + "kubernetes", TemplateLogPlugin{ - Scheme: TemplateSchemeFlyin, - TemplateURIs: []TemplateURI{"vscode://flyin:{{ .port }}/{{ .podName }}"}, + TemplateURIs: []TemplateURI{"https://dashboard.k8s.net/#!/log/{{.namespace}}/{{.podName}}/pod?namespace={{.namespace}}"}, MessageFormat: core.TaskLog_JSON, - DisplayName: "Flyin Logs", }, args{ input: Input{ - PodName: "my-pod-name", + PodName: "flyteexamples-development-task-name", + PodUID: "pod-uid", + Namespace: "flyteexamples-development", + ContainerName: "ignore", + ContainerID: "ignore", + LogName: "main_logs", + PodRFC3339StartTime: "1970-01-01T01:02:03+01:00", + PodRFC3339FinishTime: "1970-01-01T04:25:45+01:00", + PodUnixStartTime: 123, + PodUnixFinishTime: 12345, }, }, - Output{ - TaskLogs: []*core.TaskLog{}, - }, + Output{TaskLogs: []*core.TaskLog{{ + Uri: "https://dashboard.k8s.net/#!/log/flyteexamples-development/flyteexamples-development-task-name/pod?namespace=flyteexamples-development", + MessageFormat: core.TaskLog_JSON, + Name: "main_logs", + }}}, }, } for _, tt := range tests { diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/templatescheme_enumer.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/templatescheme_enumer.go deleted file mode 100644 index c1f4d668c0..0000000000 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/templatescheme_enumer.go +++ /dev/null @@ -1,85 +0,0 @@ -// Code generated by "enumer --type=TemplateScheme --trimprefix=TemplateScheme -json -yaml"; DO NOT EDIT. - -package tasklog - -import ( - "encoding/json" - "fmt" -) - -const _TemplateSchemeName = "PodTaskExecutionFlyin" - -var _TemplateSchemeIndex = [...]uint8{0, 3, 16, 21} - -func (i TemplateScheme) String() string { - if i < 0 || i >= TemplateScheme(len(_TemplateSchemeIndex)-1) { - return fmt.Sprintf("TemplateScheme(%d)", i) - } - return _TemplateSchemeName[_TemplateSchemeIndex[i]:_TemplateSchemeIndex[i+1]] -} - -var _TemplateSchemeValues = []TemplateScheme{0, 1, 2} - -var _TemplateSchemeNameToValueMap = map[string]TemplateScheme{ - _TemplateSchemeName[0:3]: 0, - _TemplateSchemeName[3:16]: 1, - _TemplateSchemeName[16:21]: 2, -} - -// TemplateSchemeString retrieves an enum value from the enum constants string name. -// Throws an error if the param is not part of the enum. -func TemplateSchemeString(s string) (TemplateScheme, error) { - if val, ok := _TemplateSchemeNameToValueMap[s]; ok { - return val, nil - } - return 0, fmt.Errorf("%s does not belong to TemplateScheme values", s) -} - -// TemplateSchemeValues returns all values of the enum -func TemplateSchemeValues() []TemplateScheme { - return _TemplateSchemeValues -} - -// IsATemplateScheme returns "true" if the value is listed in the enum definition. "false" otherwise -func (i TemplateScheme) IsATemplateScheme() bool { - for _, v := range _TemplateSchemeValues { - if i == v { - return true - } - } - return false -} - -// MarshalJSON implements the json.Marshaler interface for TemplateScheme -func (i TemplateScheme) MarshalJSON() ([]byte, error) { - return json.Marshal(i.String()) -} - -// UnmarshalJSON implements the json.Unmarshaler interface for TemplateScheme -func (i *TemplateScheme) UnmarshalJSON(data []byte) error { - var s string - if err := json.Unmarshal(data, &s); err != nil { - return fmt.Errorf("TemplateScheme should be a string, got %s", data) - } - - var err error - *i, err = TemplateSchemeString(s) - return err -} - -// MarshalYAML implements a YAML Marshaler for TemplateScheme -func (i TemplateScheme) MarshalYAML() (interface{}, error) { - return i.String(), nil -} - -// UnmarshalYAML implements a YAML Unmarshaler for TemplateScheme -func (i *TemplateScheme) UnmarshalYAML(unmarshal func(interface{}) error) error { - var s string - if err := unmarshal(&s); err != nil { - return err - } - - var err error - *i, err = TemplateSchemeString(s) - return err -} diff --git a/flyteplugins/go/tasks/pluginmachinery/webapi/mocks/sync_plugin.go b/flyteplugins/go/tasks/pluginmachinery/webapi/mocks/sync_plugin.go new file mode 100644 index 0000000000..cfe5d38090 --- /dev/null +++ b/flyteplugins/go/tasks/pluginmachinery/webapi/mocks/sync_plugin.go @@ -0,0 +1,88 @@ +// Code generated by mockery v1.0.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + core "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" + mock "github.com/stretchr/testify/mock" + + webapi "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/webapi" +) + +// SyncPlugin is an autogenerated mock type for the SyncPlugin type +type SyncPlugin struct { + mock.Mock +} + +type SyncPlugin_Do struct { + *mock.Call +} + +func (_m SyncPlugin_Do) Return(phase core.PhaseInfo, err error) *SyncPlugin_Do { + return &SyncPlugin_Do{Call: _m.Call.Return(phase, err)} +} + +func (_m *SyncPlugin) OnDo(ctx context.Context, tCtx webapi.TaskExecutionContext) *SyncPlugin_Do { + c_call := _m.On("Do", ctx, tCtx) + return &SyncPlugin_Do{Call: c_call} +} + +func (_m *SyncPlugin) OnDoMatch(matchers ...interface{}) *SyncPlugin_Do { + c_call := _m.On("Do", matchers...) + return &SyncPlugin_Do{Call: c_call} +} + +// Do provides a mock function with given fields: ctx, tCtx +func (_m *SyncPlugin) Do(ctx context.Context, tCtx webapi.TaskExecutionContext) (core.PhaseInfo, error) { + ret := _m.Called(ctx, tCtx) + + var r0 core.PhaseInfo + if rf, ok := ret.Get(0).(func(context.Context, webapi.TaskExecutionContext) core.PhaseInfo); ok { + r0 = rf(ctx, tCtx) + } else { + r0 = ret.Get(0).(core.PhaseInfo) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, webapi.TaskExecutionContext) error); ok { + r1 = rf(ctx, tCtx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type SyncPlugin_GetConfig struct { + *mock.Call +} + +func (_m SyncPlugin_GetConfig) Return(_a0 webapi.PluginConfig) *SyncPlugin_GetConfig { + return &SyncPlugin_GetConfig{Call: _m.Call.Return(_a0)} +} + +func (_m *SyncPlugin) OnGetConfig() *SyncPlugin_GetConfig { + c_call := _m.On("GetConfig") + return &SyncPlugin_GetConfig{Call: c_call} +} + +func (_m *SyncPlugin) OnGetConfigMatch(matchers ...interface{}) *SyncPlugin_GetConfig { + c_call := _m.On("GetConfig", matchers...) + return &SyncPlugin_GetConfig{Call: c_call} +} + +// GetConfig provides a mock function with given fields: +func (_m *SyncPlugin) GetConfig() webapi.PluginConfig { + ret := _m.Called() + + var r0 webapi.PluginConfig + if rf, ok := ret.Get(0).(func() webapi.PluginConfig); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(webapi.PluginConfig) + } + + return r0 +} diff --git a/flyteplugins/go/tasks/plugins/array/core/phase_enumer.go b/flyteplugins/go/tasks/plugins/array/core/phase_enumer.go index c659c7bcfe..a5cc4569bc 100644 --- a/flyteplugins/go/tasks/plugins/array/core/phase_enumer.go +++ b/flyteplugins/go/tasks/plugins/array/core/phase_enumer.go @@ -6,9 +6,9 @@ import ( "fmt" ) -const _PhaseName = "PhaseStartPhasePreLaunchPhaseLaunchPhaseWaitingForResourcesPhaseCheckingSubTaskExecutionsPhaseAssembleFinalOutputPhaseWriteToDiscoveryPhaseWriteToDiscoveryThenFailPhaseSuccessPhaseAssembleFinalErrorPhaseRetryableFailurePhasePermanentFailure" +const _PhaseName = "PhaseStartPhasePreLaunchPhaseLaunchPhaseWaitingForResourcesPhaseCheckingSubTaskExecutionsPhaseAssembleFinalOutputPhaseWriteToDiscoveryPhaseWriteToDiscoveryThenFailPhaseSuccessPhaseAssembleFinalErrorPhaseRetryableFailurePhasePermanentFailurePhaseAbortSubTasks" -var _PhaseIndex = [...]uint8{0, 10, 24, 35, 59, 89, 113, 134, 163, 175, 198, 219, 240} +var _PhaseIndex = [...]uint16{0, 10, 24, 35, 59, 89, 113, 134, 163, 175, 198, 219, 240, 258} func (i Phase) String() string { if i >= Phase(len(_PhaseIndex)-1) { @@ -17,7 +17,7 @@ func (i Phase) String() string { return _PhaseName[_PhaseIndex[i]:_PhaseIndex[i+1]] } -var _PhaseValues = []Phase{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11} +var _PhaseValues = []Phase{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12} var _PhaseNameToValueMap = map[string]Phase{ _PhaseName[0:10]: 0, @@ -32,6 +32,7 @@ var _PhaseNameToValueMap = map[string]Phase{ _PhaseName[175:198]: 9, _PhaseName[198:219]: 10, _PhaseName[219:240]: 11, + _PhaseName[240:258]: 12, } // PhaseString retrieves an enum value from the enum constants string name. diff --git a/flyteplugins/go/tasks/plugins/array/k8s/management_test.go b/flyteplugins/go/tasks/plugins/array/k8s/management_test.go index ab26028b09..9404bdfb72 100644 --- a/flyteplugins/go/tasks/plugins/array/k8s/management_test.go +++ b/flyteplugins/go/tasks/plugins/array/k8s/management_test.go @@ -67,6 +67,7 @@ func getMockTaskExecutionContext(ctx context.Context, parallelism int) *mocks.Ta tID := &mocks.TaskExecutionID{} tID.OnGetGeneratedName().Return("notfound") + tID.On("GetUniqueNodeID").Return("an-unique-id") tID.OnGetID().Return(core2.TaskExecutionIdentifier{ TaskId: &core2.Identifier{ ResourceType: core2.ResourceType_TASK, diff --git a/flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go b/flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go index 8ac4d6edc0..77b3ac6501 100644 --- a/flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go +++ b/flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go @@ -187,22 +187,20 @@ var logTemplateRegexes = struct { tasklog.MustCreateRegex("subtaskParentRetryAttempt"), } -func (s SubTaskExecutionID) TemplateVarsByScheme() *tasklog.TemplateVarsByScheme { - return &tasklog.TemplateVarsByScheme{ - TaskExecution: tasklog.TemplateVars{ - {Regex: logTemplateRegexes.ParentName, Value: s.parentName}, - { - Regex: logTemplateRegexes.ExecutionIndex, - Value: strconv.FormatUint(uint64(s.executionIndex), 10), - }, - { - Regex: logTemplateRegexes.RetryAttempt, - Value: strconv.FormatUint(s.subtaskRetryAttempt, 10), - }, - { - Regex: logTemplateRegexes.ParentRetryAttempt, - Value: strconv.FormatUint(uint64(s.taskRetryAttempt), 10), - }, +func (s SubTaskExecutionID) TemplateVarsByScheme() []tasklog.TemplateVar { + return []tasklog.TemplateVar{ + {Regex: logTemplateRegexes.ParentName, Value: s.parentName}, + { + Regex: logTemplateRegexes.ExecutionIndex, + Value: strconv.FormatUint(uint64(s.executionIndex), 10), + }, + { + Regex: logTemplateRegexes.RetryAttempt, + Value: strconv.FormatUint(s.subtaskRetryAttempt, 10), + }, + { + Regex: logTemplateRegexes.ParentRetryAttempt, + Value: strconv.FormatUint(uint64(s.taskRetryAttempt), 10), }, } } diff --git a/flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context_test.go b/flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context_test.go index c3e213e403..103980fab0 100644 --- a/flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context_test.go +++ b/flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context_test.go @@ -36,13 +36,11 @@ func TestSubTaskExecutionContext(t *testing.T) { assert.Equal(t, storage.DataReference("/prefix/"), stCtx.OutputWriter().GetOutputPrefixPath()) assert.Equal(t, storage.DataReference("/raw_prefix/5/1"), stCtx.OutputWriter().GetRawOutputPrefix()) assert.Equal(t, - &tasklog.TemplateVarsByScheme{ - TaskExecution: tasklog.TemplateVars{ - {Regex: logTemplateRegexes.ParentName, Value: "notfound"}, - {Regex: logTemplateRegexes.ExecutionIndex, Value: "0"}, - {Regex: logTemplateRegexes.RetryAttempt, Value: "1"}, - {Regex: logTemplateRegexes.ParentRetryAttempt, Value: "0"}, - }, + []tasklog.TemplateVar{ + {Regex: logTemplateRegexes.ParentName, Value: "notfound"}, + {Regex: logTemplateRegexes.ExecutionIndex, Value: "0"}, + {Regex: logTemplateRegexes.RetryAttempt, Value: "1"}, + {Regex: logTemplateRegexes.ParentRetryAttempt, Value: "0"}, }, stCtx.TaskExecutionMetadata().GetTaskExecutionID().(SubTaskExecutionID).TemplateVarsByScheme(), ) diff --git a/flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go b/flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go index 576740af93..e7d43a2256 100644 --- a/flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go @@ -179,6 +179,7 @@ func dummyDaskTaskContext(taskTemplate *core.TaskTemplate, resources *v1.Resourc }, }) tID.On("GetGeneratedName").Return(testTaskID) + tID.On("GetUniqueNodeID").Return("an-unique-id") taskExecutionMetadata := &mocks.TaskExecutionMetadata{} taskExecutionMetadata.OnGetTaskExecutionID().Return(tID) diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go index bf7d537416..d0e154835c 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go @@ -211,8 +211,9 @@ func TestGetLogsTemplateUri(t *testing.T) { taskCtx := dummyTaskContext() pytorchJobObjectMeta := meta_v1.ObjectMeta{ - Name: "test", - Namespace: "pytorch-namespace", + Name: "test", + Namespace: "pytorch-" + + "namespace", CreationTimestamp: meta_v1.Time{ Time: time.Date(2022, time.January, 1, 12, 0, 0, 0, time.UTC), }, @@ -317,6 +318,8 @@ func dummyTaskContext() pluginsCore.TaskExecutionContext { }, RetryAttempt: 0, }) + tID.OnGetGeneratedName().Return("some-acceptable-name") + tID.On("GetUniqueNodeID").Return("an-unique-id") taskExecutionMetadata := &mocks.TaskExecutionMetadata{} taskExecutionMetadata.OnGetTaskExecutionID().Return(tID) diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go index d009c7c887..29fe47a446 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go @@ -148,6 +148,7 @@ func dummyMPITaskContext(taskTemplate *core.TaskTemplate, resources *corev1.Reso }, }) tID.OnGetGeneratedName().Return("some-acceptable-name") + tID.On("GetUniqueNodeID").Return("an-unique-id") overrides := &mocks.TaskOverrides{} overrides.OnGetResources().Return(resources) diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go index e0606b1020..0700644578 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go @@ -154,6 +154,7 @@ func dummyPytorchTaskContext(taskTemplate *core.TaskTemplate, resources *corev1. }, }) tID.OnGetGeneratedName().Return("some-acceptable-name") + tID.On("GetUniqueNodeID").Return("an-unique-id") overrides := &mocks.TaskOverrides{} overrides.OnGetResources().Return(resources) diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go index c3252183fc..2402bd4c5a 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go @@ -149,6 +149,7 @@ func dummyTensorFlowTaskContext(taskTemplate *core.TaskTemplate, resources *core }, }) tID.OnGetGeneratedName().Return("some-acceptable-name") + tID.On("GetUniqueNodeID").Return("an-unique-id") overrides := &mocks.TaskOverrides{} overrides.OnGetResources().Return(resources) diff --git a/flyteplugins/go/tasks/plugins/k8s/pod/plugin.go b/flyteplugins/go/tasks/plugins/k8s/pod/plugin.go index eae0ac98b7..f72d4eb1d7 100644 --- a/flyteplugins/go/tasks/plugins/k8s/pod/plugin.go +++ b/flyteplugins/go/tasks/plugins/k8s/pod/plugin.go @@ -146,7 +146,7 @@ func (p plugin) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContex return p.GetTaskPhaseWithLogs(ctx, pluginContext, r, logPlugin, " (User)", nil) } -func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.PluginContext, r client.Object, logPlugin tasklog.Plugin, logSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme) (pluginsCore.PhaseInfo, error) { +func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.PluginContext, r client.Object, logPlugin tasklog.Plugin, logSuffix string, extraLogTemplateVarsByScheme []tasklog.TemplateVar) (pluginsCore.PhaseInfo, error) { pluginState := k8s.PluginState{} _, err := pluginContext.PluginStateReader().Get(&pluginState) if err != nil { diff --git a/flyteplugins/go/tasks/plugins/k8s/pod/sidecar_test.go b/flyteplugins/go/tasks/plugins/k8s/pod/sidecar_test.go index 0c728780d9..4977695a1a 100644 --- a/flyteplugins/go/tasks/plugins/k8s/pod/sidecar_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/pod/sidecar_test.go @@ -84,6 +84,7 @@ func dummySidecarTaskMetadata(resources *v1.ResourceRequirements, extendedResour }, }) tID.On("GetGeneratedName").Return("my_project:my_domain:my_name") + tID.On("GetUniqueNodeID").Return("an-unique-id") taskMetadata.On("GetTaskExecutionID").Return(tID) to := &pluginsCoreMock.TaskOverrides{} diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go index f6abb58c93..3f8d678ef5 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go @@ -457,13 +457,13 @@ func getEventInfoForRayJob(logConfig logs.LogConfig, pluginContext k8s.PluginCon taskExecID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID() input := tasklog.Input{ - Namespace: rayJob.Namespace, - TaskExecutionID: taskExecID, - ExtraTemplateVarsByScheme: &tasklog.TemplateVarsByScheme{}, + Namespace: rayJob.Namespace, + TaskExecutionID: taskExecID, + ExtraTemplateVars: []tasklog.TemplateVar{}, } if rayJob.Status.JobId != "" { - input.ExtraTemplateVarsByScheme.Common = append( - input.ExtraTemplateVarsByScheme.Common, + input.ExtraTemplateVars = append( + input.ExtraTemplateVars, tasklog.TemplateVar{ Regex: logTemplateRegexes.RayJobID, Value: rayJob.Status.JobId, @@ -471,8 +471,8 @@ func getEventInfoForRayJob(logConfig logs.LogConfig, pluginContext k8s.PluginCon ) } if rayJob.Status.RayClusterName != "" { - input.ExtraTemplateVarsByScheme.Common = append( - input.ExtraTemplateVarsByScheme.Common, + input.ExtraTemplateVars = append( + input.ExtraTemplateVars, tasklog.TemplateVar{ Regex: logTemplateRegexes.RayClusterName, Value: rayJob.Status.RayClusterName, diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go index b171ae9aa8..beab938768 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go @@ -738,7 +738,6 @@ func TestGetEventInfo_LogTemplates(t *testing.T) { TemplateURIs: []tasklog.TemplateURI{ "http://test/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .taskRetryAttempt }}", }, - Scheme: tasklog.TemplateSchemeTaskExecution, }, expectedTaskLogs: []*core.TaskLog{ { @@ -823,7 +822,6 @@ func TestGetEventInfo_DashboardURL(t *testing.T) { dashboardURLTemplate: tasklog.TemplateLogPlugin{ DisplayName: "Ray Dashboard", TemplateURIs: []tasklog.TemplateURI{"http://test/{{.generatedName}}"}, - Scheme: tasklog.TemplateSchemeTaskExecution, }, expectedTaskLogs: []*core.TaskLog{ { diff --git a/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go b/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go index b07ab0ef33..264f9514e9 100644 --- a/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go @@ -379,6 +379,7 @@ func dummySparkTaskContext(taskTemplate *core.TaskTemplate, interruptible bool) }, }) tID.On("GetGeneratedName").Return("some-acceptable-name") + tID.On("GetUniqueNodeID").Return("an-unique-id") overrides := &mocks.TaskOverrides{} overrides.On("GetResources").Return(&corev1.ResourceRequirements{})