Skip to content

Commit

Permalink
internal/newtelemetry/internal: created and implemented the Writer
Browse files Browse the repository at this point in the history
centralize all http concerns in the writer struct such as:
* Reusing the same top-level structures when possible
* encoding properly in json the payload
* separating the low level http writing from some higher level things we will concerns ourselves about in the telemetry client itself
* Centralizing a lot of the static data the telemtry client is supposed to send on every request

Signed-off-by: Eliott Bouhana <[email protected]>
  • Loading branch information
eliottness committed Jan 14, 2025
1 parent cce924a commit 27525e2
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 60 deletions.
16 changes: 16 additions & 0 deletions internal/globalconfig/globalconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package globalconfig

import (
"math"
"os"
"sync"

"gopkg.in/DataDog/dd-trace-go.v1/internal"
Expand Down Expand Up @@ -130,3 +131,18 @@ func HeaderTagsLen() int {
func ClearHeaderTags() {
cfg.headersAsTags.Clear()
}

// InstrumentationInstallID returns the install ID as described in DD_INSTRUMENTATION_INSTALL_ID
func InstrumentationInstallID() string {
return os.Getenv("DD_INSTRUMENTATION_INSTALL_ID")
}

// InstrumentationInstallType returns the install type as described in DD_INSTRUMENTATION_INSTALL_TYPE
func InstrumentationInstallType() string {
return os.Getenv("DD_INSTRUMENTATION_INSTALL_TYPE")
}

// InstrumentationInstallTime returns the install time as described in DD_INSTRUMENTATION_INSTALL_TIME
func InstrumentationInstallTime() string {
return os.Getenv("DD_INSTRUMENTATION_INSTALL_TIME")
}
26 changes: 0 additions & 26 deletions internal/newtelemetry/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,7 @@

package newtelemetry

import (
"net"
"net/http"
"time"
)

var (
// We copy the transport to avoid using the default one, as it might be
// augmented with tracing and we don't want these calls to be recorded.
// See https://golang.org/pkg/net/http/#DefaultTransport .
//orchestrion:ignore
defaultHTTPClient = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
Timeout: 5 * time.Second,
}

// agentlessURL is the endpoint used to send telemetry in an agentless environment. It is
// also the default URL in case connecting to the agent URL fails.
agentlessURL = "https://instrumentation-telemetry-intake.datadoghq.com/api/v2/apmtelemetry"
Expand Down
16 changes: 16 additions & 0 deletions internal/newtelemetry/internal/tracerconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025 Datadog, Inc.

package internal

// TracerConfig is the configuration for the tracer for the telemetry client.
type TracerConfig struct {
// Service is the name of the service being traced.
Service string
// Env is the environment the service is running in.
Env string
// Version is the version of the service.
Version string
}
36 changes: 2 additions & 34 deletions internal/newtelemetry/internal/transport/body.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,6 @@

package transport

import (
"runtime"

"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/hostname"
"gopkg.in/DataDog/dd-trace-go.v1/internal/osinfo"
tracerversion "gopkg.in/DataDog/dd-trace-go.v1/internal/version"
)

// Application is identifying information about the app itself
type Application struct {
ServiceName string `json:"service_name"`
Expand All @@ -37,38 +28,15 @@ type Host struct {
}

// Body is the common high-level structure encapsulating a telemetry request body
// Described here: https://github.com/DataDog/instrumentation-telemetry-api-docs/blob/main/GeneratedDocumentation/ApiDocs/v2/SchemaDocumentation/Request%20Bodies/telemetry_body.md
type Body struct {
APIVersion string `json:"api_version"`
RequestType RequestType `json:"request_type"`
TracerTime int64 `json:"tracer_time"`
RuntimeID string `json:"runtime_id"`
SeqID int64 `json:"seq_id"`
Debug bool `json:"debug"`
Debug bool `json:"debug,omitempty"`
Payload Payload `json:"payload"`
Application Application `json:"application"`
Host Host `json:"host"`
}

func NewBody(service, env, version string) *Body {
return &Body{
APIVersion: "v2",
RuntimeID: globalconfig.RuntimeID(),
Application: Application{
ServiceName: service,
Env: env,
ServiceVersion: version,
TracerVersion: tracerversion.Tag,
LanguageName: "go",
LanguageVersion: runtime.Version(),
},
Host: Host{
Hostname: hostname.Get(),
OS: osinfo.OSName(),
OSVersion: osinfo.OSVersion(),
Architecture: osinfo.Architecture(),
KernelName: osinfo.KernelName(),
KernelRelease: osinfo.KernelRelease(),
KernelVersion: osinfo.KernelVersion(),
},
}
}
189 changes: 189 additions & 0 deletions internal/newtelemetry/internal/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025 Datadog, Inc.

package internal

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"runtime"
"strconv"
"sync"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/hostname"
"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/internal/transport"
"gopkg.in/DataDog/dd-trace-go.v1/internal/osinfo"
"gopkg.in/DataDog/dd-trace-go.v1/internal/version"
)

// We copy the transport to avoid using the default one, as it might be
// augmented with tracing and we don't want these calls to be recorded.
// See https://golang.org/pkg/net/http/#DefaultTransport .
//
//orchestrion:ignore
var defaultHTTPClient = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
Timeout: 5 * time.Second,
}

func newBody(config TracerConfig, debugMode bool) *transport.Body {
return &transport.Body{
APIVersion: "v2",
RuntimeID: globalconfig.RuntimeID(),
Debug: debugMode,
Application: transport.Application{
ServiceName: config.Service,
Env: config.Env,
ServiceVersion: config.Version,
TracerVersion: version.Tag,
LanguageName: "go",
LanguageVersion: runtime.Version(),
},
Host: transport.Host{
Hostname: hostname.Get(),
OS: osinfo.OSName(),
OSVersion: osinfo.OSVersion(),
Architecture: osinfo.Architecture(),
KernelName: osinfo.KernelName(),
KernelRelease: osinfo.KernelRelease(),
KernelVersion: osinfo.KernelVersion(),
},
}
}

type Writer interface {
Flush(transport.Payload) error
}

type writer struct {
mu sync.Mutex
body *transport.Body
httpClient *http.Client
request *http.Request
}

type WriterConfig struct {
// TracerConfig is the configuration the tracer sent when the telemetry client was created (required)
TracerConfig
// Request is a proto http request that should contain what URL to send the telemetry data to and more if necessary (required)
Request *http.Request
// HTTPClient is the http client that will be used to send the telemetry data (defaults to the default http client)
HTTPClient *http.Client
// Debug is a flag that indicates whether the telemetry client is in debug mode (defaults to false)
Debug bool
}

func NewWriter(config WriterConfig) Writer {
if config.Request == nil {
panic("telemetry/writer: *http.Request must not be nil")
}

if config.HTTPClient == nil {
config.HTTPClient = defaultHTTPClient
}

body := newBody(config.TracerConfig, config.Debug)
return &writer{
body: body,
httpClient: config.HTTPClient,
request: preBakeRequest(config.Request, body),
}
}

// preBakeRequest adds all the *static* headers that we already know at the time of the creation of the writer.
// This is useful to avoid querying too many things at the time of the request.
// Headers necessary are described here:
// https://github.com/DataDog/instrumentation-telemetry-api-docs/blob/cf17b41a30fbf31d54e2cfbfc983875d58b02fe1/GeneratedDocumentation/ApiDocs/v2/overview.md#required-http-headers
func preBakeRequest(request *http.Request, body *transport.Body) *http.Request {
clonedRequest := request.Clone(context.Background())
for key, val := range map[string]string{
"Content-Type": "application/json",
"DD-Telemetry-API-Version": body.APIVersion,
"DD-Telemetry-Debug-Enabled": strconv.FormatBool(body.Debug),
"DD-Client-Library-Language": body.Application.LanguageName,
"DD-Client-Library-Version": body.Application.TracerVersion,
"DD-Agent-Env": body.Application.Env,
"DD-Agent-Hostname": body.Host.Hostname,
"DD-Agent-Install-Id": globalconfig.InstrumentationInstallID(),
"DD-Agent-Install-Type": globalconfig.InstrumentationInstallType(),
"DD-Agent-Install-Time": globalconfig.InstrumentationInstallTime(),
"Datadog-Container-ID": internal.ContainerID(),
"Datadog-Entity-ID": internal.EntityID(),
// TODO: Add support for Cloud provider/resource-type/resource-id headers in another PR and package
// Described here: https://github.com/DataDog/instrumentation-telemetry-api-docs/blob/cf17b41a30fbf31d54e2cfbfc983875d58b02fe1/GeneratedDocumentation/ApiDocs/v2/overview.md#setting-the-serverless-telemetry-headers
} {
if val == "" {
continue
}
clonedRequest.Header.Add(key, val)
}
return clonedRequest
}

func (w *writer) NewHTTPRequest(payload transport.Payload) *http.Request {
clonedRequest := w.request.Clone(context.Background())
clonedRequest.Header.Add("DD-Telemetry-Request-Type", string(payload.RequestType()))
w.body.SeqID++
w.body.TracerTime = time.Now().Unix()
w.body.RequestType = payload.RequestType()
w.body.Payload = payload
return clonedRequest
}

func (w *writer) Flush(payload transport.Payload) error {
w.mu.Lock()
defer w.mu.Unlock()

var (
request = w.NewHTTPRequest(payload)
pipeReader, pipeWriter = io.Pipe()
done = make(chan struct{})
encodeErr error
)

go func() {
defer pipeWriter.Close()
encodeErr = json.NewEncoder(pipeWriter).Encode(w.body)
done <- struct{}{}
}()

request.Body = pipeReader
response, httpErr := w.httpClient.Do(request)
<-done

if httpErr == nil {
defer response.Body.Close()
}

if httpErr != nil || encodeErr != nil {
return errors.Join(httpErr, encodeErr)
}

if response.StatusCode >= 300 || response.StatusCode < 200 {
respBodyBytes, _ := io.ReadAll(response.Body)
return fmt.Errorf("telemetry/writer: unexpected status code: %q (received body: %q)"+response.Status, string(respBodyBytes))
}

return nil
}

0 comments on commit 27525e2

Please sign in to comment.