From 283529136ebf2ac62e5eb1c41b038018a26d02b2 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Thu, 8 Apr 2021 17:01:49 +0530 Subject: [PATCH 1/4] Reduce schema registry calls by using cached calls Add support to use cached calls instead of making latest calls for the KEY schema of the topic Batcher does compute the id of the key Passes it to the loader via the loader kafka job Loader now retrives the job by ID which is cached and no longer there is a need to query kafka everytime This will reduce the schema registry calls --- pkg/kafka/producer.go | 36 ++------------ pkg/redshiftbatcher/batch_processor.go | 39 ++++++++++++++-- pkg/redshiftloader/job.go | 18 +++++-- pkg/redshiftloader/job_test.go | 1 + pkg/redshiftloader/load_processor.go | 34 +++++++++----- pkg/redshiftloader/loader_handler.go | 7 ++- pkg/schemaregistry/schemaregistry.go | 65 +++++++++++++++++++------- pkg/transformer/debezium/schema.go | 30 ++++++++++-- pkg/transformer/transformer.go | 15 ++++-- 9 files changed, 166 insertions(+), 79 deletions(-) diff --git a/pkg/kafka/producer.go b/pkg/kafka/producer.go index 9c66cd935..8ca836a7b 100644 --- a/pkg/kafka/producer.go +++ b/pkg/kafka/producer.go @@ -5,23 +5,20 @@ import ( "fmt" "github.com/Shopify/sarama" "github.com/linkedin/goavro/v2" - "github.com/practo/klog/v2" - "github.com/practo/tipoca-stream/redshiftsink/pkg/schemaregistry" - "strings" "time" ) type AvroProducer struct { producer sarama.SyncProducer - registry schemaregistry.SchemaRegistry } func NewAvroProducer( brokers []string, kafkaVersion string, - schemaRegistryURL string, configTLS TLSConfig, -) (*AvroProducer, error) { +) ( + *AvroProducer, error, +) { version, err := sarama.ParseKafkaVersion(kafkaVersion) if err != nil { return nil, fmt.Errorf("Error parsing Kafka version: %v\n", err) @@ -52,36 +49,9 @@ func NewAvroProducer( return &AvroProducer{ producer: producer, - registry: schemaregistry.NewRegistry(schemaRegistryURL), }, nil } -// CreateSchema creates schema if it does not exist -func (c *AvroProducer) CreateSchema( - topic string, scheme string) (int, bool, error) { - - created := false - - schemeStr := strings.ReplaceAll(scheme, "\n", "") - schemeStr = strings.ReplaceAll(schemeStr, " ", "") - - schema, err := schemaregistry.GetLatestSchemaWithRetry( - c.registry, topic, false, 2, - ) - if schema == nil || schema.Schema() != schemeStr { - klog.V(2).Infof("%s: Creating schema for the topic", topic) - schema, err = c.registry.CreateSchema( - topic, scheme, schemaregistry.Avro, false, - ) - if err != nil { - return 0, false, err - } - created = true - } - - return schema.ID(), created, nil -} - func (c *AvroProducer) Add( topic string, schema string, diff --git a/pkg/redshiftbatcher/batch_processor.go b/pkg/redshiftbatcher/batch_processor.go index 184a5176c..910ee8157 100644 --- a/pkg/redshiftbatcher/batch_processor.go +++ b/pkg/redshiftbatcher/batch_processor.go @@ -12,6 +12,7 @@ import ( "github.com/practo/tipoca-stream/redshiftsink/pkg/redshift" loader "github.com/practo/tipoca-stream/redshiftsink/pkg/redshiftloader" "github.com/practo/tipoca-stream/redshiftsink/pkg/s3sink" + "github.com/practo/tipoca-stream/redshiftsink/pkg/schemaregistry" "github.com/practo/tipoca-stream/redshiftsink/pkg/serializer" "github.com/practo/tipoca-stream/redshiftsink/pkg/transformer" "github.com/practo/tipoca-stream/redshiftsink/pkg/transformer/debezium" @@ -53,8 +54,12 @@ type batchProcessor struct { maxConcurrency int - // loaderSchemaID informations for the loader topic + // loaderSchemaID stores the schema ID for the loader topic-value loaderSchemaID int + + // schemaIDKey stores the schema ID for the batcher topic-key + // loader would use these to fetch primaryKeys for the table + schemaIDKey int } func newBatchProcessor( @@ -84,7 +89,6 @@ func newBatchProcessor( signaler, err := kafka.NewAvroProducer( strings.Split(kafkaConfig.Brokers, ","), kafkaConfig.Version, - viper.GetString("schemaRegistryURL"), kafkaConfig.TLSConfig, ) if err != nil { @@ -101,13 +105,35 @@ func newBatchProcessor( ) } - loaderSchemaID, _, err := signaler.CreateSchema( + registry := schemaregistry.NewRegistry(viper.GetString("schemaRegistryURL")) + // creates the loader schema for value if not present + loaderSchemaID, _, err := schemaregistry.CreateSchema( + registry, kafkaLoaderTopicPrefix+topic, loader.JobAvroSchema, + false, // key is false means its for the value + ) + if err != nil { + return nil, fmt.Errorf( + "Error creating schema for topic: %s, err: %v", + kafkaLoaderTopicPrefix+topic, err) + } + schemaKey, err := schemaregistry.GetLatestSchemaWithRetry( + registry, + topic, + true, // key is true means its for the key + 2, ) if err != nil { return nil, fmt.Errorf( - "Error creating schema for topic: %s, err: %v", topic, err) + "Error fetching schema for topic-key for topic: %s, err: %v", + topic, err) + } + if schemaKey == nil { + return nil, fmt.Errorf( + "Error since schema came as nil for topic-key for topic: %s", + topic, + ) } klog.V(2).Infof("%s: autoCommit: %v", topic, saramaConfig.AutoCommit) @@ -128,6 +154,7 @@ func newBatchProcessor( signaler: signaler, maxConcurrency: maxConcurrency, loaderSchemaID: loaderSchemaID, + schemaIDKey: schemaKey.ID(), }, nil } @@ -228,7 +255,8 @@ func (b *batchProcessor) signalLoad(resp *response) error { resp.endOffset, ",", b.s3sink.GetKeyURI(resp.s3Key), - resp.batchSchemaID, // schema of upstream topic + resp.batchSchemaID, // schema of upstream topic's value + b.schemaIDKey, // schema of upstream topic's key resp.maskSchema, resp.skipMerge, resp.bytesProcessed, @@ -288,6 +316,7 @@ func (b *batchProcessor) processMessage( r, err := b.schemaTransformer.TransformValue( b.topic, resp.batchSchemaID, + b.schemaIDKey, resp.maskSchema, ) if err != nil { diff --git a/pkg/redshiftloader/job.go b/pkg/redshiftloader/job.go index b763539f1..133208987 100644 --- a/pkg/redshiftloader/job.go +++ b/pkg/redshiftloader/job.go @@ -16,6 +16,7 @@ var JobAvroSchema string = `{ {"name": "csvDialect", "type": "string"}, {"name": "s3Path", "type": "string"}, {"name": "schemaId", "type": "int"}, + {"name": "schemaIdKey", "type": "int"}, {"name": "maskSchema", "type": "string"}, {"name": "skipMerge", "type": "string", "default": ""}, {"name": "batchBytes", "type": "long", "default": 0} @@ -23,12 +24,13 @@ var JobAvroSchema string = `{ }` type Job struct { - UpstreamTopic string `json:"upstreamTopic"` + UpstreamTopic string `json:"upstreamTopic"` // batcher topic StartOffset int64 `json:"startOffset"` EndOffset int64 `json:"endOffset"` CsvDialect string `json:"csvDialect"` S3Path string `json:"s3Path"` - SchemaId int `json:"schemaId"` // schema id of debezium event + SchemaId int `json:"schemaId"` // schema id of debezium event for the value for upstream topic (batcher topic) + SchemaIdKey int `json:"schemaIdKey"` // schema id of debezium event for the key for upstream topic (batcher topic) MaskSchema map[string]serializer.MaskInfo `json:"maskSchema"` SkipMerge bool `json:"skipMerge"` // to load using merge strategy or directy COPY BatchBytes int64 `json:"batchBytes"` // batch bytes store sum of all message bytes in this batch @@ -36,7 +38,7 @@ type Job struct { func NewJob( upstreamTopic string, startOffset int64, endOffset int64, - csvDialect string, s3Path string, schemaId int, + csvDialect string, s3Path string, schemaId int, schemaIdKey int, maskSchema map[string]serializer.MaskInfo, skipMerge bool, batchBytes int64) Job { @@ -47,6 +49,7 @@ func NewJob( CsvDialect: csvDialect, S3Path: s3Path, SchemaId: schemaId, + SchemaIdKey: schemaIdKey, MaskSchema: maskSchema, SkipMerge: skipMerge, BatchBytes: batchBytes, @@ -84,6 +87,14 @@ func StringMapToJob(data map[string]interface{}) Job { } else if value, ok := v.(int); ok { job.SchemaId = value } + case "schemaIdKey": + if value, ok := v.(int32); ok { + job.SchemaIdKey = int(value) + } else if value, ok := v.(int); ok { + job.SchemaIdKey = value + } else { + job.SchemaIdKey = -1 // backward compatibility + } case "skipMerge": if value, ok := v.(string); ok { if value == "true" { @@ -198,6 +209,7 @@ func (c Job) ToStringMap() map[string]interface{} { "csvDialect": c.CsvDialect, "s3Path": c.S3Path, "schemaId": c.SchemaId, + "schemaIdKey": c.SchemaIdKey, "skipMerge": skipMerge, "maskSchema": ToSchemaString(c.MaskSchema), "batchBytes": c.BatchBytes, diff --git a/pkg/redshiftloader/job_test.go b/pkg/redshiftloader/job_test.go index f13568d90..092a498d2 100644 --- a/pkg/redshiftloader/job_test.go +++ b/pkg/redshiftloader/job_test.go @@ -20,6 +20,7 @@ func TestToStringMap(t *testing.T) { ",", "s3path", 1, + 2, maskSchema, false, 10, diff --git a/pkg/redshiftloader/load_processor.go b/pkg/redshiftloader/load_processor.go index bbefa6d96..422c8eb0f 100644 --- a/pkg/redshiftloader/load_processor.go +++ b/pkg/redshiftloader/load_processor.go @@ -91,7 +91,7 @@ func newLoadProcessor( partition int32, saramaConfig kafka.SaramaConfig, redshifter *redshift.Redshift, -) serializer.MessageBatchSyncProcessor { +) (serializer.MessageBatchSyncProcessor, error) { sink, err := s3sink.NewS3Sink( viper.GetString("s3sink.accessKeyId"), viper.GetString("s3sink.secretAccessKey"), @@ -99,7 +99,7 @@ func newLoadProcessor( viper.GetString("s3sink.bucket"), ) if err != nil { - klog.Fatalf("Error creating s3 client: %v\n", err) + return nil, fmt.Errorf("Error creating s3 client: %v\n", err) } klog.V(3).Infof("%s: auto-commit: %v", topic, saramaConfig.AutoCommit) @@ -119,7 +119,7 @@ func newLoadProcessor( targetTable: nil, tableSuffix: viper.GetString("redshift.tableSuffix"), redshiftStats: viper.GetBool("redshift.stats"), - } + }, nil } func (b *loadProcessor) ctxCancelled(ctx context.Context) error { @@ -425,8 +425,11 @@ func (b *loadProcessor) merge(ctx context.Context) error { // batch messages. // this also intializes b.stagingTable func (b *loadProcessor) createStagingTable( - ctx context.Context, schemaId int, inputTable redshift.Table) error { - + ctx context.Context, + schemaId int, + schemaIdKey int, + inputTable redshift.Table, +) error { b.stagingTable = redshift.NewTable(inputTable) b.stagingTable.Name = b.stagingTable.Name + "_staged" @@ -449,10 +452,17 @@ func (b *loadProcessor) createStagingTable( return fmt.Errorf("Error dropping staging table: %v\n", err) } - primaryKeys, err := b.schemaTransformer.TransformKey( - b.upstreamTopic) - if err != nil { - return fmt.Errorf("Error getting primarykey for: %s, err: %v\n", b.topic, err) + var primaryKeys []string + if schemaIdKey == -1 { + primaryKeys, err = b.schemaTransformer.PrimaryKeys(schemaIdKey) + if err != nil { + return fmt.Errorf("Error getting primarykey for: %s, err: %v\n", b.topic, err) + } + } else { // Deprecated as below is expensive and does not use cache + primaryKeys, err = b.schemaTransformer.TransformKey(b.upstreamTopic) + if err != nil { + return fmt.Errorf("Error getting primarykey for: %s, err: %v\n", b.topic, err) + } } b.primaryKeys = primaryKeys @@ -622,8 +632,8 @@ func (b *loadProcessor) processBatch( } var inputTable redshift.Table - var schemaId int var err error + var schemaId, schemaIdKey int b.stagingTable = nil b.targetTable = nil b.upstreamTopic = "" @@ -637,6 +647,7 @@ func (b *loadProcessor) processBatch( default: job := StringMapToJob(message.Value.(map[string]interface{})) schemaId = job.SchemaId + schemaIdKey = job.SchemaIdKey b.batchEndOffset = message.Offset bytesProcessed += job.BatchBytes @@ -651,6 +662,7 @@ func (b *loadProcessor) processBatch( resp, err := b.schemaTransformer.TransformValue( b.upstreamTopic, schemaId, + schemaIdKey, job.MaskSchema, ) if err != nil { @@ -699,7 +711,7 @@ func (b *loadProcessor) processBatch( // load klog.V(2).Infof("%s, load staging\n", b.topic) - err = b.createStagingTable(ctx, schemaId, inputTable) + err = b.createStagingTable(ctx, schemaId, schemaIdKey, inputTable) if err != nil { return bytesProcessed, err } diff --git a/pkg/redshiftloader/loader_handler.go b/pkg/redshiftloader/loader_handler.go index d1444b705..77b5859c0 100644 --- a/pkg/redshiftloader/loader_handler.go +++ b/pkg/redshiftloader/loader_handler.go @@ -117,13 +117,18 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession, var lastSchemaId *int var err error - processor := newLoadProcessor( + processor, err := newLoadProcessor( h.consumerGroupID, claim.Topic(), claim.Partition(), h.saramaConfig, h.redshifter, ) + if err != nil { + return fmt.Errorf( + "Error making the load processor for topic: %s, err: %v", + claim.Topic(), err) + } maxBufSize := h.maxSize if h.maxBytesPerBatch != nil { maxBufSize = serializer.DefaultMessageBufferSize diff --git a/pkg/schemaregistry/schemaregistry.go b/pkg/schemaregistry/schemaregistry.go index 071fe1231..ccad44530 100644 --- a/pkg/schemaregistry/schemaregistry.go +++ b/pkg/schemaregistry/schemaregistry.go @@ -6,6 +6,7 @@ import ( "github.com/practo/klog/v2" "github.com/riferrei/srclient" "math/rand" + "strings" "time" ) @@ -53,10 +54,29 @@ func NewRegistry(url string) SchemaRegistry { } } +func toSchema(cSchema *srclient.Schema) *Schema { + return &Schema{ + id: cSchema.ID(), + schema: cSchema.Schema(), + version: cSchema.Version(), + codec: cSchema.Codec(), + } +} + +func tocSchemaType(schemaType SchemaType) srclient.SchemaType { + switch schemaType { + case Avro: + return srclient.Avro + } + + return "" +} + type cSchemaRegistry struct { client *srclient.SchemaRegistryClient } +// GetSchema returns the cached response if cache hit func (c *cSchemaRegistry) GetSchema(schemaID int) (*Schema, error) { cSchema, err := c.client.GetSchema(schemaID) if err != nil { @@ -66,6 +86,7 @@ func (c *cSchemaRegistry) GetSchema(schemaID int) (*Schema, error) { return toSchema(cSchema), nil } +// GetLatestSchema always makes a call to registry everytime func (c *cSchemaRegistry) GetLatestSchema( subject string, key bool) (*Schema, error) { cSchema, err := c.client.GetLatestSchema(subject, key) @@ -76,6 +97,7 @@ func (c *cSchemaRegistry) GetLatestSchema( return toSchema(cSchema), nil } +// CreateSchema creates schema in registry if the schema if not present func (c *cSchemaRegistry) CreateSchema( subject string, schema string, schemaType SchemaType, key bool) (*Schema, error) { @@ -89,24 +111,7 @@ func (c *cSchemaRegistry) CreateSchema( return toSchema(cSchema), nil } -func toSchema(cSchema *srclient.Schema) *Schema { - return &Schema{ - id: cSchema.ID(), - schema: cSchema.Schema(), - version: cSchema.Version(), - codec: cSchema.Codec(), - } -} - -func tocSchemaType(schemaType SchemaType) srclient.SchemaType { - switch schemaType { - case Avro: - return srclient.Avro - } - - return "" -} - +// GetSchemaWithRetry gets the schema from registry, it gives cached response func GetSchemaWithRetry( registry SchemaRegistry, schemaId int, @@ -132,6 +137,7 @@ func GetSchemaWithRetry( } } +// GetLatestSchemaWithRetry gets the schema from registry everytime func GetLatestSchemaWithRetry( registry SchemaRegistry, topic string, @@ -157,3 +163,26 @@ func GetLatestSchemaWithRetry( time.Sleep(time.Duration(sleepFor) * time.Second) } } + +// CreateSchema creates schema for both key and value of the topic +func CreateSchema( + registry SchemaRegistry, + topic string, + scheme string, + key bool, +) (int, bool, error) { + created := false + schemeStr := strings.ReplaceAll(scheme, "\n", "") + schemeStr = strings.ReplaceAll(schemeStr, " ", "") + schema, err := GetLatestSchemaWithRetry(registry, topic, key, 2) + if schema == nil || schema.Schema() != schemeStr { + klog.V(2).Infof("%s: Creating schema for the topic", topic) + schema, err = registry.CreateSchema(topic, scheme, Avro, key) + if err != nil { + return 0, false, err + } + created = true + } + + return schema.ID(), created, nil +} diff --git a/pkg/transformer/debezium/schema.go b/pkg/transformer/debezium/schema.go index bab7ae15c..91198d0d6 100644 --- a/pkg/transformer/debezium/schema.go +++ b/pkg/transformer/debezium/schema.go @@ -239,6 +239,8 @@ type schemaTransformer struct { registry schemaregistry.SchemaRegistry } +// TransformKey is deprecated as it makes expensive GetLatestSchemaWithRetry calls +// Use PrimaryKeys instead func (c *schemaTransformer) TransformKey(topic string) ([]string, error) { s, err := schemaregistry.GetLatestSchemaWithRetry(c.registry, topic, true, 10) if err != nil { @@ -248,6 +250,15 @@ func (c *schemaTransformer) TransformKey(topic string) ([]string, error) { return c.transformSchemaKey(s.Schema()) } +func (c *schemaTransformer) PrimaryKeys(schemaID int) ([]string, error) { + s, err := schemaregistry.GetSchemaWithRetry(c.registry, schemaID, 3) + if err != nil { + return []string{}, err + } + + return c.transformSchemaKey(s.Schema()) +} + func (c *schemaTransformer) transformSchemaKey( schema string) ([]string, error) { @@ -294,7 +305,10 @@ func isPrimaryKey(columnName string, primaryKeys []string) bool { return false } -func (c *schemaTransformer) TransformValue(topic string, schemaId int, +func (c *schemaTransformer) TransformValue( + topic string, + schemaId int, + schemaIdKey int, maskSchema map[string]serializer.MaskInfo) (interface{}, error) { s, err := schemaregistry.GetSchemaWithRetry(c.registry, schemaId, 10) @@ -302,9 +316,17 @@ func (c *schemaTransformer) TransformValue(topic string, schemaId int, return nil, err } - primaryKeys, err := c.TransformKey(topic) - if err != nil { - return nil, err + var primaryKeys []string + if schemaIdKey != -1 { + primaryKeys, err = c.PrimaryKeys(schemaIdKey) + if err != nil { + return nil, err + } + } else { // Deprecated as below is expensive and does not use cache + primaryKeys, err = c.TransformKey(topic) + if err != nil { + return nil, err + } } return c.transformSchemaValue( diff --git a/pkg/transformer/transformer.go b/pkg/transformer/transformer.go index 9077132a5..a42d6ee34 100644 --- a/pkg/transformer/transformer.go +++ b/pkg/transformer/transformer.go @@ -21,13 +21,20 @@ type MessageTransformer interface { } type SchemaTransformer interface { - // TransformKey transforms the topic schema into name of the primary - // key and its type. - TransformKey(topic string) ([]string, error) + // PrimaryKeys returns the list of primary keys for the schema + PrimaryKeys(schemaID int) ([]string, error) // Transform value transforms the schemaId for various use cases. // it uses maskSchema to change the type of the schema datatypes if required - TransformValue(topic string, schemaId int, + TransformValue( + topic string, + schemaId int, + schemaIdKey int, maskSchema map[string]serializer.MaskInfo) (interface{}, error) + + // Deprecated: + // TransformKey transforms the topic schema into name of the primary + // key and its type. + TransformKey(topic string) ([]string, error) } // ParseTopic breaks down the topic string into server, database, table From b3b4cb1dc2c896488e20878245a0280650ce0e7e Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Fri, 9 Apr 2021 13:10:24 +0530 Subject: [PATCH 2/4] Review fixes --- pkg/redshiftloader/job.go | 4 ++++ pkg/redshiftloader/load_processor.go | 15 ++++++--------- pkg/transformer/debezium/schema.go | 15 ++++++--------- 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/pkg/redshiftloader/job.go b/pkg/redshiftloader/job.go index 133208987..77445a7e4 100644 --- a/pkg/redshiftloader/job.go +++ b/pkg/redshiftloader/job.go @@ -116,7 +116,11 @@ func StringMapToJob(data map[string]interface{}) Job { job.BatchBytes = 0 } } + } + // backward compatibility + if job.SchemaIdKey == 0 { + job.SchemaIdKey = -1 } return job diff --git a/pkg/redshiftloader/load_processor.go b/pkg/redshiftloader/load_processor.go index 422c8eb0f..3b882c47f 100644 --- a/pkg/redshiftloader/load_processor.go +++ b/pkg/redshiftloader/load_processor.go @@ -453,16 +453,13 @@ func (b *loadProcessor) createStagingTable( } var primaryKeys []string - if schemaIdKey == -1 { - primaryKeys, err = b.schemaTransformer.PrimaryKeys(schemaIdKey) - if err != nil { - return fmt.Errorf("Error getting primarykey for: %s, err: %v\n", b.topic, err) - } - } else { // Deprecated as below is expensive and does not use cache + if schemaIdKey == -1 || schemaIdKey == 0 { // Deprecated as below is expensive and does not use cache primaryKeys, err = b.schemaTransformer.TransformKey(b.upstreamTopic) - if err != nil { - return fmt.Errorf("Error getting primarykey for: %s, err: %v\n", b.topic, err) - } + } else { // below is the new faster way to get primary keys + primaryKeys, err = b.schemaTransformer.PrimaryKeys(schemaIdKey) + } + if err != nil { + return fmt.Errorf("Error getting primarykey for: %s, err: %v\n", b.topic, err) } b.primaryKeys = primaryKeys diff --git a/pkg/transformer/debezium/schema.go b/pkg/transformer/debezium/schema.go index 91198d0d6..3fbf23267 100644 --- a/pkg/transformer/debezium/schema.go +++ b/pkg/transformer/debezium/schema.go @@ -317,16 +317,13 @@ func (c *schemaTransformer) TransformValue( } var primaryKeys []string - if schemaIdKey != -1 { - primaryKeys, err = c.PrimaryKeys(schemaIdKey) - if err != nil { - return nil, err - } - } else { // Deprecated as below is expensive and does not use cache + if schemaIdKey == -1 || schemaIdKey == 0 { // Deprecated as below is expensive and does not use cache primaryKeys, err = c.TransformKey(topic) - if err != nil { - return nil, err - } + } else { // below is the new faster way to get primary keys + primaryKeys, err = c.PrimaryKeys(schemaIdKey) + } + if err != nil { + return nil, err } return c.transformSchemaValue( From e7349fc213c8ee5c71d5b29e35fbd3b901aeeda5 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Fri, 9 Apr 2021 13:22:22 +0530 Subject: [PATCH 3/4] Added default for backward compatibility of schema --- pkg/redshiftloader/job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/redshiftloader/job.go b/pkg/redshiftloader/job.go index 77445a7e4..bc508c086 100644 --- a/pkg/redshiftloader/job.go +++ b/pkg/redshiftloader/job.go @@ -16,7 +16,7 @@ var JobAvroSchema string = `{ {"name": "csvDialect", "type": "string"}, {"name": "s3Path", "type": "string"}, {"name": "schemaId", "type": "int"}, - {"name": "schemaIdKey", "type": "int"}, + {"name": "schemaIdKey", "type": "int", default: -1}, {"name": "maskSchema", "type": "string"}, {"name": "skipMerge", "type": "string", "default": ""}, {"name": "batchBytes", "type": "long", "default": 0} From 993eb53b4ff9eb59ed08e7c9784878f5566bbd22 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Fri, 9 Apr 2021 13:27:32 +0530 Subject: [PATCH 4/4] Default should be in "" --- pkg/redshiftloader/job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/redshiftloader/job.go b/pkg/redshiftloader/job.go index bc508c086..c0afe7809 100644 --- a/pkg/redshiftloader/job.go +++ b/pkg/redshiftloader/job.go @@ -16,7 +16,7 @@ var JobAvroSchema string = `{ {"name": "csvDialect", "type": "string"}, {"name": "s3Path", "type": "string"}, {"name": "schemaId", "type": "int"}, - {"name": "schemaIdKey", "type": "int", default: -1}, + {"name": "schemaIdKey", "type": "int", "default": -1}, {"name": "maskSchema", "type": "string"}, {"name": "skipMerge", "type": "string", "default": ""}, {"name": "batchBytes", "type": "long", "default": 0}