Skip to content

Commit

Permalink
Performance test (#4)
Browse files Browse the repository at this point in the history
* performance test

* performance test

* comments

* fixed auth acquirer

* performance test

* changes

* sending events to Caduceus

* able to register webhook

* using server

* metric config

* time struct

* logging and metrics

* logs

* logs

* changes to logging

* logging

* metrics fixes

* debug changes

* caduceus receiving events

* changing prometheus query location

* querying prometheus

* querying prometheus

* querying prometheus

* adding channel for durations

* configurating caduceator

* configuring caduceator

* prometheus config

* vegeta changes

* channel looping

* adding mutex locks

* adding mutex

* added configurations

* fixed parsing url

* testing configs

* removed channel

* added buckets

* edits

* config changes

* updated

* updates

* updates

* updates

* updates

* updates

* updates

* updates

* changes
  • Loading branch information
gargidb authored Mar 11, 2020
1 parent 340cccc commit aef41b3
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 32 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

## [v0.1.2]
- bug fixes

## [v0.1.1]
- Inital relase
- Removed integration tests and fixed travis [#2](https://github.com/xmidt-org/caduceator/pull/2)
Expand Down
4 changes: 2 additions & 2 deletions caduceator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ webhook:
# Authorization Basic {basic}
#
# (Optional)
basic: "Basic dXNlcjpwYXNz"
basic: ""

# jwt provides a way to use Bearer Authorization when registering to a
# webhook. If the below values are all provided, a request is made to the
Expand All @@ -301,4 +301,4 @@ webhook:

# buffer is the length of time before a token expires to get a new token.
# (Optional)
buffer: "5s"
buffer: "1m"
8 changes: 5 additions & 3 deletions deploy/docker-compose/docFiles/caduceator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ vegetaConfig:

postURL: "http://caduceus:6000/api/v3/notify"

sleepTime: "3s"

########################################
# Prometheus Related Configuration
########################################
Expand Down Expand Up @@ -214,11 +216,11 @@ webhook:
# registrationInterval provides the time to wait between each registration to
# the webhook. If this is set to 0, no registration is done.
# (Optional)
registrationInterval: ""
registrationInterval: "4m"

# timeout provides the length of time the request should wait before timing
# out (in seconds)
timeout: 5
# out
timeout: "1m"

# registrationURL provides the place to register the webhook.
registrationURL: "http://caduceus:6000/hook"
Expand Down
95 changes: 69 additions & 26 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type VegetaConfig struct {
Duration time.Duration
MaxRoutines int
PostURL string
SleepTime time.Duration
}

type Request struct {
Expand All @@ -81,8 +82,8 @@ type WebhookConfig struct {
}

type Webhook struct {
RegistrationInterval string
Timeout int
RegistrationInterval time.Duration
Timeout time.Duration
RegistrationURL string
Request Request
Basic string
Expand All @@ -95,9 +96,10 @@ type Secret struct {
}

type JWT struct {
AuthURL string
Timeout string
Buffer string
RequestHeaders map[string]string
AuthURL string
Timeout time.Duration
Buffer time.Duration
}

type PrometheusConfig struct {
Expand All @@ -106,8 +108,25 @@ type PrometheusConfig struct {
MetricsURL string
}

func vegetaStarter(metrics vegeta.Metrics, config *Config, attacker *vegeta.Attacker, acquirer *acquire.RemoteBearerTokenAcquirer, logger log.Logger) {
rate := vegeta.Rate{Freq: config.VegetaConfig.Frequency, Per: time.Second}
duration := config.VegetaConfig.Duration * time.Minute
for res := range attacker.Attack(Start(0, acquirer, logger, config.VegetaConfig.PostURL), rate, duration, "Big Bang!") {
metrics.Add(res)
}

metricsReporter := vegeta.NewTextReporter(&metrics)

err := metricsReporter.Report(os.Stdout)

if err != nil {
logging.Error(logger).Log(logging.MessageKey(), "vegeta failed", logging.ErrorKey(), err.Error())
os.Exit(1)
}
}

// Start function is used to send events to Caduceus
func Start(id uint64, acquirer *acquire.FixedValueAcquirer, logger log.Logger, requestURL string) vegeta.Targeter {
func Start(id uint64, acquirer *acquire.RemoteBearerTokenAcquirer, logger log.Logger, requestURL string) vegeta.Targeter {

return func(target *vegeta.Target) (err error) {

Expand All @@ -133,32 +152,32 @@ func Start(id uint64, acquirer *acquire.FixedValueAcquirer, logger log.Logger, r

if err := encoder.Encode(&message); err != nil {
logging.Error(logger).Log(logging.MessageKey(), "failed to encode payload", logging.ErrorKey(), err.Error())
return err
}

req, err := http.NewRequest("POST", requestURL, &buffer)

if err != nil {
logging.Error(logger).Log(logging.MessageKey(), "failed to create new request", logging.ErrorKey(), err.Error())
return err

}
req.Header.Add("Content-type", "application/msgpack")

authValue, err := acquirer.Acquire()
if err != nil {
logging.Error(logger).Log(logging.MessageKey(), "failed to acquire", logging.ErrorKey(), err.Error())
return err

}

req.Header.Add("Authorization", authValue)

resp, err := http.DefaultClient.Do(req)

if err != nil {
logging.Error(logger).Log(logging.MessageKey(), "failed while making HTTP request", logging.ErrorKey(), err.Error())
logging.Error(logger).Log(logging.MessageKey(), "failed while making HTTP request: ", logging.ErrorKey(), err.Error())
return err
}
resp.Body.Close()
defer resp.Body.Close()

return err
}
Expand All @@ -172,8 +191,18 @@ func main() {
logger, metricsRegistry, caduceator, err = server.Initialize(applicationName, os.Args, f, v, basculechecks.Metrics, basculemetrics.Metrics, Metrics)
)

if err != nil {
logging.Error(logger).Log(logging.MessageKey(), "failed to initialize", logging.ErrorKey(), err.Error())
}

config := new(Config)
v.Unmarshal(config)
err = v.Unmarshal(config)
if err != nil {
logging.Error(logger).Log(logging.MessageKey(), "failed to unmarshal config", logging.ErrorKey(), err.Error())
}

logging.Info(logger).Log(logging.MessageKey(), "vegeta frequency")
logging.Info(logger).Log(logging.MessageKey(), config.VegetaConfig.Frequency)

// use constant secret for hash
secretGetter := secretGetter.NewConstantSecret(config.Webhook.Request.WebhookConfig.Secret)
Expand All @@ -194,7 +223,7 @@ func main() {

// set up the registerer
basicConfig := webhookClient.BasicConfig{
Timeout: 5 * time.Second,
Timeout: config.Webhook.Timeout,
RegistrationURL: config.Webhook.RegistrationURL,
Request: webhook.W{
Config: webhook.Config{
Expand All @@ -205,9 +234,18 @@ func main() {
},
}

acquirer, err := acquire.NewFixedAuthAcquirer(config.Webhook.Basic)
acquireConfig := acquire.RemoteBearerTokenAcquirerOptions{
AuthURL: config.Webhook.JWT.AuthURL,
Timeout: config.Webhook.JWT.Timeout,
Buffer: config.Webhook.JWT.Buffer,
RequestHeaders: config.Webhook.JWT.RequestHeaders,
}

acquirer, err := acquire.NewRemoteBearerTokenAcquirer(acquireConfig)

// acquirer, err = acquire.NewFixedAuthAcquirer(config.Webhook.Basic)
if err != nil {
logging.Error(logger).Log(logging.MessageKey(), "failed to create basic auth plain text acquirer:", logging.ErrorKey(), err.Error())
logging.Error(logger).Log(logging.MessageKey(), "failed to create bearer auth plain text acquirer:", logging.ErrorKey(), err.Error())
os.Exit(1)
}

Expand All @@ -216,7 +254,8 @@ func main() {
logging.Error(logger).Log(logging.MessageKey(), "failed to setup registerer", logging.ErrorKey(), err.Error())
os.Exit(1)
}
periodicRegisterer := webhookClient.NewPeriodicRegisterer(registerer, 4*time.Minute, logger)

periodicRegisterer := webhookClient.NewPeriodicRegisterer(registerer, config.Webhook.RegistrationInterval, logger)

// start the registerer
periodicRegisterer.Start()
Expand All @@ -239,6 +278,7 @@ func main() {
queryURL: config.PrometheusConfig.QueryURL,
queryExpression: config.PrometheusConfig.QueryExpression,
metricsURL: config.PrometheusConfig.MetricsURL,
sleepTime: config.VegetaConfig.SleepTime,
}

// start listening
Expand All @@ -253,24 +293,26 @@ func main() {
waitGroup, shutdown, err := concurrent.Execute(runnable)
if err != nil {
logging.Error(logger).Log(logging.MessageKey(), "failed to execute additional process", logging.ErrorKey(), err.Error())
os.Exit(1)
}

// send events to Caduceus using vegeta
var metrics vegeta.Metrics
rate := vegeta.Rate{Freq: config.VegetaConfig.Frequency, Per: time.Second}
duration := config.VegetaConfig.Duration * time.Minute

for res := range attacker.Attack(Start(0, acquirer, logger, config.VegetaConfig.PostURL), rate, duration, "Big Bang!") {
metrics.Add(res)
}
go vegetaStarter(metrics, config, attacker, acquirer, logger)
// rate := vegeta.Rate{Freq: config.VegetaConfig.Frequency, Per: time.Second}
// duration := config.VegetaConfig.Duration * time.Minute
// for res := range attacker.Attack(Start(0, acquirer, logger, config.VegetaConfig.PostURL), rate, duration, "Big Bang!") {
// metrics.Add(res)
// }

metricsReporter := vegeta.NewTextReporter(&metrics)
// metricsReporter := vegeta.NewTextReporter(&metrics)

err = metricsReporter.Report(os.Stdout)
// err = metricsReporter.Report(os.Stdout)

if err != nil {
logging.Error(logger).Log(logging.MessageKey(), "vegeta failed", logging.ErrorKey(), err.Error())
}
// if err != nil {
// logging.Error(logger).Log(logging.MessageKey(), "vegeta failed", logging.ErrorKey(), err.Error())
// os.Exit(1)
// }

signals := make(chan os.Signal, 10)
signal.Notify(signals)
Expand All @@ -290,6 +332,7 @@ func main() {
}

metrics.Close()
periodicRegisterer.Stop()
close(shutdown)
waitGroup.Wait()
logging.Info(logger).Log(logging.MessageKey(), "Caduceator has shut down")
Expand Down
2 changes: 2 additions & 0 deletions primaryHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type App struct {
queryURL string
queryExpression string
metricsURL string
sleepTime time.Duration
}

const (
Expand Down Expand Up @@ -88,6 +89,7 @@ func (app *App) receiveEvents(writer http.ResponseWriter, req *http.Request) {
writer.WriteHeader(http.StatusBadRequest)
return
}
time.Sleep(app.sleepTime)
writer.WriteHeader(http.StatusAccepted)
}

Expand Down
3 changes: 2 additions & 1 deletion queueTimer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ func (app *App) calculateDuration(cutoffTime time.Time) {
Loop:
for {

currentTime := time.Now()

encodedQuery := &url.URL{Path: app.queryExpression}

res, err := http.Get(app.queryURL + "?query=" + encodedQuery.String())

currentTime := time.Now()
if err != nil {
logging.Error(app.logger).Log(logging.MessageKey(), "failed to query prometheus", logging.ErrorKey(), err.Error())
} else {
Expand Down

0 comments on commit aef41b3

Please sign in to comment.