添加日志模块

This commit is contained in:
yangjiechina
2024-04-05 21:59:56 +08:00
parent f4f7d7aab1
commit 0376ccf604
8 changed files with 348 additions and 10 deletions
+11
View File
@@ -5,12 +5,20 @@ require github.com/yangjiechina/avformat v0.0.0
require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.1
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pion/webrtc/v3 v3.2.29
github.com/sirupsen/logrus v1.9.3
github.com/x-cray/logrus-prefixed-formatter v0.5.2
go.uber.org/zap v1.27.0
)
require (
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
github.com/pion/datachannel v1.5.5 // indirect
github.com/pion/dtls/v2 v2.2.7 // indirect
github.com/pion/ice/v2 v2.3.13 // indirect
@@ -28,9 +36,12 @@ require (
github.com/pion/turn/v2 v2.1.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.9.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/term v0.16.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
+137
View File
@@ -0,0 +1,137 @@
package log
import (
"fmt"
"github.com/natefinch/lumberjack"
"github.com/yangjiechina/avformat/utils"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"os"
"strings"
)
var (
Sugar *zap.SugaredLogger
)
func InitLogger(leve zapcore.LevelEnabler,
name string, maxSize, maxBackup, maxAge int, compress bool) {
utils.Assert(Sugar == nil)
var sinks []zapcore.Core
writeSyncer := getLogWriter(name, maxSize, maxBackup, maxAge, compress)
encoder := getEncoder()
fileCore := zapcore.NewCore(encoder, writeSyncer, leve)
sinks = append(sinks, fileCore)
//打印到控制台
sinks = append(sinks, zapcore.NewCore(encoder, zapcore.AddSync(os.Stdout), leve))
core := zapcore.NewTee(sinks...)
logger := zap.New(core, zap.AddCaller())
Sugar = logger.Sugar()
}
func getEncoder() zapcore.Encoder {
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
return zapcore.NewConsoleEncoder(encoderConfig)
}
// 配置日志保存规则
// @name 日志文件名, 可包含路径
// @maxSize 单个日志文件最大大小(M)
// @maxBackup 日志文件最多生成多少个
// @maxAge 日志文件最多保存多少天
func getLogWriter(name string, maxSize, maxBackup, maxAge int, compress bool) zapcore.WriteSyncer {
lumberJackLogger := &lumberjack.Logger{
Filename: name,
MaxSize: maxSize,
MaxBackups: maxBackup,
MaxAge: maxAge,
Compress: compress,
}
return zapcore.AddSync(lumberJackLogger)
}
// Logger interface used as base logger throughout the library.
type Logger interface {
Print(args ...interface{})
Printf(format string, args ...interface{})
Trace(args ...interface{})
Tracef(format string, args ...interface{})
Debug(args ...interface{})
Debugf(format string, args ...interface{})
Info(args ...interface{})
Infof(format string, args ...interface{})
Warn(args ...interface{})
Warnf(format string, args ...interface{})
Error(args ...interface{})
Errorf(format string, args ...interface{})
Fatal(args ...interface{})
Fatalf(format string, args ...interface{})
Panic(args ...interface{})
Panicf(format string, args ...interface{})
WithPrefix(prefix string) Logger
Prefix() string
WithFields(fields Fields) Logger
Fields() Fields
SetLevel(level Level)
}
type Loggable interface {
Log() Logger
}
type Fields map[string]interface{}
func (fields Fields) String() string {
str := make([]string, 0)
for k, v := range fields {
str = append(str, fmt.Sprintf("%s=%+v", k, v))
}
return strings.Join(str, " ")
}
func (fields Fields) WithFields(newFields Fields) Fields {
allFields := make(Fields)
for k, v := range fields {
allFields[k] = v
}
for k, v := range newFields {
allFields[k] = v
}
return allFields
}
func AddFieldsFrom(logger Logger, values ...interface{}) Logger {
for _, value := range values {
switch v := value.(type) {
case Logger:
logger = logger.WithFields(v.Fields())
case Loggable:
logger = logger.WithFields(v.Log().Fields())
case interface{ Fields() Fields }:
logger = logger.WithFields(v.Fields())
}
}
return logger
}
+148
View File
@@ -0,0 +1,148 @@
package log
import (
"github.com/sirupsen/logrus"
prefixed "github.com/x-cray/logrus-prefixed-formatter"
)
type LogrusLogger struct {
log logrus.Ext1FieldLogger
prefix string
fields Fields
}
// Level type
type Level uint32
// These are the different logging levels. You can set the logging level to log
// on your instance of logger, obtained with `logrus.New()`.
const (
// PanicLevel level, highest level of severity. Logs and then calls panic with the
// message passed to Debug, Info, ...
PanicLevel Level = iota
// FatalLevel level. Logs and then calls `logger.Exit(1)`. It will exit even if the
// logging level is set to Panic.
FatalLevel
// ErrorLevel level. Logs. Used for errors that should definitely be noted.
// Commonly used for hooks to send errors to an error tracking service.
ErrorLevel
// WarnLevel level. Non-critical entries that deserve eyes.
WarnLevel
// InfoLevel level. General operational entries about what's going on inside the
// application.
InfoLevel
// DebugLevel level. Usually only enabled when debugging. Very verbose logging.
DebugLevel
// TraceLevel level. Designates finer-grained informational events than the Debug.
TraceLevel
)
func NewLogrusLogger(logrus logrus.Ext1FieldLogger, prefix string, fields Fields) *LogrusLogger {
return &LogrusLogger{
log: logrus,
prefix: prefix,
fields: fields,
}
}
func NewDefaultLogrusLogger() *LogrusLogger {
logger := logrus.New()
logger.Formatter = &prefixed.TextFormatter{
FullTimestamp: true,
TimestampFormat: "2006-01-02 15:04:05.000",
}
return NewLogrusLogger(logger, "main", nil)
}
func (l *LogrusLogger) Print(args ...interface{}) {
l.prepareEntry().Print(args...)
}
func (l *LogrusLogger) Printf(format string, args ...interface{}) {
l.prepareEntry().Printf(format, args...)
}
func (l *LogrusLogger) Trace(args ...interface{}) {
l.prepareEntry().Trace(args...)
}
func (l *LogrusLogger) Tracef(format string, args ...interface{}) {
l.prepareEntry().Tracef(format, args...)
}
func (l *LogrusLogger) Debug(args ...interface{}) {
l.prepareEntry().Debug(args...)
}
func (l *LogrusLogger) Debugf(format string, args ...interface{}) {
l.prepareEntry().Debugf(format, args...)
}
func (l *LogrusLogger) Info(args ...interface{}) {
l.prepareEntry().Info(args...)
}
func (l *LogrusLogger) Infof(format string, args ...interface{}) {
l.prepareEntry().Infof(format, args...)
}
func (l *LogrusLogger) Warn(args ...interface{}) {
l.prepareEntry().Warn(args...)
}
func (l *LogrusLogger) Warnf(format string, args ...interface{}) {
l.prepareEntry().Warnf(format, args...)
}
func (l *LogrusLogger) Error(args ...interface{}) {
l.prepareEntry().Error(args...)
}
func (l *LogrusLogger) Errorf(format string, args ...interface{}) {
l.prepareEntry().Errorf(format, args...)
}
func (l *LogrusLogger) Fatal(args ...interface{}) {
l.prepareEntry().Fatal(args...)
}
func (l *LogrusLogger) Fatalf(format string, args ...interface{}) {
l.prepareEntry().Fatalf(format, args...)
}
func (l *LogrusLogger) Panic(args ...interface{}) {
l.prepareEntry().Panic(args...)
}
func (l *LogrusLogger) Panicf(format string, args ...interface{}) {
l.prepareEntry().Panicf(format, args...)
}
func (l *LogrusLogger) WithPrefix(prefix string) Logger {
return NewLogrusLogger(l.log, prefix, l.Fields())
}
func (l *LogrusLogger) Prefix() string {
return l.prefix
}
func (l *LogrusLogger) WithFields(fields Fields) Logger {
return NewLogrusLogger(l.log, l.Prefix(), l.Fields().WithFields(fields))
}
func (l *LogrusLogger) Fields() Fields {
return l.fields
}
func (l *LogrusLogger) prepareEntry() *logrus.Entry {
return l.log.
WithFields(logrus.Fields(l.Fields())).
WithField("prefix", l.Prefix())
}
func (l *LogrusLogger) SetLevel(level Level) {
if ll, ok := l.log.(*logrus.Logger); ok {
ll.SetLevel(logrus.Level(level))
}
}
+5
View File
@@ -3,8 +3,10 @@ package main
import (
"github.com/yangjiechina/live-server/flv"
"github.com/yangjiechina/live-server/hls"
"github.com/yangjiechina/live-server/log"
"github.com/yangjiechina/live-server/rtc"
"github.com/yangjiechina/live-server/rtsp"
"go.uber.org/zap/zapcore"
"net"
"net/http"
@@ -52,6 +54,9 @@ func init() {
}
func main() {
//初始化日志
log.InitLogger(zapcore.DebugLevel, "./logs/lkm.log", 10, 100, 7, false)
stream.AppConfig.GOPCache = true
stream.AppConfig.MergeWriteLatency = 350
+10 -2
View File
@@ -1,6 +1,7 @@
package rtmp
import (
"github.com/yangjiechina/live-server/log"
"net"
"github.com/yangjiechina/avformat/transport"
@@ -37,10 +38,12 @@ func (s *serverImpl) Start(addr net.Addr) error {
}
func (s *serverImpl) Close() {
panic("implement me")
}
func (s *serverImpl) OnConnected(conn net.Conn) {
log.Sugar.Debugf("rtmp连接 conn:%s", conn.RemoteAddr().String())
t := conn.(*transport.Conn)
t.Data = NewSession(conn)
}
@@ -50,12 +53,17 @@ func (s *serverImpl) OnPacket(conn net.Conn, data []byte) {
err := t.Data.(*sessionImpl).Input(conn, data)
if err != nil {
log.Sugar.Errorf("处理rtmp包失败 err:%s conn:%s", err.Error(), conn.RemoteAddr().String())
_ = conn.Close()
println("处理rtmp包发生错误:" + err.Error())
t.Data.(*sessionImpl).Close()
t.Data = nil
}
}
func (s *serverImpl) OnDisConnected(conn net.Conn, err error) {
log.Sugar.Debugf("rtmp断开连接 conn:%s", conn.RemoteAddr().String())
t := conn.(*transport.Conn)
t.Data.(*sessionImpl).Close()
t.Data = nil
+18 -5
View File
@@ -3,6 +3,7 @@ package rtmp
import (
"github.com/yangjiechina/avformat/librtmp"
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/live-server/log"
"github.com/yangjiechina/live-server/stream"
"net"
)
@@ -26,14 +27,16 @@ func NewSession(conn net.Conn) Session {
type sessionImpl struct {
//解析rtmp协议栈
stack *librtmp.Stack
//publisher/sink
handle interface{}
//publisher/sink, 在publish或play成功后赋值
handle interface{}
isPublisher bool
conn net.Conn
conn net.Conn
}
func (s *sessionImpl) OnPublish(app, stream_ string, response chan utils.HookState) {
log.Sugar.Infof("rtmp onpublish app:%s stream:%s conn:%s", app, stream_, s.conn.RemoteAddr().String())
sourceId := app + "_" + stream_
source := NewPublisher(sourceId, s.stack, s.conn)
s.stack.SetOnPublishHandler(source)
@@ -53,9 +56,11 @@ func (s *sessionImpl) OnPublish(app, stream_ string, response chan utils.HookSta
func (s *sessionImpl) OnPlay(app, stream_ string, response chan utils.HookState) {
sourceId := app + "_" + stream_
//拉流事件Sink统一处理
sink := NewSink(stream.GenerateSinkId(s.conn.RemoteAddr()), sourceId, s.conn)
log.Sugar.Infof("rtmp onplay app:%s stream:%s sink:%v conn:%s", app, stream_, sink.Id(), s.conn.RemoteAddr().String())
sink.(*stream.SinkImpl).Play(sink, func() {
s.handle = sink
response <- utils.HookStateOK
@@ -75,6 +80,14 @@ func (s *sessionImpl) Input(conn net.Conn, data []byte) error {
}
func (s *sessionImpl) Close() {
log.Sugar.Debugf("释放rtmp session conn:%s", s.conn.RemoteAddr().String())
//释放协议栈
if s.stack != nil {
s.stack.Close()
}
//还没到publish/play
if s.handle == nil {
return
}
+4 -1
View File
@@ -3,6 +3,7 @@ package stream
import (
"bytes"
"encoding/json"
"fmt"
"github.com/yangjiechina/avformat/utils"
"net/http"
"time"
@@ -81,8 +82,10 @@ func (h *hookSessionImpl) send(url string, body interface{}, success func(respon
request.Header.Set("Content-Type", "application/json")
response, err := client.Do(request)
if err != nil || response.StatusCode != http.StatusOK {
if err != nil {
failure(response, err)
} else if response.StatusCode != http.StatusOK {
failure(response, fmt.Errorf("code:%d reason:%s", response.StatusCode, response.Status))
} else {
success(response)
}
+15 -2
View File
@@ -1,8 +1,8 @@
package stream
import (
"fmt"
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/live-server/log"
"net"
"net/http"
)
@@ -26,6 +26,8 @@ type ISink interface {
Protocol() Protocol
ProtocolStr() string
State() SessionState
SetState(state SessionState) bool
@@ -123,6 +125,10 @@ func (s *SinkImpl) Protocol() Protocol {
return s.Protocol_
}
func (s *SinkImpl) ProtocolStr() string {
return streamTypeToStr(s.Protocol_)
}
func (s *SinkImpl) State() SessionState {
return s.State_
}
@@ -182,10 +188,13 @@ func (s *SinkImpl) Play(sink ISink, success func(), failure func(state utils.Hoo
f := func() {
source := SourceManager.Find(sink.SourceId())
if source == nil {
fmt.Printf("添加到等待队列 sink:%s", sink.Id())
log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", sink.ProtocolStr(), sink.Id(), sink.SourceId())
sink.SetState(SessionStateWait)
AddSinkToWaitingQueue(sink.SourceId(), sink)
} else {
log.Sugar.Debugf("发送播放事件 sink:%s-%v source:%s", sink.ProtocolStr(), sink.Id(), sink.SourceId())
source.AddEvent(SourceEventPlay, sink)
}
}
@@ -200,10 +209,14 @@ func (s *SinkImpl) Play(sink ISink, success func(), failure func(state utils.Hoo
f()
success()
}, func(response *http.Response, err error) {
log.Sugar.Errorf("Hook播放事件响应失败 err:%s sink:%s-%v source:%s", err.Error(), sink.ProtocolStr(), sink.Id(), sink.SourceId())
failure(utils.HookStateFailure)
})
if err != nil {
log.Sugar.Errorf("Hook播放事件发送失败 err:%s sink:%s-%v source:%s", err.Error(), sink.ProtocolStr(), sink.Id(), sink.SourceId())
failure(utils.HookStateFailure)
return
}