Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor thumnailer code #22

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions pkg/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/Glimesh/waveguide/pkg/orchestrator"
"github.com/Glimesh/waveguide/pkg/service"
"github.com/Glimesh/waveguide/pkg/types"
"github.com/pion/rtp"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -152,11 +153,8 @@ func (ctrl *Control) StartStream(channelID types.ChannelID) (*Stream, error) {

go ctrl.setupHeartbeat(channelID)

// Really gross, I'm sorry.
whepEndpoint := fmt.Sprintf("%s/whep/endpoint", ctrl.HTTPServerURL())
go func() {
err := stream.thumbnailer(ctx, whepEndpoint)
if err != nil {
if err := stream.Ingest(ctx); err != nil { //nolint not shadowed
stream.log.Error(err)
ctrl.StopStream(channelID)
}
Expand Down Expand Up @@ -340,21 +338,21 @@ func (ctrl *Control) sendThumbnail(channelID types.ChannelID) (err error) {

func (ctrl *Control) newStream(channelID types.ChannelID, cancelFunc context.CancelFunc) (*Stream, error) {
stream := &Stream{
log: ctrl.log.WithField("channel_id", channelID),
ChannelID: channelID,

log: ctrl.log.WithField("channel_id", channelID),
whepURI: ctrl.HTTPServerURL() + "/whep/endpoint/" + channelID.String(),
authenticated: true,

cancelFunc: cancelFunc,
authenticated: true,
mediaStarted: false,
ChannelID: channelID,
keyframer: NewKeyframer(),
rtpIngest: make(chan *rtp.Packet),
stopHeartbeat: make(chan struct{}, 1),
stopThumbnailer: make(chan struct{}, 1),
// 10 keyframes in 5 seconds is probably a bit extreme
lastThumbnail: make(chan []byte, 10),
startTime: time.Now().Unix(),
totalAudioPackets: 0,
totalVideoPackets: 0,
clientVendorName: "",
clientVendorVersion: "",
lastThumbnail: make(chan []byte, 10),

startTime: time.Now().Unix(),
}

if _, exists := ctrl.streams[channelID]; exists {
Expand Down
5 changes: 1 addition & 4 deletions pkg/control/keyframer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (kf *Keyframer) Reset() {
kf.packets = make(map[uint16][]byte)
}

func (kf *Keyframer) WriteRTP(p *rtp.Packet) []byte {
func (kf *Keyframer) KeyFrame(p *rtp.Packet) []byte {
// fmt.Printf("frameStarted=%t\n", kf.frameStarted)
// Frame has started, but timestamps don't match, continue
if kf.frameStarted && kf.timestamp != p.Timestamp {
Expand Down Expand Up @@ -79,10 +79,7 @@ func (kf *Keyframer) WriteRTP(p *rtp.Packet) []byte {
kf.lastFullKeyframe = newFrame

return newFrame
} else {
// fmt.Println("No marker, no end")
}
// fmt.Println(p)

return nil
}
Expand Down
15 changes: 9 additions & 6 deletions pkg/control/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/Glimesh/waveguide/pkg/types"

"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/sirupsen/logrus"
)
Expand All @@ -23,17 +24,20 @@ type Stream struct {

// authenticated is set after the stream has successfully authed with a remote service
authenticated bool

whepURI string

// mediaStarted is set after media bytes have come in from the client
mediaStarted bool
hasSomeAudio bool
hasSomeVideo bool

stopHeartbeat chan struct{}

keyframer *Keyframer
rtpIngest chan *rtp.Packet
lastThumbnail chan []byte
// channel used to signal thumbnailer to stop
stopThumbnailer chan struct{}

lastThumbnail chan []byte
stopHeartbeat chan struct{}

ChannelID types.ChannelID
StreamID types.StreamID
Expand Down Expand Up @@ -90,11 +94,10 @@ func (s *Stream) ReportMetadata(metadatas ...Metadata) error {
func (s *Stream) Stop() {
s.log.Infof("stopping stream")

s.cancelFunc()
s.stopHeartbeat <- struct{}{} // not being used anywhere, is it really needed?

s.stopThumbnailer <- struct{}{}
s.log.Debug("sent stop thumbnailer signal")

s.cancelFunc()
s.log.Debug("canceled stream ctx")
}
160 changes: 97 additions & 63 deletions pkg/control/thumbnailer.go
Original file line number Diff line number Diff line change
@@ -1,117 +1,151 @@
package control

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"strings"

"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media/h264writer"
)

// Note: This type of functionality will be common in Waveguide
// However we should not do it like this :D
func (s *Stream) thumbnailer(ctx context.Context, whepEndpoint string) error {
log := s.log.WithField("app", "thumbnailer")
type header struct {
key string
value string
}

func (s *Stream) Ingest(ctx context.Context) error {
logger := s.log.WithField("app", "ingest")
done := make(chan struct{}, 1)
go s.startIngestor(done)

log.Info("Started Thumbnailer")
// Create a new PeerConnection
peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{}) //nolint exhaustive struct
pc, err := webrtc.NewPeerConnection(webrtc.Configuration{}) //nolint exhaustive struct
if err != nil {
return err
}
defer peerConnection.Close()
defer pc.Close()

peerConnection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
kfer := NewKeyframer()
pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
codec := track.Codec()

if codec.MimeType == "video/H264" {
for {
select {
case <-ctx.Done():
log.Debug("received ctx cancel signal")
if ctx.Err() != nil {
return
}

p, _, readErr := track.ReadRTP()
if readErr != nil {
continue
}

select {
case s.rtpIngest <- p:
default:
// Read RTP Packets in a loop
p, _, readErr := track.ReadRTP()
if readErr != nil {
// Don't kill the thumbnailer after one weird RTP packet
continue
}

keyframe := kfer.WriteRTP(p)
if keyframe != nil {
// fmt.Printf("!!! PEER KEYFRAME !!! %s\n\n", kfer)
// saveImage(int(p.SequenceNumber), keyframe)
// os.WriteFile(fmt.Sprintf("%d-peer.h264", p.SequenceNumber), keyframe, 0666)
s.lastThumbnail <- keyframe
kfer.Reset()
}
}
}
}
})

url := fmt.Sprintf("%s/%d", whepEndpoint, s.ChannelID)
req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte{}))
if err != nil {
if err := s.setupPeerConnection(pc); err != nil {
return err
}
req.Header.Set("Accept", "application/sdp")

<-ctx.Done()
logger.Debug("received ctx done signal")
done <- struct{}{}
close(s.rtpIngest)

return nil
}

func doHTTPRequest(uri, method string, body io.Reader, headers ...header) (*http.Response, error) {
req, err := http.NewRequest(method, uri, body)
if err != nil {
return nil, err
}

for _, header := range headers {
req.Header.Set(header.key, header.value)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}

return resp, nil
}

type Option struct {
VideoWriter *h264writer.H264Writer
}

func (s *Stream) setupPeerConnection(pc *webrtc.PeerConnection) error {
sdpHeader := header{"Accept", "application/sdp"}
resp, err := doHTTPRequest(
s.whepURI,
http.MethodPost,
strings.NewReader(""),
sdpHeader,
)
if err != nil {
return err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)

offer, err := io.ReadAll(resp.Body)
if err != nil {
return err
}

if err := peerConnection.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: string(body),
}); err != nil {
if err := pc.SetRemoteDescription(
webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: string(offer),
}); err != nil {
return err
}

answer, err := peerConnection.CreateAnswer(nil)
answerSDP, err := pc.CreateAnswer(nil)
if err != nil {
return err
}

// Create channel that is blocked until ICE Gathering is complete
gatherComplete := webrtc.GatheringCompletePromise(peerConnection)

if err := peerConnection.SetLocalDescription(answer); err != nil {
gatherComplete := webrtc.GatheringCompletePromise(pc)
if err := pc.SetLocalDescription(answerSDP); err != nil {
return err
}

// Block until ICE Gathering is complete, disabling trickle ICE
// we do this because we only can exchange one signaling message
// in a production application you should exchange ICE Candidates via OnICECandidate
<-gatherComplete

answerSdp := peerConnection.LocalDescription().SDP
req2, err := http.NewRequest("POST", resp.Header.Get("location"), bytes.NewBufferString(answerSdp))
if err != nil {
return err
}
req2.Header.Set("Accept", "application/sdp")
_, err = http.DefaultClient.Do(req2)
answer := pc.LocalDescription().SDP
_, err = doHTTPRequest( //nolint response is ignored
resp.Header.Get("location"),
http.MethodPost,
strings.NewReader(answer),
sdpHeader,
)
if err != nil {
return err
}

select {
case <-ctx.Done():
log.Debug("received ctx done signal")
case <-s.stopThumbnailer:
log.Debug("received kill peersnap signal")
}
log.Info("ending thumbnailer")

return nil
}

func (s *Stream) startIngestor(done <-chan struct{}) {
LOOP:
for {
select {
case p := <-s.rtpIngest:
keyframe := s.keyframer.KeyFrame(p)
if keyframe != nil {
s.lastThumbnail <- keyframe
s.keyframer.Reset()
}
case <-done:
break LOOP
}
}
s.log.Debug("ending rtp ingestor")
}