Skip to content

Commit

Permalink
Dynamic log links (#4774)
Browse files Browse the repository at this point in the history
  • Loading branch information
eapolinario authored Jan 29, 2024
1 parent fb9ffd5 commit 460f0f1
Show file tree
Hide file tree
Showing 25 changed files with 472 additions and 462 deletions.
3 changes: 1 addition & 2 deletions flyteplugins/go/tasks/logs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ type LogConfig struct {
StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"`
StackDriverTemplateURI tasklog.TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"`

IsFlyinEnabled bool `json:"flyin-enabled" pflag:",Enable Log-links to flyin logs"`
FlyinTemplateURI tasklog.TemplateURI `json:"flyin-template-uri" pflag:",Template Uri to use when building flyin log links"`
DynamicLogLinks map[string]tasklog.TemplateLogPlugin `json:"dynamic-log-links" pflag:"-,Map of dynamic log links"`

Templates []tasklog.TemplateLogPlugin `json:"templates" pflag:"-,"`
}
Expand Down
2 changes: 0 additions & 2 deletions flyteplugins/go/tasks/logs/logconfig_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 0 additions & 28 deletions flyteplugins/go/tasks/logs/logconfig_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 34 additions & 29 deletions flyteplugins/go/tasks/logs/logging_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

// Internal
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme, taskTemplate *core.TaskTemplate) ([]*core.TaskLog, error) {
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVars []tasklog.TemplateVar, taskTemplate *core.TaskTemplate) ([]*core.TaskLog, error) {
if logPlugin == nil {
return nil, nil
}
Expand All @@ -39,19 +39,19 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas

logs, err := logPlugin.GetTaskLogs(
tasklog.Input{
PodName: pod.Name,
PodUID: string(pod.GetUID()),
Namespace: pod.Namespace,
ContainerName: pod.Spec.Containers[index].Name,
ContainerID: pod.Status.ContainerStatuses[index].ContainerID,
LogName: nameSuffix,
PodRFC3339StartTime: time.Unix(startTime, 0).Format(time.RFC3339),
PodRFC3339FinishTime: time.Unix(finishTime, 0).Format(time.RFC3339),
PodUnixStartTime: startTime,
PodUnixFinishTime: finishTime,
TaskExecutionID: taskExecID,
ExtraTemplateVarsByScheme: extraLogTemplateVarsByScheme,
TaskTemplate: taskTemplate,
PodName: pod.Name,
PodUID: string(pod.GetUID()),
Namespace: pod.Namespace,
ContainerName: pod.Spec.Containers[index].Name,
ContainerID: pod.Status.ContainerStatuses[index].ContainerID,
LogName: nameSuffix,
PodRFC3339StartTime: time.Unix(startTime, 0).Format(time.RFC3339),
PodRFC3339FinishTime: time.Unix(finishTime, 0).Format(time.RFC3339),
PodUnixStartTime: startTime,
PodUnixFinishTime: finishTime,
TaskExecutionID: taskExecID,
ExtraTemplateVars: extraLogTemplateVars,
TaskTemplate: taskTemplate,
},
)

Expand All @@ -63,13 +63,14 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas
}

type templateLogPluginCollection struct {
plugins []tasklog.TemplateLogPlugin
plugins []tasklog.TemplateLogPlugin
dynamicPlugins []tasklog.TemplateLogPlugin
}

func (t templateLogPluginCollection) GetTaskLogs(input tasklog.Input) (tasklog.Output, error) {
var taskLogs []*core.TaskLog

for _, plugin := range t.plugins {
for _, plugin := range append(t.plugins, t.dynamicPlugins...) {
o, err := plugin.GetTaskLogs(input)
if err != nil {
return tasklog.Output{}, err
Expand All @@ -84,39 +85,43 @@ func (t templateLogPluginCollection) GetTaskLogs(input tasklog.Input) (tasklog.O
func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) {
// Use a list to maintain order.
var plugins []tasklog.TemplateLogPlugin
var dynamicPlugins []tasklog.TemplateLogPlugin

if cfg.IsKubernetesEnabled {
if len(cfg.KubernetesTemplateURI) > 0 {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.KubernetesTemplateURI}, MessageFormat: core.TaskLog_JSON})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", TemplateURIs: []tasklog.TemplateURI{cfg.KubernetesTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, MessageFormat: core.TaskLog_JSON})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, MessageFormat: core.TaskLog_JSON})
}
}

if cfg.IsCloudwatchEnabled {
if len(cfg.CloudwatchTemplateURI) > 0 {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.CloudwatchTemplateURI}, MessageFormat: core.TaskLog_JSON})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", TemplateURIs: []tasklog.TemplateURI{cfg.CloudwatchTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, MessageFormat: core.TaskLog_JSON})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, MessageFormat: core.TaskLog_JSON})
}
}

if cfg.IsStackDriverEnabled {
if len(cfg.StackDriverTemplateURI) > 0 {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.StackDriverTemplateURI}, MessageFormat: core.TaskLog_JSON})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", TemplateURIs: []tasklog.TemplateURI{cfg.StackDriverTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, MessageFormat: core.TaskLog_JSON})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, MessageFormat: core.TaskLog_JSON})
}
}

if cfg.IsFlyinEnabled {
if len(cfg.FlyinTemplateURI) > 0 {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Flyin Logs", Scheme: tasklog.TemplateSchemeFlyin, TemplateURIs: []tasklog.TemplateURI{cfg.FlyinTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Flyin Logs", Scheme: tasklog.TemplateSchemeFlyin, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://flyin.%s/logs/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", cfg.GCPProjectName)}, MessageFormat: core.TaskLog_JSON})
}
for logLinkType, dynamicLogLink := range cfg.DynamicLogLinks {
dynamicPlugins = append(
dynamicPlugins,
tasklog.TemplateLogPlugin{
Name: logLinkType,
DisplayName: dynamicLogLink.DisplayName,
DynamicTemplateURIs: dynamicLogLink.TemplateURIs,
MessageFormat: core.TaskLog_JSON,
})
}

plugins = append(plugins, cfg.Templates...)
return templateLogPluginCollection{plugins: plugins}, nil
return templateLogPluginCollection{plugins: plugins, dynamicPlugins: dynamicPlugins}, nil
}
175 changes: 154 additions & 21 deletions flyteplugins/go/tasks/logs/logging_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ func TestGetLogsForContainerInPod_Templates(t *testing.T) {
"https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .taskRetryAttempt }}/view/logs",
},
MessageFormat: core.TaskLog_JSON,
Scheme: tasklog.TemplateSchemeTaskExecution,
},
},
}, nil, []*core.TaskLog{
Expand All @@ -351,30 +350,164 @@ func TestGetLogsForContainerInPod_Templates(t *testing.T) {
})
}

func TestGetLogsForContainerInPod_Flyin(t *testing.T) {
assertTestSucceeded(t,
&LogConfig{
IsKubernetesEnabled: true,
KubernetesTemplateURI: "https://k8s.com",
IsFlyinEnabled: true,
FlyinTemplateURI: "https://flyin.mydomain.com:{{ .port }}/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}",
func TestGetLogsForContainerInPod_Flyteinteractive(t *testing.T) {
tests := []struct {
name string
config *LogConfig
template *core.TaskTemplate
expectedTaskLogs []*core.TaskLog
}{
{
"Flyteinteractive enabled but no task template",
&LogConfig{
DynamicLogLinks: map[string]tasklog.TemplateLogPlugin{
"vscode": tasklog.TemplateLogPlugin{
DisplayName: "vscode link",
TemplateURIs: []tasklog.TemplateURI{
"https://flyteinteractive.mydomain.com:{{ .taskConfig.port }}/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}",
},
},
},
},
nil,
nil,
},
&core.TaskTemplate{
Config: map[string]string{
"link_type": "vscode",
"port": "65535",
{
"Flyteinteractive enabled but config not found in task template",
&LogConfig{
DynamicLogLinks: map[string]tasklog.TemplateLogPlugin{
"vscode": tasklog.TemplateLogPlugin{
DisplayName: "vscode link",
TemplateURIs: []tasklog.TemplateURI{
"https://flyteinteractive.mydomain.com:{{ .taskConfig.port }}/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}",
},
},
},
},
&core.TaskTemplate{},
nil,
},
[]*core.TaskLog{
{
Uri: "https://k8s.com",
MessageFormat: core.TaskLog_JSON,
Name: "Kubernetes Logs my-Suffix",
{
"Flyteinteractive disabled but config present in TaskTemplate",
&LogConfig{},
&core.TaskTemplate{
Config: map[string]string{
"link_type": "vscode",
"port": "65535",
},
},
{
Uri: "https://flyin.mydomain.com:65535/my-namespace/my-pod/ContainerName/ContainerID",
MessageFormat: core.TaskLog_JSON,
Name: "Flyin Logs my-Suffix",
nil,
},
{
"Flyteinteractive - multiple dynamic options",
&LogConfig{
DynamicLogLinks: map[string]tasklog.TemplateLogPlugin{
"vscode": tasklog.TemplateLogPlugin{
DisplayName: "vscode link",
TemplateURIs: []tasklog.TemplateURI{
"https://abc.com:{{ .taskConfig.port }}/{{ .taskConfig.route }}",
},
},
},
},
&core.TaskTemplate{
Config: map[string]string{
"link_type": "vscode",
"port": "65535",
"route": "a-route",
},
},
[]*core.TaskLog{
{
Uri: "https://abc.com:65535/a-route",
MessageFormat: core.TaskLog_JSON,
Name: "vscode link my-Suffix",
},
},
},
{
"Flyteinteractive - multiple uses of the template (invalid use of ports in a URI)",
&LogConfig{
DynamicLogLinks: map[string]tasklog.TemplateLogPlugin{
"vscode": tasklog.TemplateLogPlugin{
DisplayName: "vscode link",
TemplateURIs: []tasklog.TemplateURI{
"https://abc.com:{{ .taskConfig.port }}:{{ .taskConfig.port}}",
},
},
},
},
&core.TaskTemplate{
Config: map[string]string{
"link_type": "vscode",
"port": "65535",
},
},
[]*core.TaskLog{
{
Uri: "https://abc.com:65535:65535",
MessageFormat: core.TaskLog_JSON,
Name: "vscode link my-Suffix",
},
},
},
{
"Flyteinteractive disabled and K8s enabled and flyteinteractive config present in TaskTemplate",
&LogConfig{
IsKubernetesEnabled: true,
KubernetesTemplateURI: "https://k8s.com/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}",
},
&core.TaskTemplate{
Config: map[string]string{
"link_type": "vscode",
"port": "65535",
},
},
[]*core.TaskLog{
{
Uri: "https://k8s.com/my-namespace/my-pod/ContainerName/ContainerID",
MessageFormat: core.TaskLog_JSON,
Name: "Kubernetes Logs my-Suffix",
},
},
},
{
"Flyteinteractive and K8s enabled",
&LogConfig{
IsKubernetesEnabled: true,
KubernetesTemplateURI: "https://k8s.com/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}",
DynamicLogLinks: map[string]tasklog.TemplateLogPlugin{
"vscode": tasklog.TemplateLogPlugin{
DisplayName: "vscode link",
TemplateURIs: []tasklog.TemplateURI{
"https://flyteinteractive.mydomain.com:{{ .taskConfig.port }}/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}",
},
},
},
},
&core.TaskTemplate{
Config: map[string]string{
"link_type": "vscode",
"port": "65535",
},
},
[]*core.TaskLog{
{
Uri: "https://k8s.com/my-namespace/my-pod/ContainerName/ContainerID",
MessageFormat: core.TaskLog_JSON,
Name: "Kubernetes Logs my-Suffix",
},
{
Uri: "https://flyteinteractive.mydomain.com:65535/my-namespace/my-pod/ContainerName/ContainerID",
MessageFormat: core.TaskLog_JSON,
Name: "vscode link my-Suffix",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assertTestSucceeded(t, tt.config, tt.template, tt.expectedTaskLogs)
})
}
}
7 changes: 4 additions & 3 deletions flyteplugins/go/tasks/pluginmachinery/core/phase_enumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 460f0f1

Please sign in to comment.