Skip to content

Commit

Permalink
Add metrics for WatchDocuments and enhance pushpull metrics (#1008)
Browse files Browse the repository at this point in the history
This change improves our monitoring capabilities in two ways:

1. Enhances pushpull metrics with project and hostname labels:
   - Allows separation of metrics by project and hostname
   - Improves visibility for the Grafana dashboard

2. Adds new WatchDocuments metrics:
   - Event count: Increments on watch document events
   - Payload bytes: Measures size of broadcast events
   - Both metrics include labels for project, hostname, and doc event type

These additions will provide more granular insights into system performance
and usage patterns across different projects and event types.

---------

Co-authored-by: Youngteac Hong <[email protected]>
  • Loading branch information
emplam27 and hackerwins authored Sep 11, 2024
1 parent 3e49afb commit 9e36425
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 62 deletions.
5 changes: 5 additions & 0 deletions api/types/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,8 @@ type DocEventBody struct {
Topic string
Payload []byte
}

// PayloadLen returns the size of the payload.
func (b *DocEventBody) PayloadLen() int {
return len(b.Payload)
}
11 changes: 6 additions & 5 deletions server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,18 @@ func PushPull(

// 01. push changes: filter out the changes that are already saved in the database.
cpAfterPush, pushedChanges := pushChanges(ctx, clientInfo, docInfo, reqPack, initialServerSeq)
be.Metrics.AddPushPullReceivedChanges(reqPack.ChangesLen())
be.Metrics.AddPushPullReceivedOperations(reqPack.OperationsLen())
hostname := be.Config.Hostname
be.Metrics.AddPushPullReceivedChanges(hostname, project, reqPack.ChangesLen())
be.Metrics.AddPushPullReceivedOperations(hostname, project, reqPack.OperationsLen())

// 02. pull pack: pull changes or a snapshot from the database and create a response pack.
respPack, err := pullPack(ctx, be, clientInfo, docInfo, reqPack, cpAfterPush, initialServerSeq, opts.Mode)
if err != nil {
return nil, err
}
be.Metrics.AddPushPullSentChanges(respPack.ChangesLen())
be.Metrics.AddPushPullSentOperations(respPack.OperationsLen())
be.Metrics.AddPushPullSnapshotBytes(respPack.SnapshotLen())
be.Metrics.AddPushPullSentChanges(hostname, project, respPack.ChangesLen())
be.Metrics.AddPushPullSentOperations(hostname, project, respPack.OperationsLen())
be.Metrics.AddPushPullSnapshotBytes(hostname, project, respPack.SnapshotLen())

// 03. update the client's document and checkpoint.
docRefKey := docInfo.RefKey()
Expand Down
189 changes: 139 additions & 50 deletions server/profiling/prometheus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ import (
)

const (
namespace = "yorkie"
sdkTypeLabel = "sdk_type"
sdkVersionLabel = "sdk_version"
methodLabel = "grpc_method"
projectIDLabel = "project_id"
projectNameLabel = "project_name"
hostnameLabel = "hostname"
taskTypeLabel = "task_type"
namespace = "yorkie"
sdkTypeLabel = "sdk_type"
sdkVersionLabel = "sdk_version"
methodLabel = "grpc_method"
projectIDLabel = "project_id"
projectNameLabel = "project_name"
hostnameLabel = "hostname"
taskTypeLabel = "task_type"
docEventTypeLabel = "doc_event_type"
)

var (
Expand All @@ -54,18 +55,19 @@ type Metrics struct {
serverVersion *prometheus.GaugeVec
serverHandledCounter *prometheus.CounterVec

backgroundGoroutinesTotal *prometheus.GaugeVec

pushPullResponseSeconds prometheus.Histogram
pushPullReceivedChangesTotal prometheus.Counter
pushPullSentChangesTotal prometheus.Counter
pushPullReceivedOperationsTotal prometheus.Counter
pushPullSentOperationsTotal prometheus.Counter
pushPullReceivedChangesTotal *prometheus.CounterVec
pushPullSentChangesTotal *prometheus.CounterVec
pushPullReceivedOperationsTotal *prometheus.CounterVec
pushPullSentOperationsTotal *prometheus.CounterVec
pushPullSnapshotDurationSeconds prometheus.Histogram
pushPullSnapshotBytesTotal prometheus.Counter
pushPullSnapshotBytesTotal *prometheus.CounterVec

backgroundGoroutinesTotal *prometheus.GaugeVec

watchDocumentConnectionTotal *prometheus.GaugeVec
watchDocumentPayloadBytesTotal *prometheus.GaugeVec
watchDocumentConnectionsTotal *prometheus.GaugeVec
watchDocumentEventsTotal *prometheus.CounterVec
watchDocumentEventPayloadBytesTotal *prometheus.CounterVec

userAgentTotal *prometheus.CounterVec
}
Expand Down Expand Up @@ -101,56 +103,73 @@ func NewMetrics() (*Metrics, error) {
Name: "response_seconds",
Help: "The response time of PushPull.",
}),
pushPullReceivedChangesTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
pushPullReceivedChangesTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "pushpull",
Name: "received_changes_total",
Help: "The total count of changes included in request packs in PushPull.",
}, []string{
projectIDLabel,
projectNameLabel,
hostnameLabel,
}),
pushPullSentChangesTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
pushPullSentChangesTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "pushpull",
Name: "sent_changes_total",
Help: "The total count of changes included in response packs in PushPull.",
}, []string{
projectIDLabel,
projectNameLabel,
hostnameLabel,
}),
pushPullReceivedOperationsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "pushpull",
Name: "received_operations_total",
Help: "The total count of operations included in request packs in PushPull.",
}, []string{
projectIDLabel,
projectNameLabel,
hostnameLabel,
}),
pushPullReceivedOperationsTotal: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "pushpull",
Name: "received_operations_total",
Help: "The total count of operations included in request" +
" packs in PushPull.",
}),
pushPullSentOperationsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
pushPullSentOperationsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "pushpull",
Name: "sent_operations_total",
Help: "The total count of operations included in response" +
" packs in PushPull.",
Help: "The total count of operations included in response packs in PushPull.",
}, []string{
projectIDLabel,
projectNameLabel,
hostnameLabel,
}),
pushPullSnapshotDurationSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: "pushpull",
Name: "snapshot_duration_seconds",
Help: "The creation time of snapshot for response packs in PushPull.",
}),
pushPullSnapshotBytesTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
pushPullSnapshotBytesTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "pushpull",
Name: "snapshot_bytes_total",
Help: "The total bytes of snapshots for response packs in PushPull.",
}, []string{
projectIDLabel,
projectNameLabel,
hostnameLabel,
}),
backgroundGoroutinesTotal: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "background",
Name: "goroutines_total",
Help: "The total number of goroutines attached by a particular background task.",
}, []string{taskTypeLabel}),
watchDocumentConnectionTotal: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
watchDocumentConnectionsTotal: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "stream",
Name: "watch_document_stream_connection_total",
Help: "The total number of document watch stream connection.",
Name: "watch_document_stream_connections_total",
Help: "The total number of document watch stream connections.",
}, []string{
projectIDLabel,
projectNameLabel,
Expand All @@ -169,6 +188,28 @@ func NewMetrics() (*Metrics, error) {
projectNameLabel,
hostnameLabel,
}),
watchDocumentEventsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "stream",
Name: "watch_document_events_total",
Help: "The total number of events in document watch stream connections.",
}, []string{
projectIDLabel,
projectNameLabel,
hostnameLabel,
docEventTypeLabel,
}),
watchDocumentEventPayloadBytesTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "stream",
Name: "watch_document_event_payload_bytes_total",
Help: "The total bytes of event payloads in document watch stream connections.",
}, []string{
projectIDLabel,
projectNameLabel,
hostnameLabel,
docEventTypeLabel,
}),
}

metrics.serverVersion.With(prometheus.Labels{
Expand All @@ -186,26 +227,42 @@ func (m *Metrics) ObservePushPullResponseSeconds(seconds float64) {

// AddPushPullReceivedChanges sets the number of changes
// included in the request pack of PushPull.
func (m *Metrics) AddPushPullReceivedChanges(count int) {
m.pushPullReceivedChangesTotal.Add(float64(count))
func (m *Metrics) AddPushPullReceivedChanges(hostname string, project *types.Project, count int) {
m.pushPullReceivedChangesTotal.With(prometheus.Labels{
projectIDLabel: project.ID.String(),
projectNameLabel: project.Name,
hostnameLabel: hostname,
}).Add(float64(count))
}

// AddPushPullSentChanges adds the number of changes
// included in the response pack of PushPull.
func (m *Metrics) AddPushPullSentChanges(count int) {
m.pushPullSentChangesTotal.Add(float64(count))
func (m *Metrics) AddPushPullSentChanges(hostname string, project *types.Project, count int) {
m.pushPullSentChangesTotal.With(prometheus.Labels{
projectIDLabel: project.ID.String(),
projectNameLabel: project.Name,
hostnameLabel: hostname,
}).Add(float64(count))
}

// AddPushPullReceivedOperations sets the number of operations
// included in the request pack of PushPull.
func (m *Metrics) AddPushPullReceivedOperations(count int) {
m.pushPullReceivedOperationsTotal.Add(float64(count))
func (m *Metrics) AddPushPullReceivedOperations(hostname string, project *types.Project, count int) {
m.pushPullReceivedOperationsTotal.With(prometheus.Labels{
projectIDLabel: project.ID.String(),
projectNameLabel: project.Name,
hostnameLabel: hostname,
}).Add(float64(count))
}

// AddPushPullSentOperations adds the number of operations
// included in the response pack of PushPull.
func (m *Metrics) AddPushPullSentOperations(count int) {
m.pushPullSentOperationsTotal.Add(float64(count))
func (m *Metrics) AddPushPullSentOperations(hostname string, project *types.Project, count int) {
m.pushPullSentOperationsTotal.With(prometheus.Labels{
projectIDLabel: project.ID.String(),
projectNameLabel: project.Name,
hostnameLabel: hostname,
}).Add(float64(count))
}

// ObservePushPullSnapshotDurationSeconds adds an observation
Expand All @@ -215,8 +272,12 @@ func (m *Metrics) ObservePushPullSnapshotDurationSeconds(seconds float64) {
}

// AddPushPullSnapshotBytes adds the snapshot byte size of response pack.
func (m *Metrics) AddPushPullSnapshotBytes(bytes int) {
m.pushPullSnapshotBytesTotal.Add(float64(bytes))
func (m *Metrics) AddPushPullSnapshotBytes(hostname string, project *types.Project, bytes int) {
m.pushPullSnapshotBytesTotal.With(prometheus.Labels{
projectIDLabel: project.ID.String(),
projectNameLabel: project.Name,
hostnameLabel: hostname,
}).Add(float64(bytes))
}

// AddUserAgent adds the number of user agent.
Expand Down Expand Up @@ -270,24 +331,52 @@ func (m *Metrics) RemoveBackgroundGoroutines(taskType string) {
}).Dec()
}

// AddWatchDocumentConnection adds the number of document watch stream connection.
func (m *Metrics) AddWatchDocumentConnection(hostname string, project *types.Project) {
m.watchDocumentConnectionTotal.With(prometheus.Labels{
// AddWatchDocumentConnections adds the number of document watch stream connection.
func (m *Metrics) AddWatchDocumentConnections(hostname string, project *types.Project) {
m.watchDocumentConnectionsTotal.With(prometheus.Labels{
projectIDLabel: project.ID.String(),
projectNameLabel: project.Name,
hostnameLabel: hostname,
}).Inc()
}

// RemoveWatchDocumentConnection removes the number of document watch stream connection.
func (m *Metrics) RemoveWatchDocumentConnection(hostname string, project *types.Project) {
m.watchDocumentConnectionTotal.With(prometheus.Labels{
// RemoveWatchDocumentConnections removes the number of document watch stream connection.
func (m *Metrics) RemoveWatchDocumentConnections(hostname string, project *types.Project) {
m.watchDocumentConnectionsTotal.With(prometheus.Labels{
projectIDLabel: project.ID.String(),
projectNameLabel: project.Name,
hostnameLabel: hostname,
}).Dec()
}

// AddWatchDocumentEvents adds the number of events in document watch stream connections.
func (m *Metrics) AddWatchDocumentEvents(hostname string, project *types.Project, docEventType types.DocEventType) {
m.watchDocumentEventsTotal.With(prometheus.Labels{
projectIDLabel: project.ID.String(),
projectNameLabel: project.Name,
hostnameLabel: hostname,
docEventTypeLabel: string(docEventType),
}).Inc()
}

// AddWatchDocumentEventPayloadBytes adds the bytes of event payload in document watch stream connections.
func (m *Metrics) AddWatchDocumentEventPayloadBytes(hostname string, project *types.Project,
docEventType types.DocEventType, bytes int) {
m.watchDocumentEventsTotal.With(prometheus.Labels{
projectIDLabel: project.ID.String(),
projectNameLabel: project.Name,
hostnameLabel: hostname,
docEventTypeLabel: string(docEventType),
}).Inc()

m.watchDocumentEventPayloadBytesTotal.With(prometheus.Labels{
projectIDLabel: project.ID.String(),
projectNameLabel: project.Name,
hostnameLabel: hostname,
docEventTypeLabel: string(docEventType),
}).Add(float64(bytes))
}

// Registry returns the registry of this metrics.
func (m *Metrics) Registry() *prometheus.Registry {
return m.registry
Expand Down
Loading

0 comments on commit 9e36425

Please sign in to comment.