Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite to allow easier error handeling #3

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 198 additions & 28 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package quickwit

import (
"bytes"
"encoding/json"
"fmt"

"github.com/imroc/req/v3"
"io"
"net/http"
"strings"
)

type ErrorMessage struct {
Expand All @@ -16,30 +19,15 @@ func (msg *ErrorMessage) Error() string {
}

type QuickwitClient struct {
*req.Client
endpoint string
client *http.Client
}

// endpoint is the root url of the quickwit server.
func NewQuickwitClient(endpoint string) *QuickwitClient {
return &QuickwitClient{
Client: req.C().
SetBaseURL(endpoint).
SetCommonErrorResult(&ErrorMessage{}).
EnableDumpEachRequest().
OnAfterResponse(func(client *req.Client, resp *req.Response) error {
if resp.Err != nil { // There is an underlying error, e.g. network error or unmarshal error.
return nil
}
if errMsg, ok := resp.ErrorResult().(*ErrorMessage); ok {
resp.Err = errMsg // Convert api error into go error
return nil
}
if !resp.IsSuccessState() {
// Neither a success response nor a error response, record details to help troubleshooting
resp.Err = fmt.Errorf("bad status: %s\nraw content:\n%s", resp.Status, resp.Dump())
}
return nil
}),
endpoint: endpoint,
client: &http.Client{},
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is often desirable to make the underlying http client configurable. What do you think about exposing both the endpoint and the client, so that they can be set/overridden externally?

type QuickwitClient struct {
	Endpoint string
	Client   *http.Client
}

The current

func NewQuickwitClient(endpoint string) *QuickwitClient{
  …
}

could stay as-is for now until more options are needed.

}
}

Expand All @@ -60,11 +48,193 @@ type SearchResponse struct {
Aggregations interface{} `json:"aggregations,omitempty"`
}

func (c *QuickwitClient) Search(indexId string, searchRequest SearchRequest) (searchResponse *SearchResponse, err error) {
_, err = c.R().
SetPathParam("indexId", indexId).
SetBody(searchRequest).
SetSuccessResult(&searchResponse).
Post("/api/v1/{indexId}/search")
return
type FieldMapping struct {
Name string `json:"name,omitempty"`
Type string `json:"type,omitempty"`
Fast bool `json:"fast,omitempty"`
InputFormat string `json:"input_format,omitempty"`
InputFormats []string `json:"input_formats,omitempty"`
FastPrecision string `json:"fast_precision,omitempty"`
Record string `json:"record,omitempty"`
Description string `json:"description,omitempty"`
Stored bool `json:"stored,omitempty"`
Indexed bool `json:"indexed,omitempty"`
Tokenizer string `json:"tokenizer,omitempty"`
Fieldnorms bool `json:"fieldnorms,omitempty"`
OutputFormat string `json:"output_format,omitempty"`
}

type DocMapping struct {
FieldMappings []*FieldMapping `json:"field_mappings,omitempty"`
Mode string `json:"mode,omitempty"`
DynamicMapping interface{} `json:"dynamic_mapping,omitempty"`
TagFields []string `json:"tag_fields,omitempty"`
StoreSource bool `json:"store_source,omitempty"`
TimestampField string `json:"timestamp_field,omitempty"`
PartitionKey string `json:"partition_key,omitempty"`
MaxNumPartitions uint64 `json:"max_num_partitions,omitempty"`
IndexFieldPresence bool `json:"index_field_presence,omitempty"`
}
type MergePolicy struct {
Type string `json:"type,omitempty"`
MaxMergeOps uint64 `json:"max_merge_ops,omitempty"`
MergeFactor uint64 `json:"merge_factor,omitempty"`
MaxMergeFactor uint64 `json:"max_merge_factor,omitempty"`
}
type Resources struct {
HeapSize string `json:"heap_size,omitempty"`
MaxMergeWriteThroughput uint64 `json:"max_merge_write_throughput,omitempty"`
}
type IndexSettings struct {
CommitTimeoutSecs uint64 `json:"commit_timeout_secs,omitempty"`
SplitNumDocsTarget uint64 `json:"split_num_docs_target,omitempty"`
MergePolicy *MergePolicy `json:"merge_policy,omitempty"`
Resources *Resources `json:"resources,omitempty"`
DocstoreCompressionLevel uint64 `json:"docstore_compression_level,omitempty"`
DocstoreBlocksize uint64 `json:"docstore_blocksize,omitempty"`
}
type SearchSettings struct {
DefaultSearchFields []string `json:"default_search_fields,omitempty"`
}

type Retention struct {
Period string `json:"period,omitempty"`
Schedule string `json:"schedule,omitempty"`
}

type CreateIndexRequest struct {
Version string `json:"version,omitempty"`
IndexId string `json:"index_id,omitempty"`
IndexUri string `json:"index_uri,omitempty"`
DocMapping *DocMapping `json:"doc_mapping,omitempty"`
IndexingSettings *IndexSettings `json:"indexing_settings,omitempty"`
SearchSettings *SearchSettings `json:"search_settings,omitempty"`
Retention *Retention `json:"retention,omitempty"`
}

type DeleteIndexResponse struct {
SplitId string `json:"split_id"`
NumDocs uint64 `json:"num_docs"`
UncompressedDocsSizeBytes uint64 `json:"uncompressed_docs_size_bytes"`
FileName string `json:"file_name"`
FileSizeBytes uint64 `json:"file_size_bytes"`
}

type IngestRequest struct {
Documents []interface{}
}

type IngestResponse struct {
NumDocsForProcessing uint64 `json:"num_docs_for_processing"`
}

func (c *QuickwitClient) SearchMultiIndex(indexIds []string, searchRequest SearchRequest) (*SearchResponse, error) {
return c.Search(strings.Join(indexIds, ","), searchRequest)
}

func (c *QuickwitClient) Search(indexId string, searchRequest SearchRequest) (*SearchResponse, error) {
BinaryData, err := json.Marshal(searchRequest)
if err != nil {
return nil, err
}
post, err := c.client.Post(fmt.Sprintf("%s/api/v1/%s/search", c.endpoint, indexId), "application/json", bytes.NewReader(BinaryData))
if err != nil {
return nil, err
}
defer post.Body.Close()
if post.StatusCode != http.StatusOK {
msg, err := io.ReadAll(post.Body)
if err != nil {
return nil, err
}
var errmsg ErrorMessage
err = json.Unmarshal(msg, &errmsg)
if err != nil {
return nil, fmt.Errorf("bad status:%d\n raw body: %s", post.StatusCode, msg)
}
return nil, &errmsg
}
return &SearchResponse{}, nil
Copy link

@pims pims Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that the quickwit response is never returned.
A small change like the following should get us a functional client! (tests needed of course)

diff --git a/client.go b/client.go
index 28b02c8..8da87ec 100644
--- a/client.go
+++ b/client.go
@@ -142,6 +142,7 @@ func (c *QuickwitClient) Search(indexId string, searchRequest SearchRequest) (*S
                return nil, err
        }
        defer post.Body.Close()
+
        if post.StatusCode != http.StatusOK {
                msg, err := io.ReadAll(post.Body)
                if err != nil {
@@ -154,7 +155,9 @@ func (c *QuickwitClient) Search(indexId string, searchRequest SearchRequest) (*S
                }
                return nil, &errmsg
        }
-       return &SearchResponse{}, nil
+       var searchResponse SearchResponse
+       err = json.NewDecoder(post.Body).Decode(&searchResponse)
+       return &searchResponse, err
 }

There's probably a way to reduce some amount of duplication across all methods but that doesn't have to be done today.

}

func (c *QuickwitClient) Ingest(indexId string, ingestRequest IngestRequest) (*IngestResponse, error) {
var postdata []byte
for _, doc := range ingestRequest.Documents {
docBytes, err := json.Marshal(doc)
if err != nil {
return nil, err
}
postdata = append(postdata, docBytes...)
postdata = append(postdata, '\n')

}
post, err := c.client.Post(fmt.Sprintf("%s/api/v1/%s/ingest", c.endpoint, indexId), "application/json", bytes.NewReader(postdata))
if err != nil {
return nil, err
}
defer post.Body.Close()
if post.StatusCode != http.StatusOK {
msg, err := io.ReadAll(post.Body)
if err != nil {
return nil, err
}
return nil, fmt.Errorf("bad status:%d\n raw body: %s", post.StatusCode, msg)
}
var ingestResponse IngestResponse
err = json.NewDecoder(post.Body).Decode(&ingestResponse)
if err != nil {
return nil, err
}
return &ingestResponse, nil
}

func (c *QuickwitClient) DeleteIndex(indexId string) (*[]DeleteIndexResponse, error) {
req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/api/v1/%s", c.endpoint, indexId), nil)
if err != nil {
return nil, err
}
do, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer do.Body.Close()
if do.StatusCode != http.StatusOK {
msg, err := io.ReadAll(do.Body)
if err != nil {
return nil, err
}
return nil, fmt.Errorf("bad status:%d\n raw body: %s", do.StatusCode, msg)
}
var deleteIndexResponse []DeleteIndexResponse
err = json.NewDecoder(do.Body).Decode(&deleteIndexResponse)
if err != nil {
return nil, err
}
return &deleteIndexResponse, nil

}

func (c *QuickwitClient) CreateIndex(createIndexRequest CreateIndexRequest) error {
BinaryData, err := json.Marshal(createIndexRequest)
if err != nil {
return err
}
post, err := c.client.Post(fmt.Sprintf("%s/api/v1/indexes", c.endpoint), "application/json", bytes.NewReader(BinaryData))
if err != nil {
return err
}
defer post.Body.Close()
if post.StatusCode != http.StatusOK {
msg, err := io.ReadAll(post.Body)
if err != nil {
return err
}
var errmsg ErrorMessage
err = json.Unmarshal(msg, &errmsg)
if err != nil {
return fmt.Errorf("bad status:%d\n raw body: %s", post.StatusCode, msg)
}
return &errmsg
}
return nil
}
92 changes: 91 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func TestClient_Search(t *testing.T) {
qClient := NewQuickwitClient("http://localhost:7280")
httpmock.ActivateNonDefault(qClient.GetClient())
httpmock.ActivateNonDefault(qClient.client)
httpmock.RegisterResponder("POST", "http://localhost:7280/api/v1/otel-logs-v0/search", func(request *http.Request) (*http.Response, error) {
respBody := `{"hits": [], "num_hits": 0, "elapsed_time_micros": 100}`
resp := httpmock.NewStringResponse(http.StatusOK, respBody)
Expand All @@ -25,3 +25,93 @@ func TestClient_Search(t *testing.T) {
require.Equal(t, uint64(0), searchResponse.NumHits)
require.Equal(t, 0, len(searchResponse.Hits))
}

func TestClient_ExpectedError(t *testing.T) {
qClient := NewQuickwitClient("http://localhost:7280")
httpmock.ActivateNonDefault(qClient.client)
httpmock.RegisterResponder("POST", "http://localhost:7280/api/v1/otel-logs-v0/search", func(request *http.Request) (*http.Response, error) {
respBody := `{"message":"test error"}`
resp := httpmock.NewStringResponse(http.StatusInternalServerError, respBody)
resp.Header.Set("Content-Type", "application/json; charset=utf-8")
return resp, nil
})
_, err := qClient.Search("otel-logs-v0", SearchRequest{Query: "test"})
require.Error(t, err)
require.Equal(t, "API Error: test error", err.Error())
}

func TestClient_UnExpectedError(t *testing.T) {
qClient := NewQuickwitClient("http://localhost:7280")
httpmock.ActivateNonDefault(qClient.client)
httpmock.RegisterResponder("POST", "http://localhost:7280/api/v1/otel-logs-v0/search", func(request *http.Request) (*http.Response, error) {
respBody := `test unexpected error`
resp := httpmock.NewStringResponse(http.StatusInternalServerError, respBody)
resp.Header.Set("Content-Type", "application/json; charset=utf-8")
return resp, nil
})
_, err := qClient.Search("otel-logs-v0", SearchRequest{Query: "test"})
require.Error(t, err)
require.Equal(t, "bad status:500\n raw body: test unexpected error", err.Error())
}

func TestClient_Ingest(t *testing.T) {
qClient := NewQuickwitClient("http://localhost:7280")
httpmock.ActivateNonDefault(qClient.client)
httpmock.RegisterResponder("POST", "http://localhost:7280/api/v1/otel-logs-v0/ingest", func(request *http.Request) (*http.Response, error) {
respBody := `{"num_docs_for_processing": 3}`
resp := httpmock.NewStringResponse(http.StatusOK, respBody)
resp.Header.Set("Content-Type", "application/json; charset=utf-8")
return resp, nil
})
type dummy struct {
Url string `json:"url"`
Title string `json:"title"`
Body string `json:"body"`
}

ingest := IngestRequest{
Documents: []interface{}{
dummy{
Url: "https://en.wikipedia.org/wiki?id=1",
Title: "foo",
Body: "foo",
},
dummy{
Url: "https://en.wikipedia.org/wiki?id=2",
Title: "bar",
Body: "bar",
},
dummy{
Url: "https://en.wikipedia.org/wiki?id=3",
Title: "baz",
Body: "baz",
},
},
}

ingestResponse, err := qClient.Ingest("otel-logs-v0", ingest)
if err != nil {
t.Error(err)
}
require.NoError(t, err)
require.Equal(t, uint64(3), ingestResponse.NumDocsForProcessing)
}

func TestClient_DeleteIndex(t *testing.T) {
qClient := NewQuickwitClient("http://localhost:7280")
httpmock.ActivateNonDefault(qClient.client)
httpmock.RegisterResponder("DELETE", "http://localhost:7280/api/v1/otel-logs-v0", func(request *http.Request) (*http.Response, error) {
respBody := `[ { "split_id": "01GK1XNAECH7P14850S9VV6P94", "num_docs": 1337, "uncompressed_docs_size_bytes": 23933408, "file_name": "01GK1XNAECH7P14850S9VV6P94.split", "file_size_bytes": 2991676 } ]`
resp := httpmock.NewStringResponse(http.StatusOK, respBody)
resp.Header.Set("Content-Type", "application/json; charset=utf-8")
return resp, nil
})
deleteResponse, err := qClient.DeleteIndex("otel-logs-v0")
if err != nil {
t.Error(err)
}
require.NoError(t, err)
require.Equal(t, 1, len(*deleteResponse))
require.Equal(t, "01GK1XNAECH7P14850S9VV6P94", (*deleteResponse)[0].SplitId)
require.Equal(t, uint64(1337), (*deleteResponse)[0].NumDocs)
}
28 changes: 6 additions & 22 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,34 +1,18 @@
module github.com/quickwit-oss/quickwit-go

go 1.18
go 1.21

toolchain go1.22.1

require (
github.com/imroc/req/v3 v3.33.1
github.com/jarcoal/httpmock v1.3.0
github.com/stretchr/testify v1.5.1
github.com/stretchr/testify v1.6.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/onsi/ginkgo/v2 v2.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/qtls-go1-18 v0.2.0 // indirect
github.com/quic-go/qtls-go1-19 v0.2.0 // indirect
github.com/quic-go/qtls-go1-20 v0.1.0 // indirect
github.com/quic-go/quic-go v0.32.0 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading