Skip to content

Commit

Permalink
Add PutMedia connection event callbacks (#367)
Browse files Browse the repository at this point in the history
  • Loading branch information
at-wat authored Jun 19, 2023
1 parent b1232b0 commit 65e7d87
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 21 deletions.
29 changes: 28 additions & 1 deletion provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,16 @@ type PutMediaOptions struct {
connectionTimeout time.Duration
httpClient http.Client
tags func() []SimpleTag
onError func(error)
retryCount int
retryIntervalBase time.Duration
fragmentHeadDumpLen int
lenBlockBuffer int
lenResponseBuffer int
logger LoggerIF

onError func(error)
onNewConn func()
onSwitchConn func(uint64)
}

type PutMediaOption func(*PutMediaOptions)
Expand Down Expand Up @@ -280,13 +283,19 @@ func (p *Provider) PutMedia(opts ...PutMediaOption) (BlockWriter, error) {
return nil
}
prepareNextConn := func() {
if options.onNewConn != nil {
options.onNewConn()
}
nextConn = newConnection(options)
select {
case chConnection <- nextConn:
case <-closed:
}
}
switchToNextConn := func(startTime uint64) {
if options.onSwitchConn != nil {
options.onSwitchConn(startTime)
}
if conn != nil {
conn.close()
}
Expand Down Expand Up @@ -685,6 +694,24 @@ func OnError(onError func(error)) PutMediaOption {
}
}

// OnPutMediaNewConn registers a func that will be called before
// creating a new PutMedia API connection.
// Media stream processing is blocked until the func returns.
func OnPutMediaNewConn(onNewConn func()) PutMediaOption {
return func(p *PutMediaOptions) {
p.onNewConn = onNewConn
}
}

// OnPutMediaSwitchConn registers a func that will be called before
// switching a PutMedia API connection.
// Media stream processing is blocked until the func returns.
func OnPutMediaSwitchConn(onSwitchConn func(timecode uint64)) PutMediaOption {
return func(p *PutMediaOptions) {
p.onSwitchConn = onSwitchConn
}
}

func WithPutMediaRetry(count int, intervalBase time.Duration) PutMediaOption {
return func(p *PutMediaOptions) {
p.retryCount = count
Expand Down
73 changes: 53 additions & 20 deletions provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,18 @@ func TestProvider(t *testing.T) {
}

testCases := map[string]struct {
mockServerOpts func(*testing.T, map[uint64]bool, *bool, func()) []kvsm.KinesisVideoServerOption
putMediaOpts []kvm.PutMediaOption
expected []kvsm.FragmentTest
errCheck func(*testing.T, int, error) bool
mockServerOpts func(*testing.T, map[uint64]bool, *bool, func()) []kvsm.KinesisVideoServerOption
putMediaOpts []kvm.PutMediaOption
expected []kvsm.FragmentTest
expectedNewConnCnt int
expectedSwitchConnCnt int
errCheck func(*testing.T, int, error) bool
}{
"NoError": {
mockServerOpts: func(*testing.T, map[uint64]bool, *bool, func()) []kvsm.KinesisVideoServerOption { return nil },
expected: []kvsm.FragmentTest{expected0, expected1, expected2},
mockServerOpts: func(*testing.T, map[uint64]bool, *bool, func()) []kvsm.KinesisVideoServerOption { return nil },
expected: []kvsm.FragmentTest{expected0, expected1, expected2},
expectedNewConnCnt: 3,
expectedSwitchConnCnt: 3,
},
"HTTPErrorRetry": {
mockServerOpts: func(t *testing.T, dropped map[uint64]bool, _ *bool, _ func()) []kvsm.KinesisVideoServerOption {
Expand All @@ -100,8 +104,10 @@ func TestProvider(t *testing.T) {
}),
}
},
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected1, expected2},
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected1, expected2},
expectedNewConnCnt: 3,
expectedSwitchConnCnt: 3,
},
"DelayedHTTPErrorRetry": {
mockServerOpts: func(t *testing.T, dropped map[uint64]bool, _ *bool, _ func()) []kvsm.KinesisVideoServerOption {
Expand All @@ -123,8 +129,10 @@ func TestProvider(t *testing.T) {
}),
}
},
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected1, expected2},
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected1, expected2},
expectedNewConnCnt: 3,
expectedSwitchConnCnt: 3,
},
"KinesisErrorRetry": {
mockServerOpts: func(t *testing.T, dropped map[uint64]bool, _ *bool, _ func()) []kvsm.KinesisVideoServerOption {
Expand All @@ -144,8 +152,10 @@ func TestProvider(t *testing.T) {
}),
}
},
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected0, expected1, expected1, expected2, expected2},
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected0, expected1, expected1, expected2, expected2},
expectedNewConnCnt: 3,
expectedSwitchConnCnt: 3,
},
"KinesisFailDumpShort": {
mockServerOpts: func(t *testing.T, dropped map[uint64]bool, _ *bool, _ func()) []kvsm.KinesisVideoServerOption {
Expand All @@ -164,7 +174,9 @@ func TestProvider(t *testing.T) {
kvm.WithFragmentHeadDumpLen(17),
kvm.WithSegmentUID([]byte{0x00, 0x01, 0x02, 0x03}),
},
expected: []kvsm.FragmentTest{},
expected: []kvsm.FragmentTest{},
expectedNewConnCnt: 3,
expectedSwitchConnCnt: 3,
errCheck: func(t *testing.T, cnt int, err error) bool {
if err == nil {
t.Error("Expected error")
Expand Down Expand Up @@ -208,7 +220,9 @@ func TestProvider(t *testing.T) {
kvm.WithFragmentHeadDumpLen(512),
kvm.WithSegmentUID([]byte{0x00, 0x01, 0x02, 0x03}),
},
expected: []kvsm.FragmentTest{},
expected: []kvsm.FragmentTest{},
expectedNewConnCnt: 3,
expectedSwitchConnCnt: 3,
errCheck: func(t *testing.T, cnt int, err error) bool {
if err == nil {
t.Error("Expected error")
Expand Down Expand Up @@ -278,8 +292,10 @@ func TestProvider(t *testing.T) {
}),
}
},
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected0, expected1, expected1, expected2, expected2},
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected0, expected1, expected1, expected2, expected2},
expectedNewConnCnt: 3,
expectedSwitchConnCnt: 3,
},
"DisconnectRetry": {
mockServerOpts: func(t *testing.T, _ map[uint64]bool, disconnected *bool, disconnect func()) []kvsm.KinesisVideoServerOption {
Expand All @@ -297,8 +313,10 @@ func TestProvider(t *testing.T) {
}),
}
},
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected1, expected2},
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected1, expected2},
expectedNewConnCnt: 3,
expectedSwitchConnCnt: 3,
},
"DelayedDisconnectRetry": {
mockServerOpts: func(t *testing.T, _ map[uint64]bool, disconnected *bool, disconnect func()) []kvsm.KinesisVideoServerOption {
Expand All @@ -317,8 +335,10 @@ func TestProvider(t *testing.T) {
}),
}
},
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected1, expected2},
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected1, expected2},
expectedNewConnCnt: 3,
expectedSwitchConnCnt: 3,
},
}

Expand Down Expand Up @@ -353,6 +373,7 @@ func TestProvider(t *testing.T) {

var cntErr, cntTag uint32
var skipBelow uint32
var cntNewConn, cntSwitchConn uint32
opts := []kvm.PutMediaOption{
kvm.WithFragmentTimecodeType(kvm.FragmentTimecodeTypeRelative),
kvm.WithProducerStartTimestamp(startTimestamp),
Expand All @@ -370,6 +391,12 @@ func TestProvider(t *testing.T) {
}
}
}),
kvm.OnPutMediaNewConn(func() {
atomic.AddUint32(&cntNewConn, 1)
}),
kvm.OnPutMediaSwitchConn(func(uint64) {
atomic.AddUint32(&cntSwitchConn, 1)
}),
}
opts = append(opts, testCase.putMediaOpts...)
w, err := pro.PutMedia(opts...)
Expand Down Expand Up @@ -425,6 +452,12 @@ func TestProvider(t *testing.T) {
)
}

if int(cntNewConn) != testCase.expectedNewConnCnt {
t.Errorf("Expected count of new connection: %d, got: %d", testCase.expectedNewConnCnt, cntNewConn)
}
if int(cntSwitchConn) != testCase.expectedSwitchConnCnt {
t.Errorf("Expected count of switch connection: %d, got: %d", testCase.expectedSwitchConnCnt, cntSwitchConn)
}
for _, fragment := range testCase.expected {
actual, ok := server.GetFragment(fragment.Cluster.Timecode)
if !ok {
Expand Down

0 comments on commit 65e7d87

Please sign in to comment.