diff --git a/integrity_test.go b/integrity_test.go index 5fc6da1..7890dd1 100644 --- a/integrity_test.go +++ b/integrity_test.go @@ -84,6 +84,25 @@ func (flog *ParquetFlog) Deref() Flog { // - Download parquet files from the store created by Parseable for the minute // - Compare the sent logs with the ones loaded from the downloaded parquet func TestIntegrity(t *testing.T) { + + flogs := createAndIngest(t) + parquetFiles := downloadParquetFiles(NewGlob.Stream, NewGlob.MinIoConfig) + actualFlogs := loadFlogsFromParquetFiles(parquetFiles) + + rowCount := len(actualFlogs) + + for i, expectedFlog := range flogs { + // The rows in parquet written by Parseable will be latest first, so we + // compare the first of ours with the last of what we got from Parseable's + // store. + actualFlog := actualFlogs[rowCount-i-1].Deref() + require.Equal(t, actualFlog, expectedFlog) + } + + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) +} + +func createAndIngest(t *testing.T) []Flog { CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) iterations := 2 flogsPerIteration := 100 @@ -127,20 +146,7 @@ func TestIntegrity(t *testing.T) { // XXX: We don't need to sleep for the entire minute, just until the next minute boundary. } - parquetFiles := downloadParquetFiles(NewGlob.Stream, NewGlob.MinIoConfig) - actualFlogs := loadFlogsFromParquetFiles(parquetFiles) - - rowCount := len(actualFlogs) - - for i, expectedFlog := range flogs { - // The rows in parquet written by Parseable will be latest first, so we - // compare the first of ours with the last of what we got from Parseable's - // store. - actualFlog := actualFlogs[rowCount-i-1].Deref() - require.Equal(t, actualFlog, expectedFlog) - } - - DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) + return flogs } func ingestFlogs(flogs []Flog, stream string) error { diff --git a/quest_test.go b/quest_test.go index 1ec7047..6b71685 100644 --- a/quest_test.go +++ b/quest_test.go @@ -18,6 +18,7 @@ package main import ( "bytes" + "encoding/json" "fmt" "os/exec" "strings" @@ -402,6 +403,174 @@ func TestSmokeGetRetention(t *testing.T) { DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) } +func TestActivateHotTier(t *testing.T) { + CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) + activateHotTier(t, "", true) + disableHotTier(t, false) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) +} + +func TestActivateNonExistentHotTier(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + + status, _ := activateHotTier(t, "", false) + require.NotEqualf(t, status, 200, "Hot tier was activated for a non-existent stream.") +} + +func TestHotTierWithTimePartition(t *testing.T) { + time_partition_stream := NewGlob.Stream + "timepartition" + timeHeader := map[string]string{"X-P-Time-Partition": "source_time", "X-P-Time-Partition-Limit": "365d"} + CreateStreamWithHeader(t, NewGlob.QueryClient, time_partition_stream, timeHeader) + + payload := StreamHotTier{ + Size: "20 GiB", + } + jsonPayload, _ := json.Marshal(payload) + + req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+time_partition_stream+"/hottier", bytes.NewBuffer(jsonPayload)) + req.Header.Set("Content-Type", "application/json") + response, _ := NewGlob.QueryClient.Do(req) + body := readAsString(response.Body) + + require.NotEqualf(t, response.StatusCode, 200, "Hot tier activation succeeded for time partition with message: %s, but was expected to fail", body) +} + +func TestHotTierHugeDiskSize(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + + CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) + status, err := activateHotTier(t, "1000GiB", false) // activate hot tier with huge disk size + require.NotEqualf(t, status, 200, "Hot tier was activated for a huge disk size with message: %s", err) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) +} + +func TestHotTierIncreaseSize(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + + CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) + activateHotTier(t, "", false) + status, err := activateHotTier(t, "30 GiB", false) // increase disk size + require.Equalf(t, 200, status, "Increasing disk size of hot tier failed with error: %s", err) + disableHotTier(t, false) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) +} + +func TestHotTierDecreaseSize(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + + CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) + activateHotTier(t, "", false) + status, message := activateHotTier(t, "10 GiB", false) // decrease disk size + require.NotEqualf(t, 200, status, "Decreasing disk size of hot tier should fail but succeeded with message: %s", message) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) +} + +func TestGetNonExistentHotTier(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + + CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) + getHotTierStatus(t, true) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) +} + +func TestDisableNonExistentHotTier(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + + CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) + disableHotTier(t, true) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) +} + +// create stream, put hot tier, ingest data for a duration, wait for 2-3 mins to see if all data is available in hot tier +func TestHotTierGetsLogs(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + + // DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) + createAndIngest(t) + activateHotTier(t, "", true) + time.Sleep(2 * 60 * time.Second) // wait 2 minutes for hot tier to sync + + htCount := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200) + disableHotTier(t, false) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) + + require.Equalf(t, htCount, `[{"count":200}]`, "Ingested 200 logs, but hot tier contains %s", htCount) +} + +// create stream, ingest data for a duration, set hot tier, wait for 2-3 mins to see if all data is available in hot tier +func TestHotTierGetsLogsAfter(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + + // create a stream without hot tier + createAndIngest(t) + time.Sleep(2 * 60 * time.Second) + prevCount := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) + + // create a second stream with hot tier + createAndIngest(t) + activateHotTier(t, "", true) + time.Sleep(2 * 60 * time.Second) // wait 2 minutes for hot tier to sync + + htCount := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200) + disableHotTier(t, false) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) + + require.Equalf(t, prevCount, htCount, "With hot tier disabled, the count was %s but with it, the count is %s", prevCount, htCount) +} + +// create stream, ingest data, query get count, set hot tier, wait for 2-3 mins, query again get count, both counts should match +func TestHotTierLogCount(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + + createAndIngest(t) + countBefore := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200) + + activateHotTier(t, "", true) + time.Sleep(60 * 2 * time.Second) // wait for 2 minutes to allow hot tier to sync + + countAfter := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200) + + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) + require.Equalf(t, countBefore, countAfter, "Ingested %s, but hot tier contains only %s", countBefore, countAfter) +} + +// create stream, ingest data for a duration, call GET /logstream/{logstream}/info - to get the first_event_at field then set hot tier, wait for 2-3 mins, call GET /hottier - to get oldest entry in hot tier then match both +func TestOldestHotTierEntry(t *testing.T) { + if NewGlob.IngestorUrl.String() == "" { + t.Skip("Skipping in standalone mode") + } + + createAndIngest(t) + streamInfo := getStreamInfo(t) + + activateHotTier(t, "", true) + time.Sleep(60 * 2 * time.Second) + + hottier := getHotTierStatus(t, false) + + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) + require.Equalf(t, streamInfo.FirstEventAt, hottier.OldestDateTimeEntry, "The first event at in the stream info is %s but the oldest entry in hot tier is %s", *streamInfo.FirstEventAt, *hottier.OldestDateTimeEntry) +} + // This test calls all the User API endpoints // in a sequence to check if they work as expected. func TestSmoke_AllUsersAPI(t *testing.T) { diff --git a/test_utils.go b/test_utils.go index 7c7bb70..302c958 100644 --- a/test_utils.go +++ b/test_utils.go @@ -33,6 +33,23 @@ const ( sleepDuration = 2 * time.Second ) +type StreamHotTier struct { + Size string `json:"size"` + UsedSize *string `json:"used_size,omitempty"` + AvailableSize *string `json:"available_size,omitempty"` + OldestDateTimeEntry *string `json:"oldest_date_time_entry,omitempty"` +} + +type StreamInfo struct { + CreatedAt string `json:"created-at"` + FirstEventAt *string `json:"first-event-at"` + CacheEnabled *bool `json:"cache_enabled"` + TimePartition *string `json:"time_partition"` + TimePartitionLimit *string `json:"time_partition_limit"` + CustomPartition *string `json:"custom_partition"` + StaticSchemaFlag *string `json:"static_schema_flag"` +} + func flogStreamFields() []string { return []string{ "p_timestamp", @@ -68,8 +85,9 @@ func Sleep() { func CreateStream(t *testing.T, client HTTPClient, stream string) { req, _ := client.NewRequest("PUT", "logstream/"+stream, nil) response, err := client.Do(req) + body := readAsString(response.Body) require.NoErrorf(t, err, "Request failed: %s", err) - require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s", response.Status) + require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s with response: %s", response.Status, body) } func CreateStreamWithHeader(t *testing.T, client HTTPClient, stream string, header map[string]string) { @@ -245,7 +263,7 @@ func IngestOneEventForStaticSchemaStream_SameFieldsInLog(t *testing.T, client HT require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s resp %s", response.Status, readAsString(response.Body)) } -func QueryLogStreamCount(t *testing.T, client HTTPClient, stream string, count uint64) { +func QueryLogStreamCount(t *testing.T, client HTTPClient, stream string, count uint64) string { // Query last 30 minutes of data only endTime := time.Now().Add(time.Second).Format(time.RFC3339Nano) startTime := time.Now().Add(-30 * time.Minute).Format(time.RFC3339Nano) @@ -263,6 +281,7 @@ func QueryLogStreamCount(t *testing.T, client HTTPClient, stream string, count u require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body) expected := fmt.Sprintf(`[{"count":%d}]`, count) require.Equalf(t, expected, body, "Query count incorrect; Expected %s, Actual %s", expected, body) + return body } func QueryLogStreamCount_Historical(t *testing.T, client HTTPClient, stream string, count uint64) { @@ -561,3 +580,91 @@ func checkAPIAccess(t *testing.T, client HTTPClient, stream string, role string) require.Equalf(t, 403, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, readAsString(response.Body)) } } + +func activateHotTier(t *testing.T, size string, verify bool) (int, string) { + if size == "" { + size = "20 GiB" // default hot tier size + } + + payload := StreamHotTier{ + Size: size, + } + jsonPayload, _ := json.Marshal(payload) + + req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/hottier", bytes.NewBuffer(jsonPayload)) + req.Header.Set("Content-Type", "application/json") + response, err := NewGlob.QueryClient.Do(req) + body := readAsString(response.Body) + + if verify { + if NewGlob.IngestorUrl.String() != "" { + require.Equalf(t, 200, response.StatusCode, "Server returned unexpected http code: %s and response: %s", response.Status, body) + require.NoErrorf(t, err, "Activating hot tier failed in distributed mode: %s", err) + } else { + // right now, hot tier is unavailable in standalone so anything other than 200 is fine + require.NotEqualf(t, 200, response.StatusCode, "Hot tier has been activated in standalone mode: %s and response: %s", response.Status, body) + } + } + + return response.StatusCode, body +} + +func getHotTierStatus(t *testing.T, shouldFail bool) *StreamHotTier { + req, err := NewGlob.QueryClient.NewRequest("GET", "logstream/"+NewGlob.Stream+"/hottier", nil) + require.NoError(t, err, "Failed to create request") + + req.Header.Set("Content-Type", "application/json") + + response, err := NewGlob.QueryClient.Do(req) + require.NoError(t, err, "Failed to execute GET /hottier") + defer response.Body.Close() + + body := readAsString(response.Body) + + if shouldFail { + require.NotEqualf(t, 200, response.StatusCode, "Hot tier was expected to fail but succeeded with body: %s", body) + return &StreamHotTier{Size: "0"} + } else { + require.Equalf(t, 200, response.StatusCode, "GET hot tier failed with status code: %d & body: %s", response.StatusCode, body) + } + + var hotTierStatus StreamHotTier + err = json.Unmarshal([]byte(body), &hotTierStatus) + require.NoError(t, err, "The response from GET /hottier isn't of expected schema: %s", body) + + return &hotTierStatus +} + +func disableHotTier(t *testing.T, shouldFail bool) { + req, _ := NewGlob.QueryClient.NewRequest("DELETE", "logstream/"+NewGlob.Stream+"/hottier", nil) + response, err := NewGlob.QueryClient.Do(req) + body := readAsString(response.Body) + + if shouldFail { + require.NotEqualf(t, 200, response.StatusCode, "Non-existent hot tier was disabled with response: %s", body) + } else { + require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body) + } + require.NoErrorf(t, err, "Disabling hot tier failed: %s", err) +} + +func getStreamInfo(t *testing.T) *StreamInfo { + req, err := NewGlob.QueryClient.NewRequest("GET", "logstream/"+NewGlob.Stream+"/info", nil) + require.NoError(t, err, "Failed to create request") + + req.Header.Set("Content-Type", "application/json") + + response, err := NewGlob.QueryClient.Do(req) + require.NoError(t, err, "Failed to execute GET /logstream/{stream_name}/info") + defer response.Body.Close() + + body := readAsString(response.Body) + + require.Equal(t, 200, response.StatusCode, "GET /logstream/{stream_name}/info failed with status code: %d & body: %s", response.StatusCode, body) + + var streamInfo StreamInfo + err = json.Unmarshal([]byte(body), &streamInfo) + require.NoError(t, err, "The response from GET /info isn't of expected schema: %s", body) + + return &streamInfo +}