Skip to content

Commit

Permalink
v2 unit tests refactoring, validate responses
Browse files Browse the repository at this point in the history
  • Loading branch information
pdelewski committed Dec 27, 2024
1 parent 31a39e1 commit e4d71b0
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 41 deletions.
65 changes: 56 additions & 9 deletions quesma/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@ func Test_Main(m *testing.T) {
_ = buildIngestOnlyQuesma()
}

func emitRequests(stop chan os.Signal) {
func emitRequests(stop chan os.Signal, t *testing.T, testData []struct {
url string
expectedResponse string
}) {
go func() {
time.Sleep(1 * time.Second)
requestBody := []byte(`{"query": {"match_all": {}}}`)
sendRequest("http://localhost:8888/_bulk", requestBody)
sendRequest("http://localhost:8888/_doc", requestBody)
sendRequest("http://localhost:8888/_search", requestBody)
sendRequest("http://localhost:8888/_search", requestBody)
var resp string
var err error
for _, test := range testData {
resp, err = sendRequest(test.url, requestBody)
assert.NoError(t, err)
assert.Contains(t, test.expectedResponse, resp)
}
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
close(stop)
}()
Expand Down Expand Up @@ -144,7 +150,16 @@ func Test_fallbackScenario(t *testing.T) {
q1, _ := qBuilder.Build()
q1.Start()
stop := make(chan os.Signal, 1)
emitRequests(stop)
testData := []struct {
url string
expectedResponse string
}{
{"http://localhost:8888/_bulk", "unknown\n"},
{"http://localhost:8888/_doc", "unknown\n"},
{"http://localhost:8888/_search", "unknown\n"},
{"http://localhost:8888/_search", "unknown\n"},
}
emitRequests(stop, t, testData)
<-stop
q1.Stop(context.Background())
atomic.LoadInt32(&fallbackCalled)
Expand All @@ -155,7 +170,20 @@ func Test_scenario1(t *testing.T) {
q1 := ab_testing_scenario()
q1.Start()
stop := make(chan os.Signal, 1)
emitRequests(stop)
testData := []struct {
url string
expectedResponse string
}{
{"http://localhost:8888/_bulk", `bulk->IngestProcessor->InnerIngestProcessor1->0ABIngestTestProcessor
bulk->IngestProcessor->InnerIngestProcessor2->0ABIngestTestProcessor
`},
{"http://localhost:8888/_doc", `doc->IngestProcessor->InnerIngestProcessor1->0ABIngestTestProcessor
doc->IngestProcessor->InnerIngestProcessor2->0ABIngestTestProcessor
`},
{"http://localhost:8888/_search", "ABTestProcessor processor: Responses are equal\n"},
{"http://localhost:8888/_search", "ABTestProcessor processor: Responses are not equal\n"},
}
emitRequests(stop, t, testData)
<-stop
q1.Stop(context.Background())
}
Expand Down Expand Up @@ -215,7 +243,17 @@ func Test_middleware(t *testing.T) {
quesmaBuilder.Build()
quesmaBuilder.Start()
stop := make(chan os.Signal, 1)
emitRequests(stop)
testData := []struct {
url string
expectedResponse string
}{
{"http://localhost:8888/_bulk", "middleware\n"},
{"http://localhost:8888/_doc", "middleware\n"},
{"http://localhost:8888/_search", "middleware\n"},
{"http://localhost:8888/_search", "middleware\n"},
}
emitRequests(stop, t, testData)

<-stop
quesmaBuilder.Stop(context.Background())
atomic.LoadInt32(&middlewareCallCount)
Expand All @@ -227,7 +265,16 @@ func Test_middleware(t *testing.T) {
quesmaBuilder.Build()
quesmaBuilder.Start()
stop := make(chan os.Signal, 1)
emitRequests(stop)
testData := []struct {
url string
expectedResponse string
}{
{"http://localhost:8888/_bulk", "middleware\n"},
{"http://localhost:8888/_doc", "middleware\n"},
{"http://localhost:8888/_search", "middleware\n"},
{"http://localhost:8888/_search", "middleware\n"},
}
emitRequests(stop, t, testData)
<-stop
quesmaBuilder.Stop(context.Background())
atomic.LoadInt32(&middlewareCallCount)
Expand Down
16 changes: 5 additions & 11 deletions quesma/processors/ab_test_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,8 @@ func (p *ABTestProcessor) compare(json1 string, json2 string) (bool, string) {

diff := cmp.Diff(obj1, obj2)
if diff == "" {
fmt.Println("JSON objects are equal")
return true, ""
}
fmt.Println("JSON objects are not equal:", diff)
return false, diff
}

Expand All @@ -77,8 +75,6 @@ func (p *ABTestProcessor) Handle(metadata map[string]interface{}, message ...any

data = append(data, strconv.Itoa(level)...)
data = append(data, []byte(p.GetId())...)
data = append(data, []byte(",correlationId:")...)
data = append(data, []byte(correlationId)...)
data = append(data, []byte("\n")...)
}

Expand All @@ -88,16 +84,14 @@ func (p *ABTestProcessor) Handle(metadata map[string]interface{}, message ...any
resp := make([]byte, 0)
for _, messages := range p.messageStorage {
if len(messages) == 2 {
equal, diff := p.compare(string(messages[0]), string(messages[1]))
equal, _ := p.compare(string(messages[0]), string(messages[1]))
if equal {
resp = append(resp, []byte("ABTestProcessor processor: Responses are equal\n\n")...)
resp = append(resp, []byte("\n")...)
resp = append(resp, []byte(diff)...)
resp = append(resp, []byte("ABTestProcessor processor: Responses are equal\n")...)
//resp = append(resp, []byte(diff)...)

} else {
resp = append(resp, []byte("ABTestProcessor processor: Responses are not equal\n\n")...)
resp = append(resp, []byte("\n")...)
resp = append(resp, []byte(diff)...)
resp = append(resp, []byte("ABTestProcessor processor: Responses are not equal\n")...)
//resp = append(resp, []byte(diff)...)
}
// clean storage
p.messageStorage = make(map[string][][]byte)
Expand Down
15 changes: 6 additions & 9 deletions quesma/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,20 @@ import (
"net/http"
)

func sendRequest(url string, requestBody []byte) {
func sendRequest(url string, requestBody []byte) (string, error) {
// Send POST request
resp, err := http.Post(url, "application/json", bytes.NewBuffer(requestBody))
if err != nil {
fmt.Println("Error sending request:", err)
return
return "", err
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
resp.Body = io.NopCloser(bytes.NewBuffer(respBody))
if err != nil {
fmt.Println(err)
} else {
respBody, err := io.ReadAll(resp.Body)
resp.Body = io.NopCloser(bytes.NewBuffer(respBody))
if err != nil {
fmt.Println(err)
} else {
fmt.Println(string(respBody))
}
fmt.Println(string(respBody))
}
return string(respBody), nil
}
17 changes: 5 additions & 12 deletions quesma/v2_test_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"quesma/frontend_connectors"
"quesma/processors"
quesma_api "quesma_v2/core"
"strconv"
"sync/atomic"
)

Expand Down Expand Up @@ -86,7 +85,7 @@ func bulk(_ context.Context, request *quesma_api.Request, _ http.ResponseWriter)
}
metadata := quesma_api.MakeNewMetadata()
metadata["level"] = 0
resp := []byte("bulk\n")
resp := []byte("bulk->")
atomic.AddInt64(&correlationId, 1)
quesma_api.SetCorrelationId(metadata, correlationId)
return &quesma_api.Result{Meta: metadata, GenericResult: resp, StatusCode: 200}, nil
Expand All @@ -101,7 +100,7 @@ func doc(_ context.Context, request *quesma_api.Request, _ http.ResponseWriter)
metadata["level"] = 0
atomic.AddInt64(&correlationId, 1)
quesma_api.SetCorrelationId(metadata, correlationId)
resp := []byte("doc\n")
resp := []byte("doc->")

return &quesma_api.Result{Meta: metadata, GenericResult: resp, StatusCode: 200}, nil
}
Expand Down Expand Up @@ -149,10 +148,8 @@ func (p *IngestProcessor) Handle(metadata map[string]interface{}, message ...any
panic("IngestProcessor: invalid message type")
}

level := metadata["level"].(int)
data = append(data, strconv.Itoa(level)...)
data = append(data, []byte(p.GetId())...)
data = append(data, []byte("\n")...)
data = append(data, []byte("->")...)
}
return metadata, data, nil
}
Expand Down Expand Up @@ -262,10 +259,8 @@ func (p *InnerIngestProcessor2) Handle(metadata map[string]interface{}, message
if err != nil {
panic("InnerIngestProcessor2: invalid message type")
}
level := metadata["level"].(int)
data = append(data, strconv.Itoa(level)...)
data = append(data, []byte(p.GetId())...)
data = append(data, []byte("\n")...)
data = append(data, []byte("->")...)
}
return metadata, data, nil
}
Expand Down Expand Up @@ -296,10 +291,8 @@ func (p *InnerIngestProcessor1) Handle(metadata map[string]interface{}, message
if err != nil {
panic("InnerIngestProcessor1: invalid message type")
}
level := metadata["level"].(int)
data = append(data, strconv.Itoa(level)...)
data = append(data, []byte(p.GetId())...)
data = append(data, []byte("\n")...)
data = append(data, []byte("->")...)
}
return metadata, data, nil
}
Expand Down

0 comments on commit e4d71b0

Please sign in to comment.