Files
lkm/hls/hls_stream.go
2025-07-27 15:05:37 +08:00

320 lines
9.2 KiB
Go

package hls
import (
"fmt"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/mpeg"
"os"
"path/filepath"
"strconv"
"unsafe"
)
type TransStream struct {
stream.BaseTransStream
muxer *mpeg.TSMuxer
ctx struct {
segmentSeq int // 切片序号
writeBuffer []byte // ts流的缓冲区, 由TSMuxer使用. 减少用户态和内核态交互,以及磁盘IO频率
writeBufferSize int // 已缓存TS流大小
url string // @See TransStream.tsUrl
path string // ts切片位于磁盘中的绝对路径
file *os.File // ts切片文件句柄
}
M3U8Writer stream.M3U8Writer
m3u8Name string // m3u8文件名
m3u8File *os.File // m3u8文件句柄
dir string // m3u8文件父目录
tsUrl string // m3u8中每个url的前缀, 默认为空, 为了支持绝对路径访问:http://xxx/xxx/xxx.ts
tsFormat string // ts文件名格式
duration int // 切片时长, 单位秒
playlistLength int // 最大切片文件个数
PlaylistFormat *string // 位于内存中的m3u8播放列表,每个sink都引用指针地址.
PlaylistFormatPtr []*collections.ReferenceCounter[[]byte] // string指针转byte[], 方便发送给sink
}
func (t *TransStream) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
// 创建一下个切片
// 已缓存时长>=指定时长, 如果存在视频, 还需要等遇到关键帧才切片
var newSegment bool
if (!t.HasVideo() || utils.AVMediaTypeVideo == packet.MediaType && packet.Key) && float32(t.muxer.Duration())/90000 >= float32(t.duration) {
// 保存当前切片文件
if t.ctx.file != nil {
err := t.flushSegment(false)
if err != nil {
return nil, -1, false, err
}
}
// 创建新的切片
if err := t.createSegment(); err != nil {
return nil, -1, false, err
}
newSegment = true
}
duration := packet.GetDuration(90000)
dts := t.Tracks[index].Dts
pts := t.Tracks[index].Pts
t.Tracks[index].Dts += duration
t.Tracks[index].Pts = t.Tracks[index].Dts + packet.GetPtsDtsDelta(90000)
data := packet.Data
if utils.AVMediaTypeVideo == packet.MediaType {
data = avformat.AVCCPacket2AnnexB(t.FindTrackWithStreamIndex(packet.Index).Stream, packet)
}
// 写入ts切片
length := len(data)
capacity := cap(t.ctx.writeBuffer)
for i := 0; i < length; {
if capacity-t.ctx.writeBufferSize < mpeg.TsPacketSize {
_, _ = t.ctx.file.Write(t.ctx.writeBuffer[:t.ctx.writeBufferSize])
t.ctx.writeBufferSize = 0
}
bytes := t.ctx.writeBuffer[t.ctx.writeBufferSize : t.ctx.writeBufferSize+mpeg.TsPacketSize]
i += t.muxer.Input(bytes, index, data[i:], length, dts, pts, packet.Key, i == 0)
t.ctx.writeBufferSize += mpeg.TsPacketSize
}
// 缓存完第二个切片, 才响应发送m3u8文件. 如果一个切片就发, 播放器缓存少会卡顿.
if newSegment && t.M3U8Writer.Size() > 1 {
return t.PlaylistFormatPtr, -1, true, nil
}
return nil, -1, true, nil
}
func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
var err error
var trackIndex int
if utils.AVMediaTypeVideo == track.Stream.MediaType {
data := track.Stream.CodecParameters.AnnexBExtraData()
trackIndex, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, data)
} else {
trackIndex, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, track.Stream.Data)
}
return trackIndex, err
}
func (t *TransStream) WriteHeader() error {
return t.createSegment()
}
// 写入新的TS切片,更新M3U8
func (t *TransStream) flushSegment(end bool) error {
// 写入剩余TS包
if t.ctx.writeBufferSize > 0 {
_, _ = t.ctx.file.Write(t.ctx.writeBuffer[:t.ctx.writeBufferSize])
t.ctx.writeBufferSize = 0
}
if err := t.ctx.file.Close(); err != nil {
return err
}
// 删除多余的ts切片文件
if t.M3U8Writer.Size() >= t.playlistLength {
_ = os.Remove(t.M3U8Writer.Get(0).Path)
}
// 更新m3u8列表
duration := float32(t.muxer.Duration()) / 90000
t.M3U8Writer.AddSegment(duration, t.ctx.url, t.ctx.segmentSeq, t.ctx.path)
m3u8Txt := t.M3U8Writer.String()
//if end {
// m3u8Txt += "#EXT-X-ENDLIST"
//}
*t.PlaylistFormat = m3u8Txt
// 写入最新的m3u8到文件
if t.m3u8File != nil {
if _, err := t.m3u8File.Seek(0, 0); err != nil {
return err
} else if err = t.m3u8File.Truncate(0); err != nil {
return err
} else if _, err = t.m3u8File.Write([]byte(m3u8Txt)); err != nil {
return err
}
}
return nil
}
// 创建一个新的ts切片
func (t *TransStream) createSegment() error {
t.muxer.Reset()
var tsFile *os.File
startSeq := t.ctx.segmentSeq + 1
for {
tsName := fmt.Sprintf(t.tsFormat, startSeq)
// ts文件
t.ctx.path = fmt.Sprintf("%s/%s", t.dir, tsName)
// m3u8列表中切片的url
t.ctx.url = fmt.Sprintf("%s%s", t.tsUrl, tsName)
file, err := os.OpenFile(t.ctx.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err == nil {
tsFile = file
break
}
log.Sugar.Errorf("创建ts切片文件失败 err:%s path:%s", err.Error(), t.ctx.path)
if os.IsPermission(err) || os.IsTimeout(err) || os.IsNotExist(err) {
return err
}
// 继续创建TS文件, 认为是文件名冲突, 并且文件已经被打开.
startSeq++
}
t.ctx.segmentSeq = startSeq
t.ctx.file = tsFile
n, err := t.muxer.WriteHeader(t.ctx.writeBuffer)
if err != nil {
return err
}
t.ctx.writeBufferSize = n
return nil
}
func (t *TransStream) Close() ([]stream.TransStreamSegment, error) {
var err error
if t.ctx.file != nil {
err = t.flushSegment(true)
err = t.ctx.file.Close()
t.ctx.file = nil
}
if t.muxer != nil {
t.muxer.Close()
t.muxer = nil
}
if t.m3u8File != nil {
err = t.m3u8File.Close()
t.m3u8File = nil
}
return nil, err
}
func stringPtrToBytes(ptr *string) []byte {
ptrAddr := uintptr(unsafe.Pointer(ptr))
return (*[unsafe.Sizeof(ptr)]byte)(unsafe.Pointer(&ptrAddr))[:]
}
func bytesToStringPtr(b []byte) *string {
ptrAddr := *(*uintptr)(unsafe.Pointer(&b[0]))
return (*string)(unsafe.Pointer(ptrAddr))
}
func DeleteOldSegments(id string) {
var index int
for ; ; index++ {
path := stream.AppConfig.Hls.TSPath(id, strconv.Itoa(index))
fileInfo, err := os.Stat(path)
if err != nil && os.IsNotExist(err) {
break
} else if fileInfo.IsDir() {
continue
}
_ = os.Remove(path)
}
}
// NewTransStream 创建HLS传输流
// @Params dir m3u8的文件夹目录
// @Params m3u8Name m3u8文件名
// @Params tsFormat ts文件格式, 例如: %d.ts
// @Params tsUrl m3u8中ts切片的url前缀
// @Params parentDir 保存切片的绝对路径. mu38和ts切片放在同一目录下, 目录地址使用parentDir+urlPrefix
// @Params segmentDuration 单个切片时长
// @Params playlistLength 缓存多少个切片
func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, playlistLength int, seq int, playlistFormat *string, writer stream.M3U8Writer) (stream.TransStream, error) {
// 创建文件夹
m3u8Path := fmt.Sprintf("%s/%s", dir, m3u8Name)
if err := os.MkdirAll(filepath.Dir(m3u8Path), 0666); err != nil {
log.Sugar.Errorf("创建HLS目录失败 err: %s path: %s", err.Error(), m3u8Path)
return nil, err
}
// 创建m3u8文件
file, err := os.OpenFile(m3u8Path, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
log.Sugar.Errorf("创建m3u8文件失败 err: %s path: %s", err.Error(), m3u8Path)
//return nil, err
}
transStream := &TransStream{
m3u8Name: m3u8Name,
tsFormat: tsFormat,
tsUrl: tsUrl,
dir: dir,
duration: segmentDuration,
playlistLength: playlistLength,
}
if writer != nil {
transStream.M3U8Writer = writer
} else {
transStream.M3U8Writer = stream.NewM3U8Writer(playlistLength)
}
if playlistFormat != nil {
transStream.PlaylistFormat = playlistFormat
} else {
transStream.PlaylistFormat = new(string)
}
playlistFormatPtrCounter := collections.NewReferenceCounter[[]byte](stringPtrToBytes(transStream.PlaylistFormat))
transStream.PlaylistFormatPtr = append(transStream.PlaylistFormatPtr, playlistFormatPtrCounter)
// 创建TS封装器
muxer := mpeg.NewTSMuxer()
// 初始化ts封装上下文对象
transStream.ctx.segmentSeq = seq
transStream.ctx.writeBuffer = make([]byte, 1024*1024)
transStream.muxer = muxer
transStream.m3u8File = file
return transStream, nil
}
func TransStreamFactory(source stream.Source, _ stream.TransStreamProtocol, _ []*stream.Track, _ stream.Sink) (stream.TransStream, error) {
id := source.GetID()
var writer stream.M3U8Writer
var playlistFormat *string
startSeq := -1
endInfo := source.GetTransStreamPublisher().GetStreamEndInfo()
if endInfo != nil && endInfo.M3U8Writer != nil {
writer = endInfo.M3U8Writer
playlistFormat = endInfo.PlaylistFormat
startSeq = writer.Get(writer.Size() - 1).Sequence
}
// 删除旧的m3u8文件
//_ = os.Remove(stream.AppConfig.Hls.M3U8Path(id))
// 删除旧的切片文件
//go DeleteOldSegments(id)
return NewTransStream(stream.AppConfig.Hls.M3U8Dir(id), stream.AppConfig.Hls.M3U8Format(id), stream.AppConfig.Hls.TSFormat(id), "", stream.AppConfig.Hls.Duration, stream.AppConfig.Hls.PlaylistLength, startSeq, playlistFormat, writer)
}