diff --git a/docs/modules/components/pages/inputs/gcp_spanner_change_stream.adoc b/docs/modules/components/pages/inputs/gcp_spanner_change_stream.adoc new file mode 100644 index 0000000000..43a69aa036 --- /dev/null +++ b/docs/modules/components/pages/inputs/gcp_spanner_change_stream.adoc @@ -0,0 +1,148 @@ += gcp_spanner_change_stream +:type: input +:status: beta +:categories: ["Services","GCP"] + + + +//// + THIS FILE IS AUTOGENERATED! + + To make changes, edit the corresponding source file under: + + https://github.com/redpanda-data/connect/tree/main/internal/impl/. + + And: + + https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl +//// + +// © 2024 Redpanda Data Inc. + + +component_type_dropdown::[] + + +Creates an input that consumes from a spanner change stream. + +Introduced in version 3.43.0. + + +[tabs] +====== +Common:: ++ +-- + +```yml +# Common config fields, showing default values +input: + label: "" + gcp_spanner_change_stream: + stream_dsn: projects//instances//databases/ # No default (required) + stream_id: "" + partition_dsn: projects//instances//databases/ # No default (optional) + partition_table: "" # No default (optional) + use_in_mememory_partition: false +``` + +-- +Advanced:: ++ +-- + +```yml +# All config fields, showing default values +input: + label: "" + gcp_spanner_change_stream: + stream_dsn: projects//instances//databases/ # No default (required) + stream_id: "" + start_time_epoch: 0 + partition_dsn: projects//instances//databases/ # No default (optional) + partition_table: "" # No default (optional) + use_in_mememory_partition: false + allowed_mod_types: + - INSERT + - UPDATE + - DELETE +``` + +-- +====== + +== Fields + +=== `stream_dsn` + +Required field to use to connect to spanner for the change stream. + + +*Type*: `string` + + +```yml +# Examples + +stream_dsn: projects//instances//databases/ +``` + +=== `stream_id` + +Required name of the change stream to track. + + +*Type*: `string` + +*Default*: `""` + +=== `start_time_epoch` + +Optional microsecond accurate epoch timestamp to start reading from. If empty time.Now() will be used. + + +*Type*: `int` + +*Default*: `0` + +=== `partition_dsn` + +Field used to set the DSN for the metadata partition table, can be the same as stream_dsn. + + +*Type*: `string` + + +```yml +# Examples + +partition_dsn: projects//instances//databases/ +``` + +=== `partition_table` + +Name of the table to create/use in spanner to track change stream partition metadata. + + +*Type*: `string` + + +=== `use_in_mememory_partition` + +use an in memory partition table for tracking the partitions. + + +*Type*: `bool` + +*Default*: `false` + +=== `allowed_mod_types` + +Mod types to allow through when reading the change stream, default all. + + +*Type*: `array` + +*Default*: `["INSERT","UPDATE","DELETE"]` + + diff --git a/internal/impl/gcp/input_spanner_change_stream.go b/internal/impl/gcp/input_spanner_change_stream.go index 375b289030..acaa47c617 100644 --- a/internal/impl/gcp/input_spanner_change_stream.go +++ b/internal/impl/gcp/input_spanner_change_stream.go @@ -41,13 +41,13 @@ func newSpannerChangeStreamInputConfig() *service.ConfigSpec { Version("3.43.0"). Categories("Services", "GCP"). Summary("Creates an input that consumes from a spanner change stream."). - Field(service.NewStringField("partition_dsn")). - Field(service.NewStringField("partition_table")). - Field(service.NewBoolField("use_in_mememory_partition").Description("use an in memory partition table for tracking the partitions").Default(false)). - Field(service.NewStringField("stream_dsn").Optional().Default("")). - Field(service.NewStringField("stream_id").Description("The name of the change stream to track").Default("")). - Field(service.NewIntField("start_time_epoch").Optional().Description("Microsecond accurate epoch timestamp to start reading from").Default(0)). - Field(service.NewStringListField("allowed_mod_types").Default([]string{"INSERT", "UPDATE", "DELETE"})) + Field(service.NewStringField("stream_dsn").Description("Required field to use to connect to spanner for the change stream.").Example("projects//instances//databases/")). + Field(service.NewStringField("stream_id").Description("Required name of the change stream to track.").Default("")). + Field(service.NewIntField("start_time_epoch").Advanced().Optional().Default(0).Description("Optional microsecond accurate epoch timestamp to start reading from. If empty time.Now() will be used.")). + Field(service.NewStringField("partition_dsn").Optional().Description("Field used to set the DSN for the metadata partition table, can be the same as stream_dsn.").Example("projects//instances//databases/")). + Field(service.NewStringField("partition_table").Optional().Description("Name of the table to create/use in spanner to track change stream partition metadata.")). + Field(service.NewBoolField("use_in_mememory_partition").Description("use an in memory partition table for tracking the partitions.").Default(false)). + Field(service.NewStringListField("allowed_mod_types").Advanced().Description("Mod types to allow through when reading the change stream, default all.").Default([]string{"INSERT", "UPDATE", "DELETE"})) } func newSpannerStreamInput(conf *service.ParsedConfig, log *service.Logger) (out *spannerStreamInput, err error) { @@ -58,23 +58,23 @@ func newSpannerStreamInput(conf *service.ParsedConfig, log *service.Logger) (out log: log, shutdownSig: shutdown.NewSignaller(), } - out.partitionDSN, err = conf.FieldString("partition_dsn") - if err != nil { + if out.partitionDSN, err = conf.FieldString("partition_dsn"); err != nil { return } - out.partitionTable, err = conf.FieldString("partition_table") - if err != nil { + if out.partitionTable, err = conf.FieldString("partition_table"); err != nil { return } - out.streamDSN, err = conf.FieldString("stream_dsn") - if err != nil { + if out.streamDSN, err = conf.FieldString("stream_dsn"); err != nil { return } - out.streamID, err = conf.FieldString("stream_id") - if err != nil { + if out.streamID, err = conf.FieldString("stream_id"); err != nil { + return + } + + if out.allowedModTypes, err = conf.FieldStringList("allowed_mod_types"); err != nil { return } @@ -95,10 +95,6 @@ func newSpannerStreamInput(conf *service.ParsedConfig, log *service.Logger) (out }(startTimeEpoch) } - out.allowedModTypes, err = conf.FieldStringList("allowed_mod_types") - if err != nil { - return - } if !useInMemPartition && slices.Contains([]string{out.partitionDSN, out.partitionTable, out.streamDSN, out.streamID}, "") { return nil, errors.New("partition_dsn, partition_table, stream_dsn, and stream_id must be set") } else if slices.Contains([]string{out.streamDSN, out.streamID}, "") { @@ -110,7 +106,7 @@ func newSpannerStreamInput(conf *service.ParsedConfig, log *service.Logger) (out func init() { err := service.RegisterInput( - "spanner_change_stream", newSpannerChangeStreamInputConfig(), + "gcp_spanner_change_stream", newSpannerChangeStreamInputConfig(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { return newSpannerStreamInput(conf, mgr.Logger()) }) diff --git a/internal/impl/gcp/input_spanner_change_stream_test.go b/internal/impl/gcp/input_spanner_change_stream_test.go index 1f3091969b..bbb6d4a759 100644 --- a/internal/impl/gcp/input_spanner_change_stream_test.go +++ b/internal/impl/gcp/input_spanner_change_stream_test.go @@ -48,6 +48,9 @@ func TestGCPSpannerChangeStreamInput_Read(t *testing.T) { proc, err := newSpannerStreamInput(parsed, nil) require.NoError(t, err) + mockStreamReader := &mockStreamReader{} + proc.reader = mockStreamReader + dataChangeRecord := &model.DataChangeRecord{ CommitTimestamp: time.Now(), RecordSequence: "0000001", @@ -105,7 +108,7 @@ func TestGCPSpannerChangeStreamInput_Connect(t *testing.T) { mockStreamReader := &mockStreamReader{} proc.reader = mockStreamReader - mockStreamReader.On("Stream", ctx, mock.Anything).Once().Return(nil) + mockStreamReader.On("Stream", mock.AnythingOfType("*context.cancelCtx"), mock.Anything).Once().Return(nil) defer mockStreamReader.AssertExpectations(t) err = proc.Connect(ctx) @@ -136,7 +139,7 @@ func TestGCPSpannerChangeStreamInput_Close(t *testing.T) { defer mockStreamReader.AssertExpectations(t) proc.reader = mockStreamReader - mockStreamReader.On("Stream", ctx, mock.Anything).Once().Return(nil) + mockStreamReader.On("Stream", mock.AnythingOfType("*context.cancelCtx"), mock.Anything).Once().Return(nil) mockStreamReader.On("Close", mock.Anything).Once().Return(nil) diff --git a/internal/plugins/info.csv b/internal/plugins/info.csv index 397eb732c5..c57c787274 100644 --- a/internal/plugins/info.csv +++ b/internal/plugins/info.csv @@ -84,6 +84,7 @@ gcp_cloud_storage ,output ,GCP Cloud Storage ,3.43.0 ,certif gcp_cloudtrace ,tracer ,GCP Cloud Trace ,4.2.0 ,certified ,n ,y ,y gcp_pubsub ,input ,GCP PubSub ,0.0.0 ,certified ,n ,y ,y gcp_pubsub ,output ,GCP PubSub ,0.0.0 ,certified ,n ,y ,y +gcp_spanner_change_stream ,input ,gcp_spanner_change_stream ,0.0.0 ,community ,n ,n ,n gcp_vertex_ai_chat ,processor ,GCP Vertex AI ,4.34.0 ,enterprise ,n ,y ,y gcp_vertex_ai_embeddings ,processor ,gcp_vertex_ai_embeddings ,4.37.0 ,enterprise ,n ,y ,y generate ,input ,generate ,3.40.0 ,certified ,n ,y ,y