Skip to content

Commit

Permalink
add key_in_value check
Browse files Browse the repository at this point in the history
  • Loading branch information
aerfrei committed Dec 19, 2024
1 parent b73397e commit 7b3f4e1
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 30 deletions.
19 changes: 11 additions & 8 deletions pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@ type ChangefeedOption struct {

func newChangefeedOption() ChangefeedOption {
cfo := ChangefeedOption{
FullTableName: true, //rand.Intn(2) < 1,
KeyInValue: true, //rand.Intn(2) < 1,
Format: "cloud", // fix
FullTableName: rand.Intn(2) < 1,
KeyInValue: rand.Intn(2) < 1,
Format: "json",
}
//if rand.Intn(2) < 1 {
// cfo.Format = "parquet"
//}

// TODO: work out why parquet and key_in_value are not compatible here
// error is "this sink is incompatible with format=parquet"
// but before we were apparenly applying this parquet format without issue
if !cfo.KeyInValue && rand.Intn(2) < 1 {
cfo.Format = "parquet"
}

return cfo
}

Expand Down Expand Up @@ -232,7 +237,6 @@ func RunNemesis(
if cfo.KeyInValue {
options = options + ", key_in_value"
}
fmt.Println("running with these options:", options)
foo, err := f.Feed(fmt.Sprintf(
`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff%s`, options,
))
Expand Down Expand Up @@ -826,7 +830,6 @@ func noteFeedMessage(a fsm.Args) error {
}
for {
m, err := ns.f.Next()
fmt.Println("feed message", m)
if err != nil {
return err
} else if m == nil {
Expand Down
20 changes: 10 additions & 10 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,17 @@ func (v *beforeAfterValidator) NoteRow(
}

if v.keyInValue {
fmt.Println("we have key in value")
keyInValueJSON, err := valueJSON.FetchValKey("key")
fmt.Println(keyInValueJSON, err)
// if err != nil {
// return err
// }
// comparison, err := keyJSON.Compare(keyInValueJSON)
// fmt.Println(keyInValueJSON.String(), keyJSON.String(), comparison, err)
// //if keyJSON != keyInValueJSON {
// // v.failures = append(v.failures, fmt.Sprintf("key %s does not match expected value %s", key, keyInValueJSON))
// //}
if err != nil {
return err
}
keyInValueString := keyInValueJSON.String()
keyString := keyJSON.String()
if keyInValueString != keyString {
v.failures = append(v.failures, fmt.Sprintf(
"key in value %s does not match expected key value %s",
keyInValueString, keyString))
}
}

afterJSON, err := valueJSON.FetchValKey("after")
Expand Down
43 changes: 31 additions & 12 deletions pkg/ccl/changefeedccl/cdctest/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func TestOrderValidator(t *testing.T) {
})
}

var standardChangefeedOptions = ChangefeedOption{
FullTableName: false,
KeyInValue: false,
Format: "json",
}

func TestBeforeAfterValidator(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -134,17 +140,30 @@ func TestBeforeAfterValidator(t *testing.T) {
}

t.Run(`empty`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
require.NoError(t, err)
assertValidatorFailures(t, v)
})
t.Run(`fullTableName`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, true)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, ChangefeedOption{
FullTableName: true,
KeyInValue: false,
Format: "json",
})
require.NoError(t, err)
assertValidatorFailures(t, v)
})
t.Run(`key_in_value`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, ChangefeedOption{
FullTableName: false,
KeyInValue: true,
Format: "json",
})
require.NoError(t, err)
assertValidatorFailures(t, v)
})
t.Run(`during initial`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
require.NoError(t, err)
// "before" is ignored if missing.
noteRow(t, v, `p`, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1], `foo`)
Expand All @@ -159,7 +178,7 @@ func TestBeforeAfterValidator(t *testing.T) {
`' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 3]`)
})
t.Run(`missing before`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
// "before" should have been provided.
Expand All @@ -170,7 +189,7 @@ func TestBeforeAfterValidator(t *testing.T) {
`' WHERE to_json(k)::TEXT = $1 [1]`)
})
t.Run(`incorrect before`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
// "before" provided with wrong value.
Expand All @@ -181,7 +200,7 @@ func TestBeforeAfterValidator(t *testing.T) {
`' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [5 10]`)
})
t.Run(`unnecessary before`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
// "before" provided but should not have been.
Expand All @@ -192,7 +211,7 @@ func TestBeforeAfterValidator(t *testing.T) {
`' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 1]`)
})
t.Run(`missing after`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
// "after" should have been provided.
Expand All @@ -203,7 +222,7 @@ func TestBeforeAfterValidator(t *testing.T) {
`' WHERE to_json(k)::TEXT = $1 [1]`)
})
t.Run(`incorrect after`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
// "after" provided with wrong value.
Expand All @@ -214,7 +233,7 @@ func TestBeforeAfterValidator(t *testing.T) {
`' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 5]`)
})
t.Run(`unnecessary after`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
// "after" provided but should not have been.
Expand All @@ -225,7 +244,7 @@ func TestBeforeAfterValidator(t *testing.T) {
`' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 3]`)
})
t.Run(`incorrect before and after`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
// "before" and "after" both provided with wrong value.
Expand All @@ -239,7 +258,7 @@ func TestBeforeAfterValidator(t *testing.T) {
`' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 4]`)
})
t.Run(`correct`, func(t *testing.T) {
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
require.NoError(t, err)
noteResolved(t, v, `p`, ts[0])
noteRow(t, v, `p`, `[1]`, `{}`, ts[0], `foo`)
Expand Down Expand Up @@ -278,7 +297,7 @@ func TestBeforeAfterValidatorForGeometry(t *testing.T) {
t.Fatal(err)
}
}
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false)
v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions)
require.NoError(t, err)
assertValidatorFailures(t, v)
noteRow(t, v, `p`, `[1]`, `{"after": {"k":1, "geom":{"coordinates": [1,2], "type": "Point"}}}`, ts[0], `foo`)
Expand Down

0 comments on commit 7b3f4e1

Please sign in to comment.