From 88aa1615b64b348e9abd7b0e6675f69da0d3885b Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Mon, 12 Aug 2024 22:49:16 +0530 Subject: [PATCH 01/10] groundwork for logical tests --- integrity_test.go | 34 ++++++++++++++++++++-------------- quest_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ test_utils.go | 16 ++++++++++++++++ 3 files changed, 77 insertions(+), 14 deletions(-) diff --git a/integrity_test.go b/integrity_test.go index 5fc6da1..7890dd1 100644 --- a/integrity_test.go +++ b/integrity_test.go @@ -84,6 +84,25 @@ func (flog *ParquetFlog) Deref() Flog { // - Download parquet files from the store created by Parseable for the minute // - Compare the sent logs with the ones loaded from the downloaded parquet func TestIntegrity(t *testing.T) { + + flogs := createAndIngest(t) + parquetFiles := downloadParquetFiles(NewGlob.Stream, NewGlob.MinIoConfig) + actualFlogs := loadFlogsFromParquetFiles(parquetFiles) + + rowCount := len(actualFlogs) + + for i, expectedFlog := range flogs { + // The rows in parquet written by Parseable will be latest first, so we + // compare the first of ours with the last of what we got from Parseable's + // store. + actualFlog := actualFlogs[rowCount-i-1].Deref() + require.Equal(t, actualFlog, expectedFlog) + } + + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) +} + +func createAndIngest(t *testing.T) []Flog { CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) iterations := 2 flogsPerIteration := 100 @@ -127,20 +146,7 @@ func TestIntegrity(t *testing.T) { // XXX: We don't need to sleep for the entire minute, just until the next minute boundary. } - parquetFiles := downloadParquetFiles(NewGlob.Stream, NewGlob.MinIoConfig) - actualFlogs := loadFlogsFromParquetFiles(parquetFiles) - - rowCount := len(actualFlogs) - - for i, expectedFlog := range flogs { - // The rows in parquet written by Parseable will be latest first, so we - // compare the first of ours with the last of what we got from Parseable's - // store. - actualFlog := actualFlogs[rowCount-i-1].Deref() - require.Equal(t, actualFlog, expectedFlog) - } - - DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) + return flogs } func ingestFlogs(flogs []Flog, stream string) error { diff --git a/quest_test.go b/quest_test.go index 1ec7047..1af5f8c 100644 --- a/quest_test.go +++ b/quest_test.go @@ -34,6 +34,13 @@ const ( events_count = "5" ) +type StreamHotTier struct { + Size string `json:"size"` + UsedSize *string `json:"used_size,omitempty"` + AvailableSize *string `json:"available_size,omitempty"` + OldestDateTimeEntry *string `json:"oldest_date_time_entry,omitempty"` +} + func TestSmokeListLogStream(t *testing.T) { CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) req, err := NewGlob.QueryClient.NewRequest("GET", "logstream", nil) @@ -402,6 +409,40 @@ func TestSmokeGetRetention(t *testing.T) { DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) } +func TestHotTierGetsLogs(t *testing.T) { + // create stream, put hot tier, ingest data for a duration, wait for 2-3 mins to see if all data is available in hot tier +} + +func TestHotTierGetsLogsAfter(t *testing.T) { + logs := createAndIngest(t) + + activateHotTier(t) + time.Sleep(60 * 2 * time.Second) // wait for 2 minutes to allow hot tier to sync + + // fetch the logs from hot tier + req, _ := NewGlob.QueryClient.NewRequest("GET", "logstream/"+NewGlob.Stream+"/hottier", nil) + response, err := NewGlob.QueryClient.Do(req) + require.NoErrorf(t, err, "Fetching hot tier stream failed: %s", err) + + // ascertain that they are in expected schema. prolly will be, just to be sure + body, err := readJsonBody[StreamHotTier](response.Body) + require.NoErrorf(t, err, "Hot tier response not in correct schema: %s", err) + + // ascertain that the ingested all the ingested logs are present in hot tier + require.Equalf(t, len(logs), "%d", body.Size, "Total no. of ingested logs is %d but hot tier contains %d logs", len(logs), body.Size) + + disableHotTier(t) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) +} + +func TestHotTierLogCount(t *testing.T) { + // create stream, ingest data, query get count, set hot tier, wait for 2-3 mins, query again get count, both counts should match +} + +func TestOldestHotTierEntry(t *testing.T) { + // create stream, ingest data for a duration, call GET /logstream/{logstream}/info - to get the first_event_at field then set hot tier, wait for 2-3 mins, call GET /hottier - to get oldest entry in hot tier then match both +} + // This test calls all the User API endpoints // in a sequence to check if they work as expected. func TestSmoke_AllUsersAPI(t *testing.T) { diff --git a/test_utils.go b/test_utils.go index 7c7bb70..12f39f3 100644 --- a/test_utils.go +++ b/test_utils.go @@ -561,3 +561,19 @@ func checkAPIAccess(t *testing.T, client HTTPClient, stream string, role string) require.Equalf(t, 403, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, readAsString(response.Body)) } } + +func activateHotTier(t *testing.T) { + req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/hottier", nil) + response, err := NewGlob.QueryClient.Do(req) + body := readAsString(response.Body) + require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body) + require.NoErrorf(t, err, "Activating hot tier failed: %s", err) +} + +func disableHotTier(t *testing.T) { + req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/hottier", nil) + response, err := NewGlob.QueryClient.Do(req) + body := readAsString(response.Body) + require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body) + require.NoErrorf(t, err, "Disabling hot tier failed: %s", err) +} From a58901add67d211ccd541608be876d8fc1f21675 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Tue, 13 Aug 2024 09:57:55 +0530 Subject: [PATCH 02/10] changed test logic --- quest_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/quest_test.go b/quest_test.go index 1af5f8c..046c07b 100644 --- a/quest_test.go +++ b/quest_test.go @@ -428,8 +428,14 @@ func TestHotTierGetsLogsAfter(t *testing.T) { body, err := readJsonBody[StreamHotTier](response.Body) require.NoErrorf(t, err, "Hot tier response not in correct schema: %s", err) + // get total byte count of ingested logs + size := 0 + for _, expectedlog := range logs { + size = size + int(expectedlog.ByteCount) + } + // ascertain that the ingested all the ingested logs are present in hot tier - require.Equalf(t, len(logs), "%d", body.Size, "Total no. of ingested logs is %d but hot tier contains %d logs", len(logs), body.Size) + require.Equalf(t, size, "%d", *body.UsedSize, "Total size of ingested logs is %d GiB but hot tier contains %d GiB", size, body.UsedSize) disableHotTier(t) DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) From d8de52f1b44d2110049a05769ed9244cf549a4ca Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Tue, 13 Aug 2024 10:35:51 +0530 Subject: [PATCH 03/10] added log count test --- quest_test.go | 10 +++++++++- test_utils.go | 3 ++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/quest_test.go b/quest_test.go index 046c07b..2f9278b 100644 --- a/quest_test.go +++ b/quest_test.go @@ -441,8 +441,16 @@ func TestHotTierGetsLogsAfter(t *testing.T) { DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) } +// create stream, ingest data, query get count, set hot tier, wait for 2-3 mins, query again get count, both counts should match func TestHotTierLogCount(t *testing.T) { - // create stream, ingest data, query get count, set hot tier, wait for 2-3 mins, query again get count, both counts should match + createAndIngest(t) + countBefore := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 50) + + activateHotTier(t) + time.Sleep(60 * 2 * time.Second) // wait for 2 minutes to allow hot tier to sync + + countAfter := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 50) + require.Equalf(t, countBefore, countAfter, "Ingested %s, but hot tier contains only %s", countBefore, countAfter) } func TestOldestHotTierEntry(t *testing.T) { diff --git a/test_utils.go b/test_utils.go index 12f39f3..7c3ea52 100644 --- a/test_utils.go +++ b/test_utils.go @@ -245,7 +245,7 @@ func IngestOneEventForStaticSchemaStream_SameFieldsInLog(t *testing.T, client HT require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s resp %s", response.Status, readAsString(response.Body)) } -func QueryLogStreamCount(t *testing.T, client HTTPClient, stream string, count uint64) { +func QueryLogStreamCount(t *testing.T, client HTTPClient, stream string, count uint64) string { // Query last 30 minutes of data only endTime := time.Now().Add(time.Second).Format(time.RFC3339Nano) startTime := time.Now().Add(-30 * time.Minute).Format(time.RFC3339Nano) @@ -263,6 +263,7 @@ func QueryLogStreamCount(t *testing.T, client HTTPClient, stream string, count u require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body) expected := fmt.Sprintf(`[{"count":%d}]`, count) require.Equalf(t, expected, body, "Query count incorrect; Expected %s, Actual %s", expected, body) + return body } func QueryLogStreamCount_Historical(t *testing.T, client HTTPClient, stream string, count uint64) { From c73ed1549e2f68f7c40c4ddb86b7a8d40b478fa6 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Tue, 13 Aug 2024 11:40:52 +0530 Subject: [PATCH 04/10] endpoint test #1 --- quest_test.go | 19 +++++++++++++++++++ test_utils.go | 10 ++++++++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/quest_test.go b/quest_test.go index 2f9278b..0a792e8 100644 --- a/quest_test.go +++ b/quest_test.go @@ -409,11 +409,23 @@ func TestSmokeGetRetention(t *testing.T) { DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) } +func TestActivateHotTier(t *testing.T) { + activateHotTier(t) +} + func TestHotTierGetsLogs(t *testing.T) { // create stream, put hot tier, ingest data for a duration, wait for 2-3 mins to see if all data is available in hot tier + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } } +// create stream, ingest data for a duration, set hot tier, wait for 2-3 mins to see if all data is available in hot tier func TestHotTierGetsLogsAfter(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + logs := createAndIngest(t) activateHotTier(t) @@ -443,6 +455,10 @@ func TestHotTierGetsLogsAfter(t *testing.T) { // create stream, ingest data, query get count, set hot tier, wait for 2-3 mins, query again get count, both counts should match func TestHotTierLogCount(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + createAndIngest(t) countBefore := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 50) @@ -455,6 +471,9 @@ func TestHotTierLogCount(t *testing.T) { func TestOldestHotTierEntry(t *testing.T) { // create stream, ingest data for a duration, call GET /logstream/{logstream}/info - to get the first_event_at field then set hot tier, wait for 2-3 mins, call GET /hottier - to get oldest entry in hot tier then match both + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } } // This test calls all the User API endpoints diff --git a/test_utils.go b/test_utils.go index 7c3ea52..911d887 100644 --- a/test_utils.go +++ b/test_utils.go @@ -567,8 +567,14 @@ func activateHotTier(t *testing.T) { req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/hottier", nil) response, err := NewGlob.QueryClient.Do(req) body := readAsString(response.Body) - require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body) - require.NoErrorf(t, err, "Activating hot tier failed: %s", err) + + if NewGlob.IngestorUrl.String() != "" { + require.Equalf(t, 200, response.StatusCode, "Server returned unexpected http code: %s and response: %s", response.Status, body) + require.NoErrorf(t, err, "Activating hot tier failed in distributed mode: %s", err) + } else { + // right now, hot tier is unavailable in standalone so anything other than 200 is fine + require.NotEqualf(t, 200, response.StatusCode, "Hot tier has been activated in standalone mode: %s and response: %s", response.Status, body) + } } func disableHotTier(t *testing.T) { From fdba13853c4b4d28f515d03717634a43cf2658d6 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Tue, 13 Aug 2024 12:07:03 +0530 Subject: [PATCH 05/10] corrected request payload --- quest_test.go | 7 ------- test_utils.go | 16 +++++++++++++++- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/quest_test.go b/quest_test.go index 0a792e8..3596287 100644 --- a/quest_test.go +++ b/quest_test.go @@ -34,13 +34,6 @@ const ( events_count = "5" ) -type StreamHotTier struct { - Size string `json:"size"` - UsedSize *string `json:"used_size,omitempty"` - AvailableSize *string `json:"available_size,omitempty"` - OldestDateTimeEntry *string `json:"oldest_date_time_entry,omitempty"` -} - func TestSmokeListLogStream(t *testing.T) { CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) req, err := NewGlob.QueryClient.NewRequest("GET", "logstream", nil) diff --git a/test_utils.go b/test_utils.go index 911d887..9ecd60f 100644 --- a/test_utils.go +++ b/test_utils.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "io" + "math" "os/exec" "strings" "testing" @@ -33,6 +34,13 @@ const ( sleepDuration = 2 * time.Second ) +type StreamHotTier struct { + Size string `json:"size"` + UsedSize *string `json:"used_size,omitempty"` + AvailableSize *string `json:"available_size,omitempty"` + OldestDateTimeEntry *string `json:"oldest_date_time_entry,omitempty"` +} + func flogStreamFields() []string { return []string{ "p_timestamp", @@ -564,7 +572,13 @@ func checkAPIAccess(t *testing.T, client HTTPClient, stream string, role string) } func activateHotTier(t *testing.T) { - req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/hottier", nil) + payload := StreamHotTier{ + Size: fmt.Sprintf("%d", int64(20*math.Pow(1024, 3))), // set hot tier size to be 20 GB + } + json, _ := json.Marshal(payload) + + req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/hottier", bytes.NewBuffer(json)) + req.Header.Set("Content-Type", "application/json") response, err := NewGlob.QueryClient.Do(req) body := readAsString(response.Body) From cdd8cdfd7c47813ad44a0f193ef466f5f04096ad Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Tue, 13 Aug 2024 14:22:59 +0530 Subject: [PATCH 06/10] corrected disable hot tier api call --- test_utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_utils.go b/test_utils.go index 9ecd60f..e171f62 100644 --- a/test_utils.go +++ b/test_utils.go @@ -592,7 +592,7 @@ func activateHotTier(t *testing.T) { } func disableHotTier(t *testing.T) { - req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/hottier", nil) + req, _ := NewGlob.QueryClient.NewRequest("DELETE", "logstream/"+NewGlob.Stream+"/hottier", nil) response, err := NewGlob.QueryClient.Do(req) body := readAsString(response.Body) require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body) From a9dc47b0ece10240aa4591e63fb3ff498ce36aa3 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Wed, 14 Aug 2024 19:53:00 +0530 Subject: [PATCH 07/10] corrected test logic --- quest_test.go | 38 ++++++++++++++++---------------------- test_utils.go | 6 +++--- 2 files changed, 19 insertions(+), 25 deletions(-) diff --git a/quest_test.go b/quest_test.go index 3596287..1d74e50 100644 --- a/quest_test.go +++ b/quest_test.go @@ -403,7 +403,10 @@ func TestSmokeGetRetention(t *testing.T) { } func TestActivateHotTier(t *testing.T) { + CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) activateHotTier(t) + disableHotTier(t) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) } func TestHotTierGetsLogs(t *testing.T) { @@ -419,31 +422,22 @@ func TestHotTierGetsLogsAfter(t *testing.T) { t.Skip("Skipping in standalone mode") } - logs := createAndIngest(t) + // create a stream without hot tier + createAndIngest(t) + time.Sleep(2 * 60 * time.Second) + prevCount := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) + // create a second stream with hot tier + createAndIngest(t) activateHotTier(t) - time.Sleep(60 * 2 * time.Second) // wait for 2 minutes to allow hot tier to sync - - // fetch the logs from hot tier - req, _ := NewGlob.QueryClient.NewRequest("GET", "logstream/"+NewGlob.Stream+"/hottier", nil) - response, err := NewGlob.QueryClient.Do(req) - require.NoErrorf(t, err, "Fetching hot tier stream failed: %s", err) - - // ascertain that they are in expected schema. prolly will be, just to be sure - body, err := readJsonBody[StreamHotTier](response.Body) - require.NoErrorf(t, err, "Hot tier response not in correct schema: %s", err) - - // get total byte count of ingested logs - size := 0 - for _, expectedlog := range logs { - size = size + int(expectedlog.ByteCount) - } - - // ascertain that the ingested all the ingested logs are present in hot tier - require.Equalf(t, size, "%d", *body.UsedSize, "Total size of ingested logs is %d GiB but hot tier contains %d GiB", size, body.UsedSize) + time.Sleep(2 * 60 * time.Second) // wait 2 minutes for hot tier to sync + htCount := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200) disableHotTier(t) DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) + + require.Equalf(t, prevCount, htCount, "With hot tier disabled, the count was %s but with it, the count is %s", prevCount, htCount) } // create stream, ingest data, query get count, set hot tier, wait for 2-3 mins, query again get count, both counts should match @@ -453,12 +447,12 @@ func TestHotTierLogCount(t *testing.T) { } createAndIngest(t) - countBefore := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 50) + countBefore := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200) activateHotTier(t) time.Sleep(60 * 2 * time.Second) // wait for 2 minutes to allow hot tier to sync - countAfter := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 50) + countAfter := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200) require.Equalf(t, countBefore, countAfter, "Ingested %s, but hot tier contains only %s", countBefore, countAfter) } diff --git a/test_utils.go b/test_utils.go index e171f62..7d70e9d 100644 --- a/test_utils.go +++ b/test_utils.go @@ -21,7 +21,6 @@ import ( "encoding/json" "fmt" "io" - "math" "os/exec" "strings" "testing" @@ -76,8 +75,9 @@ func Sleep() { func CreateStream(t *testing.T, client HTTPClient, stream string) { req, _ := client.NewRequest("PUT", "logstream/"+stream, nil) response, err := client.Do(req) + body := readAsString(response.Body) require.NoErrorf(t, err, "Request failed: %s", err) - require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s", response.Status) + require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s with response: %s", response.Status, body) } func CreateStreamWithHeader(t *testing.T, client HTTPClient, stream string, header map[string]string) { @@ -573,7 +573,7 @@ func checkAPIAccess(t *testing.T, client HTTPClient, stream string, role string) func activateHotTier(t *testing.T) { payload := StreamHotTier{ - Size: fmt.Sprintf("%d", int64(20*math.Pow(1024, 3))), // set hot tier size to be 20 GB + Size: "20 GiB", // set hot tier size to be 20 GB } json, _ := json.Marshal(payload) From 8328baaff89311dffd74c8672494da8411f2ce71 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Mon, 19 Aug 2024 12:26:43 +0530 Subject: [PATCH 08/10] more logical tests --- quest_test.go | 38 ++++++++++++++++++++++++++++++------- test_utils.go | 52 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 7 deletions(-) diff --git a/quest_test.go b/quest_test.go index 1d74e50..b64792e 100644 --- a/quest_test.go +++ b/quest_test.go @@ -409,11 +409,22 @@ func TestActivateHotTier(t *testing.T) { DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) } +// create stream, put hot tier, ingest data for a duration, wait for 2-3 mins to see if all data is available in hot tier func TestHotTierGetsLogs(t *testing.T) { - // create stream, put hot tier, ingest data for a duration, wait for 2-3 mins to see if all data is available in hot tier if NewGlob.IngestorUrl.String() == "" { t.Skip("Skipping in standalone mode") } + + // DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) + createAndIngest(t) + activateHotTier(t) + time.Sleep(2 * 60 * time.Second) // wait 2 minutes for hot tier to sync + + htCount := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200) + disableHotTier(t) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) + + require.Equalf(t, htCount, `[{"count":200}]`, "Ingested 200 logs, but hot tier contains %s", htCount) } // create stream, ingest data for a duration, set hot tier, wait for 2-3 mins to see if all data is available in hot tier @@ -453,15 +464,28 @@ func TestHotTierLogCount(t *testing.T) { time.Sleep(60 * 2 * time.Second) // wait for 2 minutes to allow hot tier to sync countAfter := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200) + + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) require.Equalf(t, countBefore, countAfter, "Ingested %s, but hot tier contains only %s", countBefore, countAfter) } -func TestOldestHotTierEntry(t *testing.T) { - // create stream, ingest data for a duration, call GET /logstream/{logstream}/info - to get the first_event_at field then set hot tier, wait for 2-3 mins, call GET /hottier - to get oldest entry in hot tier then match both - if NewGlob.IngestorUrl.String() == "" { - t.Skip("Skipping in standalone mode") - } -} +// create stream, ingest data for a duration, call GET /logstream/{logstream}/info - to get the first_event_at field then set hot tier, wait for 2-3 mins, call GET /hottier - to get oldest entry in hot tier then match both +// func TestOldestHotTierEntry(t *testing.T) { +// if NewGlob.IngestorUrl.String() == "" { +// t.Skip("Skipping in standalone mode") +// } + +// createAndIngest(t) +// streamInfo := getStreamInfo(t) + +// activateHotTier(t) +// time.Sleep(60 * 2 * time.Second) + +// hottier := getHotTierStatus(t) + +// DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) +// require.Equalf(t, streamInfo.FirstEventAt, hottier.OldestDateTimeEntry, "The first event at in the stream info is %s but the oldest entry in hot tier is %s", *streamInfo.FirstEventAt, *hottier.OldestDateTimeEntry) +// } // This test calls all the User API endpoints // in a sequence to check if they work as expected. diff --git a/test_utils.go b/test_utils.go index 7d70e9d..44079bd 100644 --- a/test_utils.go +++ b/test_utils.go @@ -40,6 +40,16 @@ type StreamHotTier struct { OldestDateTimeEntry *string `json:"oldest_date_time_entry,omitempty"` } +type StreamInfo struct { + CreatedAt string `json:"created-at"` + FirstEventAt *string `json:"first-event-at"` + CacheEnabled *bool `json:"cache_enabled"` + TimePartition *string `json:"time_partition"` + TimePartitionLimit *string `json:"time_partition_limit"` + CustomPartition *string `json:"custom_partition"` + StaticSchemaFlag *string `json:"static_schema_flag"` +} + func flogStreamFields() []string { return []string{ "p_timestamp", @@ -591,6 +601,27 @@ func activateHotTier(t *testing.T) { } } +func getHotTierStatus(t *testing.T) *StreamHotTier { + req, err := NewGlob.QueryClient.NewRequest("GET", "logstream/"+NewGlob.Stream+"/hottier", nil) + require.NoError(t, err, "Failed to create request") + + req.Header.Set("Content-Type", "application/json") + + response, err := NewGlob.QueryClient.Do(req) + require.NoError(t, err, "Failed to execute GET /hottier") + defer response.Body.Close() + + body := readAsString(response.Body) + + require.Equal(t, 200, response.StatusCode, "GET hot tier failed with status code: %d & body: %s", response.StatusCode, body) + + var hotTierStatus StreamHotTier + err = json.Unmarshal([]byte(body), &hotTierStatus) + require.NoError(t, err, "The response from GET /hottier isn't of expected schema: %s", body) + + return &hotTierStatus +} + func disableHotTier(t *testing.T) { req, _ := NewGlob.QueryClient.NewRequest("DELETE", "logstream/"+NewGlob.Stream+"/hottier", nil) response, err := NewGlob.QueryClient.Do(req) @@ -598,3 +629,24 @@ func disableHotTier(t *testing.T) { require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body) require.NoErrorf(t, err, "Disabling hot tier failed: %s", err) } + +func getStreamInfo(t *testing.T) *StreamInfo { + req, err := NewGlob.QueryClient.NewRequest("GET", "logstream/"+NewGlob.Stream+"/info", nil) + require.NoError(t, err, "Failed to create request") + + req.Header.Set("Content-Type", "application/json") + + response, err := NewGlob.QueryClient.Do(req) + require.NoError(t, err, "Failed to execute GET /logstream/{stream_name}/info") + defer response.Body.Close() + + body := readAsString(response.Body) + + require.Equal(t, 200, response.StatusCode, "GET /logstream/{stream_name}/info failed with status code: %d & body: %s", response.StatusCode, body) + + var streamInfo StreamInfo + err = json.Unmarshal([]byte(body), &streamInfo) + require.NoError(t, err, "The response from GET /info isn't of expected schema: %s", body) + + return &streamInfo +} From 1e4677affa6e4893d1281e10ec6aa0231820dc5b Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Mon, 19 Aug 2024 14:13:46 +0530 Subject: [PATCH 09/10] endpoint tests --- quest_test.go | 93 +++++++++++++++++++++++++++++++++++++++++++++++---- test_utils.go | 45 +++++++++++++++++-------- 2 files changed, 117 insertions(+), 21 deletions(-) diff --git a/quest_test.go b/quest_test.go index b64792e..f7086a9 100644 --- a/quest_test.go +++ b/quest_test.go @@ -18,6 +18,7 @@ package main import ( "bytes" + "encoding/json" "fmt" "os/exec" "strings" @@ -404,11 +405,89 @@ func TestSmokeGetRetention(t *testing.T) { func TestActivateHotTier(t *testing.T) { CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) - activateHotTier(t) - disableHotTier(t) + activateHotTier(t, "", true) + disableHotTier(t, false) DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) } +func TestActivateNonExistentHotTier(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + + status, _ := activateHotTier(t, "", false) + require.NotEqualf(t, status, 200, "Hot tier was activated for a non-existent stream.") +} + +func TestHotTierWithTimePartition(t *testing.T) { + time_partition_stream := NewGlob.Stream + "timepartition" + timeHeader := map[string]string{"X-P-Time-Partition": "source_time", "X-P-Time-Partition-Limit": "365d"} + CreateStreamWithHeader(t, NewGlob.QueryClient, time_partition_stream, timeHeader) + + payload := StreamHotTier{ + Size: "20 Gib", + } + jsonPayload, _ := json.Marshal(payload) + + req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+time_partition_stream+"/hottier", bytes.NewBuffer(jsonPayload)) + req.Header.Set("Content-Type", "application/json") + response, _ := NewGlob.QueryClient.Do(req) + body := readAsString(response.Body) + + require.NotEqualf(t, response.StatusCode, 200, "Hot tier activation succeeded for time partition with message: %s, but was expected to fail", body) +} + +func TestHotTierHugeDiskSize(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + + CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) + status, _ := activateHotTier(t, "500GiB", false) // activate hot tier with huge disk size + require.NotEqualf(t, status, 200, "Hot tier was activated for a non-existent stream.") + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) +} + +func TestHotTierIncreaseSize(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + + CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) + activateHotTier(t, "", true) + status, err := activateHotTier(t, "30 GiB", false) // increase disk size + require.Equalf(t, 200, status, "Increasing disk size of hot tier failed with error: %s", err) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) +} + +func TestHotTierDecreaseSize(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + + CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) + activateHotTier(t, "", true) + status, message := activateHotTier(t, "10 GiB", false) // decrease disk size + require.NotEqualf(t, 200, status, "Decreasing disk size of hot tier should fail but succeeded with message: %s", message) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) +} + +func TestGetNonExistentHotTier(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + + getHotTierStatus(t, true) +} + +func DisableNonExistentHotTier(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + + disableHotTier(t, true) +} + // create stream, put hot tier, ingest data for a duration, wait for 2-3 mins to see if all data is available in hot tier func TestHotTierGetsLogs(t *testing.T) { if NewGlob.IngestorUrl.String() == "" { @@ -417,11 +496,11 @@ func TestHotTierGetsLogs(t *testing.T) { // DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) createAndIngest(t) - activateHotTier(t) + activateHotTier(t, "", true) time.Sleep(2 * 60 * time.Second) // wait 2 minutes for hot tier to sync htCount := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200) - disableHotTier(t) + disableHotTier(t, false) DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) require.Equalf(t, htCount, `[{"count":200}]`, "Ingested 200 logs, but hot tier contains %s", htCount) @@ -441,11 +520,11 @@ func TestHotTierGetsLogsAfter(t *testing.T) { // create a second stream with hot tier createAndIngest(t) - activateHotTier(t) + activateHotTier(t, "", true) time.Sleep(2 * 60 * time.Second) // wait 2 minutes for hot tier to sync htCount := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200) - disableHotTier(t) + disableHotTier(t, false) DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) require.Equalf(t, prevCount, htCount, "With hot tier disabled, the count was %s but with it, the count is %s", prevCount, htCount) @@ -460,7 +539,7 @@ func TestHotTierLogCount(t *testing.T) { createAndIngest(t) countBefore := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200) - activateHotTier(t) + activateHotTier(t, "", true) time.Sleep(60 * 2 * time.Second) // wait for 2 minutes to allow hot tier to sync countAfter := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200) diff --git a/test_utils.go b/test_utils.go index 44079bd..6cbc08d 100644 --- a/test_utils.go +++ b/test_utils.go @@ -581,27 +581,35 @@ func checkAPIAccess(t *testing.T, client HTTPClient, stream string, role string) } } -func activateHotTier(t *testing.T) { +func activateHotTier(t *testing.T, size string, verify bool) (int, string) { + if size == "" { + size = "20 GiB" // default hot tier size + } + payload := StreamHotTier{ - Size: "20 GiB", // set hot tier size to be 20 GB + Size: size, } - json, _ := json.Marshal(payload) + jsonPayload, _ := json.Marshal(payload) - req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/hottier", bytes.NewBuffer(json)) + req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/hottier", bytes.NewBuffer(jsonPayload)) req.Header.Set("Content-Type", "application/json") response, err := NewGlob.QueryClient.Do(req) body := readAsString(response.Body) - if NewGlob.IngestorUrl.String() != "" { - require.Equalf(t, 200, response.StatusCode, "Server returned unexpected http code: %s and response: %s", response.Status, body) - require.NoErrorf(t, err, "Activating hot tier failed in distributed mode: %s", err) - } else { - // right now, hot tier is unavailable in standalone so anything other than 200 is fine - require.NotEqualf(t, 200, response.StatusCode, "Hot tier has been activated in standalone mode: %s and response: %s", response.Status, body) + if verify { + if NewGlob.IngestorUrl.String() != "" { + require.Equalf(t, 200, response.StatusCode, "Server returned unexpected http code: %s and response: %s", response.Status, body) + require.NoErrorf(t, err, "Activating hot tier failed in distributed mode: %s", err) + } else { + // right now, hot tier is unavailable in standalone so anything other than 200 is fine + require.NotEqualf(t, 200, response.StatusCode, "Hot tier has been activated in standalone mode: %s and response: %s", response.Status, body) + } } + + return response.StatusCode, body } -func getHotTierStatus(t *testing.T) *StreamHotTier { +func getHotTierStatus(t *testing.T, shouldFail bool) *StreamHotTier { req, err := NewGlob.QueryClient.NewRequest("GET", "logstream/"+NewGlob.Stream+"/hottier", nil) require.NoError(t, err, "Failed to create request") @@ -613,7 +621,11 @@ func getHotTierStatus(t *testing.T) *StreamHotTier { body := readAsString(response.Body) - require.Equal(t, 200, response.StatusCode, "GET hot tier failed with status code: %d & body: %s", response.StatusCode, body) + if shouldFail { + require.NotEqualf(t, 200, response.StatusCode, "Hot tier was expected to fail but succeeded with body: %s", body) + } else { + require.Equalf(t, 200, response.StatusCode, "GET hot tier failed with status code: %d & body: %s", response.StatusCode, body) + } var hotTierStatus StreamHotTier err = json.Unmarshal([]byte(body), &hotTierStatus) @@ -622,11 +634,16 @@ func getHotTierStatus(t *testing.T) *StreamHotTier { return &hotTierStatus } -func disableHotTier(t *testing.T) { +func disableHotTier(t *testing.T, shouldFail bool) { req, _ := NewGlob.QueryClient.NewRequest("DELETE", "logstream/"+NewGlob.Stream+"/hottier", nil) response, err := NewGlob.QueryClient.Do(req) body := readAsString(response.Body) - require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body) + + if shouldFail { + require.NotEqualf(t, 200, response.StatusCode, "Non-existent hot tier was disabled with response: %s", body) + } else { + require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body) + } require.NoErrorf(t, err, "Disabling hot tier failed: %s", err) } From 3fce12fd8cbc0c8134283bf5a5eb6e4fb87a0fc1 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Mon, 19 Aug 2024 14:56:53 +0530 Subject: [PATCH 10/10] corrected some tests --- quest_test.go | 41 +++++++++++++++++++++++------------------ test_utils.go | 1 + 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/quest_test.go b/quest_test.go index f7086a9..6b71685 100644 --- a/quest_test.go +++ b/quest_test.go @@ -425,7 +425,7 @@ func TestHotTierWithTimePartition(t *testing.T) { CreateStreamWithHeader(t, NewGlob.QueryClient, time_partition_stream, timeHeader) payload := StreamHotTier{ - Size: "20 Gib", + Size: "20 GiB", } jsonPayload, _ := json.Marshal(payload) @@ -443,8 +443,8 @@ func TestHotTierHugeDiskSize(t *testing.T) { } CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) - status, _ := activateHotTier(t, "500GiB", false) // activate hot tier with huge disk size - require.NotEqualf(t, status, 200, "Hot tier was activated for a non-existent stream.") + status, err := activateHotTier(t, "1000GiB", false) // activate hot tier with huge disk size + require.NotEqualf(t, status, 200, "Hot tier was activated for a huge disk size with message: %s", err) DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) } @@ -454,9 +454,10 @@ func TestHotTierIncreaseSize(t *testing.T) { } CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) - activateHotTier(t, "", true) + activateHotTier(t, "", false) status, err := activateHotTier(t, "30 GiB", false) // increase disk size require.Equalf(t, 200, status, "Increasing disk size of hot tier failed with error: %s", err) + disableHotTier(t, false) DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) } @@ -466,7 +467,7 @@ func TestHotTierDecreaseSize(t *testing.T) { } CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) - activateHotTier(t, "", true) + activateHotTier(t, "", false) status, message := activateHotTier(t, "10 GiB", false) // decrease disk size require.NotEqualf(t, 200, status, "Decreasing disk size of hot tier should fail but succeeded with message: %s", message) DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) @@ -477,15 +478,19 @@ func TestGetNonExistentHotTier(t *testing.T) { t.Skip("Skipping in standalone mode") } + CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) getHotTierStatus(t, true) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) } -func DisableNonExistentHotTier(t *testing.T) { +func TestDisableNonExistentHotTier(t *testing.T) { if NewGlob.IngestorUrl.String() == "" { t.Skip("Skipping in standalone mode") } + CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) disableHotTier(t, true) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) } // create stream, put hot tier, ingest data for a duration, wait for 2-3 mins to see if all data is available in hot tier @@ -549,22 +554,22 @@ func TestHotTierLogCount(t *testing.T) { } // create stream, ingest data for a duration, call GET /logstream/{logstream}/info - to get the first_event_at field then set hot tier, wait for 2-3 mins, call GET /hottier - to get oldest entry in hot tier then match both -// func TestOldestHotTierEntry(t *testing.T) { -// if NewGlob.IngestorUrl.String() == "" { -// t.Skip("Skipping in standalone mode") -// } +func TestOldestHotTierEntry(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } -// createAndIngest(t) -// streamInfo := getStreamInfo(t) + createAndIngest(t) + streamInfo := getStreamInfo(t) -// activateHotTier(t) -// time.Sleep(60 * 2 * time.Second) + activateHotTier(t, "", true) + time.Sleep(60 * 2 * time.Second) -// hottier := getHotTierStatus(t) + hottier := getHotTierStatus(t, false) -// DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) -// require.Equalf(t, streamInfo.FirstEventAt, hottier.OldestDateTimeEntry, "The first event at in the stream info is %s but the oldest entry in hot tier is %s", *streamInfo.FirstEventAt, *hottier.OldestDateTimeEntry) -// } + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) + require.Equalf(t, streamInfo.FirstEventAt, hottier.OldestDateTimeEntry, "The first event at in the stream info is %s but the oldest entry in hot tier is %s", *streamInfo.FirstEventAt, *hottier.OldestDateTimeEntry) +} // This test calls all the User API endpoints // in a sequence to check if they work as expected. diff --git a/test_utils.go b/test_utils.go index 6cbc08d..302c958 100644 --- a/test_utils.go +++ b/test_utils.go @@ -623,6 +623,7 @@ func getHotTierStatus(t *testing.T, shouldFail bool) *StreamHotTier { if shouldFail { require.NotEqualf(t, 200, response.StatusCode, "Hot tier was expected to fail but succeeded with body: %s", body) + return &StreamHotTier{Size: "0"} } else { require.Equalf(t, 200, response.StatusCode, "GET hot tier failed with status code: %d & body: %s", response.StatusCode, body) }