mirror of
https://github.com/eolinker/apinto
synced 2026-04-22 16:07:04 +08:00
139 lines
3.2 KiB
Go
139 lines
3.2 KiB
Go
package monitor_manager
|
|
|
|
import (
|
|
"os"
|
|
"reflect"
|
|
"time"
|
|
|
|
"github.com/eolinker/apinto/entries/monitor-entry"
|
|
"github.com/eolinker/apinto/scope-manager"
|
|
"github.com/eolinker/eosc"
|
|
"github.com/eolinker/eosc/common/bean"
|
|
"github.com/eolinker/eosc/log"
|
|
)
|
|
|
|
var _ IManager = (*MonitorManager)(nil)
|
|
|
|
type IManager interface {
|
|
SetProxyOutput(id string, proxy scope_manager.IProxyOutput[monitor_entry.IOutput])
|
|
ConcurrencyAdd(apiID string, count int32)
|
|
RemoveCurrencyAPI(apiID string)
|
|
Output(id string, ps []monitor_entry.IPoint)
|
|
}
|
|
|
|
var monitorManager = NewMonitorManager()
|
|
|
|
func init() {
|
|
bean.Injection(&monitorManager)
|
|
}
|
|
|
|
type MonitorManager struct {
|
|
outputs eosc.Untyped[string, scope_manager.IProxyOutput[monitor_entry.IOutput]]
|
|
concurrentApis eosc.Untyped[string, *concurrency]
|
|
pointChan chan point
|
|
}
|
|
|
|
func (o *MonitorManager) RemoveCurrencyAPI(apiID string) {
|
|
v, ok := o.concurrentApis.Del(apiID)
|
|
if ok {
|
|
now := time.Now()
|
|
tags := map[string]string{
|
|
"api": apiID,
|
|
"cluster": os.Getenv("cluster_id"),
|
|
"node": os.Getenv("node_id"),
|
|
}
|
|
fields := map[string]interface{}{
|
|
"value": v.Get(),
|
|
}
|
|
p := monitor_entry.NewPoint("node", tags, fields, now)
|
|
for _, v := range o.outputs.List() {
|
|
o.proxyOutput(v, []monitor_entry.IPoint{p})
|
|
}
|
|
}
|
|
}
|
|
|
|
func NewMonitorManager() IManager {
|
|
o := &MonitorManager{
|
|
outputs: eosc.BuildUntyped[string, scope_manager.IProxyOutput[monitor_entry.IOutput]](),
|
|
concurrentApis: eosc.BuildUntyped[string, *concurrency](),
|
|
pointChan: make(chan point, 100),
|
|
}
|
|
go o.doLoop()
|
|
return o
|
|
}
|
|
|
|
type point struct {
|
|
id string
|
|
points []monitor_entry.IPoint
|
|
}
|
|
|
|
func (o *MonitorManager) SetProxyOutput(id string, proxy scope_manager.IProxyOutput[monitor_entry.IOutput]) {
|
|
o.outputs.Set(id, proxy)
|
|
}
|
|
|
|
func (o *MonitorManager) ConcurrencyAdd(id string, count int32) {
|
|
v, has := o.concurrentApis.Get(id)
|
|
if !has {
|
|
v = &concurrency{count: 0}
|
|
o.concurrentApis.Set(id, v)
|
|
}
|
|
v.Add(count)
|
|
}
|
|
|
|
func (o *MonitorManager) Output(id string, ps []monitor_entry.IPoint) {
|
|
o.pointChan <- point{
|
|
id: id,
|
|
points: ps,
|
|
}
|
|
}
|
|
|
|
func (o *MonitorManager) doLoop() {
|
|
ticket := time.NewTicker(1 * time.Second)
|
|
defer ticket.Stop()
|
|
for {
|
|
select {
|
|
case p, ok := <-o.pointChan:
|
|
if !ok {
|
|
return
|
|
}
|
|
v, has := o.outputs.Get(p.id)
|
|
if !has {
|
|
continue
|
|
}
|
|
o.proxyOutput(v, p.points)
|
|
case <-ticket.C:
|
|
ticket.Reset(1 * time.Second)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (o *MonitorManager) proxyOutput(v scope_manager.IProxyOutput[monitor_entry.IOutput], ps []monitor_entry.IPoint) {
|
|
for _, proxy := range v.List() {
|
|
out, ok := proxy.(monitor_entry.IOutput)
|
|
if !ok {
|
|
log.Error("error output type: ", reflect.TypeOf(proxy))
|
|
continue
|
|
}
|
|
out.Output(ps...)
|
|
}
|
|
}
|
|
|
|
func (o *MonitorManager) genNodePoints() []monitor_entry.IPoint {
|
|
now := time.Now()
|
|
cluster := os.Getenv("cluster_id")
|
|
node := os.Getenv("node_id")
|
|
points := make([]monitor_entry.IPoint, 0, o.concurrentApis.Count())
|
|
for key, value := range o.concurrentApis.All() {
|
|
tags := map[string]string{
|
|
"api": key,
|
|
"cluster": cluster,
|
|
"node": node,
|
|
}
|
|
fields := map[string]interface{}{
|
|
"value": value.Get(),
|
|
}
|
|
points = append(points, monitor_entry.NewPoint("node", tags, fields, now))
|
|
}
|
|
return points
|
|
}
|