Skip to content

Commit

Permalink
Retry on PutMedia failure (#154)
Browse files Browse the repository at this point in the history
* Split marshalling and upload
* Implement PutMedia retry
  • Loading branch information
at-wat authored Mar 22, 2021
1 parent c672b5b commit a311d04
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 99 deletions.
18 changes: 16 additions & 2 deletions kvsmockserver/kinesisvideoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type KinesisVideoServer struct {
fragments map[uint64]FragmentTest
blockTime time.Duration
mu sync.Mutex

putMediaHook func(uint64, *FragmentTest, http.ResponseWriter) bool
}

type KinesisVideoServerOption func(*KinesisVideoServer)
Expand All @@ -43,6 +45,12 @@ func WithBlockTime(blockTime time.Duration) KinesisVideoServerOption {
}
}

func WithPutMediaHook(h func(uint64, *FragmentTest, http.ResponseWriter) bool) KinesisVideoServerOption {
return func(s *KinesisVideoServer) {
s.putMediaHook = h
}
}

func NewKinesisVideoServer(opts ...KinesisVideoServerOption) *KinesisVideoServer {
s := &KinesisVideoServer{
fragments: make(map[uint64]FragmentTest),
Expand Down Expand Up @@ -97,12 +105,18 @@ func (s *KinesisVideoServer) putMedia(w http.ResponseWriter, r *http.Request) {
return
}

s.mu.Lock()
data.Segment.Cluster.Timecode += baseTimecode
s.fragments[data.Segment.Cluster.Timecode] = FragmentTest{
fragment := FragmentTest{
Cluster: data.Segment.Cluster,
Tags: data.Segment.Tags,
}
if s.putMediaHook != nil {
if !s.putMediaHook(data.Segment.Cluster.Timecode, &fragment, w) {
return
}
}
s.mu.Lock()
s.fragments[data.Segment.Cluster.Timecode] = fragment
s.mu.Unlock()

fmt.Fprintf(w,
Expand Down
68 changes: 61 additions & 7 deletions provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kinesisvideomanager
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -50,6 +51,8 @@ type Provider struct {
signer *v4.Signer
cliConfig *client.Config
tracks []TrackEntry

bufferPool sync.Pool
}

func (c *Client) Provider(streamID StreamID, tracks []TrackEntry) (*Provider, error) {
Expand All @@ -69,6 +72,11 @@ func (c *Client) Provider(streamID StreamID, tracks []TrackEntry) (*Provider, er
signer: c.signer,
cliConfig: c.cliConfig,
tracks: tracks,
bufferPool: sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 1024))
},
},
}, nil
}

Expand All @@ -81,6 +89,8 @@ type PutMediaOptions struct {
httpClient http.Client
tags func() []SimpleTag
onError func(error)
retryCount int
retryIntervalBase time.Duration
}

type PutMediaOption func(*PutMediaOptions)
Expand Down Expand Up @@ -299,25 +309,61 @@ func (p *Provider) putMedia(baseTimecode chan uint64, ch chan ebml.Block, chTag
},
}

r, w := io.Pipe()
chErr := make(chan error)
r, wOut := io.Pipe()
w := io.Writer(wOut)
var backup *bytes.Buffer
if opts.retryCount > 0 {
// Take copy of the fragment.
backup = p.bufferPool.Get().(*bytes.Buffer)
defer p.bufferPool.Put(backup)
backup.Reset()
w = io.MultiWriter(wOut, backup)
}

ctx, cancel := context.WithCancel(context.Background())
ctxErr := &errContext{Context: ctx}
go func() {
defer func() {
close(chErr)
w.CloseWithError(io.EOF)
cancel()
wOut.CloseWithError(io.EOF)
}()

buf := bufio.NewWriter(w)
if err := ebml.Marshal(&data, buf); err != nil {
chErr <- err
ctxErr.err = err
return
}
if err := buf.Flush(); err != nil {
chErr <- err
ctxErr.err = err
return
}
}()
ret, err := p.putMediaRaw(ctxErr, r, opts)
if err != nil && opts.retryCount > 0 {
interval := opts.retryIntervalBase
for i := 0; i < opts.retryCount; i++ {
time.Sleep(interval)

ret, err = p.putMediaRaw(ctxErr, bytes.NewReader(backup.Bytes()), opts)
if err == nil {
break
}
interval *= 2
}
}
return ret, err
}

type errContext struct {
context.Context
err error
}

func (c *errContext) Err() error {
return c.err
}

func (p *Provider) putMediaRaw(ctx context.Context, r io.Reader, opts *PutMediaOptions) (io.ReadCloser, error) {
req, err := http.NewRequest("POST", p.endpoint, r)
if err != nil {
return nil, err
Expand Down Expand Up @@ -350,7 +396,8 @@ func (p *Provider) putMedia(baseTimecode chan uint64, ch chan ebml.Block, chTag
}
return nil, fmt.Errorf("%d: %s", res.StatusCode, string(body))
}
if err := <-chErr; err != nil {
<-ctx.Done()
if err := ctx.Err(); err != nil {
return nil, err
}
return res.Body, nil
Expand Down Expand Up @@ -407,3 +454,10 @@ func OnError(onError func(error)) PutMediaOption {
p.onError = onError
}
}

func WithPutMediaRetry(count int, intervalBase time.Duration) PutMediaOption {
return func(p *PutMediaOptions) {
p.retryCount = count
p.retryIntervalBase = intervalBase
}
}
211 changes: 121 additions & 90 deletions provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,105 +35,136 @@ import (
var testData = [][]byte{{0x01, 0x02}}

func TestProvider(t *testing.T) {
server := kvsm.NewKinesisVideoServer()
defer server.Close()

pro := newProvider(t, server)

ch := make(chan *kvm.BlockWithBaseTimecode)
timecodes := []uint64{
1000,
9000,
10000,
10001, // switch to the next fragment here
10002,
dropped := make(map[uint64]bool)

testCases := map[string]struct {
mockServerOpts []kvsm.KinesisVideoServerOption
putMediaOpts []kvm.PutMediaOption
}{
"NoError": {},
"ErrorRetry": {
mockServerOpts: []kvsm.KinesisVideoServerOption{
kvsm.WithPutMediaHook(func(timecode uint64, f *kvsm.FragmentTest, w http.ResponseWriter) bool {
if !dropped[timecode] {
dropped[timecode] = true
w.WriteHeader(500)
t.Logf("Error injected: timecode=%d", timecode)
return false
}
return true
}),
},
putMediaOpts: []kvm.PutMediaOption{
kvm.WithPutMediaRetry(2, 100*time.Millisecond),
},
},
}
go func() {
defer close(ch)
for _, tc := range timecodes {
ch <- &kvm.BlockWithBaseTimecode{
Timecode: tc,
Block: newBlock(0),
}
}
}()

chResp := make(chan kvm.FragmentEvent)
var response []kvm.FragmentEvent
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
go func() {
defer cancel()
for {
select {
case r, ok := <-chResp:
if !ok {
return
for name, testCase := range testCases {
testCase := testCase
t.Run(name, func(t *testing.T) {
server := kvsm.NewKinesisVideoServer(testCase.mockServerOpts...)
defer server.Close()

pro := newProvider(t, server)

ch := make(chan *kvm.BlockWithBaseTimecode)
timecodes := []uint64{
1000,
9000,
10000,
10001, // switch to the next fragment here
10002,
}
go func() {
defer close(ch)
for _, tc := range timecodes {
ch <- &kvm.BlockWithBaseTimecode{
Timecode: tc,
Block: newBlock(0),
}
}
}()

chResp := make(chan kvm.FragmentEvent)
var response []kvm.FragmentEvent
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
go func() {
defer cancel()
for {
select {
case r, ok := <-chResp:
if !ok {
return
}
response = append(response, r)
}
}
response = append(response, r)
}()

startTimestamp := time.Now()
startTimestampInMillis := uint64(startTimestamp.UnixNano() / int64(time.Millisecond))
cnt := 0
var err error
opts := []kvm.PutMediaOption{
kvm.WithFragmentTimecodeType(kvm.FragmentTimecodeTypeRelative),
kvm.WithProducerStartTimestamp(startTimestamp),
kvm.WithTags(func() []kvm.SimpleTag {
cnt++
return []kvm.SimpleTag{
{TagName: "TEST_TAG", TagString: fmt.Sprintf("%d", cnt)},
}
}),
kvm.OnError(func(e error) {
err = e
}),
}
}
}()

startTimestamp := time.Now()
startTimestampInMillis := uint64(startTimestamp.UnixNano() / int64(time.Millisecond))
cnt := 0
var err error
opts := []kvm.PutMediaOption{
kvm.WithFragmentTimecodeType(kvm.FragmentTimecodeTypeRelative),
kvm.WithProducerStartTimestamp(startTimestamp),
kvm.WithTags(func() []kvm.SimpleTag {
cnt++
return []kvm.SimpleTag{
{TagName: "TEST_TAG", TagString: fmt.Sprintf("%d", cnt)},
opts = append(opts, testCase.putMediaOpts...)
pro.PutMedia(ch, chResp, opts...)
if err != nil {
t.Fatalf("Failed to run PutMedia: %v", err)
}
}),
kvm.OnError(func(e error) {
err = e
}),
}
pro.PutMedia(ch, chResp, opts...)
if err != nil {
t.Fatalf("Failed to run PutMedia: %v", err)
}

<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded {
t.Fatalf("PutMedia timed out")
}
<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded {
t.Fatalf("PutMedia timed out")
}

expected := []kvsm.FragmentTest{
{
Cluster: kvsm.ClusterTest{
Timecode: startTimestampInMillis + 1000,
SimpleBlock: []ebml.Block{newBlock(0), newBlock(8000), newBlock(9000)},
},
Tags: newTags([]kvm.SimpleTag{{TagName: "TEST_TAG", TagString: "1"}}),
},
{
Cluster: kvsm.ClusterTest{
Timecode: startTimestampInMillis + 10001,
SimpleBlock: []ebml.Block{newBlock(0), newBlock(1)},
},
Tags: newTags([]kvm.SimpleTag{{TagName: "TEST_TAG", TagString: "2"}}),
},
}
expected := []kvsm.FragmentTest{
{
Cluster: kvsm.ClusterTest{
Timecode: startTimestampInMillis + 1000,
SimpleBlock: []ebml.Block{newBlock(0), newBlock(8000), newBlock(9000)},
},
Tags: newTags([]kvm.SimpleTag{{TagName: "TEST_TAG", TagString: "1"}}),
},
{
Cluster: kvsm.ClusterTest{
Timecode: startTimestampInMillis + 10001,
SimpleBlock: []ebml.Block{newBlock(0), newBlock(1)},
},
Tags: newTags([]kvm.SimpleTag{{TagName: "TEST_TAG", TagString: "2"}}),
},
}

if n := len(response); n != len(expected) {
t.Fatalf("Response size expected to be %d but %d", len(expected), n)
}
if n := len(response); n != len(expected) {
t.Fatalf("Response size expected to be %d but %d", len(expected), n)
}

for _, fragment := range expected {
actual, ok := server.GetFragment(fragment.Cluster.Timecode)
if !ok {
t.Errorf("fragment %d not found", fragment.Cluster.Timecode)
continue
}
if !reflect.DeepEqual(fragment.Cluster, actual.Cluster) {
t.Errorf("Unexpected Cluster\n expected:%+v\n actual%+v", fragment.Cluster, actual.Cluster)
}
if !reflect.DeepEqual(fragment.Tags, actual.Tags) {
t.Errorf("Unexpected Tags\n expected:%+v\n actual%+v", fragment.Tags, actual.Tags)
}
for _, fragment := range expected {
actual, ok := server.GetFragment(fragment.Cluster.Timecode)
if !ok {
t.Errorf("fragment %d not found", fragment.Cluster.Timecode)
continue
}
if !reflect.DeepEqual(fragment.Cluster, actual.Cluster) {
t.Errorf("Unexpected Cluster\n expected:%+v\n actual%+v", fragment.Cluster, actual.Cluster)
}
if !reflect.DeepEqual(fragment.Tags, actual.Tags) {
t.Errorf("Unexpected Tags\n expected:%+v\n actual%+v", fragment.Tags, actual.Tags)
}
}
})
}
}

Expand Down

0 comments on commit a311d04

Please sign in to comment.