Skip to content

Commit

Permalink
ingest: device functions metric
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jan 16, 2024
1 parent e414e24 commit 16a2fa7
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 16 deletions.
10 changes: 10 additions & 0 deletions ingest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ var (
return ingestedMessages.WithLabelValues(destinationId, status, errorType)
}

deviceFunctions = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "bulkerapp",
Subsystem: "ingest",
Name: "device_functions",
Help: "Device Functions enrichment status by destination Id",
}, []string{"destinationId", "status", "errorType"})
DeviceFunctions = func(destinationId, status string) prometheus.Counter {
return ingestedMessages.WithLabelValues(destinationId, status)
}

repositoryErrors = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "ingest",
Subsystem: "repository",
Expand Down
39 changes: 23 additions & 16 deletions ingest/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/http"
"net/http/pprof"
"regexp"
"strconv"
"strings"
"time"
)
Expand Down Expand Up @@ -318,31 +319,37 @@ func (r *Router) processSyncDestination(message *IngestMessage, stream *StreamWi
if len(functionDestinations) > 0 {
var err error
ids := utils.ArrayMap(functionDestinations, func(d *ShortDestinationConfig) string { return d.ConnectionId })
defer func() {
defer func(ids []string) {
for _, id := range ids {
if err != nil {
IngestedMessages(id, "error", "device functions error").Inc()
DeviceFunctions(id, "error").Inc()
} else {
DeviceFunctions(id, "success").Inc()
}
}
}()
}(ids)
req, err := http.NewRequest("POST", r.config.RotorURL+"/func/multi?ids="+strings.Join(ids, ","), bytes.NewReader(messageBytes))
if err != nil {
r.Errorf("failed to create rotor request for connections: %s: %v", ids, err)
}
req.Header.Set("Content-Type", "application/json")
res, err := r.httpClient.Do(req)
if err != nil {
r.Errorf("failed to send rotor request for device functions for connections: %s: %v", ids, err)
} else {
defer res.Body.Close()
//get body
body, err := io.ReadAll(res.Body)
if res.StatusCode != 200 || err != nil {
r.Errorf("Failed to send rotor request for device functions for connections: %s: status: %v body: %s", ids, res.StatusCode, string(body))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Request-Timeout-Ms", strconv.Itoa(r.config.DeviceFunctionsTimeoutMs))
var res *http.Response
res, err = r.httpClient.Do(req)
if err != nil {
r.Errorf("failed to send rotor request for device functions for connections: %s: %v", ids, err)
} else {
err = json.Unmarshal(body, &functionsResults)
if err != nil {
r.Errorf("Failed to unmarshal rotor response for connections: %s: %v", ids, err)
defer res.Body.Close()
//get body
var body []byte
body, err = io.ReadAll(res.Body)
if res.StatusCode != 200 || err != nil {
r.Errorf("Failed to send rotor request for device functions for connections: %s: status: %v body: %s", ids, res.StatusCode, string(body))
} else {
err = json.Unmarshal(body, &functionsResults)
if err != nil {
r.Errorf("Failed to unmarshal rotor response for connections: %s: %v", ids, err)
}
}
}
}
Expand Down

0 comments on commit 16a2fa7

Please sign in to comment.