优化非健康检查时,节点被down之后无法恢复的问题

This commit is contained in:
huangmengzhu
2023-03-28 18:40:24 +08:00
parent 2907d73861
commit c94153dd28
5 changed files with 35 additions and 13 deletions
+21 -4
View File
@@ -38,9 +38,27 @@ type IAppContainer interface {
}
type appContainer struct {
lock sync.RWMutex
nodes eosc.Untyped[string, INode]
apps map[string]*_AppAgent
lock sync.RWMutex
nodes eosc.Untyped[string, INode]
apps map[string]*_AppAgent
isHealthCheck int32
}
func (ac *appContainer) status(status NodeStatus) NodeStatus {
if atomic.LoadInt32(&ac.isHealthCheck) > 0 {
return status
}
return Running
}
func (ac *appContainer) SetHealthCheck(isHealthCheck bool) {
if isHealthCheck {
atomic.StoreInt32(&ac.isHealthCheck, 1)
} else {
atomic.StoreInt32(&ac.isHealthCheck, 0)
}
}
func NewAppContainer() IAppContainer {
@@ -78,7 +96,6 @@ func (ac *appContainer) Set(name string, infos []NodeInfo) IAppAgent {
ns := ac.create(infos)
ac.lock.RLock()
app, has := ac.apps[name]
ac.lock.RUnlock()
if has {
+10 -6
View File
@@ -30,13 +30,17 @@ type INode interface {
Down()
Leave()
}
type _INodeStatusCheck interface {
status(status NodeStatus) NodeStatus
}
type _BaseNode struct {
id string
ip string
port int
status NodeStatus
lastTime atomic.Pointer[time.Time]
status NodeStatus
lastTime atomic.Pointer[time.Time]
statusChecker _INodeStatusCheck
}
func (n *_BaseNode) Last() time.Time {
@@ -50,8 +54,8 @@ func (n *_BaseNode) Last() time.Time {
}
func newBaseNode(ip string, port int) *_BaseNode {
return &_BaseNode{ip: ip, port: port, status: Running}
func newBaseNode(ip string, port int, statusChecker _INodeStatusCheck) *_BaseNode {
return &_BaseNode{ip: ip, port: port, status: Running, statusChecker: statusChecker}
}
func (n *_BaseNode) ID() string {
@@ -67,7 +71,8 @@ func (n *_BaseNode) Port() int {
}
func (n *_BaseNode) Status() eocontext.NodeStatus {
return n.status
return n.statusChecker.status(n.status)
}
// Addr 返回节点地址
@@ -97,7 +102,6 @@ func (n *_BaseNode) Leave() {
type Attrs = eocontext.Attrs
type Node struct {
INode
label Attrs
}
+2 -1
View File
@@ -11,6 +11,7 @@ var (
type INodes interface {
Get(ip string, port int) INode
All() []INode
SetHealthCheck(isHealthCheck bool)
}
func (ac *appContainer) Get(ip string, port int) INode {
@@ -28,7 +29,7 @@ func (ac *appContainer) Get(ip string, port int) INode {
return node
}
ac.nodes.Set(id, newBaseNode(ip, port))
ac.nodes.Set(id, newBaseNode(ip, port, ac))
node, _ = ac.nodes.Get(id)
return node
}
+1 -1
View File
@@ -18,7 +18,7 @@ func TestConsulGetNodes(t *testing.T) {
newConsul := &consul{
clients: clients,
services: discovery.NewAppContainer("http"),
services: discovery.NewAppContainer(),
locker: sync.RWMutex{},
context: nil,
cancelFunc: nil,
+1 -1
View File
@@ -26,7 +26,7 @@ func NewHeathCheckHandler(nodes discovery.INodes, cfg *Config) *HeathCheckHandle
func (s *HeathCheckHandler) reset(cfg *Config) error {
s.healthOn = cfg.HealthOn
s.nodes.SetHealthCheck(s.healthOn)
if !cfg.HealthOn {
checker := s.checker
if checker != nil {