Skip to content

Commit

Permalink
Merge pull request #189 from practo/reduce-schema-registry-call-loader
Browse files Browse the repository at this point in the history
Reduce schema registry calls by using cached calls
  • Loading branch information
alok87 authored Apr 9, 2021
2 parents ff68c4c + 993eb53 commit a3230e3
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 75 deletions.
36 changes: 3 additions & 33 deletions pkg/kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,20 @@ import (
"fmt"
"github.com/Shopify/sarama"
"github.com/linkedin/goavro/v2"
"github.com/practo/klog/v2"
"github.com/practo/tipoca-stream/redshiftsink/pkg/schemaregistry"
"strings"
"time"
)

type AvroProducer struct {
producer sarama.SyncProducer
registry schemaregistry.SchemaRegistry
}

func NewAvroProducer(
brokers []string,
kafkaVersion string,
schemaRegistryURL string,
configTLS TLSConfig,
) (*AvroProducer, error) {
) (
*AvroProducer, error,
) {
version, err := sarama.ParseKafkaVersion(kafkaVersion)
if err != nil {
return nil, fmt.Errorf("Error parsing Kafka version: %v\n", err)
Expand Down Expand Up @@ -52,36 +49,9 @@ func NewAvroProducer(

return &AvroProducer{
producer: producer,
registry: schemaregistry.NewRegistry(schemaRegistryURL),
}, nil
}

// CreateSchema creates schema if it does not exist
func (c *AvroProducer) CreateSchema(
topic string, scheme string) (int, bool, error) {

created := false

schemeStr := strings.ReplaceAll(scheme, "\n", "")
schemeStr = strings.ReplaceAll(schemeStr, " ", "")

schema, err := schemaregistry.GetLatestSchemaWithRetry(
c.registry, topic, false, 2,
)
if schema == nil || schema.Schema() != schemeStr {
klog.V(2).Infof("%s: Creating schema for the topic", topic)
schema, err = c.registry.CreateSchema(
topic, scheme, schemaregistry.Avro, false,
)
if err != nil {
return 0, false, err
}
created = true
}

return schema.ID(), created, nil
}

func (c *AvroProducer) Add(
topic string,
schema string,
Expand Down
39 changes: 34 additions & 5 deletions pkg/redshiftbatcher/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/practo/tipoca-stream/redshiftsink/pkg/redshift"
loader "github.com/practo/tipoca-stream/redshiftsink/pkg/redshiftloader"
"github.com/practo/tipoca-stream/redshiftsink/pkg/s3sink"
"github.com/practo/tipoca-stream/redshiftsink/pkg/schemaregistry"
"github.com/practo/tipoca-stream/redshiftsink/pkg/serializer"
"github.com/practo/tipoca-stream/redshiftsink/pkg/transformer"
"github.com/practo/tipoca-stream/redshiftsink/pkg/transformer/debezium"
Expand Down Expand Up @@ -53,8 +54,12 @@ type batchProcessor struct {

maxConcurrency int

// loaderSchemaID informations for the loader topic
// loaderSchemaID stores the schema ID for the loader topic-value
loaderSchemaID int

// schemaIDKey stores the schema ID for the batcher topic-key
// loader would use these to fetch primaryKeys for the table
schemaIDKey int
}

func newBatchProcessor(
Expand Down Expand Up @@ -84,7 +89,6 @@ func newBatchProcessor(
signaler, err := kafka.NewAvroProducer(
strings.Split(kafkaConfig.Brokers, ","),
kafkaConfig.Version,
viper.GetString("schemaRegistryURL"),
kafkaConfig.TLSConfig,
)
if err != nil {
Expand All @@ -101,13 +105,35 @@ func newBatchProcessor(
)
}

loaderSchemaID, _, err := signaler.CreateSchema(
registry := schemaregistry.NewRegistry(viper.GetString("schemaRegistryURL"))
// creates the loader schema for value if not present
loaderSchemaID, _, err := schemaregistry.CreateSchema(
registry,
kafkaLoaderTopicPrefix+topic,
loader.JobAvroSchema,
false, // key is false means its for the value
)
if err != nil {
return nil, fmt.Errorf(
"Error creating schema for topic: %s, err: %v",
kafkaLoaderTopicPrefix+topic, err)
}
schemaKey, err := schemaregistry.GetLatestSchemaWithRetry(
registry,
topic,
true, // key is true means its for the key
2,
)
if err != nil {
return nil, fmt.Errorf(
"Error creating schema for topic: %s, err: %v", topic, err)
"Error fetching schema for topic-key for topic: %s, err: %v",
topic, err)
}
if schemaKey == nil {
return nil, fmt.Errorf(
"Error since schema came as nil for topic-key for topic: %s",
topic,
)
}

klog.V(2).Infof("%s: autoCommit: %v", topic, saramaConfig.AutoCommit)
Expand All @@ -128,6 +154,7 @@ func newBatchProcessor(
signaler: signaler,
maxConcurrency: maxConcurrency,
loaderSchemaID: loaderSchemaID,
schemaIDKey: schemaKey.ID(),
}, nil
}

Expand Down Expand Up @@ -228,7 +255,8 @@ func (b *batchProcessor) signalLoad(resp *response) error {
resp.endOffset,
",",
b.s3sink.GetKeyURI(resp.s3Key),
resp.batchSchemaID, // schema of upstream topic
resp.batchSchemaID, // schema of upstream topic's value
b.schemaIDKey, // schema of upstream topic's key
resp.maskSchema,
resp.skipMerge,
resp.bytesProcessed,
Expand Down Expand Up @@ -288,6 +316,7 @@ func (b *batchProcessor) processMessage(
r, err := b.schemaTransformer.TransformValue(
b.topic,
resp.batchSchemaID,
b.schemaIDKey,
resp.maskSchema,
)
if err != nil {
Expand Down
22 changes: 19 additions & 3 deletions pkg/redshiftloader/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,29 @@ var JobAvroSchema string = `{
{"name": "csvDialect", "type": "string"},
{"name": "s3Path", "type": "string"},
{"name": "schemaId", "type": "int"},
{"name": "schemaIdKey", "type": "int", "default": -1},
{"name": "maskSchema", "type": "string"},
{"name": "skipMerge", "type": "string", "default": ""},
{"name": "batchBytes", "type": "long", "default": 0}
]
}`

type Job struct {
UpstreamTopic string `json:"upstreamTopic"`
UpstreamTopic string `json:"upstreamTopic"` // batcher topic
StartOffset int64 `json:"startOffset"`
EndOffset int64 `json:"endOffset"`
CsvDialect string `json:"csvDialect"`
S3Path string `json:"s3Path"`
SchemaId int `json:"schemaId"` // schema id of debezium event
SchemaId int `json:"schemaId"` // schema id of debezium event for the value for upstream topic (batcher topic)
SchemaIdKey int `json:"schemaIdKey"` // schema id of debezium event for the key for upstream topic (batcher topic)
MaskSchema map[string]serializer.MaskInfo `json:"maskSchema"`
SkipMerge bool `json:"skipMerge"` // to load using merge strategy or directy COPY
BatchBytes int64 `json:"batchBytes"` // batch bytes store sum of all message bytes in this batch
}

func NewJob(
upstreamTopic string, startOffset int64, endOffset int64,
csvDialect string, s3Path string, schemaId int,
csvDialect string, s3Path string, schemaId int, schemaIdKey int,
maskSchema map[string]serializer.MaskInfo, skipMerge bool,
batchBytes int64) Job {

Expand All @@ -47,6 +49,7 @@ func NewJob(
CsvDialect: csvDialect,
S3Path: s3Path,
SchemaId: schemaId,
SchemaIdKey: schemaIdKey,
MaskSchema: maskSchema,
SkipMerge: skipMerge,
BatchBytes: batchBytes,
Expand Down Expand Up @@ -84,6 +87,14 @@ func StringMapToJob(data map[string]interface{}) Job {
} else if value, ok := v.(int); ok {
job.SchemaId = value
}
case "schemaIdKey":
if value, ok := v.(int32); ok {
job.SchemaIdKey = int(value)
} else if value, ok := v.(int); ok {
job.SchemaIdKey = value
} else {
job.SchemaIdKey = -1 // backward compatibility
}
case "skipMerge":
if value, ok := v.(string); ok {
if value == "true" {
Expand All @@ -105,7 +116,11 @@ func StringMapToJob(data map[string]interface{}) Job {
job.BatchBytes = 0
}
}
}

// backward compatibility
if job.SchemaIdKey == 0 {
job.SchemaIdKey = -1
}

return job
Expand Down Expand Up @@ -198,6 +213,7 @@ func (c Job) ToStringMap() map[string]interface{} {
"csvDialect": c.CsvDialect,
"s3Path": c.S3Path,
"schemaId": c.SchemaId,
"schemaIdKey": c.SchemaIdKey,
"skipMerge": skipMerge,
"maskSchema": ToSchemaString(c.MaskSchema),
"batchBytes": c.BatchBytes,
Expand Down
1 change: 1 addition & 0 deletions pkg/redshiftloader/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestToStringMap(t *testing.T) {
",",
"s3path",
1,
2,
maskSchema,
false,
10,
Expand Down
27 changes: 18 additions & 9 deletions pkg/redshiftloader/load_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ func newLoadProcessor(
partition int32,
saramaConfig kafka.SaramaConfig,
redshifter *redshift.Redshift,
) serializer.MessageBatchSyncProcessor {
) (serializer.MessageBatchSyncProcessor, error) {
sink, err := s3sink.NewS3Sink(
viper.GetString("s3sink.accessKeyId"),
viper.GetString("s3sink.secretAccessKey"),
viper.GetString("s3sink.region"),
viper.GetString("s3sink.bucket"),
)
if err != nil {
klog.Fatalf("Error creating s3 client: %v\n", err)
return nil, fmt.Errorf("Error creating s3 client: %v\n", err)
}

klog.V(3).Infof("%s: auto-commit: %v", topic, saramaConfig.AutoCommit)
Expand All @@ -119,7 +119,7 @@ func newLoadProcessor(
targetTable: nil,
tableSuffix: viper.GetString("redshift.tableSuffix"),
redshiftStats: viper.GetBool("redshift.stats"),
}
}, nil
}

func (b *loadProcessor) ctxCancelled(ctx context.Context) error {
Expand Down Expand Up @@ -425,8 +425,11 @@ func (b *loadProcessor) merge(ctx context.Context) error {
// batch messages.
// this also intializes b.stagingTable
func (b *loadProcessor) createStagingTable(
ctx context.Context, schemaId int, inputTable redshift.Table) error {

ctx context.Context,
schemaId int,
schemaIdKey int,
inputTable redshift.Table,
) error {
b.stagingTable = redshift.NewTable(inputTable)
b.stagingTable.Name = b.stagingTable.Name + "_staged"

Expand All @@ -449,8 +452,12 @@ func (b *loadProcessor) createStagingTable(
return fmt.Errorf("Error dropping staging table: %v\n", err)
}

primaryKeys, err := b.schemaTransformer.TransformKey(
b.upstreamTopic)
var primaryKeys []string
if schemaIdKey == -1 || schemaIdKey == 0 { // Deprecated as below is expensive and does not use cache
primaryKeys, err = b.schemaTransformer.TransformKey(b.upstreamTopic)
} else { // below is the new faster way to get primary keys
primaryKeys, err = b.schemaTransformer.PrimaryKeys(schemaIdKey)
}
if err != nil {
return fmt.Errorf("Error getting primarykey for: %s, err: %v\n", b.topic, err)
}
Expand Down Expand Up @@ -622,8 +629,8 @@ func (b *loadProcessor) processBatch(
}

var inputTable redshift.Table
var schemaId int
var err error
var schemaId, schemaIdKey int
b.stagingTable = nil
b.targetTable = nil
b.upstreamTopic = ""
Expand All @@ -637,6 +644,7 @@ func (b *loadProcessor) processBatch(
default:
job := StringMapToJob(message.Value.(map[string]interface{}))
schemaId = job.SchemaId
schemaIdKey = job.SchemaIdKey
b.batchEndOffset = message.Offset
bytesProcessed += job.BatchBytes

Expand All @@ -651,6 +659,7 @@ func (b *loadProcessor) processBatch(
resp, err := b.schemaTransformer.TransformValue(
b.upstreamTopic,
schemaId,
schemaIdKey,
job.MaskSchema,
)
if err != nil {
Expand Down Expand Up @@ -699,7 +708,7 @@ func (b *loadProcessor) processBatch(

// load
klog.V(2).Infof("%s, load staging\n", b.topic)
err = b.createStagingTable(ctx, schemaId, inputTable)
err = b.createStagingTable(ctx, schemaId, schemaIdKey, inputTable)
if err != nil {
return bytesProcessed, err
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/redshiftloader/loader_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,18 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession,

var lastSchemaId *int
var err error
processor := newLoadProcessor(
processor, err := newLoadProcessor(
h.consumerGroupID,
claim.Topic(),
claim.Partition(),
h.saramaConfig,
h.redshifter,
)
if err != nil {
return fmt.Errorf(
"Error making the load processor for topic: %s, err: %v",
claim.Topic(), err)
}
maxBufSize := h.maxSize
if h.maxBytesPerBatch != nil {
maxBufSize = serializer.DefaultMessageBufferSize
Expand Down
Loading

0 comments on commit a3230e3

Please sign in to comment.