Skip to content

Commit

Permalink
update documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
anicoll committed Dec 9, 2024
1 parent ab644a1 commit 4b734f3
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 22 deletions.
148 changes: 148 additions & 0 deletions docs/modules/components/pages/inputs/gcp_spanner_change_stream.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
= gcp_spanner_change_stream
:type: input
:status: beta
:categories: ["Services","GCP"]



////
THIS FILE IS AUTOGENERATED!

To make changes, edit the corresponding source file under:

https://github.com/redpanda-data/connect/tree/main/internal/impl/<provider>.

And:

https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl
////
// © 2024 Redpanda Data Inc.
component_type_dropdown::[]
Creates an input that consumes from a spanner change stream.
Introduced in version 3.43.0.
[tabs]
======
Common::
+
--
```yml
# Common config fields, showing default values
input:
label: ""
gcp_spanner_change_stream:
stream_dsn: projects/<project_id>/instances/<instance_id>/databases/<database_id> # No default (required)
stream_id: ""
partition_dsn: projects/<project_id>/instances/<instance_id>/databases/<database_id> # No default (optional)
partition_table: "" # No default (optional)
use_in_mememory_partition: false
```
--
Advanced::
+
--
```yml
# All config fields, showing default values
input:
label: ""
gcp_spanner_change_stream:
stream_dsn: projects/<project_id>/instances/<instance_id>/databases/<database_id> # No default (required)
stream_id: ""
start_time_epoch: 0
partition_dsn: projects/<project_id>/instances/<instance_id>/databases/<database_id> # No default (optional)
partition_table: "" # No default (optional)
use_in_mememory_partition: false
allowed_mod_types:
- INSERT
- UPDATE
- DELETE
```
--
======
== Fields
=== `stream_dsn`
Required field to use to connect to spanner for the change stream.
*Type*: `string`
```yml
# Examples
stream_dsn: projects/<project_id>/instances/<instance_id>/databases/<database_id>
```
=== `stream_id`
Required name of the change stream to track.
*Type*: `string`
*Default*: `""`
=== `start_time_epoch`
Optional microsecond accurate epoch timestamp to start reading from. If empty time.Now() will be used.
*Type*: `int`
*Default*: `0`
=== `partition_dsn`
Field used to set the DSN for the metadata partition table, can be the same as stream_dsn.
*Type*: `string`
```yml
# Examples
partition_dsn: projects/<project_id>/instances/<instance_id>/databases/<database_id>
```
=== `partition_table`
Name of the table to create/use in spanner to track change stream partition metadata.
*Type*: `string`
=== `use_in_mememory_partition`
use an in memory partition table for tracking the partitions.
*Type*: `bool`
*Default*: `false`
=== `allowed_mod_types`
Mod types to allow through when reading the change stream, default all.
*Type*: `array`
*Default*: `["INSERT","UPDATE","DELETE"]`
36 changes: 16 additions & 20 deletions internal/impl/gcp/input_spanner_change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ func newSpannerChangeStreamInputConfig() *service.ConfigSpec {
Version("3.43.0").
Categories("Services", "GCP").
Summary("Creates an input that consumes from a spanner change stream.").
Field(service.NewStringField("partition_dsn")).
Field(service.NewStringField("partition_table")).
Field(service.NewBoolField("use_in_mememory_partition").Description("use an in memory partition table for tracking the partitions").Default(false)).
Field(service.NewStringField("stream_dsn").Optional().Default("")).
Field(service.NewStringField("stream_id").Description("The name of the change stream to track").Default("")).
Field(service.NewIntField("start_time_epoch").Optional().Description("Microsecond accurate epoch timestamp to start reading from").Default(0)).
Field(service.NewStringListField("allowed_mod_types").Default([]string{"INSERT", "UPDATE", "DELETE"}))
Field(service.NewStringField("stream_dsn").Description("Required field to use to connect to spanner for the change stream.").Example("projects/<project_id>/instances/<instance_id>/databases/<database_id>")).
Field(service.NewStringField("stream_id").Description("Required name of the change stream to track.").Default("")).
Field(service.NewIntField("start_time_epoch").Advanced().Optional().Default(0).Description("Optional microsecond accurate epoch timestamp to start reading from. If empty time.Now() will be used.")).
Field(service.NewStringField("partition_dsn").Optional().Description("Field used to set the DSN for the metadata partition table, can be the same as stream_dsn.").Example("projects/<project_id>/instances/<instance_id>/databases/<database_id>")).
Field(service.NewStringField("partition_table").Optional().Description("Name of the table to create/use in spanner to track change stream partition metadata.")).
Field(service.NewBoolField("use_in_mememory_partition").Description("use an in memory partition table for tracking the partitions.").Default(false)).
Field(service.NewStringListField("allowed_mod_types").Advanced().Description("Mod types to allow through when reading the change stream, default all.").Default([]string{"INSERT", "UPDATE", "DELETE"}))
}

func newSpannerStreamInput(conf *service.ParsedConfig, log *service.Logger) (out *spannerStreamInput, err error) {
Expand All @@ -58,23 +58,23 @@ func newSpannerStreamInput(conf *service.ParsedConfig, log *service.Logger) (out
log: log,
shutdownSig: shutdown.NewSignaller(),
}
out.partitionDSN, err = conf.FieldString("partition_dsn")
if err != nil {
if out.partitionDSN, err = conf.FieldString("partition_dsn"); err != nil {
return
}

out.partitionTable, err = conf.FieldString("partition_table")
if err != nil {
if out.partitionTable, err = conf.FieldString("partition_table"); err != nil {
return
}

out.streamDSN, err = conf.FieldString("stream_dsn")
if err != nil {
if out.streamDSN, err = conf.FieldString("stream_dsn"); err != nil {
return
}

out.streamID, err = conf.FieldString("stream_id")
if err != nil {
if out.streamID, err = conf.FieldString("stream_id"); err != nil {
return
}

if out.allowedModTypes, err = conf.FieldStringList("allowed_mod_types"); err != nil {
return
}

Expand All @@ -95,10 +95,6 @@ func newSpannerStreamInput(conf *service.ParsedConfig, log *service.Logger) (out
}(startTimeEpoch)
}

out.allowedModTypes, err = conf.FieldStringList("allowed_mod_types")
if err != nil {
return
}
if !useInMemPartition && slices.Contains([]string{out.partitionDSN, out.partitionTable, out.streamDSN, out.streamID}, "") {
return nil, errors.New("partition_dsn, partition_table, stream_dsn, and stream_id must be set")
} else if slices.Contains([]string{out.streamDSN, out.streamID}, "") {
Expand All @@ -110,7 +106,7 @@ func newSpannerStreamInput(conf *service.ParsedConfig, log *service.Logger) (out

func init() {
err := service.RegisterInput(
"spanner_change_stream", newSpannerChangeStreamInputConfig(),
"gcp_spanner_change_stream", newSpannerChangeStreamInputConfig(),
func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) {
return newSpannerStreamInput(conf, mgr.Logger())
})
Expand Down
7 changes: 5 additions & 2 deletions internal/impl/gcp/input_spanner_change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func TestGCPSpannerChangeStreamInput_Read(t *testing.T) {
proc, err := newSpannerStreamInput(parsed, nil)
require.NoError(t, err)

mockStreamReader := &mockStreamReader{}
proc.reader = mockStreamReader

dataChangeRecord := &model.DataChangeRecord{
CommitTimestamp: time.Now(),
RecordSequence: "0000001",
Expand Down Expand Up @@ -105,7 +108,7 @@ func TestGCPSpannerChangeStreamInput_Connect(t *testing.T) {
mockStreamReader := &mockStreamReader{}
proc.reader = mockStreamReader

mockStreamReader.On("Stream", ctx, mock.Anything).Once().Return(nil)
mockStreamReader.On("Stream", mock.AnythingOfType("*context.cancelCtx"), mock.Anything).Once().Return(nil)
defer mockStreamReader.AssertExpectations(t)

err = proc.Connect(ctx)
Expand Down Expand Up @@ -136,7 +139,7 @@ func TestGCPSpannerChangeStreamInput_Close(t *testing.T) {
defer mockStreamReader.AssertExpectations(t)
proc.reader = mockStreamReader

mockStreamReader.On("Stream", ctx, mock.Anything).Once().Return(nil)
mockStreamReader.On("Stream", mock.AnythingOfType("*context.cancelCtx"), mock.Anything).Once().Return(nil)

mockStreamReader.On("Close", mock.Anything).Once().Return(nil)

Expand Down
1 change: 1 addition & 0 deletions internal/plugins/info.csv
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ gcp_cloud_storage ,output ,GCP Cloud Storage ,3.43.0 ,certif
gcp_cloudtrace ,tracer ,GCP Cloud Trace ,4.2.0 ,certified ,n ,y ,y
gcp_pubsub ,input ,GCP PubSub ,0.0.0 ,certified ,n ,y ,y
gcp_pubsub ,output ,GCP PubSub ,0.0.0 ,certified ,n ,y ,y
gcp_spanner_change_stream ,input ,gcp_spanner_change_stream ,0.0.0 ,community ,n ,n ,n
gcp_vertex_ai_chat ,processor ,GCP Vertex AI ,4.34.0 ,enterprise ,n ,y ,y
gcp_vertex_ai_embeddings ,processor ,gcp_vertex_ai_embeddings ,4.37.0 ,enterprise ,n ,y ,y
generate ,input ,generate ,3.40.0 ,certified ,n ,y ,y
Expand Down

0 comments on commit 4b734f3

Please sign in to comment.