diff --git a/pkg/ccl/changefeedccl/cdctest/nemeses.go b/pkg/ccl/changefeedccl/cdctest/nemeses.go index 29b618b38b5f..bfd74dd364b6 100644 --- a/pkg/ccl/changefeedccl/cdctest/nemeses.go +++ b/pkg/ccl/changefeedccl/cdctest/nemeses.go @@ -27,13 +27,18 @@ type ChangefeedOption struct { func newChangefeedOption() ChangefeedOption { cfo := ChangefeedOption{ - FullTableName: true, //rand.Intn(2) < 1, - KeyInValue: true, //rand.Intn(2) < 1, - Format: "cloud", // fix + FullTableName: rand.Intn(2) < 1, + KeyInValue: rand.Intn(2) < 1, + Format: "json", } - //if rand.Intn(2) < 1 { - // cfo.Format = "parquet" - //} + + // TODO: work out why parquet and key_in_value are not compatible here + // error is "this sink is incompatible with format=parquet" + // but before we were apparenly applying this parquet format without issue + if !cfo.KeyInValue && rand.Intn(2) < 1 { + cfo.Format = "parquet" + } + return cfo } @@ -232,7 +237,6 @@ func RunNemesis( if cfo.KeyInValue { options = options + ", key_in_value" } - fmt.Println("running with these options:", options) foo, err := f.Feed(fmt.Sprintf( `CREATE CHANGEFEED FOR foo WITH updated, resolved, diff%s`, options, )) @@ -826,7 +830,6 @@ func noteFeedMessage(a fsm.Args) error { } for { m, err := ns.f.Next() - fmt.Println("feed message", m) if err != nil { return err } else if m == nil { diff --git a/pkg/ccl/changefeedccl/cdctest/validator.go b/pkg/ccl/changefeedccl/cdctest/validator.go index 56921ebbdab6..4f9891a79256 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator.go +++ b/pkg/ccl/changefeedccl/cdctest/validator.go @@ -253,17 +253,17 @@ func (v *beforeAfterValidator) NoteRow( } if v.keyInValue { - fmt.Println("we have key in value") keyInValueJSON, err := valueJSON.FetchValKey("key") - fmt.Println(keyInValueJSON, err) - // if err != nil { - // return err - // } - // comparison, err := keyJSON.Compare(keyInValueJSON) - // fmt.Println(keyInValueJSON.String(), keyJSON.String(), comparison, err) - // //if keyJSON != keyInValueJSON { - // // v.failures = append(v.failures, fmt.Sprintf("key %s does not match expected value %s", key, keyInValueJSON)) - // //} + if err != nil { + return err + } + keyInValueString := keyInValueJSON.String() + keyString := keyJSON.String() + if keyInValueString != keyString { + v.failures = append(v.failures, fmt.Sprintf( + "key in value %s does not match expected key value %s", + keyInValueString, keyString)) + } } afterJSON, err := valueJSON.FetchValKey("after") diff --git a/pkg/ccl/changefeedccl/cdctest/validator_test.go b/pkg/ccl/changefeedccl/cdctest/validator_test.go index d45fd79b5cfe..ae8fa8ba85ad 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator_test.go +++ b/pkg/ccl/changefeedccl/cdctest/validator_test.go @@ -99,6 +99,12 @@ func TestOrderValidator(t *testing.T) { }) } +var standardChangefeedOptions = ChangefeedOption{ + FullTableName: false, + KeyInValue: false, + Format: "json", +} + func TestBeforeAfterValidator(t *testing.T) { defer leaktest.AfterTest(t)() @@ -134,17 +140,30 @@ func TestBeforeAfterValidator(t *testing.T) { } t.Run(`empty`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) require.NoError(t, err) assertValidatorFailures(t, v) }) t.Run(`fullTableName`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, true) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, ChangefeedOption{ + FullTableName: true, + KeyInValue: false, + Format: "json", + }) + require.NoError(t, err) + assertValidatorFailures(t, v) + }) + t.Run(`key_in_value`, func(t *testing.T) { + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, ChangefeedOption{ + FullTableName: false, + KeyInValue: true, + Format: "json", + }) require.NoError(t, err) assertValidatorFailures(t, v) }) t.Run(`during initial`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) require.NoError(t, err) // "before" is ignored if missing. noteRow(t, v, `p`, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1], `foo`) @@ -159,7 +178,7 @@ func TestBeforeAfterValidator(t *testing.T) { `' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 3]`) }) t.Run(`missing before`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // "before" should have been provided. @@ -170,7 +189,7 @@ func TestBeforeAfterValidator(t *testing.T) { `' WHERE to_json(k)::TEXT = $1 [1]`) }) t.Run(`incorrect before`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // "before" provided with wrong value. @@ -181,7 +200,7 @@ func TestBeforeAfterValidator(t *testing.T) { `' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [5 10]`) }) t.Run(`unnecessary before`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // "before" provided but should not have been. @@ -192,7 +211,7 @@ func TestBeforeAfterValidator(t *testing.T) { `' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 1]`) }) t.Run(`missing after`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // "after" should have been provided. @@ -203,7 +222,7 @@ func TestBeforeAfterValidator(t *testing.T) { `' WHERE to_json(k)::TEXT = $1 [1]`) }) t.Run(`incorrect after`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // "after" provided with wrong value. @@ -214,7 +233,7 @@ func TestBeforeAfterValidator(t *testing.T) { `' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 5]`) }) t.Run(`unnecessary after`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // "after" provided but should not have been. @@ -225,7 +244,7 @@ func TestBeforeAfterValidator(t *testing.T) { `' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 3]`) }) t.Run(`incorrect before and after`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // "before" and "after" both provided with wrong value. @@ -239,7 +258,7 @@ func TestBeforeAfterValidator(t *testing.T) { `' WHERE to_json(k)::TEXT = $1 AND to_json(v)::TEXT = $2 [1 4]`) }) t.Run(`correct`, func(t *testing.T) { - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) noteRow(t, v, `p`, `[1]`, `{}`, ts[0], `foo`) @@ -278,7 +297,7 @@ func TestBeforeAfterValidatorForGeometry(t *testing.T) { t.Fatal(err) } } - v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, false) + v, err := NewBeforeAfterValidator(sqlDBRaw, `foo`, standardChangefeedOptions) require.NoError(t, err) assertValidatorFailures(t, v) noteRow(t, v, `p`, `[1]`, `{"after": {"k":1, "geom":{"coordinates": [1,2], "type": "Point"}}}`, ts[0], `foo`)