Skip to content

Commit

Permalink
Fix export bug and more updates to checkpointing.
Browse files Browse the repository at this point in the history
  • Loading branch information
gamolina committed Dec 17, 2024
1 parent 978119a commit a67b59b
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 36 deletions.
4 changes: 0 additions & 4 deletions Docker/kinesis_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ fi
if [ -n "$CHECKPOINT_TABLE" ]
then
BOOL_FLAGS=${BOOL_FLAGS}" --checkpoint-table=${CHECKPOINT_TABLE}"
if [ -n "$POST_CHECKPOINT_INIT_DELAY" ]
then
BOOL_FLAGS=${BOOL_FLAGS}" --post-checkpoint-init-delay=${POST_CHECKPOINT_INIT_DELAY}"
fi
fi
if [ -n "$LOG_LEVEL" ]
then
Expand Down
29 changes: 7 additions & 22 deletions quanta-kinesis-consumer-lib/q-kinesis-lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ type Main struct {
AssumeRoleArn string
AssumeRoleArnRegion string
Deaggregate bool
PostCheckpointInitDelay int
Collate bool
ShardKey string
ProtoConfig string
Expand Down Expand Up @@ -180,32 +179,19 @@ func (m *Main) Init(customEndpoint string) (int, error) {
return 0, err
}
shardCount := len(shout.Shards)
initializedShardsInDB := false
foundCheckpointRecords := false
if db != nil && m.InitialPos != "TRIM_HORIZON" {
for _, v := range shout.Shards {
seq, _ := db.GetCheckpoint(*streamName, *v.ShardId)
if seq != "" && seq != "0" {
foundCheckpointRecords = true
continue
}
sequenceRange := *v.SequenceNumberRange
sequenceNumber := *sequenceRange.StartingSequenceNumber
if sequenceRange.EndingSequenceNumber != nil {
sequenceNumber = *sequenceRange.EndingSequenceNumber
}
if sequenceNumber == "" {
sequenceNumber = *sequenceRange.StartingSequenceNumber
}
err := db.SetCheckpoint(*streamName, *v.ShardId, sequenceNumber)
//u.Debugf("Initializing checkpoint for shard %s.%s, SEQ = %s", *streamName, *v.ShardId,
// sequenceNumber)
if err != nil {
return 0, fmt.Errorf("failed to set inital checkpoint, %v", err)
}
initializedShardsInDB = true
}
}
if initializedShardsInDB {
time.Sleep(time.Duration(m.PostCheckpointInitDelay) * time.Second)
if m.InitialPos == "LATEST" && foundCheckpointRecords {
u.Errorf("Checkpoint enabled and records exist. Shard iterator is 'LATEST' setting it to 'AFTER_SEQUENCE_NUMBER'.")
m.InitialPos = "AFTER_SEQUENCE_NUMBER"
}

m.metrics = cloudwatch.New(sess)
Expand Down Expand Up @@ -360,9 +346,8 @@ func (m *Main) MainProcessingLoop() error {
u.Warnf("Received Cancellation.")
}
if m.InitialPos == "TRIM_HORIZON" {
u.Error("can't re-initialize 'in-place' if set to TRIM_HORIZON, exiting")
// os.Exit(1)
return fmt.Errorf("can't re-initialize 'in-place' if set to TRIM_HORIZON")
u.Error("initial position was TRIM_HORIZON, re-initializing with set to LATEST ")
m.InitialPos = "LATEST"
}
u.Warnf("Re-initializing.")
var err error
Expand Down
6 changes: 0 additions & 6 deletions quanta-kinesis-consumer/quanta-kinesis-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func main() {
deaggregate := app.Flag("deaggregate", "Incoming payload records are aggregated.").Bool()
scanInterval := app.Flag("scan-interval", "Scan interval (milliseconds)").Default("1000").Int()
protoPath := app.Flag("proto-path", "Path to protobuf descriptor files root directory.").String()
postCheckpointInitDelay := app.Flag("post-checkpoint-init-delay", "Delay seconds after checkpoint table init.").Default("30").Int()
logLevel := app.Flag("log-level", "Log Level [ERROR, WARN, INFO, DEBUG]").Default("WARN").String()

kingpin.MustParse(app.Parse(os.Args[1:]))
Expand All @@ -66,7 +65,6 @@ func main() {
main.ScanInterval = *scanInterval
main.Port = int(*port)
main.ConsulAddr = *consul
main.PostCheckpointInitDelay = *postCheckpointInitDelay

log.Printf("Set Logging level to %v.", *logLevel)
log.Printf("Kinesis stream %v.", main.Stream)
Expand Down Expand Up @@ -95,10 +93,6 @@ func main() {
os.Exit(1)
}
log.Printf("DynamoDB checkpoint table name [%s]", main.CheckpointTable)
if main.InitialPos == "LATEST" {
u.Errorf("Checkpoint enabled. Shard iterator is 'LATEST' setting it to 'AFTER_SEQUENCE_NUMBER'.")
main.InitialPos = "AFTER_SEQUENCE_NUMBER"
}
}

if shardKey != nil {
Expand Down
12 changes: 9 additions & 3 deletions sink/s3sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,12 @@ func (s *S3ParquetSink) Open(ctx *plan.Context, bucketpath string, params map[st
// Create S3 service client
u.Infof("Parquet Sink: Opening Output S3 path s3://%s/%s", bucket, file)
s.outFile, err = pgs3.NewS3FileWriterWithClient(context.Background(), s.s3svc, bucket, file, nil, func(p *s3.PutObjectInput) {
p.SSEKMSKeyId = aws.String(s.sseKmsKeyID)
p.ServerSideEncryption = "aws:kms"
if s.sseKmsKeyID != "" {
p.SSEKMSKeyId = aws.String(s.sseKmsKeyID)
p.ServerSideEncryption = "aws:kms"
} else {
p.SSEKMSKeyId = nil
}
p.ACL = types.ObjectCannedACL(s.acl)
})

Expand All @@ -288,8 +292,10 @@ func (s *S3ParquetSink) Open(ctx *plan.Context, bucketpath string, params map[st
s.md[i] = fmt.Sprintf("name=%s, type=FLOAT", v.As)
case value.BoolType:
s.md[i] = fmt.Sprintf("name=%s, type=BOOLEAN", v.As)
case value.TimeType:
s.md[i] = fmt.Sprintf("name=%s, type=INT64, convertedtype=TIMESTAMP_MICROS", v.As)
default:
s.md[i] = fmt.Sprintf("name=%s, type=UTF8, encoding=PLAIN_DICTIONARY", v.As)
s.md[i] = fmt.Sprintf("name=%s, type=BYTE_ARRAY, convertedtype=utf8, encoding=PLAIN_DICTIONARY", v.As)
}
}

Expand Down
2 changes: 1 addition & 1 deletion version.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
quanta 0.9.15-rc-6
quanta 0.9.15-rc-7


0 comments on commit a67b59b

Please sign in to comment.