diff --git a/CHANGELOG.md b/CHANGELOG.md index a97e606..0c75b89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +## [v0.1.2] +- bug fixes + ## [v0.1.1] - Inital relase - Removed integration tests and fixed travis [#2](https://github.com/xmidt-org/caduceator/pull/2) diff --git a/caduceator.yaml b/caduceator.yaml index c5cfe97..3b5e5ba 100644 --- a/caduceator.yaml +++ b/caduceator.yaml @@ -276,7 +276,7 @@ webhook: # Authorization Basic {basic} # # (Optional) - basic: "Basic dXNlcjpwYXNz" + basic: "" # jwt provides a way to use Bearer Authorization when registering to a # webhook. If the below values are all provided, a request is made to the @@ -301,4 +301,4 @@ webhook: # buffer is the length of time before a token expires to get a new token. # (Optional) - buffer: "5s" + buffer: "1m" diff --git a/deploy/docker-compose/docFiles/caduceator.yaml b/deploy/docker-compose/docFiles/caduceator.yaml index c5cfe97..5b454b9 100644 --- a/deploy/docker-compose/docFiles/caduceator.yaml +++ b/deploy/docker-compose/docFiles/caduceator.yaml @@ -162,6 +162,8 @@ vegetaConfig: postURL: "http://caduceus:6000/api/v3/notify" + sleepTime: "3s" + ######################################## # Prometheus Related Configuration ######################################## @@ -214,11 +216,11 @@ webhook: # registrationInterval provides the time to wait between each registration to # the webhook. If this is set to 0, no registration is done. # (Optional) - registrationInterval: "" + registrationInterval: "4m" # timeout provides the length of time the request should wait before timing - # out (in seconds) - timeout: 5 + # out + timeout: "1m" # registrationURL provides the place to register the webhook. registrationURL: "http://caduceus:6000/hook" diff --git a/main.go b/main.go index d2babda..44c9384 100644 --- a/main.go +++ b/main.go @@ -66,6 +66,7 @@ type VegetaConfig struct { Duration time.Duration MaxRoutines int PostURL string + SleepTime time.Duration } type Request struct { @@ -81,8 +82,8 @@ type WebhookConfig struct { } type Webhook struct { - RegistrationInterval string - Timeout int + RegistrationInterval time.Duration + Timeout time.Duration RegistrationURL string Request Request Basic string @@ -95,9 +96,10 @@ type Secret struct { } type JWT struct { - AuthURL string - Timeout string - Buffer string + RequestHeaders map[string]string + AuthURL string + Timeout time.Duration + Buffer time.Duration } type PrometheusConfig struct { @@ -106,8 +108,25 @@ type PrometheusConfig struct { MetricsURL string } +func vegetaStarter(metrics vegeta.Metrics, config *Config, attacker *vegeta.Attacker, acquirer *acquire.RemoteBearerTokenAcquirer, logger log.Logger) { + rate := vegeta.Rate{Freq: config.VegetaConfig.Frequency, Per: time.Second} + duration := config.VegetaConfig.Duration * time.Minute + for res := range attacker.Attack(Start(0, acquirer, logger, config.VegetaConfig.PostURL), rate, duration, "Big Bang!") { + metrics.Add(res) + } + + metricsReporter := vegeta.NewTextReporter(&metrics) + + err := metricsReporter.Report(os.Stdout) + + if err != nil { + logging.Error(logger).Log(logging.MessageKey(), "vegeta failed", logging.ErrorKey(), err.Error()) + os.Exit(1) + } +} + // Start function is used to send events to Caduceus -func Start(id uint64, acquirer *acquire.FixedValueAcquirer, logger log.Logger, requestURL string) vegeta.Targeter { +func Start(id uint64, acquirer *acquire.RemoteBearerTokenAcquirer, logger log.Logger, requestURL string) vegeta.Targeter { return func(target *vegeta.Target) (err error) { @@ -133,14 +152,13 @@ func Start(id uint64, acquirer *acquire.FixedValueAcquirer, logger log.Logger, r if err := encoder.Encode(&message); err != nil { logging.Error(logger).Log(logging.MessageKey(), "failed to encode payload", logging.ErrorKey(), err.Error()) - return err } req, err := http.NewRequest("POST", requestURL, &buffer) - if err != nil { logging.Error(logger).Log(logging.MessageKey(), "failed to create new request", logging.ErrorKey(), err.Error()) return err + } req.Header.Add("Content-type", "application/msgpack") @@ -148,6 +166,7 @@ func Start(id uint64, acquirer *acquire.FixedValueAcquirer, logger log.Logger, r if err != nil { logging.Error(logger).Log(logging.MessageKey(), "failed to acquire", logging.ErrorKey(), err.Error()) return err + } req.Header.Add("Authorization", authValue) @@ -155,10 +174,10 @@ func Start(id uint64, acquirer *acquire.FixedValueAcquirer, logger log.Logger, r resp, err := http.DefaultClient.Do(req) if err != nil { - logging.Error(logger).Log(logging.MessageKey(), "failed while making HTTP request", logging.ErrorKey(), err.Error()) + logging.Error(logger).Log(logging.MessageKey(), "failed while making HTTP request: ", logging.ErrorKey(), err.Error()) return err } - resp.Body.Close() + defer resp.Body.Close() return err } @@ -172,8 +191,18 @@ func main() { logger, metricsRegistry, caduceator, err = server.Initialize(applicationName, os.Args, f, v, basculechecks.Metrics, basculemetrics.Metrics, Metrics) ) + if err != nil { + logging.Error(logger).Log(logging.MessageKey(), "failed to initialize", logging.ErrorKey(), err.Error()) + } + config := new(Config) - v.Unmarshal(config) + err = v.Unmarshal(config) + if err != nil { + logging.Error(logger).Log(logging.MessageKey(), "failed to unmarshal config", logging.ErrorKey(), err.Error()) + } + + logging.Info(logger).Log(logging.MessageKey(), "vegeta frequency") + logging.Info(logger).Log(logging.MessageKey(), config.VegetaConfig.Frequency) // use constant secret for hash secretGetter := secretGetter.NewConstantSecret(config.Webhook.Request.WebhookConfig.Secret) @@ -194,7 +223,7 @@ func main() { // set up the registerer basicConfig := webhookClient.BasicConfig{ - Timeout: 5 * time.Second, + Timeout: config.Webhook.Timeout, RegistrationURL: config.Webhook.RegistrationURL, Request: webhook.W{ Config: webhook.Config{ @@ -205,9 +234,18 @@ func main() { }, } - acquirer, err := acquire.NewFixedAuthAcquirer(config.Webhook.Basic) + acquireConfig := acquire.RemoteBearerTokenAcquirerOptions{ + AuthURL: config.Webhook.JWT.AuthURL, + Timeout: config.Webhook.JWT.Timeout, + Buffer: config.Webhook.JWT.Buffer, + RequestHeaders: config.Webhook.JWT.RequestHeaders, + } + + acquirer, err := acquire.NewRemoteBearerTokenAcquirer(acquireConfig) + + // acquirer, err = acquire.NewFixedAuthAcquirer(config.Webhook.Basic) if err != nil { - logging.Error(logger).Log(logging.MessageKey(), "failed to create basic auth plain text acquirer:", logging.ErrorKey(), err.Error()) + logging.Error(logger).Log(logging.MessageKey(), "failed to create bearer auth plain text acquirer:", logging.ErrorKey(), err.Error()) os.Exit(1) } @@ -216,7 +254,8 @@ func main() { logging.Error(logger).Log(logging.MessageKey(), "failed to setup registerer", logging.ErrorKey(), err.Error()) os.Exit(1) } - periodicRegisterer := webhookClient.NewPeriodicRegisterer(registerer, 4*time.Minute, logger) + + periodicRegisterer := webhookClient.NewPeriodicRegisterer(registerer, config.Webhook.RegistrationInterval, logger) // start the registerer periodicRegisterer.Start() @@ -239,6 +278,7 @@ func main() { queryURL: config.PrometheusConfig.QueryURL, queryExpression: config.PrometheusConfig.QueryExpression, metricsURL: config.PrometheusConfig.MetricsURL, + sleepTime: config.VegetaConfig.SleepTime, } // start listening @@ -253,24 +293,26 @@ func main() { waitGroup, shutdown, err := concurrent.Execute(runnable) if err != nil { logging.Error(logger).Log(logging.MessageKey(), "failed to execute additional process", logging.ErrorKey(), err.Error()) + os.Exit(1) } // send events to Caduceus using vegeta var metrics vegeta.Metrics - rate := vegeta.Rate{Freq: config.VegetaConfig.Frequency, Per: time.Second} - duration := config.VegetaConfig.Duration * time.Minute - - for res := range attacker.Attack(Start(0, acquirer, logger, config.VegetaConfig.PostURL), rate, duration, "Big Bang!") { - metrics.Add(res) - } + go vegetaStarter(metrics, config, attacker, acquirer, logger) + // rate := vegeta.Rate{Freq: config.VegetaConfig.Frequency, Per: time.Second} + // duration := config.VegetaConfig.Duration * time.Minute + // for res := range attacker.Attack(Start(0, acquirer, logger, config.VegetaConfig.PostURL), rate, duration, "Big Bang!") { + // metrics.Add(res) + // } - metricsReporter := vegeta.NewTextReporter(&metrics) + // metricsReporter := vegeta.NewTextReporter(&metrics) - err = metricsReporter.Report(os.Stdout) + // err = metricsReporter.Report(os.Stdout) - if err != nil { - logging.Error(logger).Log(logging.MessageKey(), "vegeta failed", logging.ErrorKey(), err.Error()) - } + // if err != nil { + // logging.Error(logger).Log(logging.MessageKey(), "vegeta failed", logging.ErrorKey(), err.Error()) + // os.Exit(1) + // } signals := make(chan os.Signal, 10) signal.Notify(signals) @@ -290,6 +332,7 @@ func main() { } metrics.Close() + periodicRegisterer.Stop() close(shutdown) waitGroup.Wait() logging.Info(logger).Log(logging.MessageKey(), "Caduceator has shut down") diff --git a/primaryHandler.go b/primaryHandler.go index 9789869..f24a485 100644 --- a/primaryHandler.go +++ b/primaryHandler.go @@ -48,6 +48,7 @@ type App struct { queryURL string queryExpression string metricsURL string + sleepTime time.Duration } const ( @@ -88,6 +89,7 @@ func (app *App) receiveEvents(writer http.ResponseWriter, req *http.Request) { writer.WriteHeader(http.StatusBadRequest) return } + time.Sleep(app.sleepTime) writer.WriteHeader(http.StatusAccepted) } diff --git a/queueTimer.go b/queueTimer.go index 7e11a64..e885c6e 100644 --- a/queueTimer.go +++ b/queueTimer.go @@ -54,11 +54,12 @@ func (app *App) calculateDuration(cutoffTime time.Time) { Loop: for { + currentTime := time.Now() + encodedQuery := &url.URL{Path: app.queryExpression} res, err := http.Get(app.queryURL + "?query=" + encodedQuery.String()) - currentTime := time.Now() if err != nil { logging.Error(app.logger).Log(logging.MessageKey(), "failed to query prometheus", logging.ErrorKey(), err.Error()) } else {