Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
Improve go logging and remove unused code (#413)
Browse files Browse the repository at this point in the history
* Improve go logging
* Remove unused code
  • Loading branch information
Amir Omidi authored Feb 16, 2021
1 parent ce0008c commit 6b43ba6
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 318 deletions.
5 changes: 3 additions & 2 deletions workflow-manager/batchpath/batchpath.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package batchpath

import (
"fmt"
"log"
"sort"
"strconv"
"strings"
"time"

"github.com/rs/zerolog/log"

wftime "github.com/letsencrypt/prio-server/workflow-manager/time"
"github.com/letsencrypt/prio-server/workflow-manager/utils"
)
Expand Down Expand Up @@ -155,7 +156,7 @@ func ReadyBatches(files []string, infix string) (List, error) {
if v.isComplete() {
output = append(output, v)
} else {
log.Printf("ignoring incomplete batch %s", v)
log.Info().Msgf("ignoring incomplete batch %s", v)
}
}
sort.Sort(List(output))
Expand Down
22 changes: 0 additions & 22 deletions workflow-manager/errors/errors.go

This file was deleted.

6 changes: 1 addition & 5 deletions workflow-manager/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ require (
cloud.google.com/go/storage v1.13.0
github.com/aws/aws-sdk-go v1.37.12
github.com/prometheus/client_golang v1.9.0
github.com/rs/zerolog v1.20.0
google.golang.org/api v0.40.0
gopkg.in/retry.v1 v1.0.3
k8s.io/api v0.19.3
k8s.io/apimachinery v0.20.2
k8s.io/client-go v0.19.3
k8s.io/utils v0.0.0-20201015054608-420da100c033 // indirect
)
148 changes: 6 additions & 142 deletions workflow-manager/go.sum

Large diffs are not rendered by default.

109 changes: 0 additions & 109 deletions workflow-manager/kubernetes/kubernetes.go

This file was deleted.

91 changes: 65 additions & 26 deletions workflow-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ package main
import (
"flag"
"fmt"
"log"
"os"
"strings"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"github.com/letsencrypt/prio-server/workflow-manager/batchpath"
"github.com/letsencrypt/prio-server/workflow-manager/monitor"
"github.com/letsencrypt/prio-server/workflow-manager/storage"
Expand Down Expand Up @@ -41,7 +44,6 @@ var peerValidationIdentity = flag.String("peer-validation-identity", "", "Identi
var aggregationPeriod = flag.String("aggregation-period", "3h", "How much time each aggregation covers")
var gracePeriod = flag.String("grace-period", "1h", "Wait this amount of time after the end of an aggregation timeslice to run the aggregation")
var pushGateway = flag.String("push-gateway", "", "Set this to the gateway to use with prometheus. If left empty, workflow-manager will not use prometheus.")
var kubeconfigPath = flag.String("kube-config-path", "", "Path to the kubeconfig file to be used to authenticate to Kubernetes API")
var dryRun = flag.Bool("dry-run", false, "If set, no operations with side effects will be done.")
var taskQueueKind = flag.String("task-queue-kind", "", "Which task queue kind to use.")
var intakeTasksTopic = flag.String("intake-tasks-topic", "", "Name of the topic to which intake-batch tasks should be published")
Expand All @@ -63,7 +65,6 @@ var awsSNSIdentity = flag.String("aws-sns-identity", "", "AWS IAM ARN of the rol
// monitoring things
var (
intakesStarted monitor.GaugeMonitor = &monitor.NoopGauge{}
intakesSkippedDueToAge monitor.GaugeMonitor = &monitor.NoopGauge{}
intakesSkippedDueToMarker monitor.GaugeMonitor = &monitor.NoopGauge{}

aggregationsStarted monitor.GaugeMonitor = &monitor.NoopGauge{}
Expand All @@ -76,29 +77,41 @@ var (

func fail(format string, args ...interface{}) {
workflowManagerLastFailure.SetToCurrentTime()
log.Fatalf(format, args...)
log.Fatal().Msgf(format, args...)
}

func prepareLogger() {
zerolog.LevelFieldName = "severity"
zerolog.TimestampFieldName = "timestamp"
zerolog.TimeFieldFormat = time.RFC3339Nano
}

func main() {
prepareLogger()
startTime := time.Now()
log.Printf("starting %s version %s. Args: %s", os.Args[0], BuildInfo, os.Args[1:])
log.Info().
Str("app", os.Args[0]).
Str("version", BuildInfo).
Str("Args", strings.Join(os.Args[1:], ",")).
Msgf("starting %s version %s. Args: %s", os.Args[0], BuildInfo, os.Args[1:])
flag.Parse()

if *pushGateway != "" {
pusher := push.New(*pushGateway, "workflow-manager").
Gatherer(prometheus.DefaultGatherer).
Grouping("locality", *k8sNS).
Grouping("ingestor", *ingestorLabel)
defer func() {
err := pusher.Push()
if err != nil {
log.Err(err).Msg("error occurred with pushing to prometheus")
}
}()

defer pusher.Push()
intakesStarted = promauto.NewGauge(prometheus.GaugeOpts{
Name: "workflow_manager_intake_tasks_scheduled",
Help: "The number of intake-batch tasks successfully scheduled",
})
intakesSkippedDueToAge = promauto.NewGauge(prometheus.GaugeOpts{
Name: "workflow_manager_intake_tasks_skipped_due_to_age",
Help: "The number of intake-batch tasks not scheduled because the batch was too old",
})
intakesSkippedDueToMarker = promauto.NewGauge(prometheus.GaugeOpts{
Name: "workflow_manager_intake_tasks_skipped_due_to_marker",
Help: "The number of intake-batch tasks not scheduled because a task marker was found",
Expand Down Expand Up @@ -257,7 +270,7 @@ func main() {
}

for _, aggregationID := range aggregationIDs {
scheduleTasks(scheduleTasksConfig{
err = scheduleTasks(scheduleTasksConfig{
aggregationID: aggregationID,
isFirst: *isFirst,
clock: wftime.DefaultClock(),
Expand All @@ -271,13 +284,18 @@ func main() {
gracePeriod: gracePeriodParsed,
})

if err != nil {
log.Err(err).Str("aggregation ID", aggregationID).Msgf("Failed to schedule aggregation tasks: %s", err)
return
}

workflowManagerLastSuccess.SetToCurrentTime()
endTime := time.Now()

workflowManagerRuntime.Set(endTime.Sub(startTime).Seconds())
}

log.Print("done")
log.Info().Msg("done")
}

type scheduleTasksConfig struct {
Expand Down Expand Up @@ -330,7 +348,8 @@ func scheduleTasks(config scheduleTasksConfig) error {
}

aggInterval := wftime.AggregationInterval(config.clock, config.aggregationPeriod, config.gracePeriod)
log.Printf("looking for batches to aggregate in interval %s", aggInterval)

log.Info().Str("aggregation interval", aggInterval.String()).Msgf("looking for batches to aggregate in interval %s", aggInterval)

ownValidationFiles, err := config.ownValidationBucket.ListBatchFiles(config.aggregationID, aggInterval)
if err != nil {
Expand All @@ -343,7 +362,7 @@ func scheduleTasks(config scheduleTasksConfig) error {
return err
}

log.Printf("found %d own validations", len(ownValidationBatches))
log.Info().Int("own validations", len(ownValidationBatches)).Msgf("found %d own validations", len(ownValidationBatches))

peerValidationFiles, err := config.peerValidationBucket.ListBatchFiles(config.aggregationID, aggInterval)
if err != nil {
Expand All @@ -356,7 +375,7 @@ func scheduleTasks(config scheduleTasksConfig) error {
return err
}

log.Printf("found %d peer validations", len(peerValidationBatches))
log.Info().Int("peer validations", len(peerValidationBatches)).Msgf("found %d peer validations", len(peerValidationBatches))

// Take the intersection of the sets of own validations and peer validations
// to get the list of batches we can aggregate.
Expand Down Expand Up @@ -413,7 +432,7 @@ func enqueueAggregationTask(
enqueuer task.Enqueuer,
) error {
if len(readyBatches) == 0 {
log.Printf("no batches to aggregate")
log.Info().Msg("no batches to aggregate")
return nil
}

Expand Down Expand Up @@ -441,23 +460,33 @@ func enqueueAggregationTask(
}

if _, ok := taskMarkers[aggregationTask.Marker()]; ok {
log.Printf("skipped aggregation task due to marker")
log.Info().
Str("aggregation ID", aggregationID).
Msg("skipped aggregation task due to marker")
aggregationsSkippedDueToMarker.Inc()
return nil
}

log.Printf("scheduling aggregation task over interval %s for aggregation ID %s over %d batches",
aggregationWindow, aggregationID, batchCount)
log.Info().
Str("aggregation ID", aggregationID).
Str("aggregation window", aggregationWindow.String()).
Int("batch count", batchCount).
Msg("Scheduling aggregation task")

enqueuer.Enqueue(aggregationTask, func(err error) {
if err != nil {
log.Printf("failed to enqueue aggregation task: %s", err)
log.Err(err).
Str("aggregation ID", aggregationID).
Msgf("failed to enqueue aggregation task: %s", err)
return
}

// Write a marker to cloud storage to ensure we don't schedule redundant
// tasks
if err := ownValidationBucket.WriteTaskMarker(aggregationTask.Marker()); err != nil {
log.Printf("failed to write aggregation task marker: %s", err)
log.Err(err).
Str("aggregation ID", aggregationID).
Msgf("failed to write aggregation task marker: %s", err)
}

aggregationsStarted.Inc()
Expand Down Expand Up @@ -488,26 +517,36 @@ func enqueueIntakeTasks(
continue
}

log.Printf("scheduling intake task for batch %s", batch)
log.Info().
Str("aggregation ID", batch.AggregationID).
Str("batch", batch.String()).
Msg("scheduling intake task for batch")

scheduled++
enqueuer.Enqueue(intakeTask, func(err error) {
if err != nil {
log.Printf("failed to enqueue intake task: %s", err)
log.Err(err).
Str("aggregation ID", batch.AggregationID).
Msg("failed to enqueue intake task")
return
}
// Write a marker to cloud storage to ensure we don't schedule
// redundant tasks
if err := ownValidationBucket.WriteTaskMarker(intakeTask.Marker()); err != nil {
log.Printf("failed to write intake task marker: %s", err)
log.Err(err).
Str("aggregation ID", batch.AggregationID).
Msg("failed to write intake task marker")
return
}

intakesStarted.Inc()
})
}

log.Printf("skipped %d batches with existing tasks. Scheduled %d new intake tasks.",
skippedDueToMarker, scheduled)
log.Info().
Int("skipped batches", skippedDueToMarker).
Int("scheduled batches", scheduled).
Msg("skipped and scheduled intake tasks")

return nil
}
Loading

0 comments on commit 6b43ba6

Please sign in to comment.