Skip to content

Commit

Permalink
chore: store fuzz test
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Jan 17, 2025
1 parent 886008a commit 2e4d391
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 94 deletions.
174 changes: 81 additions & 93 deletions jobsdb/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@ import (
"context"
"errors"
"fmt"
"slices"
"strings"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/testhelper/rand"

. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
)

Expand Down Expand Up @@ -1207,111 +1210,96 @@ func TestJobsdbSanitizeJSON(t *testing.T) {
}
}

// Fuzzer represents a test fuzzer for analytics events
type Fuzzer struct {
testData []string
}

// NewFuzzer creates a new Fuzzer instance
func NewFuzzer() *Fuzzer {
return &Fuzzer{
testData: make([]string, 0),
}
}

// Add adds a test case to the fuzzer
func (f *Fuzzer) Add(data string) {
f.testData = append(f.testData, data)
}

// TestFuzzCorpus is the main test function that sets up the test corpus
func GenerateFuzzCorpus() []string {
f := NewFuzzer()

// Basic event types
f.Add(`{"type":"alias","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","previousId":"previousId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)
f.Add(`{"type":"alias","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","previousId":"previousId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","traits":{"title":"Home | RudderStack","url":"https://www.rudderstack.com"}}`)
func FuzzTestStore(f *testing.F) {
_ = startPostgres(f)

// Add page views
f.Add(`{"type":"page","name":"Home","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)

// Add screen views
f.Add(`{"type":"screen","name":"Main","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)
columnTypes := []string{"jsonb", "text", "bytea"}
for _, columnType := range columnTypes {
f.Add(columnType, `{"type":"alias","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","previousId":"previousId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)
f.Add(columnType, `{"type":"alias","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","previousId":"previousId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","traits":{"title":"Home | RudderStack","url":"https://www.rudderstack.com"}}`)

// Add page views
f.Add(columnType, `{"type":"page","name":"Home","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)

// Add screen views
f.Add(columnType, `{"type":"screen","name":"Main","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)

// Add group events
f.Add(columnType, `{"type":"group","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","groupId":"groupId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)

// Add track events
f.Add(columnType, `{"type":"track","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","event":"event","request_ip":"5.6.7.8","userProperties":{"rating":3.0,"review_body":"OK for the price. It works but the material feels flimsy."},"context":{"traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)

// Test column names with special characters and edge cases
columnNames := []string{
// SQL keywords
"select", "from", "where", "and", "or", "not", "insert", "update", "delete",
// Special characters
"column name", "column-name", "column.name", "column@name", "column#name",
// Unicode characters
"columnñame", "colûmnname", "columnнаме", "列名", "カラム名",
// Very long names
"this_is_a_very_long_column_name_that_exceeds_the_maximum_allowed_length",
}

// Add group events
f.Add(`{"type":"group","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","groupId":"groupId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)
for _, columnName := range columnNames {
f.Add(columnType, fmt.Sprintf(`{"type":"track","messageId":"messageId","userId":"userId","event":"test","properties":{"%s":"test_value"}}`, columnName))
}

// Add track events
f.Add(`{"type":"track","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","event":"event","request_ip":"5.6.7.8","userProperties":{"rating":3.0,"review_body":"OK for the price. It works but the material feels flimsy."},"context":{"traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)
// Test event names with special cases
eventNames := []string{
"omega", "omega v2 ", "9mega", "mega&", "ome$ga",
"select", "drop", "create", "alter", "index",
"name with spaces", "name@with@special@chars",
"序列化", "テーブル", "таблица",
}

// Test column names with special characters and edge cases
columnNames := []string{
// SQL keywords
"select", "from", "where", "and", "or", "not", "insert", "update", "delete",
// Special characters
"column name", "column-name", "column.name", "column@name", "column#name",
// Unicode characters
"columnñame", "colûmnname", "columnнаме", "列名", "カラム名",
// Very long names
"this_is_a_very_long_column_name_that_exceeds_the_maximum_allowed_length",
}
for _, eventName := range eventNames {
f.Add(columnType, fmt.Sprintf(`{"type":"track","messageId":"messageId","userId":"userId","event":"%s","properties":{"test":"value"}}`, eventName))
}

for _, columnName := range columnNames {
f.Add(fmt.Sprintf(`{"type":"track","messageId":"messageId","userId":"userId","event":"test","properties":{"%s":"test_value"}}`, columnName))
}
// Add merge events
f.Add(columnType, `{"type":"merge","mergeProperties":[{"type":"email","value":"[email protected]"},{"type":"mobile","value":"+1-202-555-0146"}]}`)
f.Add(columnType, `{"type":"merge"}`)
f.Add(columnType, `{"type":"merge", "mergeProperties": "invalid"}`)

// Test event names with special cases
eventNames := []string{
"omega", "omega v2 ", "9mega", "mega&", "ome$ga",
"select", "drop", "create", "alter", "index",
"name with spaces", "name@with@special@chars",
"序列化", "テーブル", "таблица",
}
// Add identify events
f.Add(columnType, `{"type":"identify","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2}}`)

for _, eventName := range eventNames {
f.Add(fmt.Sprintf(`{"type":"track","messageId":"messageId","userId":"userId","event":"%s","properties":{"test":"value"}}`, eventName))
// Add extract events
f.Add(columnType, `{"type":"extract","recordId":"recordID","event":"event","receivedAt":"2021-09-01T00:00:00.000Z","context":{"traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)
}

// Add merge events
f.Add(`{"type":"merge","mergeProperties":[{"type":"email","value":"[email protected]"},{"type":"mobile","value":"+1-202-555-0146"}]}`)
f.Add(`{"type":"merge"}`)
f.Add(`{"type":"merge", "mergeProperties": "invalid"}`)
f.Fuzz(func(t *testing.T, columnType, payload string) {
if !slices.Contains(columnTypes, columnType) {
return
}
if !gjson.Valid(payload) {
return
}

// Add identify events
f.Add(`{"type":"identify","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2}}`)
conf := config.New()
conf.Set("JobsDB.payloadColumnType", columnType)

// Add extract events
f.Add(`{"type":"extract","recordId":"recordID","event":"event","receivedAt":"2021-09-01T00:00:00.000Z","context":{"traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)
return f.testData
}

func Test_FuzzTestStore(t *testing.T) {
_ = startPostgres(t)
conf := config.New()
columnTypes := []string{"jsonb", "text", "bytea"}
for _, column := range columnTypes {
t.Run(fmt.Sprintf("Store with %s column type", column), func(t *testing.T) {
conf.Set("JobsDB.payloadColumnType", column)
jobDB := Handle{config: conf}
err := jobDB.Setup(ReadWrite, true, column+"_"+strings.ToLower(rand.String(5)))
require.NoError(t, err)
testPayloads := GenerateFuzzCorpus()
for i, payload := range testPayloads {
jobs := []*JobT{{
Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`),
EventPayload: []byte(payload),
UserID: uuid.New().String(),
UUID: uuid.New(),
CustomVal: fmt.Sprintf("TEST_%d", i),
WorkspaceId: defaultWorkspaceID,
EventCount: 1,
}}
err := jobDB.Store(context.Background(), jobs)
require.NoError(t, err)
}
jobDB := Handle{config: conf}
err := jobDB.Setup(ReadWrite, true, columnType+"_"+strings.ToLower(rand.String(5)))
require.NoError(t, err)
defer func() {
jobDB.TearDown()
})
}
}()

jobs := []*JobT{{
Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`),
EventPayload: []byte(payload),
UserID: uuid.New().String(),
UUID: uuid.New(),
CustomVal: "TEST",
WorkspaceId: defaultWorkspaceID,
EventCount: 1,
}}
err = jobDB.Store(context.Background(), jobs)
require.NoError(t, err)
})
}

// BenchmarkJobsdb takes time... keep waiting
Expand Down
4 changes: 3 additions & 1 deletion jobsdb/jobsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/google/uuid"
"github.com/lib/pq"
"github.com/ory/dockertest/v3"
"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/samber/lo"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
rsRand "github.com/rudderlabs/rudder-go-kit/testhelper/rand"

"github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/jobsdb/internal/lock"
"github.com/rudderlabs/rudder-server/utils/misc"
Expand Down Expand Up @@ -1550,7 +1552,7 @@ type testingT interface {
func startPostgres(t testingT) *postgres.Resource {
pool, err := dockertest.NewPool("")
require.NoError(t, err)
postgresContainer, err := postgres.Setup(pool, t)
postgresContainer, err := postgres.Setup(pool, t, postgres.WithOptions("max_connections=1000"), postgres.WithShmSize(256*bytesize.MB))
require.NoError(t, err)
t.Setenv("LOG_LEVEL", "DEBUG")
t.Setenv("JOBS_DB_DB_NAME", postgresContainer.Database)
Expand Down
3 changes: 3 additions & 0 deletions jobsdb/testdata/fuzz/FuzzTestStore/1d4563ebe3d5804f
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
go test fuzz v1
string("bytea")
string("{\"type\":\"alias\",\"messageId\":\"messageId\",\"anonymousId\":\"anonymousId\",\"userId\":\"userId\",\"previousId\":\"previousId\",\"sentAt\":\"2021-09-01T00:00:00.000Z\",\"timestamp\":\"2021-09-01T00:00:00.000Z\",\"receivedAt\":\"2021-09-01T00:00:00.000Z\",\"originalTimestamp\":\"2021-09-01T00:00:00\x8c000Z\",\"channel\":\"web\",\"request_ip\":\"5.6.7.8\",\"context\":{\"traits\":{\"email\":\"[email protected]\",\"logins\":2},\"ip\":\"1.2.3.4\"}}")

0 comments on commit 2e4d391

Please sign in to comment.