From 40295f876d88b2145d20838c78277ac163b16c15 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Wed, 26 Jun 2024 09:03:05 +0800 Subject: [PATCH] refactor rtsp PushSession, new func WithSdpLogicContext and Start instead of Push --- .../pullrtmp2pushrtsp/pullrtmp2pushrtsp.go | 2 +- .../pullrtsp2pushrtsp/pullrtsp2pushrtsp.go | 4 +- pkg/rtsp/client_push_session.go | 119 ++++++++++-------- 3 files changed, 73 insertions(+), 52 deletions(-) diff --git a/app/demo/pullrtmp2pushrtsp/pullrtmp2pushrtsp.go b/app/demo/pullrtmp2pushrtsp/pullrtmp2pushrtsp.go index 9f1cd7f6..a4f11e74 100644 --- a/app/demo/pullrtmp2pushrtsp/pullrtmp2pushrtsp.go +++ b/app/demo/pullrtmp2pushrtsp/pullrtmp2pushrtsp.go @@ -39,7 +39,7 @@ func main() { func(sdpCtx sdp.LogicContext) { // remuxer完成前期工作,生成sdp并开始push nazalog.Info("start push.") - err := pushSession.Push(outRtspUrl, sdpCtx) + err := pushSession.WithSdpLogicContext(sdpCtx).Start(outRtspUrl) nazalog.Assert(nil, err) nazalog.Info("push succ.") diff --git a/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go b/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go index d14ff5e1..640e8bcb 100644 --- a/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go +++ b/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go @@ -75,8 +75,8 @@ func (r *RtspTunnel) Start() error { r.pushSession = rtsp.NewPushSession(func(option *rtsp.PushSessionOption) { option.PushTimeoutMs = 10000 option.OverTcp = r.pushOverTcp - }) - if err := r.pushSession.Push(r.pushUrl, sdpCtx); err != nil { + }).WithSdpLogicContext(sdpCtx) + if err := r.pushSession.Start(r.pushUrl); err != nil { nazalog.Errorf("[%s] start push failed. err=%+v, url=%s", r.uniqueKey, err, r.pushUrl) return err } diff --git a/pkg/rtsp/client_push_session.go b/pkg/rtsp/client_push_session.go index 309265f6..763aff06 100644 --- a/pkg/rtsp/client_push_session.go +++ b/pkg/rtsp/client_push_session.go @@ -32,6 +32,8 @@ type PushSession struct { cmdSession *ClientCommandSession baseOutSession *BaseOutSession + sdpCtx *sdp.LogicContext + disposeOnce sync.Once waitChan chan error } @@ -58,57 +60,24 @@ func NewPushSession(modOptions ...ModPushSessionOption) *PushSession { return s } -// Push 阻塞直到和对端完成推流前,握手部分的工作(也即收到RTSP Record response),或者发生错误 -func (session *PushSession) Push(rawUrl string, sdpCtx sdp.LogicContext) error { - Log.Debugf("[%s] push. url=%s", session.UniqueKey(), rawUrl) - session.cmdSession.InitWithSdp(sdpCtx) - session.baseOutSession.InitWithSdp(sdpCtx) - if err := session.cmdSession.Do(rawUrl); err != nil { - _ = session.dispose(err) - return err - } - - go func() { - var cmdSessionDisposed, baseInSessionDisposed bool - var retErr error - var retErrFlag bool - LOOP: - for { - var err error - select { - case err = <-session.cmdSession.WaitChan(): - if err != nil { - _ = session.baseOutSession.Dispose() - } - if cmdSessionDisposed { - Log.Errorf("[%s] cmd session disposed already.", session.UniqueKey()) - } - cmdSessionDisposed = true - case err = <-session.baseOutSession.WaitChan(): - // err是nil时,表示是被PullSession::Dispose主动销毁,那么cmdSession也会被销毁,就不需要我们再调用cmdSession.Dispose了 - if err != nil { - _ = session.cmdSession.Dispose() - } - if baseInSessionDisposed { - Log.Errorf("[%s] base in session disposed already.", session.UniqueKey()) - } - baseInSessionDisposed = true - } // select loop - - // 第一个错误作为返回值 - if !retErrFlag { - retErr = err - retErrFlag = true - } - if cmdSessionDisposed && baseInSessionDisposed { - break LOOP - } - } // for loop +func (session *PushSession) WithSdpLogicContext(sdpCtx sdp.LogicContext) *PushSession { + session.sdpCtx = &sdp.LogicContext{} + *session.sdpCtx = sdpCtx + return session +} - session.waitChan <- retErr - }() +// Start 阻塞直到和对端完成推流前,握手部分的工作(也即收到RTSP Record response),或者发生错误 +func (session *PushSession) Start(rawUrl string) error { + if session.sdpCtx == nil { + Log.Errorf("[%s] sdp logic context not set.", session) + return base.ErrRtsp + } + return session.push(rawUrl) +} - return nil +// Push deprecated. use WithSdpLogicContext and Start instead. +func (session *PushSession) Push(rawUrl string, sdpCtx sdp.LogicContext) error { + return session.WithSdpLogicContext(sdpCtx).Start(rawUrl) } func (session *PushSession) WriteRtpPacket(packet rtprtcp.RtpPacket) error { @@ -228,6 +197,58 @@ func (session *PushSession) WriteInterleavedPacket(packet []byte, channel int) e // --------------------------------------------------------------------------------------------------------------------- +func (session *PushSession) push(rawUrl string) error { + Log.Debugf("[%s] push. url=%s", session.UniqueKey(), rawUrl) + session.cmdSession.InitWithSdp(*session.sdpCtx) + session.baseOutSession.InitWithSdp(*session.sdpCtx) + if err := session.cmdSession.Do(rawUrl); err != nil { + _ = session.dispose(err) + return err + } + + go func() { + var cmdSessionDisposed, baseInSessionDisposed bool + var retErr error + var retErrFlag bool + LOOP: + for { + var err error + select { + case err = <-session.cmdSession.WaitChan(): + if err != nil { + _ = session.baseOutSession.Dispose() + } + if cmdSessionDisposed { + Log.Errorf("[%s] cmd session disposed already.", session.UniqueKey()) + } + cmdSessionDisposed = true + case err = <-session.baseOutSession.WaitChan(): + // err是nil时,表示是被PullSession::Dispose主动销毁,那么cmdSession也会被销毁,就不需要我们再调用cmdSession.Dispose了 + if err != nil { + _ = session.cmdSession.Dispose() + } + if baseInSessionDisposed { + Log.Errorf("[%s] base in session disposed already.", session.UniqueKey()) + } + baseInSessionDisposed = true + } // select loop + + // 第一个错误作为返回值 + if !retErrFlag { + retErr = err + retErrFlag = true + } + if cmdSessionDisposed && baseInSessionDisposed { + break LOOP + } + } // for loop + + session.waitChan <- retErr + }() + + return nil +} + func (session *PushSession) dispose(err error) error { var retErr error session.disposeOnce.Do(func() {