Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinesis sdk panicking #2752

Open
2 tasks done
davidwin93 opened this issue Aug 17, 2024 · 4 comments
Open
2 tasks done

Kinesis sdk panicking #2752

davidwin93 opened this issue Aug 17, 2024 · 4 comments
Assignees
Labels
bug This issue is a bug. needs-reproduction This issue needs reproduction. p2 This is a standard priority issue

Comments

@davidwin93
Copy link

Acknowledgements

Describe the bug

A go routine is panicking when attempting to read outside of a slices boundary when reading from a kinesis stream.

Expected Behavior

There should be no panic in a library since we can't recover from this and the entire application crashes

Current Behavior

panic: runtime error: slice bounds out of range [31319:23127]

goroutine 937 [running]:
compress/flate.(*decompressor).Read(0xc00054c008, {0xc0009ce000, 0x37e00, 0xc000498700?})
/Users/david/.asdf/installs/golang/1.22.0/go/src/compress/flate/inflate.go:339 +0x1e8
compress/gzip.(*Reader).Read(0xc000314588, {0xc0009ce000, 0x37e00, 0x37e00})
/Users/david/.asdf/installs/golang/1.22.0/go/src/compress/gzip/gunzip.go:252 +0xa2
net/http.(*gzipReader).Read(0xc000637160, {0xc0009ce000, 0x37e00, 0x37e00})
/Users/david/.asdf/installs/golang/1.22.0/go/src/net/http/transport.go:2896 +0x195
github.com/aws/aws-sdk-go-v2/aws/transport/http.(*timeoutReadCloser).Read.func1()
/Users/david/.asdf/installs/golang/1.22.0/packages/pkg/mod/github.com/aws/[email protected]/aws/transport/http/timeout_read_closer.go:48 +0x43
created by github.com/aws/aws-sdk-go-v2/aws/transport/http.(*timeoutReadCloser).Read in goroutine 49
/Users/david/.asdf/installs/golang/1.22.0/packages/pkg/mod/github.com/aws/[email protected]/aws/transport/http/timeout_read_closer.go:47 +0xfb

Reproduction Steps

Call GetRecords on a busy stream with a slow internet connection? Not really sure how to cause this directly since it happens when the data stream Im reading from is under higher load.

Possible Solution

No response

Additional Information/Context

Same as this issue that was closed for being stale.

AWS Go SDK V2 Module Versions Used

github.com/aws/aws-sdk-go-v2 v1.30.4
github.com/aws/aws-sdk-go-v2/config v1.27.28
github.com/aws/aws-sdk-go-v2/service/kinesis v1.29.4

Compiler and Version used

go1.22.0 darwin/amd64

Operating System and version

OSX 14

@davidwin93 davidwin93 added bug This issue is a bug. needs-triage This issue or PR still needs to be triaged. labels Aug 17, 2024
@davidwin93 davidwin93 changed the title (short issue description) Kinesis sdk panicking Aug 17, 2024
@RanVaknin RanVaknin self-assigned this Aug 20, 2024
@RanVaknin RanVaknin added investigating This issue is being investigated and/or work is in progress to resolve the issue. and removed needs-triage This issue or PR still needs to be triaged. labels Aug 21, 2024
@RanVaknin
Copy link
Contributor

Hi @davidwin93 ,

Like with the other issue you linked, the reproduction steps here are vague, so it's difficult for us to root cause this. From the stack trace, the error seems to be with gzip itself rather than the SDK, and this seems to manifest under specific networking conditions that are not straightforward to reproduce.

I tried giving reproducing this a fair shake, but my reproduction did not yield the reported panic behavior.

In my reproduction I have the following setup:

  1. Kinesis stream with 5 shards, pre-populated with 5000 records.
  2. producer function that writes 500 records every 3 seconds into the stream
  3. consumer function that reads in concurrently from all shards.

Consumer

func consume() {
	cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
	if err != nil {
		log.Fatalf("unable to load SDK config, %v", err)
	}

	client := kinesis.NewFromConfig(cfg)
	streamName := "foo-stream"

	listShardsOutput, err := client.ListShards(context.TODO(), &kinesis.ListShardsInput{
		StreamName: &streamName,
	})
	if err != nil {
		panic(err)
	}

	var wg sync.WaitGroup
	recordCount := 0

	for _, shard := range listShardsOutput.Shards {
		wg.Add(1)
		go func(shardId string) {
			defer wg.Done()

			shardIteratorOutput, err := client.GetShardIterator(context.TODO(), &kinesis.GetShardIteratorInput{
				StreamName:        &streamName,
				ShardId:           &shardId,
				ShardIteratorType: types.ShardIteratorTypeTrimHorizon,
			})
			if err != nil {
				log.Printf("failed to get shard iterator for shard %s, %v", shardId, err)
				return
			}

			shardIterator := shardIteratorOutput.ShardIterator
			if shardIterator == nil || *shardIterator == "" {
				log.Printf("No initial shard iterator for shard %s", shardId)
				return
			}

			for {
				if shardIterator == nil || *shardIterator == "" {
					log.Printf("Shard %s, no more records or iterator expired", shardId)
					break
				}

				getRecordsOutput, err := client.GetRecords(context.TODO(), &kinesis.GetRecordsInput{
					ShardIterator: shardIterator,
				})
				if err != nil {
					log.Printf("failed to get records from shard %s, %v", shardId, err)
					break
				}

				log.Printf("Shard %s, fetched %d records", shardId, len(getRecordsOutput.Records))
				for _, record := range getRecordsOutput.Records {
					fmt.Printf("Shard %s, Consumed: %s\n", shardId, string(record.Data))
					recordCount++
				}

				shardIterator = getRecordsOutput.NextShardIterator
				if len(getRecordsOutput.Records) == 0 {
					log.Printf("Shard %s, no new records, sleeping for a bit...", shardId)
					time.Sleep(1 * time.Second)
				}
			}
		}(aws.ToString(shard.ShardId))
	}

	wg.Wait()
}

Producer

func produce() {
	cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
	if err != nil {
		panic(err)
	}

	client := kinesis.NewFromConfig(cfg)
	for {
		var wg sync.WaitGroup

		for i := 0; i < 500; i++ {
			wg.Add(1)
			go func(i int) {
				defer wg.Done()
				data := fmt.Sprintf("record-%d-%d", i, rand.Intn(100))
				_, err := client.PutRecord(context.TODO(), &kinesis.PutRecordInput{
					StreamName:   aws.String("foo-stream"),
					PartitionKey: aws.String(fmt.Sprintf("partition-%d", i%10)),
					Data:         []byte(data),
				})
				if err != nil {
					log.Printf("failed to put record, %v", err)
					return
				}
				log.Printf("Produced: %s", data)
			}(i)
		}

		wg.Wait()
		time.Sleep(3 * time.Second)
	}
}

I'm running this from a Docker container using golang:1.20-alpine as the image.
I'm also using the tc (traffic control) tool to simulate slower connection:

$ tc qdisc add dev eth0 root tbf rate 30mbit burst 32kbit latency 50ms

And then running my code with the race flag:

$ go run main.go -race

I let this app run for about 45min and I'm not seeing any panics / errors happening.

If you have any pointers on how to tweak this example to raise the reported behavior I can take another look.

Thanks,
Ran~

@RanVaknin RanVaknin added response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 7 days. p2 This is a standard priority issue and removed investigating This issue is being investigated and/or work is in progress to resolve the issue. labels Aug 21, 2024
Copy link

github-actions bot commented Sep 1, 2024

This issue has not received a response in 1 week. If you want to keep this issue open, please just leave a comment below and auto-close will be canceled.

@github-actions github-actions bot added the closing-soon This issue will automatically close in 4 days unless further comments are made. label Sep 1, 2024
@davidwin93
Copy link
Author

Hey Ran thanks for looking into this. Im able to recreate this by pushing realtime logs from cloudfront to Kinesis and attaching a consumer on a "limited" internet connection. In this case the connection is stable at around 300Mb/sec. Running this with our EKS cluster I don't see any issues which makes sense given the higher networking performance.

The reason I raised this issue is that I don't expect a library to panic but instead I would expect an error to be raised to my GetRecords call for example.

@github-actions github-actions bot removed closing-soon This issue will automatically close in 4 days unless further comments are made. response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 7 days. labels Sep 4, 2024
@lucix-aws
Copy link
Contributor

To be clear, a panic in the stdlib is a panic. There's nothing we can do once that happens. The best we can do is ensure we're not setting up the scenario in which that panic occurs. Whether or not we're actively doing that remains to be seen.

@lucix-aws lucix-aws added needs-review This issue or pull request needs review from a core team member. needs-reproduction This issue needs reproduction. and removed needs-review This issue or pull request needs review from a core team member. labels Sep 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug This issue is a bug. needs-reproduction This issue needs reproduction. p2 This is a standard priority issue
Projects
None yet
Development

No branches or pull requests

3 participants