refactor: detect connection status in user daemon every 5 seconds (#765)

* refactor: detect connection status in user daemon every 5 seconds

* refactor: health check once immediately

* fix: check status after proxy
This commit is contained in:
naison
2026-03-23 22:51:24 +08:00
committed by GitHub
parent b3fc703baa
commit ed8a2e2c2d
7 changed files with 74 additions and 25 deletions
+3
View File
@@ -4,6 +4,7 @@ import (
"context"
"io"
"os"
"time"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
@@ -214,6 +215,8 @@ func (svr *Server) redirectConnectToSudoDaemon(req *rpc.ConnectRequest, resp rpc
if resp.Context().Err() != nil {
return resp.Context().Err()
}
connect.HealthCheckOnce(sshCtx, time.Second*5)
go connect.HealthPeriod(sshCtx, time.Second*5)
svr.connections = append(svr.connections, connect)
svr.currentConnectionID = connectionID
return resp.Send(&rpc.ConnectResponse{
+2
View File
@@ -4,6 +4,7 @@ import (
"context"
"io"
"os"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
@@ -129,6 +130,7 @@ func (svr *Server) Proxy(resp rpc.Daemon_ProxyServer) (err error) {
return err
}
svr.currentConnectionID = connectionID
options.HealthCheckOnce(cancel, time.Second*5)
return nil
}
+8 -15
View File
@@ -5,9 +5,7 @@ import (
"strconv"
"strings"
"sync"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
@@ -37,7 +35,7 @@ func (svr *Server) Status(ctx context.Context, req *rpc.StatusRequest) (*rpc.Sta
defer wg.Done()
result := genStatus(options)
var err error
result.ProxyList, result.SyncList, err = gen(ctx, options, options.Sync)
result.ProxyList, result.SyncList, err = gen(options, options.Sync)
if err != nil {
plog.G(context.Background()).Errorf("Error generating status: %v", err)
result.Status = StatusUnhealthy
@@ -62,7 +60,7 @@ func (svr *Server) Status(ctx context.Context, req *rpc.StatusRequest) (*rpc.Sta
options := svr.connections[i]
result := genStatus(options)
var err error
result.ProxyList, result.SyncList, err = gen(ctx, options, options.Sync)
result.ProxyList, result.SyncList, err = gen(options, options.Sync)
if err != nil {
plog.G(context.Background()).Errorf("Error generating status: %v", err)
result.Status = StatusUnhealthy
@@ -93,19 +91,13 @@ func genStatus(connect *handler.ConnectOptions) *rpc.Status {
return &info
}
func gen(ctx context.Context, connect *handler.ConnectOptions, sync *handler.SyncOptions) ([]*rpc.Proxy, []*rpc.Sync, error) {
timeoutCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
defer cancelFunc()
func gen(connect *handler.ConnectOptions, sync *handler.SyncOptions) ([]*rpc.Proxy, []*rpc.Sync, error) {
var proxyList []*rpc.Proxy
if connect != nil && connect.GetClientset() != nil {
mapInterface := connect.GetClientset().CoreV1().ConfigMaps(connect.Namespace)
configMap, err := mapInterface.Get(timeoutCtx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
status := connect.HealthStatus()
if configMap := status.ConfigMap(); configMap != nil {
var v = make([]*controlplane.Virtual, 0)
if str, ok := configMap.Data[config.KeyEnvoy]; ok {
if err = yaml.Unmarshal([]byte(str), &v); err != nil {
if err := yaml.Unmarshal([]byte(str), &v); err != nil {
return nil, nil, err
}
}
@@ -169,7 +161,8 @@ func gen(ctx context.Context, connect *handler.ConnectOptions, sync *handler.Syn
})
}
}
return proxyList, syncList, nil
return proxyList, syncList, status.LastError()
}
func useSecondPort(m map[int32]string) map[int32]int32 {
+3 -4
View File
@@ -76,10 +76,6 @@ func (c *ConnectOptions) Cleanup(logCtx context.Context) {
}
}
} else {
if c.cancel != nil {
c.cancel()
}
for _, function := range c.getRolloutFunc() {
if function != nil {
if err := function(); err != nil {
@@ -91,6 +87,9 @@ func (c *ConnectOptions) Cleanup(logCtx context.Context) {
plog.G(logCtx).Debugf("Clearing DNS settings")
c.dnsConfig.CancelDNS()
}
if c.cancel != nil {
c.cancel()
}
}
})
}
+3 -1
View File
@@ -69,7 +69,8 @@ type ConnectOptions struct {
Lock *sync.Mutex
Image string
ImagePullSecretName string
Request *rpc.ConnectRequest `json:"Request,omitempty"`
// for reload from ~/.kubevpn/daemon/db
Request *rpc.ConnectRequest `json:"Request,omitempty"`
ctx context.Context
cancel context.CancelFunc
@@ -91,6 +92,7 @@ type ConnectOptions struct {
once sync.Once
tunName string
proxyWorkloads ProxyList
healthStatus HealthStatus
Sync *SyncOptions
}
+6 -5
View File
@@ -223,7 +223,7 @@ func (u *sshUt) healthChecker(t *testing.T, endpoint string, header map[string]s
req.Header.Add(k, v)
}
client := &http.Client{Timeout: time.Second * 1}
client := &http.Client{Timeout: time.Second * 2}
err = retry.OnError(
wait.Backoff{Duration: time.Second, Factor: 1, Jitter: 0, Steps: 120},
func(err error) bool { return err != nil },
@@ -231,14 +231,14 @@ func (u *sshUt) healthChecker(t *testing.T, endpoint string, header map[string]s
var resp *http.Response
resp, err = client.Do(req)
if err != nil {
t.Logf("%s failed to do health check endpoint: %s: %v", time.Now().Format(time.DateTime), endpoint, err)
t.Logf("%s failed to do health check endpoint %s with header %s: %v", time.Now().Format(time.DateTime), endpoint, header, err)
return err
}
if resp.StatusCode != 200 {
if resp.Body != nil {
defer resp.Body.Close()
all, _ := io.ReadAll(resp.Body)
return fmt.Errorf("status code is %s, conetent: %v", resp.Status, string(all))
return fmt.Errorf("status code is %s, content: %v", resp.Status, string(all))
}
return fmt.Errorf("status code is %s", resp.Status)
}
@@ -257,8 +257,9 @@ func (u *sshUt) healthChecker(t *testing.T, endpoint string, header map[string]s
},
)
if err != nil {
t.Log(fmt.Sprintf("%s:%d", file, line), err)
u.kubectl(t)
t.Fatal(fmt.Sprintf("%s:%d", file, line), err)
t.FailNow()
}
}
@@ -867,7 +868,7 @@ func (u *sshUt) kubectl(t *testing.T) {
cmdGetSvc := exec.Command("kubectl", "get", "services", "-o", "wide", "-A")
cmdDescribePod := exec.Command("kubectl", "describe", "pods", "-A")
cmdDescribeSvc := exec.Command("kubectl", "describe", "services", "-A")
for _, cmd := range []*exec.Cmd{cmdGetPod, cmdDescribePod, cmdGetSvc, cmdDescribeSvc} {
for _, cmd := range []*exec.Cmd{cmdGetPod, cmdGetSvc, cmdDescribePod, cmdDescribeSvc} {
t.Logf("exec: %v", cmd.Args)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
+49
View File
@@ -0,0 +1,49 @@
package handler
import (
"context"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
)
type HealthStatus struct {
lastErr error
cm *corev1.ConfigMap
}
func (h *HealthStatus) ConfigMap() *corev1.ConfigMap {
return h.cm
}
func (h *HealthStatus) LastError() error {
return h.lastErr
}
func (c *ConnectOptions) HealthPeriod(ctx context.Context, duration time.Duration) {
for ctx.Err() == nil {
time.Sleep(duration)
c.HealthCheckOnce(ctx, duration)
}
}
func (c *ConnectOptions) HealthCheckOnce(ctx context.Context, timeout time.Duration) {
timeoutCtx, cancelFunc := context.WithTimeout(ctx, timeout)
defer cancelFunc()
mapInterface := c.GetClientset().CoreV1().ConfigMaps(c.Namespace)
configMap, err := mapInterface.Get(timeoutCtx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil {
c.healthStatus.lastErr = err
return
}
c.healthStatus.lastErr = nil
c.healthStatus.cm = configMap
}
func (c *ConnectOptions) HealthStatus() HealthStatus {
return c.healthStatus
}