Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cli: push tsdump upload failed metrics to datadog logs #137250

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 46 additions & 6 deletions pkg/cli/tsdump_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ var (
targetURLFormat = "https://api.%s/api/v2/series"
datadogDashboardURLFormat = "https://us5.datadoghq.com/dashboard/bif-kwe-gx2/self-hosted-db-console-tsdump?" +
"tpl_var_cluster=%s&tpl_var_upload_id=%s&tpl_var_upload_day=%d&tpl_var_upload_month=%d&tpl_var_upload_year=%d&from_ts=%d&to_ts=%d"
zipFileSignature = []byte{0x50, 0x4B, 0x03, 0x04}
zipFileSignature = []byte{0x50, 0x4B, 0x03, 0x04}
logMessageFormat = "tsdump upload to datadog is partially failed for metric: %s"
partialFailureMessageFormat = "The Tsdump upload to Datadog succeeded but %d metrics partially failed to upload." +
" These failures can be due to transietnt network errors. If any of these metrics are critical for your investigation," +
" please re-upload the Tsdump:\n%s\n"
datadogLogsURLFormat = "https://us5.datadoghq.com/logs?query=cluster_label:%s+upload_id:%s"
)

// DatadogPoint is a single metric point in Datadog format
Expand Down Expand Up @@ -318,6 +323,7 @@ func (d *datadogWriter) flush(data []DatadogSeries) error {
}
}
return err

}

func (d *datadogWriter) upload(fileName string) error {
Expand Down Expand Up @@ -421,10 +427,12 @@ func (d *datadogWriter) upload(fileName string) error {
fmt.Printf("\nUpload status: %s!\n", uploadStatus)

if metricsUploadState.isSingleUploadSucceeded {
var isDatadogUploadFailed = false
markDatadogUploadFailedOnce := sync.OnceFunc(func() {
isDatadogUploadFailed = true
})
if len(metricsUploadState.uploadFailedMetrics) != 0 {
fmt.Printf("The Tsdump upload to Datadog succeeded but %d metrics partially failed to upload."+
" These failures can be due to transietnt network errors. If any of these metrics are critical for your investigation,"+
" please re-upload the Tsdump:\n%s\n", len(metricsUploadState.uploadFailedMetrics), strings.Join(func() []string {
fmt.Printf(partialFailureMessageFormat, len(metricsUploadState.uploadFailedMetrics), strings.Join(func() []string {
var failedMetricsList []string
index := 1
for metric := range metricsUploadState.uploadFailedMetrics {
Expand All @@ -434,9 +442,41 @@ func (d *datadogWriter) upload(fileName string) error {
}
return failedMetricsList
}(), "\n"))
}

fmt.Println("\nupload id: ", d.uploadID)
tags := strings.Join(getUploadTags(d), ",")
fmt.Println("\nPushing logs of metric upload failures to datadog...")
for metric := range metricsUploadState.uploadFailedMetrics {
wg.Add(1)
go func(metric string) {
logMessage := fmt.Sprintf(logMessageFormat, metric)

logEntryJSON, _ := json.Marshal(struct {
Message any `json:"message,omitempty"`
Tags string `json:"ddtags,omitempty"`
Source string `json:"ddsource,omitempty"`
}{
Message: logMessage,
Tags: tags,
Source: "tsdump_upload",
})

_, err := uploadLogsToDatadog(logEntryJSON, d.apiKey, debugTimeSeriesDumpOpts.ddSite)
if err != nil {
markDatadogUploadFailedOnce()
}
wg.Done()
}(metric)
}

wg.Wait()
if isDatadogUploadFailed {
fmt.Println("Failed to pushed some metrics to datadog logs. Please refer CLI output for all failed metrics.")
} else {
fmt.Println("Pushing logs of metric upload failures to datadog...done")
fmt.Printf("datadog logs for metric upload failures link: %s\n", fmt.Sprintf(datadogLogsURLFormat, debugTimeSeriesDumpOpts.clusterLabel, d.uploadID))
}
}
fmt.Println("\nupload id:", d.uploadID)
fmt.Printf("datadog dashboard link: %s\n", dashboardLink)
} else {
fmt.Println("All metric upload is failed. Please re-upload the Tsdump.")
Expand Down
16 changes: 8 additions & 8 deletions pkg/cli/zip_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func newProfileUploadReq(
return nil, err
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, makeDDURL(datadogProfileUploadURLTmpl), &body)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, makeDDURL(datadogProfileUploadURLTmpl, debugZipUploadOpts.ddSite), &body)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -655,7 +655,7 @@ func setupDDArchive(ctx context.Context, pathPrefix, archiveName string) error {
}

req, err := http.NewRequestWithContext(
ctx, http.MethodPost, makeDDURL(datadogCreateArchiveURLTmpl), bytes.NewReader(rawPayload),
ctx, http.MethodPost, makeDDURL(datadogCreateArchiveURLTmpl, debugZipUploadOpts.ddSite), bytes.NewReader(rawPayload),
)
if err != nil {
return err
Expand Down Expand Up @@ -798,18 +798,18 @@ func ddLogUpload(ctx context.Context, sig logUploadSig) (int, error) {
buf.Write(bytes.Join(sig.logLines, []byte(",")))
buf.WriteByte(']')

return uploadLogsToDatadog(ctx, buf.Bytes())
return uploadLogsToDatadog(buf.Bytes(), debugZipUploadOpts.ddAPIKey, debugZipUploadOpts.ddSite)
}

// uploadLogsToDatadog is a generic function that uploads the given payload of
// logs to datadog. This exists because artifacts other than logs might also
// need to be uploaded to datadog in the form of logs (example: table dumps,
// events etc.).
func uploadLogsToDatadog(ctx context.Context, payload []byte) (int, error) {
func uploadLogsToDatadog(payload []byte, ddApiKey string, ddSite string) (int, error) {
var (
compressedLogs bytes.Buffer
compressedlogWriter = gzip.NewWriter(&compressedLogs)
url = makeDDURL(datadogLogIntakeURLTmpl)
url = makeDDURL(datadogLogIntakeURLTmpl, ddSite)
)

if _, err := compressedlogWriter.Write(payload); err != nil {
Expand All @@ -832,7 +832,7 @@ func uploadLogsToDatadog(ctx context.Context, payload []byte) (int, error) {

req.Header.Set(httputil.ContentTypeHeader, httputil.JSONContentType)
req.Header.Set(httputil.ContentEncodingHeader, httputil.GzipEncoding)
req.Header.Set(datadogAPIKeyHeader, debugZipUploadOpts.ddAPIKey)
req.Header.Set(datadogAPIKeyHeader, ddApiKey)

if _, err = doUploadReq(req); err == nil {
break
Expand Down Expand Up @@ -1077,8 +1077,8 @@ You will receive an email notification once the rehydration is complete.
// placeholder in the template. This is a simple convenience
// function. It assumes that the site is valid. This assumption is
// fine because we are validating the site early on in the flow.
func makeDDURL(tmpl string) string {
return fmt.Sprintf(tmpl, ddSiteToHostMap[debugZipUploadOpts.ddSite])
func makeDDURL(tmpl string, ddSite string) string {
return fmt.Sprintf(tmpl, ddSiteToHostMap[ddSite])
}

// humanReadableSize converts the given number of bytes to a human readable
Expand Down
Loading