mirror of
https://github.com/lkmio/lkm.git
synced 2026-04-22 16:17:05 +08:00
320 lines
9.2 KiB
Go
320 lines
9.2 KiB
Go
package hls
|
|
|
|
import (
|
|
"fmt"
|
|
"github.com/lkmio/avformat"
|
|
"github.com/lkmio/avformat/collections"
|
|
"github.com/lkmio/avformat/utils"
|
|
"github.com/lkmio/lkm/log"
|
|
"github.com/lkmio/lkm/stream"
|
|
"github.com/lkmio/mpeg"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"unsafe"
|
|
)
|
|
|
|
type TransStream struct {
|
|
stream.BaseTransStream
|
|
muxer *mpeg.TSMuxer
|
|
ctx struct {
|
|
segmentSeq int // 切片序号
|
|
writeBuffer []byte // ts流的缓冲区, 由TSMuxer使用. 减少用户态和内核态交互,以及磁盘IO频率
|
|
writeBufferSize int // 已缓存TS流大小
|
|
url string // @See TransStream.tsUrl
|
|
path string // ts切片位于磁盘中的绝对路径
|
|
file *os.File // ts切片文件句柄
|
|
}
|
|
|
|
M3U8Writer stream.M3U8Writer
|
|
m3u8Name string // m3u8文件名
|
|
m3u8File *os.File // m3u8文件句柄
|
|
dir string // m3u8文件父目录
|
|
tsUrl string // m3u8中每个url的前缀, 默认为空, 为了支持绝对路径访问:http://xxx/xxx/xxx.ts
|
|
tsFormat string // ts文件名格式
|
|
duration int // 切片时长, 单位秒
|
|
playlistLength int // 最大切片文件个数
|
|
|
|
PlaylistFormat *string // 位于内存中的m3u8播放列表,每个sink都引用指针地址.
|
|
PlaylistFormatPtr []*collections.ReferenceCounter[[]byte] // string指针转byte[], 方便发送给sink
|
|
}
|
|
|
|
func (t *TransStream) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
|
|
// 创建一下个切片
|
|
// 已缓存时长>=指定时长, 如果存在视频, 还需要等遇到关键帧才切片
|
|
var newSegment bool
|
|
if (!t.HasVideo() || utils.AVMediaTypeVideo == packet.MediaType && packet.Key) && float32(t.muxer.Duration())/90000 >= float32(t.duration) {
|
|
// 保存当前切片文件
|
|
if t.ctx.file != nil {
|
|
err := t.flushSegment(false)
|
|
if err != nil {
|
|
return nil, -1, false, err
|
|
}
|
|
}
|
|
|
|
// 创建新的切片
|
|
if err := t.createSegment(); err != nil {
|
|
return nil, -1, false, err
|
|
}
|
|
|
|
newSegment = true
|
|
}
|
|
|
|
duration := packet.GetDuration(90000)
|
|
dts := t.Tracks[index].Dts
|
|
pts := t.Tracks[index].Pts
|
|
t.Tracks[index].Dts += duration
|
|
t.Tracks[index].Pts = t.Tracks[index].Dts + packet.GetPtsDtsDelta(90000)
|
|
|
|
data := packet.Data
|
|
if utils.AVMediaTypeVideo == packet.MediaType {
|
|
data = avformat.AVCCPacket2AnnexB(t.FindTrackWithStreamIndex(packet.Index).Stream, packet)
|
|
}
|
|
|
|
// 写入ts切片
|
|
length := len(data)
|
|
capacity := cap(t.ctx.writeBuffer)
|
|
for i := 0; i < length; {
|
|
if capacity-t.ctx.writeBufferSize < mpeg.TsPacketSize {
|
|
_, _ = t.ctx.file.Write(t.ctx.writeBuffer[:t.ctx.writeBufferSize])
|
|
t.ctx.writeBufferSize = 0
|
|
}
|
|
|
|
bytes := t.ctx.writeBuffer[t.ctx.writeBufferSize : t.ctx.writeBufferSize+mpeg.TsPacketSize]
|
|
i += t.muxer.Input(bytes, index, data[i:], length, dts, pts, packet.Key, i == 0)
|
|
t.ctx.writeBufferSize += mpeg.TsPacketSize
|
|
}
|
|
|
|
// 缓存完第二个切片, 才响应发送m3u8文件. 如果一个切片就发, 播放器缓存少会卡顿.
|
|
if newSegment && t.M3U8Writer.Size() > 1 {
|
|
return t.PlaylistFormatPtr, -1, true, nil
|
|
}
|
|
|
|
return nil, -1, true, nil
|
|
}
|
|
|
|
func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
|
|
var err error
|
|
var trackIndex int
|
|
if utils.AVMediaTypeVideo == track.Stream.MediaType {
|
|
data := track.Stream.CodecParameters.AnnexBExtraData()
|
|
trackIndex, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, data)
|
|
} else {
|
|
trackIndex, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, track.Stream.Data)
|
|
}
|
|
|
|
return trackIndex, err
|
|
}
|
|
|
|
func (t *TransStream) WriteHeader() error {
|
|
return t.createSegment()
|
|
}
|
|
|
|
// 写入新的TS切片,更新M3U8
|
|
func (t *TransStream) flushSegment(end bool) error {
|
|
// 写入剩余TS包
|
|
if t.ctx.writeBufferSize > 0 {
|
|
_, _ = t.ctx.file.Write(t.ctx.writeBuffer[:t.ctx.writeBufferSize])
|
|
t.ctx.writeBufferSize = 0
|
|
}
|
|
|
|
if err := t.ctx.file.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// 删除多余的ts切片文件
|
|
if t.M3U8Writer.Size() >= t.playlistLength {
|
|
_ = os.Remove(t.M3U8Writer.Get(0).Path)
|
|
}
|
|
|
|
// 更新m3u8列表
|
|
duration := float32(t.muxer.Duration()) / 90000
|
|
t.M3U8Writer.AddSegment(duration, t.ctx.url, t.ctx.segmentSeq, t.ctx.path)
|
|
m3u8Txt := t.M3U8Writer.String()
|
|
|
|
//if end {
|
|
// m3u8Txt += "#EXT-X-ENDLIST"
|
|
//}
|
|
|
|
*t.PlaylistFormat = m3u8Txt
|
|
|
|
// 写入最新的m3u8到文件
|
|
if t.m3u8File != nil {
|
|
if _, err := t.m3u8File.Seek(0, 0); err != nil {
|
|
return err
|
|
} else if err = t.m3u8File.Truncate(0); err != nil {
|
|
return err
|
|
} else if _, err = t.m3u8File.Write([]byte(m3u8Txt)); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// 创建一个新的ts切片
|
|
func (t *TransStream) createSegment() error {
|
|
t.muxer.Reset()
|
|
|
|
var tsFile *os.File
|
|
startSeq := t.ctx.segmentSeq + 1
|
|
for {
|
|
tsName := fmt.Sprintf(t.tsFormat, startSeq)
|
|
// ts文件
|
|
t.ctx.path = fmt.Sprintf("%s/%s", t.dir, tsName)
|
|
// m3u8列表中切片的url
|
|
t.ctx.url = fmt.Sprintf("%s%s", t.tsUrl, tsName)
|
|
|
|
file, err := os.OpenFile(t.ctx.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
|
|
if err == nil {
|
|
tsFile = file
|
|
break
|
|
}
|
|
|
|
log.Sugar.Errorf("创建ts切片文件失败 err:%s path:%s", err.Error(), t.ctx.path)
|
|
if os.IsPermission(err) || os.IsTimeout(err) || os.IsNotExist(err) {
|
|
return err
|
|
}
|
|
|
|
// 继续创建TS文件, 认为是文件名冲突, 并且文件已经被打开.
|
|
startSeq++
|
|
}
|
|
|
|
t.ctx.segmentSeq = startSeq
|
|
t.ctx.file = tsFile
|
|
n, err := t.muxer.WriteHeader(t.ctx.writeBuffer)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
t.ctx.writeBufferSize = n
|
|
return nil
|
|
}
|
|
|
|
func (t *TransStream) Close() ([]stream.TransStreamSegment, error) {
|
|
var err error
|
|
|
|
if t.ctx.file != nil {
|
|
err = t.flushSegment(true)
|
|
err = t.ctx.file.Close()
|
|
t.ctx.file = nil
|
|
}
|
|
|
|
if t.muxer != nil {
|
|
t.muxer.Close()
|
|
t.muxer = nil
|
|
}
|
|
|
|
if t.m3u8File != nil {
|
|
err = t.m3u8File.Close()
|
|
t.m3u8File = nil
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
|
|
func stringPtrToBytes(ptr *string) []byte {
|
|
ptrAddr := uintptr(unsafe.Pointer(ptr))
|
|
return (*[unsafe.Sizeof(ptr)]byte)(unsafe.Pointer(&ptrAddr))[:]
|
|
}
|
|
|
|
func bytesToStringPtr(b []byte) *string {
|
|
ptrAddr := *(*uintptr)(unsafe.Pointer(&b[0]))
|
|
return (*string)(unsafe.Pointer(ptrAddr))
|
|
}
|
|
|
|
func DeleteOldSegments(id string) {
|
|
var index int
|
|
for ; ; index++ {
|
|
path := stream.AppConfig.Hls.TSPath(id, strconv.Itoa(index))
|
|
fileInfo, err := os.Stat(path)
|
|
if err != nil && os.IsNotExist(err) {
|
|
break
|
|
} else if fileInfo.IsDir() {
|
|
continue
|
|
}
|
|
|
|
_ = os.Remove(path)
|
|
}
|
|
}
|
|
|
|
// NewTransStream 创建HLS传输流
|
|
// @Params dir m3u8的文件夹目录
|
|
// @Params m3u8Name m3u8文件名
|
|
// @Params tsFormat ts文件格式, 例如: %d.ts
|
|
// @Params tsUrl m3u8中ts切片的url前缀
|
|
// @Params parentDir 保存切片的绝对路径. mu38和ts切片放在同一目录下, 目录地址使用parentDir+urlPrefix
|
|
// @Params segmentDuration 单个切片时长
|
|
// @Params playlistLength 缓存多少个切片
|
|
func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, playlistLength int, seq int, playlistFormat *string, writer stream.M3U8Writer) (stream.TransStream, error) {
|
|
// 创建文件夹
|
|
m3u8Path := fmt.Sprintf("%s/%s", dir, m3u8Name)
|
|
if err := os.MkdirAll(filepath.Dir(m3u8Path), 0666); err != nil {
|
|
log.Sugar.Errorf("创建HLS目录失败 err: %s path: %s", err.Error(), m3u8Path)
|
|
return nil, err
|
|
}
|
|
|
|
// 创建m3u8文件
|
|
file, err := os.OpenFile(m3u8Path, os.O_CREATE|os.O_RDWR, 0666)
|
|
if err != nil {
|
|
log.Sugar.Errorf("创建m3u8文件失败 err: %s path: %s", err.Error(), m3u8Path)
|
|
//return nil, err
|
|
}
|
|
|
|
transStream := &TransStream{
|
|
m3u8Name: m3u8Name,
|
|
tsFormat: tsFormat,
|
|
tsUrl: tsUrl,
|
|
dir: dir,
|
|
duration: segmentDuration,
|
|
playlistLength: playlistLength,
|
|
}
|
|
|
|
if writer != nil {
|
|
transStream.M3U8Writer = writer
|
|
} else {
|
|
transStream.M3U8Writer = stream.NewM3U8Writer(playlistLength)
|
|
}
|
|
|
|
if playlistFormat != nil {
|
|
transStream.PlaylistFormat = playlistFormat
|
|
} else {
|
|
transStream.PlaylistFormat = new(string)
|
|
}
|
|
|
|
playlistFormatPtrCounter := collections.NewReferenceCounter[[]byte](stringPtrToBytes(transStream.PlaylistFormat))
|
|
transStream.PlaylistFormatPtr = append(transStream.PlaylistFormatPtr, playlistFormatPtrCounter)
|
|
// 创建TS封装器
|
|
muxer := mpeg.NewTSMuxer()
|
|
|
|
// 初始化ts封装上下文对象
|
|
transStream.ctx.segmentSeq = seq
|
|
transStream.ctx.writeBuffer = make([]byte, 1024*1024)
|
|
|
|
transStream.muxer = muxer
|
|
transStream.m3u8File = file
|
|
return transStream, nil
|
|
}
|
|
|
|
func TransStreamFactory(source stream.Source, _ stream.TransStreamProtocol, _ []*stream.Track, _ stream.Sink) (stream.TransStream, error) {
|
|
id := source.GetID()
|
|
|
|
var writer stream.M3U8Writer
|
|
var playlistFormat *string
|
|
startSeq := -1
|
|
|
|
endInfo := source.GetTransStreamPublisher().GetStreamEndInfo()
|
|
if endInfo != nil && endInfo.M3U8Writer != nil {
|
|
writer = endInfo.M3U8Writer
|
|
playlistFormat = endInfo.PlaylistFormat
|
|
startSeq = writer.Get(writer.Size() - 1).Sequence
|
|
}
|
|
|
|
// 删除旧的m3u8文件
|
|
//_ = os.Remove(stream.AppConfig.Hls.M3U8Path(id))
|
|
// 删除旧的切片文件
|
|
//go DeleteOldSegments(id)
|
|
|
|
return NewTransStream(stream.AppConfig.Hls.M3U8Dir(id), stream.AppConfig.Hls.M3U8Format(id), stream.AppConfig.Hls.TSFormat(id), "", stream.AppConfig.Hls.Duration, stream.AppConfig.Hls.PlaylistLength, startSeq, playlistFormat, writer)
|
|
}
|