From 27525e24343642a370f24a88688e290e9e0d495c Mon Sep 17 00:00:00 2001 From: Eliott Bouhana Date: Tue, 14 Jan 2025 19:05:13 +0100 Subject: [PATCH] internal/newtelemetry/internal: created and implemented the Writer centralize all http concerns in the writer struct such as: * Reusing the same top-level structures when possible * encoding properly in json the payload * separating the low level http writing from some higher level things we will concerns ourselves about in the telemetry client itself * Centralizing a lot of the static data the telemtry client is supposed to send on every request Signed-off-by: Eliott Bouhana --- internal/globalconfig/globalconfig.go | 16 ++ internal/newtelemetry/defaults.go | 26 --- .../newtelemetry/internal/tracerconfig.go | 16 ++ .../newtelemetry/internal/transport/body.go | 36 +--- internal/newtelemetry/internal/writer.go | 189 ++++++++++++++++++ 5 files changed, 223 insertions(+), 60 deletions(-) create mode 100644 internal/newtelemetry/internal/tracerconfig.go create mode 100644 internal/newtelemetry/internal/writer.go diff --git a/internal/globalconfig/globalconfig.go b/internal/globalconfig/globalconfig.go index a36f50035f..2d6d21e35e 100644 --- a/internal/globalconfig/globalconfig.go +++ b/internal/globalconfig/globalconfig.go @@ -9,6 +9,7 @@ package globalconfig import ( "math" + "os" "sync" "gopkg.in/DataDog/dd-trace-go.v1/internal" @@ -130,3 +131,18 @@ func HeaderTagsLen() int { func ClearHeaderTags() { cfg.headersAsTags.Clear() } + +// InstrumentationInstallID returns the install ID as described in DD_INSTRUMENTATION_INSTALL_ID +func InstrumentationInstallID() string { + return os.Getenv("DD_INSTRUMENTATION_INSTALL_ID") +} + +// InstrumentationInstallType returns the install type as described in DD_INSTRUMENTATION_INSTALL_TYPE +func InstrumentationInstallType() string { + return os.Getenv("DD_INSTRUMENTATION_INSTALL_TYPE") +} + +// InstrumentationInstallTime returns the install time as described in DD_INSTRUMENTATION_INSTALL_TIME +func InstrumentationInstallTime() string { + return os.Getenv("DD_INSTRUMENTATION_INSTALL_TIME") +} diff --git a/internal/newtelemetry/defaults.go b/internal/newtelemetry/defaults.go index 58fa3ac11f..8a0ff20d30 100644 --- a/internal/newtelemetry/defaults.go +++ b/internal/newtelemetry/defaults.go @@ -5,33 +5,7 @@ package newtelemetry -import ( - "net" - "net/http" - "time" -) - var ( - // We copy the transport to avoid using the default one, as it might be - // augmented with tracing and we don't want these calls to be recorded. - // See https://golang.org/pkg/net/http/#DefaultTransport . - //orchestrion:ignore - defaultHTTPClient = &http.Client{ - Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).DialContext, - ForceAttemptHTTP2: true, - MaxIdleConns: 100, - IdleConnTimeout: 90 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - }, - Timeout: 5 * time.Second, - } - // agentlessURL is the endpoint used to send telemetry in an agentless environment. It is // also the default URL in case connecting to the agent URL fails. agentlessURL = "https://instrumentation-telemetry-intake.datadoghq.com/api/v2/apmtelemetry" diff --git a/internal/newtelemetry/internal/tracerconfig.go b/internal/newtelemetry/internal/tracerconfig.go new file mode 100644 index 0000000000..439ff430f3 --- /dev/null +++ b/internal/newtelemetry/internal/tracerconfig.go @@ -0,0 +1,16 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025 Datadog, Inc. + +package internal + +// TracerConfig is the configuration for the tracer for the telemetry client. +type TracerConfig struct { + // Service is the name of the service being traced. + Service string + // Env is the environment the service is running in. + Env string + // Version is the version of the service. + Version string +} diff --git a/internal/newtelemetry/internal/transport/body.go b/internal/newtelemetry/internal/transport/body.go index b420b1a3bd..1015df61e5 100644 --- a/internal/newtelemetry/internal/transport/body.go +++ b/internal/newtelemetry/internal/transport/body.go @@ -5,15 +5,6 @@ package transport -import ( - "runtime" - - "gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig" - "gopkg.in/DataDog/dd-trace-go.v1/internal/hostname" - "gopkg.in/DataDog/dd-trace-go.v1/internal/osinfo" - tracerversion "gopkg.in/DataDog/dd-trace-go.v1/internal/version" -) - // Application is identifying information about the app itself type Application struct { ServiceName string `json:"service_name"` @@ -37,38 +28,15 @@ type Host struct { } // Body is the common high-level structure encapsulating a telemetry request body +// Described here: https://github.com/DataDog/instrumentation-telemetry-api-docs/blob/main/GeneratedDocumentation/ApiDocs/v2/SchemaDocumentation/Request%20Bodies/telemetry_body.md type Body struct { APIVersion string `json:"api_version"` RequestType RequestType `json:"request_type"` TracerTime int64 `json:"tracer_time"` RuntimeID string `json:"runtime_id"` SeqID int64 `json:"seq_id"` - Debug bool `json:"debug"` + Debug bool `json:"debug,omitempty"` Payload Payload `json:"payload"` Application Application `json:"application"` Host Host `json:"host"` } - -func NewBody(service, env, version string) *Body { - return &Body{ - APIVersion: "v2", - RuntimeID: globalconfig.RuntimeID(), - Application: Application{ - ServiceName: service, - Env: env, - ServiceVersion: version, - TracerVersion: tracerversion.Tag, - LanguageName: "go", - LanguageVersion: runtime.Version(), - }, - Host: Host{ - Hostname: hostname.Get(), - OS: osinfo.OSName(), - OSVersion: osinfo.OSVersion(), - Architecture: osinfo.Architecture(), - KernelName: osinfo.KernelName(), - KernelRelease: osinfo.KernelRelease(), - KernelVersion: osinfo.KernelVersion(), - }, - } -} diff --git a/internal/newtelemetry/internal/writer.go b/internal/newtelemetry/internal/writer.go new file mode 100644 index 0000000000..ceac1b2b45 --- /dev/null +++ b/internal/newtelemetry/internal/writer.go @@ -0,0 +1,189 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025 Datadog, Inc. + +package internal + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" + "runtime" + "strconv" + "sync" + "time" + + "gopkg.in/DataDog/dd-trace-go.v1/internal" + "gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig" + "gopkg.in/DataDog/dd-trace-go.v1/internal/hostname" + "gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/internal/transport" + "gopkg.in/DataDog/dd-trace-go.v1/internal/osinfo" + "gopkg.in/DataDog/dd-trace-go.v1/internal/version" +) + +// We copy the transport to avoid using the default one, as it might be +// augmented with tracing and we don't want these calls to be recorded. +// See https://golang.org/pkg/net/http/#DefaultTransport . +// +//orchestrion:ignore +var defaultHTTPClient = &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + Timeout: 5 * time.Second, +} + +func newBody(config TracerConfig, debugMode bool) *transport.Body { + return &transport.Body{ + APIVersion: "v2", + RuntimeID: globalconfig.RuntimeID(), + Debug: debugMode, + Application: transport.Application{ + ServiceName: config.Service, + Env: config.Env, + ServiceVersion: config.Version, + TracerVersion: version.Tag, + LanguageName: "go", + LanguageVersion: runtime.Version(), + }, + Host: transport.Host{ + Hostname: hostname.Get(), + OS: osinfo.OSName(), + OSVersion: osinfo.OSVersion(), + Architecture: osinfo.Architecture(), + KernelName: osinfo.KernelName(), + KernelRelease: osinfo.KernelRelease(), + KernelVersion: osinfo.KernelVersion(), + }, + } +} + +type Writer interface { + Flush(transport.Payload) error +} + +type writer struct { + mu sync.Mutex + body *transport.Body + httpClient *http.Client + request *http.Request +} + +type WriterConfig struct { + // TracerConfig is the configuration the tracer sent when the telemetry client was created (required) + TracerConfig + // Request is a proto http request that should contain what URL to send the telemetry data to and more if necessary (required) + Request *http.Request + // HTTPClient is the http client that will be used to send the telemetry data (defaults to the default http client) + HTTPClient *http.Client + // Debug is a flag that indicates whether the telemetry client is in debug mode (defaults to false) + Debug bool +} + +func NewWriter(config WriterConfig) Writer { + if config.Request == nil { + panic("telemetry/writer: *http.Request must not be nil") + } + + if config.HTTPClient == nil { + config.HTTPClient = defaultHTTPClient + } + + body := newBody(config.TracerConfig, config.Debug) + return &writer{ + body: body, + httpClient: config.HTTPClient, + request: preBakeRequest(config.Request, body), + } +} + +// preBakeRequest adds all the *static* headers that we already know at the time of the creation of the writer. +// This is useful to avoid querying too many things at the time of the request. +// Headers necessary are described here: +// https://github.com/DataDog/instrumentation-telemetry-api-docs/blob/cf17b41a30fbf31d54e2cfbfc983875d58b02fe1/GeneratedDocumentation/ApiDocs/v2/overview.md#required-http-headers +func preBakeRequest(request *http.Request, body *transport.Body) *http.Request { + clonedRequest := request.Clone(context.Background()) + for key, val := range map[string]string{ + "Content-Type": "application/json", + "DD-Telemetry-API-Version": body.APIVersion, + "DD-Telemetry-Debug-Enabled": strconv.FormatBool(body.Debug), + "DD-Client-Library-Language": body.Application.LanguageName, + "DD-Client-Library-Version": body.Application.TracerVersion, + "DD-Agent-Env": body.Application.Env, + "DD-Agent-Hostname": body.Host.Hostname, + "DD-Agent-Install-Id": globalconfig.InstrumentationInstallID(), + "DD-Agent-Install-Type": globalconfig.InstrumentationInstallType(), + "DD-Agent-Install-Time": globalconfig.InstrumentationInstallTime(), + "Datadog-Container-ID": internal.ContainerID(), + "Datadog-Entity-ID": internal.EntityID(), + // TODO: Add support for Cloud provider/resource-type/resource-id headers in another PR and package + // Described here: https://github.com/DataDog/instrumentation-telemetry-api-docs/blob/cf17b41a30fbf31d54e2cfbfc983875d58b02fe1/GeneratedDocumentation/ApiDocs/v2/overview.md#setting-the-serverless-telemetry-headers + } { + if val == "" { + continue + } + clonedRequest.Header.Add(key, val) + } + return clonedRequest +} + +func (w *writer) NewHTTPRequest(payload transport.Payload) *http.Request { + clonedRequest := w.request.Clone(context.Background()) + clonedRequest.Header.Add("DD-Telemetry-Request-Type", string(payload.RequestType())) + w.body.SeqID++ + w.body.TracerTime = time.Now().Unix() + w.body.RequestType = payload.RequestType() + w.body.Payload = payload + return clonedRequest +} + +func (w *writer) Flush(payload transport.Payload) error { + w.mu.Lock() + defer w.mu.Unlock() + + var ( + request = w.NewHTTPRequest(payload) + pipeReader, pipeWriter = io.Pipe() + done = make(chan struct{}) + encodeErr error + ) + + go func() { + defer pipeWriter.Close() + encodeErr = json.NewEncoder(pipeWriter).Encode(w.body) + done <- struct{}{} + }() + + request.Body = pipeReader + response, httpErr := w.httpClient.Do(request) + <-done + + if httpErr == nil { + defer response.Body.Close() + } + + if httpErr != nil || encodeErr != nil { + return errors.Join(httpErr, encodeErr) + } + + if response.StatusCode >= 300 || response.StatusCode < 200 { + respBodyBytes, _ := io.ReadAll(response.Body) + return fmt.Errorf("telemetry/writer: unexpected status code: %q (received body: %q)"+response.Status, string(respBodyBytes)) + } + + return nil +}