Skip to content

Commit

Permalink
refactor rtsp PushSession, new func WithSdpLogicContext and Start ins…
Browse files Browse the repository at this point in the history
…tead of Push
  • Loading branch information
q191201771 committed Jun 26, 2024
1 parent 3ad5736 commit 40295f8
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 52 deletions.
2 changes: 1 addition & 1 deletion app/demo/pullrtmp2pushrtsp/pullrtmp2pushrtsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {
func(sdpCtx sdp.LogicContext) {
// remuxer完成前期工作,生成sdp并开始push
nazalog.Info("start push.")
err := pushSession.Push(outRtspUrl, sdpCtx)
err := pushSession.WithSdpLogicContext(sdpCtx).Start(outRtspUrl)
nazalog.Assert(nil, err)
nazalog.Info("push succ.")

Expand Down
4 changes: 2 additions & 2 deletions app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func (r *RtspTunnel) Start() error {
r.pushSession = rtsp.NewPushSession(func(option *rtsp.PushSessionOption) {
option.PushTimeoutMs = 10000
option.OverTcp = r.pushOverTcp
})
if err := r.pushSession.Push(r.pushUrl, sdpCtx); err != nil {
}).WithSdpLogicContext(sdpCtx)
if err := r.pushSession.Start(r.pushUrl); err != nil {
nazalog.Errorf("[%s] start push failed. err=%+v, url=%s", r.uniqueKey, err, r.pushUrl)
return err
}
Expand Down
119 changes: 70 additions & 49 deletions pkg/rtsp/client_push_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type PushSession struct {
cmdSession *ClientCommandSession
baseOutSession *BaseOutSession

sdpCtx *sdp.LogicContext

disposeOnce sync.Once
waitChan chan error
}
Expand All @@ -58,57 +60,24 @@ func NewPushSession(modOptions ...ModPushSessionOption) *PushSession {
return s
}

// Push 阻塞直到和对端完成推流前,握手部分的工作(也即收到RTSP Record response),或者发生错误
func (session *PushSession) Push(rawUrl string, sdpCtx sdp.LogicContext) error {
Log.Debugf("[%s] push. url=%s", session.UniqueKey(), rawUrl)
session.cmdSession.InitWithSdp(sdpCtx)
session.baseOutSession.InitWithSdp(sdpCtx)
if err := session.cmdSession.Do(rawUrl); err != nil {
_ = session.dispose(err)
return err
}

go func() {
var cmdSessionDisposed, baseInSessionDisposed bool
var retErr error
var retErrFlag bool
LOOP:
for {
var err error
select {
case err = <-session.cmdSession.WaitChan():
if err != nil {
_ = session.baseOutSession.Dispose()
}
if cmdSessionDisposed {
Log.Errorf("[%s] cmd session disposed already.", session.UniqueKey())
}
cmdSessionDisposed = true
case err = <-session.baseOutSession.WaitChan():
// err是nil时,表示是被PullSession::Dispose主动销毁,那么cmdSession也会被销毁,就不需要我们再调用cmdSession.Dispose了
if err != nil {
_ = session.cmdSession.Dispose()
}
if baseInSessionDisposed {
Log.Errorf("[%s] base in session disposed already.", session.UniqueKey())
}
baseInSessionDisposed = true
} // select loop

// 第一个错误作为返回值
if !retErrFlag {
retErr = err
retErrFlag = true
}
if cmdSessionDisposed && baseInSessionDisposed {
break LOOP
}
} // for loop
func (session *PushSession) WithSdpLogicContext(sdpCtx sdp.LogicContext) *PushSession {
session.sdpCtx = &sdp.LogicContext{}
*session.sdpCtx = sdpCtx
return session
}

session.waitChan <- retErr
}()
// Start 阻塞直到和对端完成推流前,握手部分的工作(也即收到RTSP Record response),或者发生错误
func (session *PushSession) Start(rawUrl string) error {
if session.sdpCtx == nil {
Log.Errorf("[%s] sdp logic context not set.", session)
return base.ErrRtsp
}
return session.push(rawUrl)
}

return nil
// Push deprecated. use WithSdpLogicContext and Start instead.
func (session *PushSession) Push(rawUrl string, sdpCtx sdp.LogicContext) error {
return session.WithSdpLogicContext(sdpCtx).Start(rawUrl)
}

func (session *PushSession) WriteRtpPacket(packet rtprtcp.RtpPacket) error {
Expand Down Expand Up @@ -228,6 +197,58 @@ func (session *PushSession) WriteInterleavedPacket(packet []byte, channel int) e

// ---------------------------------------------------------------------------------------------------------------------

func (session *PushSession) push(rawUrl string) error {
Log.Debugf("[%s] push. url=%s", session.UniqueKey(), rawUrl)
session.cmdSession.InitWithSdp(*session.sdpCtx)
session.baseOutSession.InitWithSdp(*session.sdpCtx)
if err := session.cmdSession.Do(rawUrl); err != nil {
_ = session.dispose(err)
return err
}

go func() {
var cmdSessionDisposed, baseInSessionDisposed bool
var retErr error
var retErrFlag bool
LOOP:
for {
var err error
select {
case err = <-session.cmdSession.WaitChan():
if err != nil {
_ = session.baseOutSession.Dispose()
}
if cmdSessionDisposed {
Log.Errorf("[%s] cmd session disposed already.", session.UniqueKey())
}
cmdSessionDisposed = true
case err = <-session.baseOutSession.WaitChan():
// err是nil时,表示是被PullSession::Dispose主动销毁,那么cmdSession也会被销毁,就不需要我们再调用cmdSession.Dispose了
if err != nil {
_ = session.cmdSession.Dispose()
}
if baseInSessionDisposed {
Log.Errorf("[%s] base in session disposed already.", session.UniqueKey())
}
baseInSessionDisposed = true
} // select loop

// 第一个错误作为返回值
if !retErrFlag {
retErr = err
retErrFlag = true
}
if cmdSessionDisposed && baseInSessionDisposed {
break LOOP
}
} // for loop

session.waitChan <- retErr
}()

return nil
}

func (session *PushSession) dispose(err error) error {
var retErr error
session.disposeOnce.Do(func() {
Expand Down

0 comments on commit 40295f8

Please sign in to comment.