Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: add prometheus server and client prometheus metrics #642

Merged
merged 7 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions api_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"math/rand"
"reflect"

proto "github.com/sourcegraph/zoekt/grpc/v1"
proto "github.com/sourcegraph/zoekt/cmd/zoekt-webserver/grpc/protos/zoekt/webserver/v1"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -243,11 +243,11 @@ func (lfm *LineFragmentMatch) ToProto() *proto.LineFragmentMatch {

func FlushReasonFromProto(p proto.FlushReason) FlushReason {
switch p {
case proto.FlushReason_TIMER_EXPIRED:
case proto.FlushReason_FLUSH_REASON_TIMER_EXPIRED:
return FlushReasonTimerExpired
case proto.FlushReason_FINAL_FLUSH:
case proto.FlushReason_FLUSH_REASON_FINAL_FLUSH:
return FlushReasonFinalFlush
case proto.FlushReason_MAX_SIZE:
case proto.FlushReason_FLUSH_REASON_MAX_SIZE:
return FlushReasonMaxSize
default:
return FlushReason(0)
Expand All @@ -257,13 +257,13 @@ func FlushReasonFromProto(p proto.FlushReason) FlushReason {
func (fr FlushReason) ToProto() proto.FlushReason {
switch fr {
case FlushReasonTimerExpired:
return proto.FlushReason_TIMER_EXPIRED
return proto.FlushReason_FLUSH_REASON_TIMER_EXPIRED
case FlushReasonFinalFlush:
return proto.FlushReason_FINAL_FLUSH
return proto.FlushReason_FLUSH_REASON_FINAL_FLUSH
case FlushReasonMaxSize:
return proto.FlushReason_MAX_SIZE
return proto.FlushReason_FLUSH_REASON_MAX_SIZE
default:
return proto.FlushReason_UNKNOWN
return proto.FlushReason_FLUSH_REASON_UNKNOWN_UNSPECIFIED
}
}

Expand Down Expand Up @@ -345,6 +345,14 @@ func (p *Progress) ToProto() *proto.Progress {
}
}

func SearchResultFromStreamProto(p *proto.StreamSearchResponse, repoURLs, lineFragments map[string]string) *SearchResult {
if p == nil {
return nil
}

return SearchResultFromProto(p.GetResponseChunk(), repoURLs, lineFragments)
}

func SearchResultFromProto(p *proto.SearchResponse, repoURLs, lineFragments map[string]string) *SearchResult {
if p == nil {
return nil
Expand Down Expand Up @@ -384,6 +392,14 @@ func (sr *SearchResult) ToProto() *proto.SearchResponse {
}
}

func (sr *SearchResult) ToStreamProto() *proto.StreamSearchResponse {
if sr == nil {
return nil
}

return &proto.StreamSearchResponse{ResponseChunk: sr.ToProto()}
}

func RepositoryBranchFromProto(p *proto.RepositoryBranch) RepositoryBranch {
return RepositoryBranch{
Name: p.GetName(),
Expand Down
57 changes: 39 additions & 18 deletions api_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
webserverproto "github.com/sourcegraph/zoekt/cmd/zoekt-webserver/grpc/protos/zoekt/webserver/v1"
"google.golang.org/protobuf/proto"

v1 "github.com/sourcegraph/zoekt/grpc/v1"
)

func TestProtoRoundtrip(t *testing.T) {
Expand Down Expand Up @@ -133,23 +132,45 @@ func TestProtoRoundtrip(t *testing.T) {
})

t.Run("SearchResult", func(t *testing.T) {
f := func(f1 *SearchResult) bool {
var repoURLs map[string]string
var lineFragments map[string]string
t.Run("unary", func(t *testing.T) {
f := func(f1 *SearchResult) bool {
var repoURLs map[string]string
var lineFragments map[string]string

if f1 != nil {
repoURLs = f1.RepoURLs
lineFragments = f1.LineFragments
}

if f1 != nil {
repoURLs = f1.RepoURLs
lineFragments = f1.LineFragments
p1 := f1.ToProto()
f2 := SearchResultFromProto(p1, repoURLs, lineFragments)

return reflect.DeepEqual(f1, f2)
}
if err := quick.Check(f, nil); err != nil {
t.Fatal(err)
}
})

p1 := f1.ToProto()
f2 := SearchResultFromProto(p1, repoURLs, lineFragments)
t.Run("stream", func(t *testing.T) {
f := func(f1 *SearchResult) bool {
var repoURLs map[string]string
var lineFragments map[string]string

return reflect.DeepEqual(f1, f2)
}
if err := quick.Check(f, nil); err != nil {
t.Fatal(err)
}
if f1 != nil {
repoURLs = f1.RepoURLs
lineFragments = f1.LineFragments
}

p1 := f1.ToStreamProto()
f2 := SearchResultFromStreamProto(p1, repoURLs, lineFragments)

return reflect.DeepEqual(f1, f2)
}
if err := quick.Check(f, nil); err != nil {
t.Fatal(err)
}
})
})

t.Run("Repository", func(t *testing.T) {
Expand Down Expand Up @@ -396,8 +417,8 @@ var (
exampleSearchResultBytes []byte

// The proto struct representation of the search result
exampleSearchResultProto = func() *v1.SearchResponse {
sr := new(v1.SearchResponse)
exampleSearchResultProto = func() *webserverproto.SearchResponse {
sr := new(webserverproto.SearchResponse)
err := proto.Unmarshal(exampleSearchResultBytes, sr)
if err != nil {
panic(err)
Expand Down Expand Up @@ -451,7 +472,7 @@ func BenchmarkProtoRoundtrip(b *testing.B) {
}

for _, buf := range buffers {
res := new(v1.SearchResponse)
res := new(webserverproto.SearchResponse)
err := proto.Unmarshal(buf, res)
if err != nil {
b.Fatal(err)
Expand Down
3 changes: 1 addition & 2 deletions cmd/zoekt-sourcegraph-indexserver/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"time"

"github.com/sourcegraph/log/logtest"

proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -28,7 +28,6 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"

"github.com/sourcegraph/zoekt"
proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
)

func TestIterateIndexOptions_Fingerprint(t *testing.T) {
Expand Down
28 changes: 27 additions & 1 deletion cmd/zoekt-sourcegraph-indexserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ import (
"text/tabwriter"
"time"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/openmetrics/v2"
"github.com/keegancsmith/tmpfriend"
"github.com/peterbourgon/ff/v3/ffcli"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
sglog "github.com/sourcegraph/log"
proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
"github.com/sourcegraph/zoekt/grpc/internalerrs"
"github.com/sourcegraph/zoekt/grpc/messagesize"
"go.uber.org/automaxprocs/maxprocs"
Expand All @@ -49,7 +51,6 @@ import (

"github.com/sourcegraph/zoekt"
"github.com/sourcegraph/zoekt/build"
proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
"github.com/sourcegraph/zoekt/debugserver"
"github.com/sourcegraph/zoekt/internal/profiler"
)
Expand Down Expand Up @@ -126,6 +127,9 @@ var (
Name: "index_num_stopped_tracking_total",
Help: "Counts the number of repos we stopped tracking.",
})

clientMetricsOnce sync.Once
clientMetrics *grpc_prometheus.ClientMetrics
)

// set of repositories that we want to capture separate indexing metrics for
Expand Down Expand Up @@ -1457,14 +1461,18 @@ func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB

func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
metrics := mustGetClientMetrics()

opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainStreamInterceptor(
grpc_prometheus.StreamClientInterceptor(metrics),
internalActorStreamInterceptor(),
internalerrs.LoggingStreamClientInterceptor(logger),
internalerrs.PrometheusStreamClientInterceptor,
),
grpc.WithChainUnaryInterceptor(
grpc_prometheus.UnaryClientInterceptor(metrics),
internalActorUnaryInterceptor(),
internalerrs.LoggingUnaryClientInterceptor(logger),
internalerrs.PrometheusUnaryClientInterceptor,
Expand Down Expand Up @@ -1494,6 +1502,24 @@ func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.Dia
return client, nil
}

// mustGetClientMetrics returns a singleton instance of the client metrics
// that are shared across all gRPC clients that this process creates.
//
// This function panics if the metrics cannot be registered with the default
// Prometheus registry.
func mustGetClientMetrics() *grpc_prometheus.ClientMetrics {
clientMetricsOnce.Do(func() {
clientMetrics = grpc_prometheus.NewRegisteredClientMetrics(prometheus.DefaultRegisterer,
grpc_prometheus.WithClientCounterOptions(),
grpc_prometheus.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
grpc_prometheus.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
grpc_prometheus.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
)
})

return clientMetrics
}

// addDefaultPort adds a default port to a URL if one is not specified.
//
// If the URL scheme is "http" and no port is specified, "80" is used.
Expand Down
3 changes: 1 addition & 2 deletions cmd/zoekt-sourcegraph-indexserver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ import (

sglog "github.com/sourcegraph/log"
"github.com/sourcegraph/log/logtest"

proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
ggilmore marked this conversation as resolved.
Show resolved Hide resolved
"github.com/xeipuuv/gojsonschema"
"google.golang.org/grpc"

"github.com/google/go-cmp/cmp"

"github.com/sourcegraph/zoekt"
proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
)

func TestServer_defaultArgs(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/zoekt-sourcegraph-indexserver/sg.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (

"github.com/go-git/go-git/v5"
retryablehttp "github.com/hashicorp/go-retryablehttp"
proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
"golang.org/x/net/trace"
"google.golang.org/grpc"

"github.com/sourcegraph/zoekt"
proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
)

// SourcegraphListResult is the return value of Sourcegraph.List. It is its
Expand Down
File renamed without changes.
7 changes: 7 additions & 0 deletions cmd/zoekt-webserver/grpc/protos/buf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
version: v1
breaking:
use:
- FILE
lint:
use:
- DEFAULT
Loading
Loading