Skip to content

Commit

Permalink
Add PutMediaOption to set logger (#287)
Browse files Browse the repository at this point in the history
  • Loading branch information
at-wat authored Jul 7, 2022
1 parent b719956 commit 531727c
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 8 deletions.
25 changes: 17 additions & 8 deletions provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type PutMediaOptions struct {
retryCount int
retryIntervalBase time.Duration
fragmentHeadDumpLen int
logger LoggerIF
}

type PutMediaOption func(*PutMediaOptions)
Expand Down Expand Up @@ -154,15 +155,17 @@ func (c *connection) numBlock() int {
}

func (p *Provider) PutMedia(ch chan *BlockWithBaseTimecode, chResp chan FragmentEvent, opts ...PutMediaOption) {
options := &PutMediaOptions{
var options *PutMediaOptions
options = &PutMediaOptions{
title: "kinesisvideomanager.Provider",
fragmentTimecodeType: FragmentTimecodeTypeRelative,
producerStartTimestamp: "0",
connectionTimeout: 15 * time.Second,
onError: func(err error) { Logger().Error(err) },
onError: func(err error) { options.logger.Error(err) },
httpClient: http.Client{
Timeout: 15 * time.Second,
},
logger: Logger(),
}
for _, o := range opts {
o(options)
Expand Down Expand Up @@ -205,7 +208,7 @@ func (p *Provider) PutMedia(ch chan *BlockWithBaseTimecode, chResp chan Fragment
if lastAbsTime != 0 {
diff := int64(absTime - lastAbsTime)
if diff < 0 || diff > math.MaxInt16 {
Logger().Warnf(
options.logger.Warnf(
`Invalid timecode: { StreamID: "%s", Timecode: %d, last: %d, diff: %d }`,
p.streamID, bt.AbsTimecode(), lastAbsTime, diff,
)
Expand All @@ -214,12 +217,12 @@ func (p *Provider) PutMedia(ch chan *BlockWithBaseTimecode, chResp chan Fragment
}

if conn == nil || (nextConn == nil && int16(absTime-conn.baseTimecode) > 8000) {
Logger().Debugf(`Prepare next connection: { StreamID: "%s" }`, p.streamID)
options.logger.Debugf(`Prepare next connection: { StreamID: "%s" }`, p.streamID)
nextConn = newConnection()
chConnection <- nextConn
}
if conn == nil || int16(absTime-conn.baseTimecode) > 9000 {
Logger().Debugf(`Switch to next connection: { StreamID: "%s", AbsTime: %d }`, p.streamID, absTime)
options.logger.Debugf(`Switch to next connection: { StreamID: "%s", AbsTime: %d }`, p.streamID, absTime)
if conn != nil {
conn.close()
}
Expand All @@ -236,11 +239,11 @@ func (p *Provider) PutMedia(ch chan *BlockWithBaseTimecode, chResp chan Fragment
conn.countBlock()
lastAbsTime = absTime
case <-timeout:
Logger().Warnf(`Sending block timed out, clean connections: { StreamID: "%s" }`, p.streamID)
options.logger.Warnf(`Sending block timed out, clean connections: { StreamID: "%s" }`, p.streamID)
cleanConnections()
}
case <-timeout:
Logger().Warnf(`Receiving block timed out, clean connections: { StreamID: "%s" }`, p.streamID)
options.logger.Warnf(`Receiving block timed out, clean connections: { StreamID: "%s" }`, p.streamID)
cleanConnections()
}
}
Expand Down Expand Up @@ -390,7 +393,7 @@ func (p *Provider) putMedia(conn *connection, chResp chan FragmentEvent, opts *P
for i := 0; i < opts.retryCount; i++ {
time.Sleep(interval)

Logger().Infof(
opts.logger.Infof(
`Retrying PutMedia: { StreamID: "%s", RetryCount: %d, Err: %s }`,
p.streamID, i,
string(regexAmzCredHeader.ReplaceAll([]byte(strconv.Quote(err.Error())), []byte("X-Amz-$1=***"))),
Expand Down Expand Up @@ -532,3 +535,9 @@ func WithPutMediaRetry(count int, intervalBase time.Duration) PutMediaOption {
p.retryIntervalBase = intervalBase
}
}

func WithPutMediaLogger(logger LoggerIF) PutMediaOption {
return func(p *PutMediaOptions) {
p.logger = logger
}
}
57 changes: 57 additions & 0 deletions provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,49 @@ func TestProvider_WithHttpClient(t *testing.T) {
}
}

func TestProvider_WithPutMediaLogger(t *testing.T) {
var wg sync.WaitGroup
defer wg.Wait()

server := kvsm.NewKinesisVideoServer()
defer server.Close()

pro := newProvider(t, server)

ch := make(chan *kvm.BlockWithBaseTimecode)
wg.Add(1)
go func() {
defer func() {
close(ch)
wg.Done()
}()
ch <- &kvm.BlockWithBaseTimecode{
Timecode: 1000,
Block: newBlock(0),
}
ch <- &kvm.BlockWithBaseTimecode{
Timecode: 1000,
Block: newBlock(-100),
}
}()

chResp := make(chan kvm.FragmentEvent)
wg.Add(1)
go func() {
defer wg.Done()
for range chResp {
}
}()

var logger dummyWarnfLogger
pro.PutMedia(ch, chResp, kvm.WithPutMediaLogger(&logger), kvm.OnError(func(error) {}))

expected := `Invalid timecode: { StreamID: "test-stream", Timecode: 900, last: 1000, diff: -100 }`
if expected != logger.lastErr {
t.Errorf("Expected log: '%s', got: '%s'", expected, logger.lastErr)
}
}

func newProvider(t *testing.T, server *kvsm.KinesisVideoServer) *kvm.Provider {
cfg := &aws.Config{
Credentials: credentials.NewStaticCredentials("key", "secret", "token"),
Expand Down Expand Up @@ -507,6 +550,20 @@ func newBlock(timecode int16) ebml.Block {
Data: testData,
}
}

func newTags(tags []kvm.SimpleTag) kvsm.TagsTest {
return kvsm.TagsTest{Tag: []kvm.Tag{{SimpleTag: tags}}}
}

type dummyWarnfLogger struct {
kvm.LoggerIF

lastErr string
}

func (l *dummyWarnfLogger) Warnf(format string, args ...interface{}) {
l.lastErr = fmt.Sprintf(format, args...)
}

func (l *dummyWarnfLogger) Debugf(format string, args ...interface{}) {
}

0 comments on commit 531727c

Please sign in to comment.