Skip to content

Commit

Permalink
grpc: zoekt-webserver: stream search: break up file matches across mu…
Browse files Browse the repository at this point in the history
…ltiple messages (#636)
  • Loading branch information
ggilmore authored Aug 16, 2023
1 parent 956d775 commit fcb279a
Show file tree
Hide file tree
Showing 10 changed files with 1,134 additions and 531 deletions.
23 changes: 12 additions & 11 deletions api_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (p *Progress) ToProto() *proto.Progress {
}
}

func SearchResultFromProto(p *proto.SearchResponse) *SearchResult {
func SearchResultFromProto(p *proto.SearchResponse, repoURLs, lineFragments map[string]string) *SearchResult {
if p == nil {
return nil
}
Expand All @@ -356,11 +356,13 @@ func SearchResultFromProto(p *proto.SearchResponse) *SearchResult {
}

return &SearchResult{
Stats: StatsFromProto(p.GetStats()),
Progress: ProgressFromProto(p.GetProgress()),
Files: files,
RepoURLs: p.RepoUrls,
LineFragments: p.LineFragments,
Stats: StatsFromProto(p.GetStats()),
Progress: ProgressFromProto(p.GetProgress()),

Files: files,

RepoURLs: repoURLs,
LineFragments: lineFragments,
}
}

Expand All @@ -375,11 +377,10 @@ func (sr *SearchResult) ToProto() *proto.SearchResponse {
}

return &proto.SearchResponse{
Stats: sr.Stats.ToProto(),
Progress: sr.Progress.ToProto(),
Files: files,
RepoUrls: sr.RepoURLs,
LineFragments: sr.LineFragments,
Stats: sr.Stats.ToProto(),
Progress: sr.Progress.ToProto(),

Files: files,
}
}

Expand Down
13 changes: 11 additions & 2 deletions api_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,17 @@ func TestProtoRoundtrip(t *testing.T) {

t.Run("SearchResult", func(t *testing.T) {
f := func(f1 *SearchResult) bool {
var repoURLs map[string]string
var lineFragments map[string]string

if f1 != nil {
repoURLs = f1.RepoURLs
lineFragments = f1.LineFragments
}

p1 := f1.ToProto()
f2 := SearchResultFromProto(p1)
f2 := SearchResultFromProto(p1, repoURLs, lineFragments)

return reflect.DeepEqual(f1, f2)
}
if err := quick.Check(f, nil); err != nil {
Expand Down Expand Up @@ -397,7 +406,7 @@ var (
}()

// The non-proto struct representation of the search result
exampleSearchResultGo = SearchResultFromProto(exampleSearchResultProto)
exampleSearchResultGo = SearchResultFromProto(exampleSearchResultProto, nil, nil)
)

func BenchmarkGobRoundtrip(b *testing.B) {
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ require (
github.com/RoaringBitmap/roaring v1.3.0
github.com/andygrunwald/go-gerrit v0.0.0-20230628115649-c44fe2fbf2ca
github.com/bmatcuk/doublestar v1.3.4
github.com/dustin/go-humanize v1.0.1
github.com/edsrzf/mmap-go v1.1.0
github.com/felixge/fgprof v0.9.3
github.com/fsnotify/fsnotify v1.6.0
Expand All @@ -30,6 +31,7 @@ require (
github.com/sourcegraph/go-ctags v0.0.0-20230111110657-c27675da7f71
github.com/sourcegraph/log v0.0.0-20230523201558-ad2d71b4d2ee
github.com/sourcegraph/mountinfo v0.0.0-20230106004439-7026e28cef67
github.com/stretchr/testify v1.8.3
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible
github.com/xanzy/go-gitlab v0.86.0
Expand Down Expand Up @@ -73,6 +75,7 @@ require (
github.com/cockroachdb/errors v1.10.0 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/getsentry/sentry-go v0.22.0 // indirect
Expand Down Expand Up @@ -106,6 +109,7 @@ require (
github.com/moby/sys/mountinfo v0.6.2 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/pjbgf/sha1cd v0.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
Expand Down Expand Up @@ -133,6 +137,7 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20230628200519-e449d1ea0e82 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230628200519-e449d1ea0e82 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

go 1.18
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ=
github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q=
github.com/elazarl/goproxy v0.0.0-20221015165544-a0805db90819 h1:RIB4cRk+lBqKK3Oy0r2gRX4ui7tuhiZq2SuTtTCi0/0=
Expand Down
100 changes: 100 additions & 0 deletions grpc/chunk/chunker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Package chunk provides a utility for sending sets of protobuf messages in
// groups of smaller chunks. This is useful for gRPC, which has limitations around the maximum
// size of a message that you can send.
//
// This code is adapted from the gitaly project, which is licensed
// under the MIT license. A copy of that license text can be found at
// https://mit-license.org/.
//
// The code this file was based off can be found here: https://gitlab.com/gitlab-org/gitaly/-/blob/v16.2.0/internal/helper/chunk/chunker.go
package chunk

import (
"google.golang.org/protobuf/proto"
)

// New returns a new Chunker that will use the given sendFunc to send chunks of messages.
func New[T proto.Message](sendFunc func([]T) error) *Chunker[T] {
return &Chunker[T]{sendFunc: sendFunc}
}

// Chunker lets you spread items you want to send over multiple chunks.
// This type is not thread-safe.
type Chunker[T proto.Message] struct {
sendFunc func([]T) error // sendFunc is the function that will be invoked when a chunk is ready to be sent.

buffer []T // buffer stores the items that will be sent when the sendFunc is invoked.
sizeBytes int // sizeBytes is the size of the current chunk in bytes.
}

// maxMessageSize is the maximum size per protobuf message
const maxMessageSize = 1 * 1024 * 1024 // 1 MiB

// Send will append the provided items to the current chunk, and send the chunk if it is full.
//
// Callers should ensure that they call Flush() after the last call to Send().
func (c *Chunker[T]) Send(items ...T) error {
for _, item := range items {
if err := c.sendOne(item); err != nil {
return err
}
}

return nil
}

func (c *Chunker[T]) sendOne(item T) error {
itemSize := proto.Size(item)

if itemSize+c.sizeBytes >= maxMessageSize {
if err := c.sendResponseMsg(); err != nil {
return err
}
}

c.buffer = append(c.buffer, item)
c.sizeBytes += itemSize

return nil
}

func (c *Chunker[T]) sendResponseMsg() error {
c.sizeBytes = 0

err := c.sendFunc(c.buffer)
if err != nil {
return err
}

c.buffer = c.buffer[:0]
return nil
}

// Flush sends remaining items in the current chunk, if any.
func (c *Chunker[T]) Flush() error {
if len(c.buffer) == 0 {
return nil
}

err := c.sendResponseMsg()
if err != nil {
return err
}

return nil
}

// SendAll is a convenience function that immediately sends all provided items in smaller chunks using the provided
// sendFunc.
//
// See the documentation for Chunker.Send() for more information.
func SendAll[T proto.Message](sendFunc func([]T) error, items ...T) error {
c := New(sendFunc)

err := c.Send(items...)
if err != nil {
return err
}

return c.Flush()
}
Loading

0 comments on commit fcb279a

Please sign in to comment.