Files
rtsp-simple-server/internal/servers/rtsp/server.go
T
Alessandro Ros d4c6f95291 dump unencrypted TLS sessions (#5624)
when dumpPackets is true, embed TLS master keys into the dump, in a
format which is natively compatible with Wireshark.
2026-04-04 14:46:43 +02:00

535 lines
13 KiB
Go

// Package rtsp contains a RTSP server.
package rtsp
import (
"context"
"crypto/tls"
"errors"
"fmt"
"reflect"
"sort"
"strings"
"sync"
"time"
"github.com/bluenviron/gortsplib/v5"
"github.com/bluenviron/gortsplib/v5/pkg/auth"
"github.com/bluenviron/gortsplib/v5/pkg/base"
"github.com/bluenviron/gortsplib/v5/pkg/liberrors"
"github.com/google/uuid"
"github.com/bluenviron/mediamtx/internal/certloader"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/packetdumper"
)
// ErrConnNotFound is returned when a connection is not found.
var ErrConnNotFound = errors.New("connection not found")
// ErrSessionNotFound is returned when a session is not found.
var ErrSessionNotFound = errors.New("session not found")
func interfaceIsEmpty(i any) bool {
return reflect.ValueOf(i).Kind() != reflect.Pointer || reflect.ValueOf(i).IsNil()
}
func printAddresses(srv *gortsplib.Server) string {
var ret []string
tmp := srv.RTSPAddress
if srv.TLSConfig == nil {
tmp += " (TCP/RTSP)"
} else {
tmp += " (TCP/RTSPS)"
}
ret = append(ret, tmp)
if srv.UDPRTPAddress != "" {
tmp = srv.UDPRTPAddress
if srv.TLSConfig == nil {
tmp += " (UDP/RTP)"
} else {
tmp += " (UDP/SRTP)"
}
ret = append(ret, tmp)
}
if srv.UDPRTCPAddress != "" {
tmp = srv.UDPRTCPAddress
if srv.TLSConfig == nil {
tmp += " (UDP/RTCP)"
} else {
tmp += " (UDP/SRTCP)"
}
ret = append(ret, tmp)
}
return strings.Join(ret, ", ")
}
type serverMetrics interface {
SetRTSPSServer(defs.APIRTSPServer)
SetRTSPServer(defs.APIRTSPServer)
}
type serverPathManager interface {
FindPathConf(req defs.PathFindPathConfReq) (*defs.PathFindPathConfRes, error)
Describe(req defs.PathDescribeReq) defs.PathDescribeRes
AddPublisher(_ defs.PathAddPublisherReq) (*defs.PathAddPublisherRes, error)
AddReader(_ defs.PathAddReaderReq) (*defs.PathAddReaderRes, error)
}
type serverParent interface {
logger.Writer
}
// Server is a RTSP server.
type Server struct {
Address string
AuthMethods []auth.VerifyMethod
DumpPackets bool
UDPReadBufferSize uint
ReadTimeout conf.Duration
WriteTimeout conf.Duration
WriteQueueSize int
RTSPTransports conf.RTSPTransports
RTPAddress string
RTCPAddress string
MulticastIPRange string
MulticastRTPPort int
MulticastRTCPPort int
Encryption bool
ServerCert string
ServerKey string
RTSPAddress string
Transports conf.RTSPTransports
RunOnConnect string
RunOnConnectRestart bool
RunOnDisconnect string
ExternalCmdPool *externalcmd.Pool
Metrics serverMetrics
PathManager serverPathManager
Parent serverParent
ctx context.Context
ctxCancel func()
wg sync.WaitGroup
srv *gortsplib.Server
mutex sync.RWMutex
conns map[*gortsplib.ServerConn]*conn
sessions map[*gortsplib.ServerSession]*session
loader *certloader.CertLoader
}
// Initialize initializes the server.
func (s *Server) Initialize() error {
s.ctx, s.ctxCancel = context.WithCancel(context.Background())
s.conns = make(map[*gortsplib.ServerConn]*conn)
s.sessions = make(map[*gortsplib.ServerSession]*session)
s.srv = &gortsplib.Server{
Handler: s,
ReadTimeout: time.Duration(s.ReadTimeout),
WriteTimeout: time.Duration(s.WriteTimeout),
UDPReadBufferSize: int(s.UDPReadBufferSize),
WriteQueueSize: s.WriteQueueSize,
RTSPAddress: s.Address,
AuthMethods: s.AuthMethods,
}
if _, ok := s.RTSPTransports[gortsplib.ProtocolUDP]; ok {
s.srv.UDPRTPAddress = s.RTPAddress
s.srv.UDPRTCPAddress = s.RTCPAddress
}
if _, ok := s.RTSPTransports[gortsplib.ProtocolUDPMulticast]; ok {
s.srv.MulticastIPRange = s.MulticastIPRange
s.srv.MulticastRTPPort = s.MulticastRTPPort
s.srv.MulticastRTCPPort = s.MulticastRTCPPort
}
if s.Encryption {
s.loader = &certloader.CertLoader{
CertPath: s.ServerCert,
KeyPath: s.ServerKey,
Parent: s.Parent,
}
err := s.loader.Initialize()
if err != nil {
return err
}
s.srv.TLSConfig = &tls.Config{GetCertificate: s.loader.GetCertificate()}
}
if s.DumpPackets {
var proto string
if s.Encryption {
proto = "rtsps"
} else {
proto = "rtsp"
}
s.srv.Listen = (&packetdumper.Listen{
Prefix: proto + "_server_conn",
}).Do
s.srv.ListenPacket = (&packetdumper.ListenPacket{
Prefix: proto + "_server_packet_conn",
}).Do
s.srv.TLSListen = (&packetdumper.TLSListen{
Listen: s.srv.Listen,
}).Do
}
err := s.srv.Start()
if err != nil {
return err
}
s.Log(logger.Info, "listener opened on %s", printAddresses(s.srv))
s.wg.Add(1)
go s.run()
if !interfaceIsEmpty(s.Metrics) {
if s.Encryption {
s.Metrics.SetRTSPSServer(s)
} else {
s.Metrics.SetRTSPServer(s)
}
}
return nil
}
// Log implements logger.Writer.
func (s *Server) Log(level logger.Level, format string, args ...any) {
label := func() string {
if s.Encryption {
return "RTSPS"
}
return "RTSP"
}()
s.Parent.Log(level, "[%s] "+format, append([]any{label}, args...)...)
}
// Close closes the server.
func (s *Server) Close() {
s.Log(logger.Info, "listener is closing")
if !interfaceIsEmpty(s.Metrics) {
if s.Encryption {
s.Metrics.SetRTSPSServer(nil)
} else {
s.Metrics.SetRTSPServer(nil)
}
}
s.ctxCancel()
s.wg.Wait()
if s.loader != nil {
s.loader.Close()
}
}
func (s *Server) run() {
defer s.wg.Done()
serverErr := make(chan error)
go func() {
serverErr <- s.srv.Wait()
}()
outer:
select {
case err := <-serverErr:
s.Log(logger.Error, "%s", err)
break outer
case <-s.ctx.Done():
s.srv.Close()
<-serverErr
break outer
}
s.ctxCancel()
}
// OnConnOpen implements gortsplib.ServerHandlerOnConnOpen.
func (s *Server) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
c := &conn{
encryption: s.Encryption,
rtspAddress: s.RTSPAddress,
authMethods: s.AuthMethods,
readTimeout: s.ReadTimeout,
runOnConnect: s.RunOnConnect,
runOnConnectRestart: s.RunOnConnectRestart,
runOnDisconnect: s.RunOnDisconnect,
externalCmdPool: s.ExternalCmdPool,
pathManager: s.PathManager,
rconn: ctx.Conn,
rserver: s.srv,
parent: s,
}
c.initialize()
s.mutex.Lock()
s.conns[ctx.Conn] = c
s.mutex.Unlock()
ctx.Conn.SetUserData(c)
}
// OnConnClose implements gortsplib.ServerHandlerOnConnClose.
func (s *Server) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) {
s.mutex.Lock()
c := s.conns[ctx.Conn]
delete(s.conns, ctx.Conn)
s.mutex.Unlock()
c.onClose(ctx.Error)
}
// OnRequest implements gortsplib.ServerHandlerOnRequest.
func (s *Server) OnRequest(sc *gortsplib.ServerConn, req *base.Request) {
c := sc.UserData().(*conn)
c.onRequest(req)
}
// OnResponse implements gortsplib.ServerHandlerOnResponse.
func (s *Server) OnResponse(sc *gortsplib.ServerConn, res *base.Response) {
c := sc.UserData().(*conn)
c.OnResponse(res)
}
// OnSessionOpen implements gortsplib.ServerHandlerOnSessionOpen.
func (s *Server) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) {
se := &session{
encryption: s.Encryption,
transports: s.Transports,
rsession: ctx.Session,
rconn: ctx.Conn,
rserver: s.srv,
externalCmdPool: s.ExternalCmdPool,
pathManager: s.PathManager,
parent: s,
}
se.initialize()
s.mutex.Lock()
s.sessions[ctx.Session] = se
s.mutex.Unlock()
ctx.Session.SetUserData(se)
}
// OnSessionClose implements gortsplib.ServerHandlerOnSessionClose.
func (s *Server) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) {
s.mutex.Lock()
se := s.sessions[ctx.Session]
delete(s.sessions, ctx.Session)
s.mutex.Unlock()
if se != nil {
se.onClose(ctx.Error)
}
}
// OnDescribe implements gortsplib.ServerHandlerOnDescribe.
func (s *Server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx,
) (*base.Response, *gortsplib.ServerStream, error) {
c := ctx.Conn.UserData().(*conn)
return c.onDescribe(ctx)
}
// OnAnnounce implements gortsplib.ServerHandlerOnAnnounce.
func (s *Server) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) {
c := ctx.Conn.UserData().(*conn)
se := ctx.Session.UserData().(*session)
return se.onAnnounce(c, ctx)
}
// OnSetup implements gortsplib.ServerHandlerOnSetup.
func (s *Server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
c := ctx.Conn.UserData().(*conn)
se := ctx.Session.UserData().(*session)
return se.onSetup(c, ctx)
}
// OnPlay implements gortsplib.ServerHandlerOnPlay.
func (s *Server) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
se := ctx.Session.UserData().(*session)
return se.onPlay(ctx)
}
// OnRecord implements gortsplib.ServerHandlerOnRecord.
func (s *Server) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) {
se := ctx.Session.UserData().(*session)
return se.onRecord(ctx)
}
// OnPause implements gortsplib.ServerHandlerOnPause.
func (s *Server) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Response, error) {
se := ctx.Session.UserData().(*session)
return se.onPause(ctx)
}
// OnPacketsLost implements gortsplib.ServerHandlerOnPacketsLost.
func (s *Server) OnPacketsLost(ctx *gortsplib.ServerHandlerOnPacketsLostCtx) {
se := ctx.Session.UserData().(*session)
se.onPacketsLost(ctx)
}
// OnDecodeError implements gortsplib.ServerHandlerOnDecodeError.
func (s *Server) OnDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx) {
se := ctx.Session.UserData().(*session)
se.onDecodeError(ctx)
}
// OnStreamWriteError implements gortsplib.ServerHandlerOnStreamWriteError.
func (s *Server) OnStreamWriteError(ctx *gortsplib.ServerHandlerOnStreamWriteErrorCtx) {
se := ctx.Session.UserData().(*session)
se.onStreamWriteError(ctx)
}
func (s *Server) findConnByUUID(uuid uuid.UUID) *conn {
for _, c := range s.conns {
if c.uuid == uuid {
return c
}
}
return nil
}
func (s *Server) findSessionByUUID(uuid uuid.UUID) (*gortsplib.ServerSession, *session) {
for key, sx := range s.sessions {
if sx.uuid == uuid {
return key, sx
}
}
return nil, nil
}
func (s *Server) getConnByRConnUnsafe(rconn *gortsplib.ServerConn) *conn {
return s.conns[rconn]
}
func (s *Server) getSessionByRSessionUnsafe(rsession *gortsplib.ServerSession) *session {
return s.sessions[rsession]
}
// APIConnsList is called by api and metrics.
func (s *Server) APIConnsList() (*defs.APIRTSPConnsList, error) {
select {
case <-s.ctx.Done():
return nil, fmt.Errorf("terminated")
default:
}
s.mutex.RLock()
defer s.mutex.RUnlock()
data := &defs.APIRTSPConnsList{
Items: []defs.APIRTSPConn{},
}
for _, c := range s.conns {
data.Items = append(data.Items, *c.apiItem())
}
sort.Slice(data.Items, func(i, j int) bool {
return data.Items[i].Created.Before(data.Items[j].Created)
})
return data, nil
}
// APIConnsGet is called by api.
func (s *Server) APIConnsGet(uuid uuid.UUID) (*defs.APIRTSPConn, error) {
select {
case <-s.ctx.Done():
return nil, fmt.Errorf("terminated")
default:
}
s.mutex.RLock()
defer s.mutex.RUnlock()
conn := s.findConnByUUID(uuid)
if conn == nil {
return nil, ErrConnNotFound
}
return conn.apiItem(), nil
}
// APISessionsList is called by api and metrics.
func (s *Server) APISessionsList() (*defs.APIRTSPSessionList, error) {
select {
case <-s.ctx.Done():
return nil, fmt.Errorf("terminated")
default:
}
s.mutex.RLock()
defer s.mutex.RUnlock()
data := &defs.APIRTSPSessionList{
Items: []defs.APIRTSPSession{},
}
for _, s := range s.sessions {
data.Items = append(data.Items, *s.apiItem())
}
sort.Slice(data.Items, func(i, j int) bool {
return data.Items[i].Created.Before(data.Items[j].Created)
})
return data, nil
}
// APISessionsGet is called by api.
func (s *Server) APISessionsGet(uuid uuid.UUID) (*defs.APIRTSPSession, error) {
select {
case <-s.ctx.Done():
return nil, fmt.Errorf("terminated")
default:
}
s.mutex.RLock()
defer s.mutex.RUnlock()
_, sx := s.findSessionByUUID(uuid)
if sx == nil {
return nil, ErrSessionNotFound
}
return sx.apiItem(), nil
}
// APISessionsKick is called by api.
func (s *Server) APISessionsKick(uuid uuid.UUID) error {
select {
case <-s.ctx.Done():
return fmt.Errorf("terminated")
default:
}
s.mutex.RLock()
defer s.mutex.RUnlock()
key, sx := s.findSessionByUUID(uuid)
if sx == nil {
return ErrSessionNotFound
}
sx.Close()
delete(s.sessions, key)
sx.onClose(liberrors.ErrServerTerminated{})
return nil
}