From a311d04cb6305215e5ce78d9b64eb58d281be32f Mon Sep 17 00:00:00 2001 From: Atsushi Watanabe Date: Mon, 22 Mar 2021 15:57:04 +0900 Subject: [PATCH] Retry on PutMedia failure (#154) * Split marshalling and upload * Implement PutMedia retry --- kvsmockserver/kinesisvideoserver.go | 18 ++- provider.go | 68 ++++++++- provider_test.go | 211 ++++++++++++++++------------ 3 files changed, 198 insertions(+), 99 deletions(-) diff --git a/kvsmockserver/kinesisvideoserver.go b/kvsmockserver/kinesisvideoserver.go index cf72ec8..bf29cf3 100644 --- a/kvsmockserver/kinesisvideoserver.go +++ b/kvsmockserver/kinesisvideoserver.go @@ -33,6 +33,8 @@ type KinesisVideoServer struct { fragments map[uint64]FragmentTest blockTime time.Duration mu sync.Mutex + + putMediaHook func(uint64, *FragmentTest, http.ResponseWriter) bool } type KinesisVideoServerOption func(*KinesisVideoServer) @@ -43,6 +45,12 @@ func WithBlockTime(blockTime time.Duration) KinesisVideoServerOption { } } +func WithPutMediaHook(h func(uint64, *FragmentTest, http.ResponseWriter) bool) KinesisVideoServerOption { + return func(s *KinesisVideoServer) { + s.putMediaHook = h + } +} + func NewKinesisVideoServer(opts ...KinesisVideoServerOption) *KinesisVideoServer { s := &KinesisVideoServer{ fragments: make(map[uint64]FragmentTest), @@ -97,12 +105,18 @@ func (s *KinesisVideoServer) putMedia(w http.ResponseWriter, r *http.Request) { return } - s.mu.Lock() data.Segment.Cluster.Timecode += baseTimecode - s.fragments[data.Segment.Cluster.Timecode] = FragmentTest{ + fragment := FragmentTest{ Cluster: data.Segment.Cluster, Tags: data.Segment.Tags, } + if s.putMediaHook != nil { + if !s.putMediaHook(data.Segment.Cluster.Timecode, &fragment, w) { + return + } + } + s.mu.Lock() + s.fragments[data.Segment.Cluster.Timecode] = fragment s.mu.Unlock() fmt.Fprintf(w, diff --git a/provider.go b/provider.go index e313e0c..a3ab015 100644 --- a/provider.go +++ b/provider.go @@ -17,6 +17,7 @@ package kinesisvideomanager import ( "bufio" "bytes" + "context" "fmt" "io" "io/ioutil" @@ -50,6 +51,8 @@ type Provider struct { signer *v4.Signer cliConfig *client.Config tracks []TrackEntry + + bufferPool sync.Pool } func (c *Client) Provider(streamID StreamID, tracks []TrackEntry) (*Provider, error) { @@ -69,6 +72,11 @@ func (c *Client) Provider(streamID StreamID, tracks []TrackEntry) (*Provider, er signer: c.signer, cliConfig: c.cliConfig, tracks: tracks, + bufferPool: sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 1024)) + }, + }, }, nil } @@ -81,6 +89,8 @@ type PutMediaOptions struct { httpClient http.Client tags func() []SimpleTag onError func(error) + retryCount int + retryIntervalBase time.Duration } type PutMediaOption func(*PutMediaOptions) @@ -299,25 +309,61 @@ func (p *Provider) putMedia(baseTimecode chan uint64, ch chan ebml.Block, chTag }, } - r, w := io.Pipe() - chErr := make(chan error) + r, wOut := io.Pipe() + w := io.Writer(wOut) + var backup *bytes.Buffer + if opts.retryCount > 0 { + // Take copy of the fragment. + backup = p.bufferPool.Get().(*bytes.Buffer) + defer p.bufferPool.Put(backup) + backup.Reset() + w = io.MultiWriter(wOut, backup) + } + + ctx, cancel := context.WithCancel(context.Background()) + ctxErr := &errContext{Context: ctx} go func() { defer func() { - close(chErr) - w.CloseWithError(io.EOF) + cancel() + wOut.CloseWithError(io.EOF) }() buf := bufio.NewWriter(w) if err := ebml.Marshal(&data, buf); err != nil { - chErr <- err + ctxErr.err = err return } if err := buf.Flush(); err != nil { - chErr <- err + ctxErr.err = err return } }() + ret, err := p.putMediaRaw(ctxErr, r, opts) + if err != nil && opts.retryCount > 0 { + interval := opts.retryIntervalBase + for i := 0; i < opts.retryCount; i++ { + time.Sleep(interval) + + ret, err = p.putMediaRaw(ctxErr, bytes.NewReader(backup.Bytes()), opts) + if err == nil { + break + } + interval *= 2 + } + } + return ret, err +} +type errContext struct { + context.Context + err error +} + +func (c *errContext) Err() error { + return c.err +} + +func (p *Provider) putMediaRaw(ctx context.Context, r io.Reader, opts *PutMediaOptions) (io.ReadCloser, error) { req, err := http.NewRequest("POST", p.endpoint, r) if err != nil { return nil, err @@ -350,7 +396,8 @@ func (p *Provider) putMedia(baseTimecode chan uint64, ch chan ebml.Block, chTag } return nil, fmt.Errorf("%d: %s", res.StatusCode, string(body)) } - if err := <-chErr; err != nil { + <-ctx.Done() + if err := ctx.Err(); err != nil { return nil, err } return res.Body, nil @@ -407,3 +454,10 @@ func OnError(onError func(error)) PutMediaOption { p.onError = onError } } + +func WithPutMediaRetry(count int, intervalBase time.Duration) PutMediaOption { + return func(p *PutMediaOptions) { + p.retryCount = count + p.retryIntervalBase = intervalBase + } +} diff --git a/provider_test.go b/provider_test.go index 171e9ed..3f770ce 100644 --- a/provider_test.go +++ b/provider_test.go @@ -35,105 +35,136 @@ import ( var testData = [][]byte{{0x01, 0x02}} func TestProvider(t *testing.T) { - server := kvsm.NewKinesisVideoServer() - defer server.Close() - - pro := newProvider(t, server) - - ch := make(chan *kvm.BlockWithBaseTimecode) - timecodes := []uint64{ - 1000, - 9000, - 10000, - 10001, // switch to the next fragment here - 10002, + dropped := make(map[uint64]bool) + + testCases := map[string]struct { + mockServerOpts []kvsm.KinesisVideoServerOption + putMediaOpts []kvm.PutMediaOption + }{ + "NoError": {}, + "ErrorRetry": { + mockServerOpts: []kvsm.KinesisVideoServerOption{ + kvsm.WithPutMediaHook(func(timecode uint64, f *kvsm.FragmentTest, w http.ResponseWriter) bool { + if !dropped[timecode] { + dropped[timecode] = true + w.WriteHeader(500) + t.Logf("Error injected: timecode=%d", timecode) + return false + } + return true + }), + }, + putMediaOpts: []kvm.PutMediaOption{ + kvm.WithPutMediaRetry(2, 100*time.Millisecond), + }, + }, } - go func() { - defer close(ch) - for _, tc := range timecodes { - ch <- &kvm.BlockWithBaseTimecode{ - Timecode: tc, - Block: newBlock(0), - } - } - }() - chResp := make(chan kvm.FragmentEvent) - var response []kvm.FragmentEvent - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - go func() { - defer cancel() - for { - select { - case r, ok := <-chResp: - if !ok { - return + for name, testCase := range testCases { + testCase := testCase + t.Run(name, func(t *testing.T) { + server := kvsm.NewKinesisVideoServer(testCase.mockServerOpts...) + defer server.Close() + + pro := newProvider(t, server) + + ch := make(chan *kvm.BlockWithBaseTimecode) + timecodes := []uint64{ + 1000, + 9000, + 10000, + 10001, // switch to the next fragment here + 10002, + } + go func() { + defer close(ch) + for _, tc := range timecodes { + ch <- &kvm.BlockWithBaseTimecode{ + Timecode: tc, + Block: newBlock(0), + } + } + }() + + chResp := make(chan kvm.FragmentEvent) + var response []kvm.FragmentEvent + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + go func() { + defer cancel() + for { + select { + case r, ok := <-chResp: + if !ok { + return + } + response = append(response, r) + } } - response = append(response, r) + }() + + startTimestamp := time.Now() + startTimestampInMillis := uint64(startTimestamp.UnixNano() / int64(time.Millisecond)) + cnt := 0 + var err error + opts := []kvm.PutMediaOption{ + kvm.WithFragmentTimecodeType(kvm.FragmentTimecodeTypeRelative), + kvm.WithProducerStartTimestamp(startTimestamp), + kvm.WithTags(func() []kvm.SimpleTag { + cnt++ + return []kvm.SimpleTag{ + {TagName: "TEST_TAG", TagString: fmt.Sprintf("%d", cnt)}, + } + }), + kvm.OnError(func(e error) { + err = e + }), } - } - }() - - startTimestamp := time.Now() - startTimestampInMillis := uint64(startTimestamp.UnixNano() / int64(time.Millisecond)) - cnt := 0 - var err error - opts := []kvm.PutMediaOption{ - kvm.WithFragmentTimecodeType(kvm.FragmentTimecodeTypeRelative), - kvm.WithProducerStartTimestamp(startTimestamp), - kvm.WithTags(func() []kvm.SimpleTag { - cnt++ - return []kvm.SimpleTag{ - {TagName: "TEST_TAG", TagString: fmt.Sprintf("%d", cnt)}, + opts = append(opts, testCase.putMediaOpts...) + pro.PutMedia(ch, chResp, opts...) + if err != nil { + t.Fatalf("Failed to run PutMedia: %v", err) } - }), - kvm.OnError(func(e error) { - err = e - }), - } - pro.PutMedia(ch, chResp, opts...) - if err != nil { - t.Fatalf("Failed to run PutMedia: %v", err) - } - <-ctx.Done() - if ctx.Err() == context.DeadlineExceeded { - t.Fatalf("PutMedia timed out") - } + <-ctx.Done() + if ctx.Err() == context.DeadlineExceeded { + t.Fatalf("PutMedia timed out") + } - expected := []kvsm.FragmentTest{ - { - Cluster: kvsm.ClusterTest{ - Timecode: startTimestampInMillis + 1000, - SimpleBlock: []ebml.Block{newBlock(0), newBlock(8000), newBlock(9000)}, - }, - Tags: newTags([]kvm.SimpleTag{{TagName: "TEST_TAG", TagString: "1"}}), - }, - { - Cluster: kvsm.ClusterTest{ - Timecode: startTimestampInMillis + 10001, - SimpleBlock: []ebml.Block{newBlock(0), newBlock(1)}, - }, - Tags: newTags([]kvm.SimpleTag{{TagName: "TEST_TAG", TagString: "2"}}), - }, - } + expected := []kvsm.FragmentTest{ + { + Cluster: kvsm.ClusterTest{ + Timecode: startTimestampInMillis + 1000, + SimpleBlock: []ebml.Block{newBlock(0), newBlock(8000), newBlock(9000)}, + }, + Tags: newTags([]kvm.SimpleTag{{TagName: "TEST_TAG", TagString: "1"}}), + }, + { + Cluster: kvsm.ClusterTest{ + Timecode: startTimestampInMillis + 10001, + SimpleBlock: []ebml.Block{newBlock(0), newBlock(1)}, + }, + Tags: newTags([]kvm.SimpleTag{{TagName: "TEST_TAG", TagString: "2"}}), + }, + } - if n := len(response); n != len(expected) { - t.Fatalf("Response size expected to be %d but %d", len(expected), n) - } + if n := len(response); n != len(expected) { + t.Fatalf("Response size expected to be %d but %d", len(expected), n) + } - for _, fragment := range expected { - actual, ok := server.GetFragment(fragment.Cluster.Timecode) - if !ok { - t.Errorf("fragment %d not found", fragment.Cluster.Timecode) - continue - } - if !reflect.DeepEqual(fragment.Cluster, actual.Cluster) { - t.Errorf("Unexpected Cluster\n expected:%+v\n actual%+v", fragment.Cluster, actual.Cluster) - } - if !reflect.DeepEqual(fragment.Tags, actual.Tags) { - t.Errorf("Unexpected Tags\n expected:%+v\n actual%+v", fragment.Tags, actual.Tags) - } + for _, fragment := range expected { + actual, ok := server.GetFragment(fragment.Cluster.Timecode) + if !ok { + t.Errorf("fragment %d not found", fragment.Cluster.Timecode) + continue + } + if !reflect.DeepEqual(fragment.Cluster, actual.Cluster) { + t.Errorf("Unexpected Cluster\n expected:%+v\n actual%+v", fragment.Cluster, actual.Cluster) + } + if !reflect.DeepEqual(fragment.Tags, actual.Tags) { + t.Errorf("Unexpected Tags\n expected:%+v\n actual%+v", fragment.Tags, actual.Tags) + } + } + }) } }