Skip to content

Commit

Permalink
Add shallow unmarshal step to MultiSearch
Browse files Browse the repository at this point in the history
  • Loading branch information
ddelemeny committed Apr 22, 2024
1 parent b273a7a commit e463295
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 17 deletions.
11 changes: 8 additions & 3 deletions pkg/quickwit/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ConfiguredFields struct {

// Client represents a client which can interact with elasticsearch api
type Client interface {
ExecuteMultisearch(r []*SearchRequest) (*MultiSearchResponse, error)
ExecuteMultisearch(r []*SearchRequest) ([]*json.RawMessage, error)
}

var logger = log.New()
Expand Down Expand Up @@ -77,7 +77,12 @@ func (c *baseClientImpl) makeRequest(method, uriPath, uriQuery string, body []by
return req, nil
}

func (c *baseClientImpl) ExecuteMultisearch(requests []*SearchRequest) (*MultiSearchResponse, error) {
// Multisearch uses a shallow unmarshalled struct to defer the decoding to downstream handlers
type MultiSearchResponse struct {
Responses []*json.RawMessage `json:"responses"`
}

func (c *baseClientImpl) ExecuteMultisearch(requests []*SearchRequest) ([]*json.RawMessage, error) {
req, err := c.createMultiSearchRequest(requests, c.index)
if err != nil {
return nil, err
Expand Down Expand Up @@ -122,7 +127,7 @@ func (c *baseClientImpl) ExecuteMultisearch(requests []*SearchRequest) (*MultiSe
elapsed := time.Since(start)
logger.Debug("Decoded multisearch json response", "took", elapsed)

return &msr, nil
return msr.Responses, nil
}

func (c *baseClientImpl) makeMultiSearchPayload(searchRequests []*SearchRequest, index string) ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/quickwit/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestClient_ExecuteMultisearch(t *testing.T) {

assert.Equal(t, "15s", jBody.GetPath("aggs", "2", "date_histogram", "fixed_interval").MustString())

require.Len(t, res.Responses, 1)
require.Len(t, res, 1)
})
}

Expand Down
5 changes: 0 additions & 5 deletions pkg/quickwit/client/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ type SearchResponse struct {
Hits *SearchResponseHits `json:"hits"`
}

// MultiSearchResponse represents a multi search response
type MultiSearchResponse struct {
Responses []*SearchResponse `json:"responses"`
}

// Query represents a query
type Query struct {
Bool *BoolQuery `json:"bool"`
Expand Down
6 changes: 3 additions & 3 deletions pkg/quickwit/data_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1667,9 +1667,9 @@ func newFakeClient() *fakeClient {
}
}

func (c *fakeClient) ExecuteMultisearch(r []*es.SearchRequest) (*es.MultiSearchResponse, error) {
func (c *fakeClient) ExecuteMultisearch(r []*es.SearchRequest) ([]*json.RawMessage, error) {
c.multisearchRequests = append(c.multisearchRequests, r)
return c.multiSearchResponse, c.multiSearchError
return c.multiSearchResponse.Responses, c.multiSearchError
}

func newDataQuery(body string) (backend.QueryDataRequest, error) {
Expand Down Expand Up @@ -1721,5 +1721,5 @@ func executeElasticsearchDataQuery(c es.Client, body string, from, to time.Time)
return &backend.QueryDataResponse{}, err
}

return parseResponse(res.Responses, queries, configuredFields)
return parseResponse(res, queries, configuredFields)
}
2 changes: 1 addition & 1 deletion pkg/quickwit/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func queryData(ctx context.Context, dataQueries []backend.DataQuery, dsInfo *es.
return &backend.QueryDataResponse{}, err
}

return parseResponse(res.Responses, queries, dsInfo.ConfiguredFields)
return parseResponse(res, queries, dsInfo.ConfiguredFields)
}

func handleQuickwitErrors(err error) (*backend.QueryDataResponse, error) {
Expand Down
14 changes: 10 additions & 4 deletions pkg/quickwit/response_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,23 @@ const (

var searchWordsRegex = regexp.MustCompile(regexp.QuoteMeta(es.HighlightPreTagsString) + `(.*?)` + regexp.QuoteMeta(es.HighlightPostTagsString))

func parseResponse(responses []*es.SearchResponse, targets []*Query, configuredFields es.ConfiguredFields) (*backend.QueryDataResponse, error) {
func parseResponse(rawResponses []*json.RawMessage, targets []*Query, configuredFields es.ConfiguredFields) (*backend.QueryDataResponse, error) {
result := backend.QueryDataResponse{
Responses: backend.Responses{},
}
if responses == nil {
if rawResponses == nil {
return &result, nil
}

for i, res := range responses {
target := targets[i]
for i, rawRes := range rawResponses {
var res *es.SearchResponse
err := json.Unmarshal([]byte(*rawRes), &res)
if nil != err {
qwlog.Debug("Failed to unmarshal response", "err", err.Error(), "byteRes", *rawRes)
continue
}

target := targets[i]
if res.Error != nil {
errResult := getErrorFromElasticResponse(res)
result.Responses[target.RefID] = backend.DataResponse{
Expand Down

0 comments on commit e463295

Please sign in to comment.