Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add test coverage for hot tier #71

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
34 changes: 20 additions & 14 deletions integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,25 @@ func (flog *ParquetFlog) Deref() Flog {
// - Download parquet files from the store created by Parseable for the minute
// - Compare the sent logs with the ones loaded from the downloaded parquet
func TestIntegrity(t *testing.T) {

flogs := createAndIngest(t)
parquetFiles := downloadParquetFiles(NewGlob.Stream, NewGlob.MinIoConfig)
actualFlogs := loadFlogsFromParquetFiles(parquetFiles)

rowCount := len(actualFlogs)

for i, expectedFlog := range flogs {
// The rows in parquet written by Parseable will be latest first, so we
// compare the first of ours with the last of what we got from Parseable's
// store.
actualFlog := actualFlogs[rowCount-i-1].Deref()
require.Equal(t, actualFlog, expectedFlog)
}

DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}

func createAndIngest(t *testing.T) []Flog {
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
iterations := 2
flogsPerIteration := 100
Expand Down Expand Up @@ -127,20 +146,7 @@ func TestIntegrity(t *testing.T) {
// XXX: We don't need to sleep for the entire minute, just until the next minute boundary.
}

parquetFiles := downloadParquetFiles(NewGlob.Stream, NewGlob.MinIoConfig)
actualFlogs := loadFlogsFromParquetFiles(parquetFiles)

rowCount := len(actualFlogs)

for i, expectedFlog := range flogs {
// The rows in parquet written by Parseable will be latest first, so we
// compare the first of ours with the last of what we got from Parseable's
// store.
actualFlog := actualFlogs[rowCount-i-1].Deref()
require.Equal(t, actualFlog, expectedFlog)
}

DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
return flogs
}

func ingestFlogs(flogs []Flog, stream string) error {
Expand Down
67 changes: 67 additions & 0 deletions quest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,73 @@ func TestSmokeGetRetention(t *testing.T) {
DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}

func TestActivateHotTier(t *testing.T) {
activateHotTier(t)
}

func TestHotTierGetsLogs(t *testing.T) {
// create stream, put hot tier, ingest data for a duration, wait for 2-3 mins to see if all data is available in hot tier
if NewGlob.IngestorUrl.String() == "" {
t.Skip("Skipping in standalone mode")
}
}

// create stream, ingest data for a duration, set hot tier, wait for 2-3 mins to see if all data is available in hot tier
func TestHotTierGetsLogsAfter(t *testing.T) {
if NewGlob.IngestorUrl.String() == "" {
t.Skip("Skipping in standalone mode")
}

logs := createAndIngest(t)

activateHotTier(t)
time.Sleep(60 * 2 * time.Second) // wait for 2 minutes to allow hot tier to sync

// fetch the logs from hot tier
req, _ := NewGlob.QueryClient.NewRequest("GET", "logstream/"+NewGlob.Stream+"/hottier", nil)
response, err := NewGlob.QueryClient.Do(req)
require.NoErrorf(t, err, "Fetching hot tier stream failed: %s", err)

// ascertain that they are in expected schema. prolly will be, just to be sure
body, err := readJsonBody[StreamHotTier](response.Body)
require.NoErrorf(t, err, "Hot tier response not in correct schema: %s", err)

// get total byte count of ingested logs
size := 0
for _, expectedlog := range logs {
size = size + int(expectedlog.ByteCount)
}

// ascertain that the ingested all the ingested logs are present in hot tier
require.Equalf(t, size, "%d", *body.UsedSize, "Total size of ingested logs is %d GiB but hot tier contains %d GiB", size, body.UsedSize)

disableHotTier(t)
DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}

// create stream, ingest data, query get count, set hot tier, wait for 2-3 mins, query again get count, both counts should match
func TestHotTierLogCount(t *testing.T) {
if NewGlob.IngestorUrl.String() == "" {
t.Skip("Skipping in standalone mode")
}

createAndIngest(t)
countBefore := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 50)

activateHotTier(t)
time.Sleep(60 * 2 * time.Second) // wait for 2 minutes to allow hot tier to sync

countAfter := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 50)
require.Equalf(t, countBefore, countAfter, "Ingested %s, but hot tier contains only %s", countBefore, countAfter)
}

func TestOldestHotTierEntry(t *testing.T) {
// create stream, ingest data for a duration, call GET /logstream/{logstream}/info - to get the first_event_at field then set hot tier, wait for 2-3 mins, call GET /hottier - to get oldest entry in hot tier then match both
if NewGlob.IngestorUrl.String() == "" {
t.Skip("Skipping in standalone mode")
}
}

// This test calls all the User API endpoints
// in a sequence to check if they work as expected.
func TestSmoke_AllUsersAPI(t *testing.T) {
Expand Down
39 changes: 38 additions & 1 deletion test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"os/exec"
"strings"
"testing"
Expand All @@ -33,6 +34,13 @@ const (
sleepDuration = 2 * time.Second
)

type StreamHotTier struct {
Size string `json:"size"`
UsedSize *string `json:"used_size,omitempty"`
AvailableSize *string `json:"available_size,omitempty"`
OldestDateTimeEntry *string `json:"oldest_date_time_entry,omitempty"`
}

func flogStreamFields() []string {
return []string{
"p_timestamp",
Expand Down Expand Up @@ -245,7 +253,7 @@ func IngestOneEventForStaticSchemaStream_SameFieldsInLog(t *testing.T, client HT
require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s resp %s", response.Status, readAsString(response.Body))
}

func QueryLogStreamCount(t *testing.T, client HTTPClient, stream string, count uint64) {
func QueryLogStreamCount(t *testing.T, client HTTPClient, stream string, count uint64) string {
// Query last 30 minutes of data only
endTime := time.Now().Add(time.Second).Format(time.RFC3339Nano)
startTime := time.Now().Add(-30 * time.Minute).Format(time.RFC3339Nano)
Expand All @@ -263,6 +271,7 @@ func QueryLogStreamCount(t *testing.T, client HTTPClient, stream string, count u
require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body)
expected := fmt.Sprintf(`[{"count":%d}]`, count)
require.Equalf(t, expected, body, "Query count incorrect; Expected %s, Actual %s", expected, body)
return body
}

func QueryLogStreamCount_Historical(t *testing.T, client HTTPClient, stream string, count uint64) {
Expand Down Expand Up @@ -561,3 +570,31 @@ func checkAPIAccess(t *testing.T, client HTTPClient, stream string, role string)
require.Equalf(t, 403, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, readAsString(response.Body))
}
}

func activateHotTier(t *testing.T) {
payload := StreamHotTier{
Size: fmt.Sprintf("%d", int64(20*math.Pow(1024, 3))), // set hot tier size to be 20 GB
}
json, _ := json.Marshal(payload)

req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/hottier", bytes.NewBuffer(json))
req.Header.Set("Content-Type", "application/json")
response, err := NewGlob.QueryClient.Do(req)
body := readAsString(response.Body)

if NewGlob.IngestorUrl.String() != "" {
require.Equalf(t, 200, response.StatusCode, "Server returned unexpected http code: %s and response: %s", response.Status, body)
require.NoErrorf(t, err, "Activating hot tier failed in distributed mode: %s", err)
} else {
// right now, hot tier is unavailable in standalone so anything other than 200 is fine
require.NotEqualf(t, 200, response.StatusCode, "Hot tier has been activated in standalone mode: %s and response: %s", response.Status, body)
}
}

func disableHotTier(t *testing.T) {
req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/hottier", nil)
vishalkrishnads marked this conversation as resolved.
Show resolved Hide resolved
response, err := NewGlob.QueryClient.Do(req)
body := readAsString(response.Body)
require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body)
require.NoErrorf(t, err, "Disabling hot tier failed: %s", err)
}
Loading