Skip to content

Commit

Permalink
specgen: re-enable unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso committed Nov 28, 2024
1 parent b4bd386 commit ec9fe4d
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 564 deletions.
13 changes: 5 additions & 8 deletions destination_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ func (d *destinationWithBatch) Open(ctx context.Context) error {
return nil
}

// setBatchEnabled stores the boolean in the context. If the context already
// contains the key it will update the boolean under that key and return the
// setBatchConfig stores a DestinationWithBatch instance in the context. If the context already
// contains the key it will update the DestinationWithBatch under that key and return the
// same context, otherwise it will return a new context with the stored value.
// This is used to signal to destinationPluginAdapter if the Destination is
// wrapped into DestinationWithBatchConfig middleware.
Expand All @@ -174,7 +174,7 @@ func (*destinationWithBatch) setBatchConfig(ctx context.Context, cfg Destination
if ok {
*ctxCfg = cfg
} else {
ctx = context.WithValue(ctx, ctxKeyBatchConfig{}, cfg)
ctx = context.WithValue(ctx, ctxKeyBatchConfig{}, &cfg)
}
return ctx
}
Expand Down Expand Up @@ -425,15 +425,12 @@ type destinationWithSchemaExtraction struct {
Destination
config *DestinationWithSchemaExtraction

payloadEnabled bool
keyEnabled bool

payloadWarnOnce sync.Once
keyWarnOnce sync.Once
}

func (d *destinationWithSchemaExtraction) Write(ctx context.Context, records []opencdc.Record) (int, error) {
if d.keyEnabled {
if *d.config.KeyEnabled {
for i := range records {
if err := d.decodeKey(ctx, &records[i]); err != nil {
if len(records) > 0 {
Expand All @@ -443,7 +440,7 @@ func (d *destinationWithSchemaExtraction) Write(ctx context.Context, records []o
}
}
}
if d.payloadEnabled {
if *d.config.PayloadEnabled {
for i := range records {
if err := d.decodePayload(ctx, &records[i]); err != nil {
if len(records) > 0 {
Expand Down
Loading

0 comments on commit ec9fe4d

Please sign in to comment.