Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

record-tester: added access control testing #220

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/recordtester/recordtester.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func main() {
testMP4 := fs.Bool("mp4", false, "Download MP4 of recording")
testStreamHealth := fs.Bool("stream-health", false, "Check stream health during test")
testVod := fs.Bool("vod", false, "Check VOD workflow")
testAccessControl := fs.Bool("access-control", false, "Test access control")
testRecording := fs.Bool("recording", true, "Test recordings")
signingKey := fs.String("signing-key", "", "Signing key for access control")
publicKey := fs.String("public-key", "", "Public key for access control")
recordObjectStoreId := fs.String("record-object-store-id", "", "ID for the Object Store to use for recording storage. Forwarded to the streams created in the API")
discordURL := fs.String("discord-url", "", "URL of Discord's webhook to send messages to Discord channel")
discordUserName := fs.String("discord-user-name", "", "User name to use when sending messages to Discord")
Expand Down Expand Up @@ -216,6 +220,10 @@ func main() {
UseHTTP: *useHttp,
TestMP4: *testMP4,
TestStreamHealth: *testStreamHealth,
TestAccessControl: *testAccessControl,
TestRecording: *testRecording,
SigningKey: *signingKey,
PublicKey: *publicKey,
}
vtOpts := vodtester.VodTesterOptions{
API: lapi,
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/golang/glog v1.0.0
github.com/gosuri/uilive v0.0.3 // indirect
github.com/gosuri/uiprogress v0.0.1
github.com/livepeer/go-api-client v0.2.9-0.20220916171125-c13c05817515
github.com/livepeer/go-api-client v0.3.2-0.20221111121231-26895f4f5e47
github.com/livepeer/go-livepeer v0.5.31
github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07
github.com/livepeer/leaderboard-serverless v1.0.0
Expand Down Expand Up @@ -53,6 +53,7 @@ require (
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.4.2
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/mock v1.5.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,8 @@ github.com/gogo/protobuf v1.3.0/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs=
github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec/go.mod h1:QZ0nwyI2jOfgRAoBvP+ab5aRr7c9x7lhGEJrKvBwjWI=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
Expand Down Expand Up @@ -710,8 +712,8 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/livepeer/go-api-client v0.2.9-0.20220916171125-c13c05817515 h1:3UvLoSvntPi0Z/yW6zskPmZZwA+lnm0pQVIvG/uBnrE=
github.com/livepeer/go-api-client v0.2.9-0.20220916171125-c13c05817515/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
github.com/livepeer/go-api-client v0.3.2-0.20221111121231-26895f4f5e47 h1:+uRySA5kZDErpf/hOEFz7iQoJqdoxuWnZTAl8i3cruw=
github.com/livepeer/go-api-client v0.3.2-0.20221111121231-26895f4f5e47/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
github.com/livepeer/go-livepeer v0.5.31 h1:LcN+qDnqWRws7fdVYc4ucZPVcLQRs2tehUYCQVnlnRw=
github.com/livepeer/go-livepeer v0.5.31/go.mod h1:cpBikcGWApkx0cyR0Ht+uAym7j3uAwXGpPbvaOA8XUU=
github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw=
Expand Down
132 changes: 94 additions & 38 deletions internal/app/recordtester/recordtester_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package recordtester
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"io/ioutil"
"net/http"
"os"
"time"

"github.com/golang-jwt/jwt/v4"
"github.com/golang/glog"
api "github.com/livepeer/go-api-client"
"github.com/livepeer/joy4/format/mp4"
Expand Down Expand Up @@ -42,6 +44,10 @@ type (
UseHTTP bool
TestMP4 bool
TestStreamHealth bool
TestAccessControl bool
TestRecording bool
SigningKey string
PublicKey string
}

recordTester struct {
Expand All @@ -55,6 +61,10 @@ type (
useHTTP bool
mp4 bool
streamHealth bool
accessControl bool
testRecording bool
signingKey string
publicKey string

// mutable fields
streamID string
Expand All @@ -77,6 +87,10 @@ func NewRecordTester(gctx context.Context, opts RecordTesterOptions) IRecordTest
useHTTP: opts.UseHTTP,
mp4: opts.TestMP4,
streamHealth: opts.TestStreamHealth,
accessControl: opts.TestAccessControl,
testRecording: opts.TestRecording,
signingKey: opts.SigningKey,
publicKey: opts.PublicKey,
}
return rt
}
Expand Down Expand Up @@ -123,7 +137,20 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
streamName := fmt.Sprintf("%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00"))
var stream *api.Stream
for {
stream, err = rt.lapi.CreateStream(api.CreateStreamReq{Name: streamName, Record: true, RecordObjectStoreId: rt.recordObjectStoreId})
streamOptions := api.CreateStreamReq{Name: streamName, Record: rt.testRecording, RecordObjectStoreId: rt.recordObjectStoreId}

if rt.accessControl {
streamOptions.PlaybackPolicy = api.PlaybackPolicy{
Type: "jwt",
}
glog.Infof("Creating stream with access control")
}

if rt.testRecording {
glog.Infof("Creating stream with recording enabled")
}

stream, err = rt.lapi.CreateStream(streamOptions)
if err != nil {
if testers.Timedout(err) && apiTry < 3 {
apiTry++
Expand All @@ -138,6 +165,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
apiTry = 0
rt.streamID = stream.ID
rt.stream = stream
defer rt.lapi.DeleteStream(stream.ID)
messenger.SendMessage(fmt.Sprintf(":information_source: Created stream id=%s", stream.ID))
// createdAPIStreams = append(createdAPIStreams, stream.ID)
glog.V(model.VERBOSE).Infof("Created Livepeer stream id=%s streamKey=%s playbackId=%s name=%s", stream.ID, stream.StreamKey, stream.PlaybackID, streamName)
Expand All @@ -156,6 +184,19 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
mediaURL := fmt.Sprintf("%s/%s/index.m3u8", ingest.Playback, stream.PlaybackID)
glog.V(model.SHORT).Infof("RTMP: %s", rtmpURL)
glog.V(model.SHORT).Infof("MEDIA: %s", mediaURL)

if rt.accessControl && (rt.signingKey != "" && rt.publicKey != "") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having accessControl and not the signing key should be an explicit fatal error. We should not ignore the setting because another required option was not provided, it is better to just fail the whole test.

Suggested change
if rt.accessControl && (rt.signingKey != "" && rt.publicKey != "") {
if rt.accessControl {
if rt.signingKey == "" || rt.publicKey == "" {
msg := "Signing key and public key are required for access control test"
glog.Fatal(msg)
return 1, errors.New(msg) // (Fatal log above already exits the process but you know just in case, leave it clear here that we end with an error)
}

token, err := rt.signJwt(stream)
if err != nil {
return 1, err
}
mediaURL = fmt.Sprintf("%s?jwt=%s", mediaURL, token)
glog.V(model.VERBOSE).Infof("URL with access control for stream id=%s playbackId=%s name=%s mediaURL=%s", stream.ID, stream.PlaybackID, streamName, mediaURL)
} else {
glog.Warningf("No access control for stream id=%s playbackId=%s name=%s mediaURL=%s", stream.ID, stream.PlaybackID, streamName, mediaURL)
return 2, nil
}
Comment on lines +195 to +198
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason to log this as a warning or returning. Executing a test without access control is perfectly acceptable. And we already log above when we use access control so no need to log when we don't.

Suggested change
} else {
glog.Warningf("No access control for stream id=%s playbackId=%s name=%s mediaURL=%s", stream.ID, stream.PlaybackID, streamName, mediaURL)
return 2, nil
}
}


if rt.useHTTP {
sterr := rt.doOneHTTPStream(fileName, streamName, broadcasters[0], testDuration, stream)
if sterr != nil {
Expand Down Expand Up @@ -280,51 +321,41 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
return 0, err
}

sess = sessions[0]
statusShould := livepeer.RecordingStatusReady
if rt.useForceURL {
statusShould = livepeer.RecordingStatusWaiting
}
if sess.RecordingStatus != statusShould {
err := fmt.Errorf("recording status is %s but should be %s", sess.RecordingStatus, statusShould)
return 240, err
// exit(250, fileName, *fileArg, err)
}
if sess.RecordingURL == "" {
err := fmt.Errorf("recording URL should appear by now")
return 249, err
// exit(249, fileName, *fileArg, err)
}
glog.Infof("recordingURL=%s downloading now", sess.RecordingURL)
if rt.testRecording {
sess = sessions[0]
statusShould := livepeer.RecordingStatusReady
if rt.useForceURL {
statusShould = livepeer.RecordingStatusWaiting
}
if sess.RecordingStatus != statusShould {
err := fmt.Errorf("recording status is %s but should be %s", sess.RecordingStatus, statusShould)
return 240, err
}
if sess.RecordingURL == "" {
err := fmt.Errorf("recording URL should appear by now")
return 249, err
}
glog.Infof("recordingURL=%s downloading now", sess.RecordingURL)
if err = rt.isCancelled(); err != nil {
return 0, err
}
if rt.mp4 {
es, err := rt.checkDownMp4(stream, sess.Mp4Url, testDuration, pauseDuration > 0)
if err != nil {
return es, err
}
}

// started := time.Now()
// downloader := testers.NewM3utester2(gctx, sess.RecordingURL, false, false, false, false, 5*time.Second, nil)
// <-downloader.Done()
// glog.Infof(`Pulling stopped after %s`, time.Since(started))
// exit(55, fileName, *fileArg, err)
glog.Info("Done Record Test")
es, err := rt.checkDown(stream, sess.RecordingURL, testDuration, pauseDuration > 0)

// lapi.DeleteStream(stream.ID)
// exit(0, fileName, *fileArg, err)
if err = rt.isCancelled(); err != nil {
return 0, err
}
if rt.mp4 {
es, err := rt.checkDownMp4(stream, sess.Mp4Url, testDuration, pauseDuration > 0)
if err != nil {
return es, err
}
}

es, err := rt.checkDown(stream, sess.RecordingURL, testDuration, pauseDuration > 0)
if es == 0 {
rt.lapi.DeleteStream(stream.ID)
// exit(0, fileName, *fileArg, err)
}
glog.Info("Done Record Test")

// uploader := testers.NewRtmpStreamer(gctx, rtmpURL)
// uploader.StartUpload(fileName, rtmpURL, -1, 30*time.Second)
return es, err
return 0, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We know err is nil here so we can be explicit. Just so one doesn't start wondering here where this err could be instead of nil (from what I checked, it can't)

Suggested change
return 0, err
return 0, nil

}

func (rt *recordTester) getIngestInfo() (*api.Ingest, error) {
Expand Down Expand Up @@ -389,6 +420,31 @@ func (rt *recordTester) isCancelled() error {
return nil
}

func (rt *recordTester) signJwt(stream *api.Stream) (string, error) {
expiration := time.Now().Add(time.Minute * 5).Unix()
unsignedToken := jwt.NewWithClaims(jwt.SigningMethodES256, jwt.MapClaims{
"sub": stream.PlaybackID,
"pub": rt.publicKey,
"exp": expiration,
})

decodedPrivateKey, _ := base64.StdEncoding.DecodeString(rt.signingKey)

pk, err := jwt.ParseECPrivateKeyFromPEM(decodedPrivateKey)

if err != nil {
glog.Errorf("Unable to parse provided signing key for access control signingKey=%s", rt.signingKey)
}

token, err := unsignedToken.SignedString(pk)

if err != nil {
glog.Errorf("Unable to sign JWT with provided private key for access control signingKey=%s", rt.signingKey)
}
Comment on lines +435 to +443
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to return all these errors. Cannot just log.

Suggested change
if err != nil {
glog.Errorf("Unable to parse provided signing key for access control signingKey=%s", rt.signingKey)
}
token, err := unsignedToken.SignedString(pk)
if err != nil {
glog.Errorf("Unable to sign JWT with provided private key for access control signingKey=%s", rt.signingKey)
}
if err != nil {
glog.Errorf("Unable to parse provided signing key for access control signingKey=%s", rt.signingKey)
return "", err
}
token, err := unsignedToken.SignedString(pk)
if err != nil {
glog.Errorf("Unable to sign JWT with provided private key for access control signingKey=%s", rt.signingKey)
return "", err
}


return token, nil
}

func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDuration time.Duration, doubled bool) (int, error) {
es := 0
started := time.Now()
Expand Down