Skip to content

Commit

Permalink
[core][executor] fine-tuning operator logs
Browse files Browse the repository at this point in the history
A bunch of OPS logs was moved to SUPPORT or DEVEL, while some of the OPS logs were made simpler.

OCTRL-978
  • Loading branch information
knopers8 committed Jan 31, 2025
1 parent a699776 commit 5126853
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ INSTALL_WHAT:=$(patsubst %, install_%, $(WHAT))

GENERATE_DIRS := ./apricot ./coconut/cmd ./common ./common/runtype ./common/system ./core ./core/integration/ccdb ./core/integration/dcs ./core/integration/ddsched ./core/integration/kafka ./core/integration/odc ./executor ./walnut ./core/integration/trg ./core/integration/bookkeeping
SRC_DIRS := ./apricot ./cmd/* ./core ./coconut ./executor ./common ./configuration ./occ/peanut ./walnut
TEST_DIRS := ./apricot/local ./common/gera ./common/utils/safeacks ./configuration/cfgbackend ./configuration/componentcfg ./configuration/template ./core/task/sm ./core/workflow ./core/integration/odc/fairmq ./core/integration ./core/environment
TEST_DIRS := ./apricot/local ./common/gera ./common/utils ./common/utils/safeacks ./configuration/cfgbackend ./configuration/componentcfg ./configuration/template ./core/task/sm ./core/workflow ./core/integration/odc/fairmq ./core/integration ./core/environment
GO_TEST_DIRS := ./core/repos ./core/integration/dcs ./common/monitoring

coverage:COVERAGE_PREFIX := ./coverage_results
Expand Down
28 changes: 28 additions & 0 deletions common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,31 @@ func TruncateString(str string, length int) string {

return string([]rune(str)[:length])
}

var taskClassNameRgx = regexp.MustCompile(`(?:.*/)?([^/@]+)@`) // Captures the segment between the last '/' and '@'

// ExtractTaskClassName extracts the task class name from the provided task name string
// For example, we extract "readout" from the following string:
// "alio2-cr1-hv-gw01.cern.ch:/opt/git/ControlWorkflows/tasks/readout@12b11ac4bb652e1835e3e94806a688c951691d5f#2sP21PjpfCQ"
func ExtractTaskClassName(taskName string) (string, error) {

matches := taskClassNameRgx.FindStringSubmatch(taskName)
if len(matches) < 2 {
return "", fmt.Errorf("failed to extract task class name from '%s'", taskName)
}

return matches[1], nil
}

// TrimJitPrefix removes the JIT prefix from task class names.
// For example, "jit-ad6f2b64b7502198430d7d7f93f15bf94c088cab-qc-pp-TPC-CalibQC_long" becomes "qc-pp-TPC-CalibQC_long".
// If input does not contain a JIT prefix, it is returned as it is.
func TrimJitPrefix(taskClassName string) string {
if strings.HasPrefix(taskClassName, "jit-") {
parts := strings.SplitN(taskClassName, "-", 3)
if len(parts) > 2 {
return parts[2]
}
}
return taskClassName
}
2 changes: 1 addition & 1 deletion core/environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,7 @@ func (env *Environment) subscribeToWfState(taskman *task.Manager) {
time.AfterFunc(500*time.Millisecond, func() { // wait 0.5s for any other tasks to go to ERROR/INACTIVE
log.WithField("partition", env.id).
WithField("level", infologger.IL_Ops).
Warn("one of the critical tasks went into ERROR state, transitioning the environment into ERROR")
Error("one of the critical tasks went into ERROR state, transitioning the environment into ERROR")
err := env.TryTransition(NewGoErrorTransition(taskman))
if err != nil {
if env.Sm.Current() == "ERROR" {
Expand Down
14 changes: 13 additions & 1 deletion core/environment/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,10 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})

log.WithFields(logrus.Fields{
"partition": environmentId.String(),
infologger.Level: infologger.IL_Ops,
}).Info("environment teardown complete")
return err
}

Expand Down Expand Up @@ -935,7 +939,7 @@ func (envs *Manager) Environment(environmentId uid.ID) (env *Environment, err er

func (envs *Manager) environment(environmentId uid.ID) (env *Environment, err error) {
if len(environmentId) == 0 { // invalid id
return nil, fmt.Errorf("invalid id: %s", environmentId)
return nil, fmt.Errorf("empty env ID")
}
envs.mu.RLock()
defer envs.mu.RUnlock()
Expand Down Expand Up @@ -1050,6 +1054,7 @@ func (envs *Manager) handleDeviceEvent(evt event.DeviceEvent) {
env, err := envs.environment(t.GetEnvironmentId())
if err != nil {
log.WithPrefix("scheduler").
WithField(infologger.Level, infologger.IL_Devel).
WithError(err).
Error("cannot find environment for DeviceEvent")
}
Expand All @@ -1060,11 +1065,13 @@ func (envs *Manager) handleDeviceEvent(evt event.DeviceEvent) {
} else {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithField(infologger.Level, infologger.IL_Devel).
Error("DeviceEvent BASIC_TASK_TERMINATED received for task with no parent role")
}
} else {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithField(infologger.Level, infologger.IL_Devel).
Debug("cannot find task for DeviceEvent BASIC_TASK_TERMINATED")
}

Expand All @@ -1084,6 +1091,7 @@ func (envs *Manager) handleDeviceEvent(evt event.DeviceEvent) {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithField("taskId", taskId.Value).
WithField(infologger.Level, infologger.IL_Devel).
Debug("cannot find task for DeviceEvent END_OF_STREAM")
return
}
Expand All @@ -1092,12 +1100,14 @@ func (envs *Manager) handleDeviceEvent(evt event.DeviceEvent) {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithField("taskId", taskId.Value).
WithField(infologger.Level, infologger.IL_Devel).
WithError(err).
Error("cannot find environment for DeviceEvent")
} else {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithField("taskId", taskId.Value).
WithField("role", t.GetParent().GetName()).
WithField("envState", env.CurrentState()).
Debug("received END_OF_STREAM event from task, trying to stop the run")
if env.CurrentState() == "RUNNING" {
Expand Down Expand Up @@ -1132,6 +1142,7 @@ func (envs *Manager) handleDeviceEvent(evt event.DeviceEvent) {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithField("taskId", taskId.Value).
WithField(infologger.Level, infologger.IL_Devel).
WithError(err).
Error("cannot find environment for DeviceEvent")
} else {
Expand All @@ -1140,6 +1151,7 @@ func (envs *Manager) handleDeviceEvent(evt event.DeviceEvent) {
WithField("taskId", taskId.Value).
WithField("taskRole", t.GetParentRolePath()).
WithField("envState", env.CurrentState()).
WithField(infologger.Level, infologger.IL_Support).
Debug("received TASK_INTERNAL_ERROR event from task, trying to stop the run")
if env.CurrentState() == "RUNNING" {
go func() {
Expand Down
2 changes: 2 additions & 0 deletions core/environment/transition_startactivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (t StartActivityTransition) do(env *Environment) (err error) {

log.WithField(infologger.Run, runNumber).
WithField("partition", env.Id().String()).
WithField(infologger.Level, infologger.IL_Support).
Info("starting new run")

cleanupCount := 0
Expand Down Expand Up @@ -123,6 +124,7 @@ func (t StartActivityTransition) do(env *Environment) (err error) {

log.WithField(infologger.Run, env.currentRunNumber).
WithField("partition", env.Id().String()).
WithField(infologger.Level, infologger.IL_Support).
Info("run started")
env.sendEnvironmentEvent(&event.EnvironmentEvent{
EnvironmentID: env.Id().String(),
Expand Down
6 changes: 6 additions & 0 deletions core/environment/transition_stopactivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (t StopActivityTransition) do(env *Environment) (err error) {

log.WithField(infologger.Run, env.currentRunNumber).
WithField("partition", env.Id().String()).
WithField(infologger.Level, infologger.IL_Support).
Info("stopping run")

args := controlcommands.PropertyMap{}
Expand Down Expand Up @@ -89,5 +90,10 @@ func (t StopActivityTransition) do(env *Environment) (err error) {
}
env.sendEnvironmentEvent(&event.EnvironmentEvent{EnvironmentID: env.Id().String(), State: "CONFIGURED"})

log.WithField(infologger.Run, env.currentRunNumber).
WithField("partition", env.Id().String()).
WithField(infologger.Level, infologger.IL_Support).
Info("run stopped")

return
}
2 changes: 2 additions & 0 deletions core/integration/odc/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
} else {
log.WithField("partition", envId).
WithField("call", "PartitionInitialize").
WithField(infologger.Level, infologger.IL_Support).
Info("odc_extract_topology_resources is set to true, plugin and resources will not be included in the ODC Run request")
}

Expand Down Expand Up @@ -1264,6 +1265,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithField("partition", envId).
WithField("call", "Configure").
WithField("runType", runType).
WithField(infologger.Level, infologger.IL_Support).
Infof("overriding run start time (orbit-reset-time) to %s for SYNTHETIC run", pdpOverrideRunStartTime)
} else {
log.WithField("partition", envId).
Expand Down
22 changes: 12 additions & 10 deletions executor/executable/basictaskcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"bytes"
"encoding/json"
"errors"
"github.com/AliceO2Group/Control/common/utils"
"io"
"os/exec"
"syscall"
Expand Down Expand Up @@ -178,19 +179,20 @@ func (t *basicTaskBase) startBasicTask() (err error) {
}

if err != nil {
taskClassName, _ := utils.ExtractTaskClassName(t.ti.Name)
log.WithField("partition", t.knownEnvironmentId.String()).
WithField("detector", t.knownDetector).
WithField("level", infologger.IL_Ops).
Errorf("task '%s' terminated with error: %s", taskClassName, err.Error())
log.WithField("partition", t.knownEnvironmentId.String()).
WithFields(logrus.Fields{
"id": t.ti.TaskID.Value,
"task": t.ti.Name,
"error": err.Error(),
"level": infologger.IL_Devel,
"id": t.ti.TaskID.Value,
"task": t.ti.Name,
"command": tciCommandStr,
"error": err.Error(),
"level": infologger.IL_Devel,
}).
Error("task terminated with error")
log.WithField("partition", t.knownEnvironmentId.String()).
WithField("level", infologger.IL_Ops).
Errorf("task terminated with error: %s %s",
tciCommandStr,
err.Error())
Error("task terminated with error (details)")
pendingState = mesos.TASK_FAILED
}

Expand Down
13 changes: 6 additions & 7 deletions executor/executable/controllabletask.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,11 @@ func (t *ControllableTask) Launch() error {

pendingState := mesos.TASK_FINISHED
if err != nil {
taskClassName, _ := utils.ExtractTaskClassName(t.ti.Name)
log.WithField("partition", t.knownEnvironmentId.String()).
WithField("detector", t.knownDetector).
WithField("level", infologger.IL_Ops).
Errorf("task '%s' terminated with error: %s", utils.TrimJitPrefix(taskClassName), err.Error())
log.WithField("partition", t.knownEnvironmentId.String()).
WithField("detector", t.knownDetector).
WithFields(logrus.Fields{
Expand All @@ -582,13 +587,7 @@ func (t *ControllableTask) Launch() error {
"error": err.Error(),
"level": infologger.IL_Devel,
}).
Error("task terminated with error")
log.WithField("partition", t.knownEnvironmentId.String()).
WithField("detector", t.knownDetector).
WithField("level", infologger.IL_Ops).
Errorf("task terminated with error: %s %s",
truncatedCmd,
err.Error())
Error("task terminated with error (details):")
pendingState = mesos.TASK_FAILED
}

Expand Down

0 comments on commit 5126853

Please sign in to comment.