Skip to content

Commit

Permalink
Opentelemetry changes with optional tracing feature. (#263)
Browse files Browse the repository at this point in the history
* Opentelemtry changes with optional tracing feature.

* Opentelemetry changes for argus client and sender wrapper

* Apply suggestions from code review

* Add space

* Minor fixes

* Review fixes

Co-authored-by: Joel Unzain <[email protected]>
  • Loading branch information
utsavbatra5 and joe94 authored May 4, 2021
1 parent 4360bd4 commit 4655646
Show file tree
Hide file tree
Showing 8 changed files with 541 additions and 46 deletions.
25 changes: 25 additions & 0 deletions caduceus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,28 @@ sender:
# - 1000
# - 10000

# tracing provides configuration around traces using OpenTelemetry.
# (Optional). By default, a 'noop' tracer provider is used and tracing is disabled.
tracing:
# provider is the provider name. Currently, stdout, jaegar and zipkin are supported.
# 'noop' can also be used as provider to explicitly disable tracing.
provider: "noop"

# skipTraceExport only applies when provider is stdout. Set skipTraceExport to true
# so that trace information is not written to stdout.
# skipTraceExport: true

# endpoint is where trace information should be routed. Applies to zipkin and jaegar.
# endpoint: "http://localhost:9411/api/v2/spans"

# timeouts that apply to the Argus HTTP client.
# (Optional) By default, the values below will be used.
argusClientTimeout:
# clientTimeout is the timeout for requests made through this
# HTTP client. This timeout includes connection time, any
# redirects, and reading the response body.
clientTimeout: 50s

# netDialerTimeout is the maximum amount of time the HTTP Client Dialer will
# wait for a connect to complete.
netDialerTimeout: 5s
25 changes: 25 additions & 0 deletions deploy/packaging/caduceus_spruce.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,28 @@ sender:
# - 100
# - 1000
# - 10000

# tracing provides configuration for OpenTelemetry
tracing:
# provider is the provider name. Currently, stdout, jaegar and zipkin are supported.
# 'noop' can also be used as provider to explicitly disable tracing.
provider: (( grab $TRACING_PROVIDER_NAME || "noop" ))

# skipTraceExport only applies when provider is stdout. Set skipTraceExport to true
# so that trace information is not written to stdout.
# skipTraceExport: true

# endpoint is where trace information should be routed. Applies to zipkin and jaegar.
# endpoint: (( grab $TRACING_PROVIDER_ENDPOINT || "http://zipkin:9411/api/v2/spans" ))

# timeouts that apply to the Argus HTTP client.
# (Optional) By default, the values below will be used.
argusClientTimeout:
# clientTimeout is the timeout for requests made through this
# HTTP client. This timeout includes connection time, any
# redirects, and reading the response body.
clientTimeout: 50s

# netDialerTimeout is the maximum amount of time the HTTP Client Dialer will
# wait for a connect to complete.
netDialerTimeout: 5s
15 changes: 10 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@ require (
github.com/c9s/goprocinfo v0.0.0-20190309065803-0b2ad9ac246b // indirect
github.com/davecgh/go-spew v1.1.1
github.com/go-kit/kit v0.10.0
github.com/gorilla/mux v1.7.4
github.com/gorilla/mux v1.8.0
github.com/justinas/alice v1.2.0
github.com/satori/go.uuid v1.2.0
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.0
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.7.0
github.com/xmidt-org/ancla v0.1.2
github.com/xmidt-org/httpaux v0.1.3 // indirect
github.com/xmidt-org/webpa-common v1.11.5
github.com/xmidt-org/ancla v0.1.4
github.com/xmidt-org/bascule v0.9.0
github.com/xmidt-org/candlelight v0.0.4
github.com/xmidt-org/webpa-common v1.11.6
github.com/xmidt-org/wrp-go/v3 v3.0.2
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.19.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.19.0
go.opentelemetry.io/otel v0.19.0
go.opentelemetry.io/otel/trace v0.19.0
)
335 changes: 328 additions & 7 deletions go.sum

Large diffs are not rendered by default.

11 changes: 8 additions & 3 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,14 @@ type ServerHandler struct {
}

func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
debugLog := log.WithPrefix(sh.Logger, level.Key(), level.DebugValue())
infoLog := log.WithPrefix(sh.Logger, level.Key(), level.InfoValue())
errorLog := log.WithPrefix(sh.Logger, level.Key(), level.ErrorValue())
logger := logging.GetLogger(request.Context())
if logger == logging.DefaultLogger() {
logger = sh.Logger
}
debugLog := level.Debug(logger)
infoLog := level.Info(logger)
errorLog := level.Error(logger)

messageKey := logging.MessageKey()
errorKey := logging.ErrorKey()

Expand Down
146 changes: 121 additions & 25 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,34 @@ package main
import (
"crypto/tls"
"fmt"
"github.com/go-kit/kit/log/level"
"github.com/gorilla/mux"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"github.com/xmidt-org/ancla"
"github.com/xmidt-org/candlelight"
"github.com/xmidt-org/webpa-common/concurrent"
"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/webpa-common/server"
"github.com/xmidt-org/webpa-common/service/servicecfg"
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"io"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
"time"

"github.com/go-kit/kit/log"
"github.com/xmidt-org/ancla"

"github.com/go-kit/kit/log/level"
"github.com/xmidt-org/webpa-common/service/servicecfg"

"github.com/spf13/pflag"
"github.com/spf13/viper"
"github.com/xmidt-org/webpa-common/concurrent"
"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/webpa-common/server"
)

const (
applicationName = "caduceus"
DEFAULT_KEY_ID = "current"
applicationName = "caduceus"
DEFAULT_KEY_ID = "current"
tracingConfigKey = "tracing"
)

var (
Expand All @@ -52,9 +56,17 @@ var (
BuildTime = "undefined"
)

// httpClientTimeout contains timeouts for an HTTP client and its requests.
type httpClientTimeout struct {
// ClientTimeout is HTTP Client Timeout.
ClientTimeout time.Duration

// NetDialerTimeout is the net dialer timeout
NetDialerTimeout time.Duration
}

// caduceus is the driver function for Caduceus. It performs everything main() would do,
// except for obtaining the command-line arguments (which are passed to it).

func caduceus(arguments []string) int {
beginCaduceus := time.Now()

Expand Down Expand Up @@ -82,7 +94,7 @@ func caduceus(arguments []string) int {
return 1
}

log.WithPrefix(logger, level.Key(), level.InfoValue()).Log("configurationFile", v.ConfigFileUsed())
level.Info(logger).Log("configurationFile", v.ConfigFileUsed())

caduceusConfig := new(CaduceusConfig)
err = v.Unmarshal(caduceusConfig)
Expand All @@ -91,13 +103,25 @@ func caduceus(arguments []string) int {
return 1
}

tr := &http.Transport{
tracing, err := loadTracing(v, applicationName)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to build tracing component: %v \n", err)
return 1
}
level.Info(logger).Log(logging.MessageKey(), "tracing status", "enabled", tracing.Enabled)

var tr http.RoundTripper = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: caduceusConfig.Sender.DisableClientHostnameValidation},
MaxIdleConnsPerHost: caduceusConfig.Sender.NumWorkersPerSender,
ResponseHeaderTimeout: caduceusConfig.Sender.ResponseHeaderTimeout,
IdleConnTimeout: caduceusConfig.Sender.IdleConnTimeout,
}

tr = otelhttp.NewTransport(tr,
otelhttp.WithPropagators(tracing.Propagator),
otelhttp.WithTracerProvider(tracing.TracerProvider),
)

caduceusSenderWrapper, err := SenderWrapperFactory{
NumWorkersPerSender: caduceusConfig.Sender.NumWorkersPerSender,
QueueSizePerSender: caduceusConfig.Sender.QueueSizePerSender,
Expand Down Expand Up @@ -132,16 +156,31 @@ func caduceus(arguments []string) int {
modifiedWRPCount: metricsRegistry.NewCounter(ModifiedWRPCounter),
maxOutstanding: 0,
}

caduceusConfig.Webhook.Logger = logger
caduceusConfig.Webhook.MetricsProvider = metricsRegistry
svc, stopWatches, err := ancla.Initialize(caduceusConfig.Webhook, caduceusSenderWrapper)
argusClientTimeout, err := newArgusClientTimeout(v)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to parse argus client timeout config values: %v \n", err)
return 1
}

caduceusConfig.Webhook.Argus.HTTPClient = newHTTPClient(argusClientTimeout, tracing)
svc, stopWatches, err := ancla.Initialize(caduceusConfig.Webhook, GetLogger, caduceusSenderWrapper)
if err != nil {
fmt.Fprintf(os.Stderr, "Webhook service initialization error: %v\n", err)
return 1
}
level.Info(logger).Log(logging.MessageKey(), "Webhook service enabled")

primaryHandler, err := NewPrimaryHandler(logger, v, serverWrapper, svc, metricsRegistry)
rootRouter := mux.NewRouter()
otelMuxOptions := []otelmux.Option{
otelmux.WithPropagators(tracing.Propagator),
otelmux.WithTracerProvider(tracing.TracerProvider),
}
rootRouter.Use(otelmux.Middleware("primary", otelMuxOptions...), candlelight.EchoFirstTraceNodeInfo(tracing.Propagator))

primaryHandler, err := NewPrimaryHandler(logger, v, serverWrapper, svc, metricsRegistry, rootRouter)
if err != nil {
fmt.Fprintf(os.Stderr, "Validator error: %v\n", err)
return 1
Expand All @@ -159,30 +198,30 @@ func caduceus(arguments []string) int {
// Now, initialize the service discovery infrastructure
//
if !v.IsSet("service") {
logger.Log(level.Key(), level.InfoValue(), logging.MessageKey(), "no service discovery configured")
level.Info(logger).Log(logging.MessageKey(), "no service discovery configured")
} else {
e, err := servicecfg.NewEnvironment(logger, v.Sub("service"))
if err != nil {
logger.Log(level.Key(), level.ErrorValue(), logging.MessageKey(), "Unable to initialize service discovery environment", logging.ErrorKey(), err)
level.Error(logger).Log(logging.MessageKey(), "Unable to initialize service discovery environment", logging.ErrorKey(), err)
return 4
}

defer e.Close()
logger.Log(level.Key(), level.InfoValue(), "configurationFile", v.ConfigFileUsed())
level.Info(logger).Log("configurationFile", v.ConfigFileUsed())
e.Register()
}

log.WithPrefix(logger, level.Key(), level.InfoValue()).Log(logging.MessageKey(), "Caduceus is up and running!", "elapsedTime", time.Since(beginCaduceus))
level.Info(logger).Log(logging.MessageKey(), "Caduceus is up and running!", "elapsedTime", time.Since(beginCaduceus))

signals := make(chan os.Signal, 10)
signal.Notify(signals, os.Kill, os.Interrupt)
for exit := false; !exit; {
select {
case s := <-signals:
logger.Log(level.Key(), level.ErrorValue(), logging.MessageKey(), "exiting due to signal", "signal", s)
level.Error(logger).Log(logging.MessageKey(), "exiting due to signal", "signal", s)
exit = true
case <-done:
logger.Log(level.Key(), level.ErrorValue(), logging.MessageKey(), "one or more servers exited")
level.Error(logger).Log(logging.MessageKey(), "one or more servers exited")
exit = true
}
}
Expand All @@ -196,6 +235,63 @@ func caduceus(arguments []string) int {
return 0
}

func loadTracing(v *viper.Viper, appName string) (candlelight.Tracing, error) {
var tracing = candlelight.Tracing{
Enabled: false,
Propagator: propagation.TraceContext{},
TracerProvider: trace.NewNoopTracerProvider(),
}
var traceConfig candlelight.Config
err := v.UnmarshalKey(tracingConfigKey, &traceConfig)
if err != nil {
return candlelight.Tracing{}, err
}
traceConfig.ApplicationName = appName
tracerProvider, err := candlelight.ConfigureTracerProvider(traceConfig)
if err != nil {
return candlelight.Tracing{}, err
}
if len(traceConfig.Provider) != 0 && traceConfig.Provider != candlelight.DefaultTracerProvider {
tracing.Enabled = true
}
tracing.TracerProvider = tracerProvider
return tracing, nil
}

func newArgusClientTimeout(v *viper.Viper) (httpClientTimeout, error) {
var timeouts httpClientTimeout
err := v.UnmarshalKey("argusClientTimeout", &timeouts)
if err != nil {
return httpClientTimeout{}, err
}
if timeouts.ClientTimeout == 0 {
timeouts.ClientTimeout = time.Second * 50
}
if timeouts.NetDialerTimeout == 0 {
timeouts.NetDialerTimeout = time.Second * 5
}
return timeouts, nil

}

func newHTTPClient(timeouts httpClientTimeout, tracing candlelight.Tracing) *http.Client {
var transport http.RoundTripper = &http.Transport{
Dial: (&net.Dialer{
Timeout: timeouts.NetDialerTimeout,
}).Dial,
}

transport = otelhttp.NewTransport(transport,
otelhttp.WithPropagators(tracing.Propagator),
otelhttp.WithTracerProvider(tracing.TracerProvider),
)

return &http.Client{
Timeout: timeouts.ClientTimeout,
Transport: transport,
}
}

func printVersion(f *pflag.FlagSet, arguments []string) (error, bool) {
printVer := f.BoolP("version", "v", false, "displays the version number")
if err := f.Parse(arguments); err != nil {
Expand Down
28 changes: 23 additions & 5 deletions primaryHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package main

import (
"fmt"
"github.com/xmidt-org/bascule"
"github.com/xmidt-org/candlelight"
"github.com/xmidt-org/webpa-common/logging"
"net/http"

"context"
"github.com/SermoDigital/jose/jwt"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics/provider"
Expand All @@ -30,10 +34,24 @@ type JWTValidator struct {
Custom secure.JWTValidatorFactory
}

func NewPrimaryHandler(l log.Logger, v *viper.Viper, sw *ServerHandler, webhookSvc ancla.Service, metricsRegistry provider.Provider) (*mux.Router, error) {
var (
router = mux.NewRouter()
)
func SetLogger(logger log.Logger) func(delegate http.Handler) http.Handler {
return func(delegate http.Handler) http.Handler {
return http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
kvs := []interface{}{"requestHeaders", r.Header, "requestURL", r.URL.EscapedPath(), "method", r.Method}
kvs, _ = candlelight.AppendTraceInfo(r.Context(), kvs)
ctx := r.WithContext(logging.WithLogger(r.Context(), log.With(logger, kvs...)))
delegate.ServeHTTP(w, ctx)
})
}
}

func GetLogger(ctx context.Context) bascule.Logger {
logger := log.With(logging.GetLogger(ctx), "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
return logger
}

func NewPrimaryHandler(l log.Logger, v *viper.Viper, sw *ServerHandler, webhookSvc ancla.Service, metricsRegistry provider.Provider, router *mux.Router) (*mux.Router, error) {

validator, err := getValidator(v)
if err != nil {
Expand All @@ -47,7 +65,7 @@ func NewPrimaryHandler(l log.Logger, v *viper.Viper, sw *ServerHandler, webhookS
Logger: l,
}

authorizationDecorator := alice.New(authHandler.Decorate)
authorizationDecorator := alice.New(SetLogger(l), authHandler.Decorate)

return configServerRouter(router, authorizationDecorator, sw, webhookSvc, metricsRegistry), nil
}
Expand Down
Loading

0 comments on commit 4655646

Please sign in to comment.