diff --git a/.gitignore b/.gitignore index 9bd28491..93f79906 100644 --- a/.gitignore +++ b/.gitignore @@ -18,5 +18,7 @@ # Build artifacts bin +/.bin/ +/kubevpn *.DS_Store diff --git a/README.md b/README.md index 43f804e0..984690e0 100644 --- a/README.md +++ b/README.md @@ -247,6 +247,36 @@ as `productpage` ***Disclaimer:*** This only works on the namespace where kubevpn-traffic-manager is deployed. Otherwise, use [Domain resolve](./README.md#domain-resolve) +### Local outbound proxy for nested VPNs + +If your operating system routes are controlled by another VPN client, route-based access may not work reliably even though `kubevpn connect` succeeds. In that case, use a local outbound proxy instead of depending on cluster CIDR routing. + +Start a standalone local SOCKS5 proxy: + +```shell +➜ ~ kubevpn proxy-out --listen-socks 127.0.0.1:1080 +Proxy hint: export ALL_PROXY=socks5h://127.0.0.1:1080 +Proxy scope: TCP traffic only. Use socks5h if you want proxy-side cluster DNS resolution. +``` + +Then access cluster Services, Pod IPs, or Service ClusterIPs through the proxy: + +```shell +➜ ~ curl --proxy socks5h://127.0.0.1:1080 http://productpage.default.svc.cluster.local:9080 +➜ ~ curl --proxy socks5h://127.0.0.1:1080 http://172.21.10.49:9080 +``` + +Or start the managed SOCKS5 proxy as part of `connect`: + +```shell +➜ ~ kubevpn connect --socks +Started managed local SOCKS5 proxy for connection 03dc50feb8c3 on 127.0.0.1:1080 +Proxy hint: export ALL_PROXY=socks5h://127.0.0.1:1080 +Proxy scope: TCP traffic only. Use socks5h if you want proxy-side cluster DNS resolution. +``` + +Use `kubevpn disconnect ` or `kubevpn quit` to stop the managed proxy. In this version, outbound proxy mode supports TCP traffic only. + ### Connect to multiple kubernetes cluster network already connected cluster `ccijorbccotmqodvr189g` @@ -756,4 +786,4 @@ If you want to debug this project on local PC. Please follow the steps bellow: [![JetBrains logo.](https://resources.jetbrains.com/storage/products/company/brand/logos/jetbrains.svg)](https://jb.gg/OpenSourceSupport) -### [Donate](https://kubevpn.dev/docs/donate) \ No newline at end of file +### [Donate](https://kubevpn.dev/docs/donate) diff --git a/README_ZH.md b/README_ZH.md index 69be1402..fc779cb8 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -234,6 +234,36 @@ reviews ClusterIP 172.21.8.24 9080/TCP 可以看到直接使用 service name 的方式,可以正常访问到集群资源。 +### 嵌套 VPN 场景下的本地出站代理 + +如果本机路由被另外一个 VPN 客户端接管,那么即使 `kubevpn connect` 成功,基于路由的访问也可能不稳定。这种情况下可以改用本地出站代理,而不是依赖集群 CIDR 路由。 + +启动一个独立的本地 SOCKS5 代理: + +```shell +➜ ~ kubevpn proxy-out --listen-socks 127.0.0.1:1080 +Proxy hint: export ALL_PROXY=socks5h://127.0.0.1:1080 +Proxy scope: TCP traffic only. Use socks5h if you want proxy-side cluster DNS resolution. +``` + +然后通过代理访问集群中的 Service、Pod IP 或 Service ClusterIP: + +```shell +➜ ~ curl --proxy socks5h://127.0.0.1:1080 http://productpage.default.svc.cluster.local:9080 +➜ ~ curl --proxy socks5h://127.0.0.1:1080 http://172.21.10.49:9080 +``` + +也可以在 `connect` 时自动启动托管的 SOCKS5 代理: + +```shell +➜ ~ kubevpn connect --socks +Started managed local SOCKS5 proxy for connection 03dc50feb8c3 on 127.0.0.1:1080 +Proxy hint: export ALL_PROXY=socks5h://127.0.0.1:1080 +Proxy scope: TCP traffic only. Use socks5h if you want proxy-side cluster DNS resolution. +``` + +可以通过 `kubevpn disconnect ` 或 `kubevpn quit` 停止这个托管代理。当前版本只支持 TCP 代理流量。 + ### 链接到多集群网络 可以看到已经链接到了一个集群 `ccijorbccotmqodvr189g` diff --git a/cmd/kubevpn/cmds/connect.go b/cmd/kubevpn/cmds/connect.go index abeefa92..a3c47250 100644 --- a/cmd/kubevpn/cmds/connect.go +++ b/cmd/kubevpn/cmds/connect.go @@ -2,7 +2,9 @@ package cmds import ( "context" + "errors" "fmt" + "io" "os" log "github.com/sirupsen/logrus" @@ -29,6 +31,8 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command { var extraRoute = &handler.ExtraRouteInfo{} var sshConf = &pkgssh.SshConfig{} var transferImage, foreground bool + var enableSocks bool + var socksListen string var imagePullSecretName string var managerNamespace string cmd := &cobra.Command{ @@ -46,6 +50,10 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command { # Connect to k8s cluster network kubevpn connect + # Connect and start a managed local SOCKS5 proxy for nested VPN cases + kubevpn connect --socks + curl --proxy socks5h://127.0.0.1:1080 http://productpage.default.svc.cluster.local:9080 + # Connect to api-server behind of bastion host or ssh jump host kubevpn connect --ssh-addr 192.168.1.100:22 --ssh-username root --ssh-keyfile ~/.ssh/ssh.pem @@ -115,13 +123,22 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command { if err != nil { return err } - err = util.PrintGRPCStream[rpc.ConnectResponse](cmd.Context(), resp) + connectionID, err := printConnectGRPCStream(cmd.Context(), resp, os.Stdout) if err != nil { if status.Code(err) == codes.Canceled { return nil } return err } + if enableSocks { + if connectionID == "" { + return fmt.Errorf("connected but missing connection ID for socks proxy startup") + } + if err := startManagedSocksProxy(connectionID, bytes, ns, socksListen); err != nil { + return err + } + printManagedSocksStarted(os.Stdout, connectionID, socksListen) + } if !foreground { _, _ = fmt.Fprintln(os.Stdout, config.Slogan) } else { @@ -137,6 +154,8 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command { } handler.AddCommonFlags(cmd.Flags(), &transferImage, &imagePullSecretName) cmd.Flags().BoolVar(&foreground, "foreground", false, "Hang up") + cmd.Flags().BoolVar(&enableSocks, "socks", false, "Start a managed local SOCKS5 proxy after connect for nested VPN or route-conflict cases") + cmd.Flags().StringVar(&socksListen, "socks-listen", "127.0.0.1:1080", "Listen address for the managed local SOCKS5 proxy") cmd.Flags().StringVar(&managerNamespace, "manager-namespace", "", "The namespace where the traffic manager is to be found. Only works in cluster mode (install kubevpn server by helm)") handler.AddExtraRoute(cmd.Flags(), extraRoute) @@ -144,7 +163,35 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command { return cmd } +func printConnectGRPCStream(ctx context.Context, clientStream grpc.ClientStream, out io.Writer) (string, error) { + go func() { + if ctx != nil { + <-ctx.Done() + _ = clientStream.SendMsg(&rpc.Cancel{}) + } + }() + + var connectionID string + for { + resp := new(rpc.ConnectResponse) + err := clientStream.RecvMsg(resp) + if errors.Is(err, io.EOF) { + return connectionID, nil + } + if err != nil { + return connectionID, err + } + if resp.ConnectionID != "" { + connectionID = resp.ConnectionID + } + if out != nil && resp.GetMessage() != "" { + _, _ = fmt.Fprint(out, resp.GetMessage()) + } + } +} + func disconnect(cli rpc.DaemonClient, bytes []byte, ns string, sshConf *pkgssh.SshConfig) error { + connectionID, _ := connectionIDFromKubeconfigBytes(bytes, ns) resp, err := cli.Disconnect(context.Background()) if err != nil { plog.G(context.Background()).Errorf("Disconnect error: %v", err) @@ -166,5 +213,24 @@ func disconnect(cli rpc.DaemonClient, bytes []byte, ns string, sshConf *pkgssh.S } return err } + if connectionID != "" { + _ = stopManagedProxy(connectionID) + printManagedSocksStopped(os.Stdout, connectionID) + } return nil } + +func connectionIDFromKubeconfigBytes(bytes []byte, ns string) (string, error) { + file, err := util.ConvertToTempKubeconfigFile(bytes, "") + if err != nil { + return "", err + } + defer os.Remove(file) + + factory := util.InitFactoryByPath(file, ns) + connect := &handler.ConnectOptions{} + if err := connect.InitClient(factory); err != nil { + return "", err + } + return util.GetConnectionID(context.Background(), connect.GetClientset().CoreV1().Namespaces(), connect.Namespace) +} diff --git a/cmd/kubevpn/cmds/connect_test.go b/cmd/kubevpn/cmds/connect_test.go new file mode 100644 index 00000000..400fd15a --- /dev/null +++ b/cmd/kubevpn/cmds/connect_test.go @@ -0,0 +1,97 @@ +package cmds + +import ( + "bytes" + "context" + "io" + "testing" + "time" + + "google.golang.org/grpc/metadata" + + "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" +) + +type fakeConnectClientStream struct { + ctx context.Context + responses []*rpc.ConnectResponse + recvIndex int + cancelSent int + waitOnDone bool +} + +func (f *fakeConnectClientStream) Header() (metadata.MD, error) { return nil, nil } +func (f *fakeConnectClientStream) Trailer() metadata.MD { return nil } +func (f *fakeConnectClientStream) CloseSend() error { return nil } +func (f *fakeConnectClientStream) Context() context.Context { + if f.ctx != nil { + return f.ctx + } + return context.Background() +} +func (f *fakeConnectClientStream) SendMsg(m interface{}) error { + if _, ok := m.(*rpc.Cancel); ok { + f.cancelSent++ + } + return nil +} +func (f *fakeConnectClientStream) RecvMsg(m interface{}) error { + if f.recvIndex >= len(f.responses) { + if f.waitOnDone && f.ctx != nil { + <-f.ctx.Done() + time.Sleep(10 * time.Millisecond) + } + return io.EOF + } + resp, ok := m.(*rpc.ConnectResponse) + if !ok { + return io.ErrUnexpectedEOF + } + *resp = *f.responses[f.recvIndex] + f.recvIndex++ + return nil +} + +func TestPrintConnectGRPCStreamReturnsConnectionID(t *testing.T) { + stream := &fakeConnectClientStream{ + responses: []*rpc.ConnectResponse{ + {Message: "Starting connect\n"}, + {ConnectionID: "conn-123", Message: "Connected tunnel\n"}, + }, + } + var out bytes.Buffer + + connectionID, err := printConnectGRPCStream(context.Background(), stream, &out) + if err != nil { + t.Fatal(err) + } + if connectionID != "conn-123" { + t.Fatalf("connectionID = %q, want %q", connectionID, "conn-123") + } + if got := out.String(); got != "Starting connect\nConnected tunnel\n" { + t.Fatalf("unexpected output %q", got) + } +} + +func TestPrintConnectGRPCStreamSendsCancelOnContextDone(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + stream := &fakeConnectClientStream{ctx: ctx, waitOnDone: true} + + done := make(chan struct{}) + go func() { + defer close(done) + _, _ = printConnectGRPCStream(ctx, stream, io.Discard) + }() + + cancel() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("printConnectGRPCStream did not finish after cancel") + } + + if stream.cancelSent == 0 { + t.Fatal("expected cancel message to be sent to the grpc stream") + } +} diff --git a/cmd/kubevpn/cmds/disconnect.go b/cmd/kubevpn/cmds/disconnect.go index 4aa44251..b05b2888 100644 --- a/cmd/kubevpn/cmds/disconnect.go +++ b/cmd/kubevpn/cmds/disconnect.go @@ -76,6 +76,13 @@ func CmdDisconnect(f cmdutil.Factory) *cobra.Command { } return err } + if all { + _ = stopAllManagedProxies() + _, _ = fmt.Fprintln(os.Stdout, "Stopped all managed local SOCKS proxies") + } else if id != "" { + _ = stopManagedProxy(id) + printManagedSocksStopped(os.Stdout, id) + } _, _ = fmt.Fprint(os.Stdout, "Disconnect completed") return nil }, diff --git a/cmd/kubevpn/cmds/proxy_out.go b/cmd/kubevpn/cmds/proxy_out.go new file mode 100644 index 00000000..99503cb3 --- /dev/null +++ b/cmd/kubevpn/cmds/proxy_out.go @@ -0,0 +1,124 @@ +package cmds + +import ( + "fmt" + "io" + "os" + + "github.com/spf13/cobra" + cmdutil "k8s.io/kubectl/pkg/cmd/util" + "k8s.io/kubectl/pkg/util/i18n" + "k8s.io/kubectl/pkg/util/templates" + + "github.com/wencaiwulue/kubevpn/v2/pkg/localproxy" + plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" +) + +func CmdProxyOut(f cmdutil.Factory) *cobra.Command { + var socksListen string + var httpConnectListen string + + cmd := &cobra.Command{ + Use: "proxy-out", + Short: i18n.T("Expose cluster workloads through a local outbound proxy"), + Long: templates.LongDesc(i18n.T(` +Expose cluster workloads through a local outbound proxy. + +This command is designed for nested VPN cases where OS routes are owned by +another VPN client. Instead of depending on cluster CIDR routing, it resolves +cluster service hostnames and forwards TCP traffic through the Kubernetes API. +Use SOCKS5 with socks5h when you want the proxy to resolve cluster DNS names. +`)), + Example: templates.Examples(i18n.T(` + # Start a local SOCKS5 proxy and access a cluster Service through socks5h + kubevpn proxy-out --listen-socks 127.0.0.1:1080 + curl --proxy socks5h://127.0.0.1:1080 http://productpage.default.svc.cluster.local:9080 + + # Start both SOCKS5 and HTTP CONNECT listeners for TCP traffic + kubevpn proxy-out --listen-socks 127.0.0.1:1080 --listen-http-connect 127.0.0.1:3128 + + # Access a Pod IP or Service ClusterIP through the proxy + curl --proxy socks5h://127.0.0.1:1080 http://172.21.10.49:9080 + `)), + RunE: func(cmd *cobra.Command, args []string) error { + plog.InitLoggerForClient() + + if err := validateProxyOutListeners(socksListen, httpConnectListen); err != nil { + return err + } + + restConfig, err := f.ToRESTConfig() + if err != nil { + return err + } + clusterAPI, clientset, err := localproxy.NewClusterAPI(restConfig) + if err != nil { + return err + } + ns, _, err := f.ToRawKubeConfigLoader().Namespace() + if err != nil { + return err + } + + connector := &localproxy.ClusterConnector{ + Client: clusterAPI, + Forwarder: localproxy.NewPodDialer(restConfig, clientset), + RESTConfig: restConfig, + DefaultNamespace: ns, + } + server := &localproxy.Server{ + Connector: connector, + SOCKSListenAddr: socksListen, + HTTPConnectListen: httpConnectListen, + Stdout: os.Stdout, + Stderr: os.Stderr, + } + + if ip := localproxy.FirstNonLoopbackIPv4(); ip != "" { + _, _ = fmt.Fprintf(os.Stdout, "Local host IPv4 detected: %s\n", ip) + } + printProxyOutHints(os.Stdout, socksListen, httpConnectListen) + return server.ListenAndServe(cmd.Context()) + }, + } + + cmd.Flags().StringVar(&socksListen, "listen-socks", "127.0.0.1:1080", "Local SOCKS5 listen address") + cmd.Flags().StringVar(&httpConnectListen, "listen-http-connect", "", "Local HTTP CONNECT listen address") + return cmd +} + +func validateProxyOutListeners(socksListen, httpConnectListen string) error { + if socksListen == "" && httpConnectListen == "" { + return fmt.Errorf("at least one of --listen-socks or --listen-http-connect must be set") + } + return nil +} + +func printManagedSocksStarted(out io.Writer, connectionID, socksListen string) { + if out == nil { + return + } + _, _ = fmt.Fprintf(out, "Started managed local SOCKS5 proxy for connection %s on %s\n", connectionID, socksListen) + printProxyOutHints(out, socksListen, "") +} + +func printManagedSocksStopped(out io.Writer, connectionID string) { + if out == nil || connectionID == "" { + return + } + _, _ = fmt.Fprintf(out, "Stopped managed local SOCKS5 proxy for connection %s\n", connectionID) +} + +func printProxyOutHints(out io.Writer, socksListen, httpConnectListen string) { + if out == nil { + return + } + if socksListen != "" { + _, _ = fmt.Fprintf(out, "Proxy hint: export ALL_PROXY=socks5h://%s\n", socksListen) + } + if httpConnectListen != "" { + _, _ = fmt.Fprintf(out, "Proxy hint: export HTTP_PROXY=http://%s\n", httpConnectListen) + _, _ = fmt.Fprintf(out, "Proxy hint: export HTTPS_PROXY=http://%s\n", httpConnectListen) + } + _, _ = fmt.Fprintln(out, "Proxy scope: TCP traffic only. Use socks5h if you want proxy-side cluster DNS resolution.") +} diff --git a/cmd/kubevpn/cmds/proxy_out_manager.go b/cmd/kubevpn/cmds/proxy_out_manager.go new file mode 100644 index 00000000..23d7f9d2 --- /dev/null +++ b/cmd/kubevpn/cmds/proxy_out_manager.go @@ -0,0 +1,179 @@ +package cmds + +import ( + "encoding/json" + "fmt" + "os" + "os/exec" + "path/filepath" + "time" + + "github.com/wencaiwulue/kubevpn/v2/pkg/config" + "github.com/wencaiwulue/kubevpn/v2/pkg/util" +) + +type managedProxyInfo struct { + ConnectionID string `json:"connectionID"` + PID int `json:"pid"` + SocksAddress string `json:"socksAddress"` + KubeconfigPath string `json:"kubeconfigPath"` + Namespace string `json:"namespace"` + LogPath string `json:"logPath"` + StartedAt time.Time `json:"startedAt"` +} + +var execProxyOutCommand = exec.Command +var resolveCurrentExecutable = os.Executable +var proxyOutStateDirFunc = func() string { + return filepath.Join(filepath.Dir(config.GetDBPath()), "proxy-out") +} +var proxyOutLogPathFunc = func(connectionID string) string { + return filepath.Join(filepath.Dir(config.GetDaemonLogPath(false)), fmt.Sprintf("proxy-out-%s.log", connectionID)) +} + +func proxyOutStateDir() string { + return proxyOutStateDirFunc() +} + +func proxyOutStatePath(connectionID string) string { + return filepath.Join(proxyOutStateDir(), connectionID+".json") +} + +func proxyOutLogPath(connectionID string) string { + return proxyOutLogPathFunc(connectionID) +} + +func ensureProxyOutStateDir() error { + return os.MkdirAll(proxyOutStateDir(), 0755) +} + +func startManagedSocksProxy(connectionID string, kubeconfigBytes []byte, namespace, listenAddr string) error { + if connectionID == "" { + return fmt.Errorf("connection ID is required") + } + if listenAddr == "" { + listenAddr = "127.0.0.1:1080" + } + if err := ensureProxyOutStateDir(); err != nil { + return err + } + _ = stopManagedProxy(connectionID) + + kubeconfigPath := filepath.Join(config.GetTempPath(), fmt.Sprintf("proxy-out-%s.kubeconfig", connectionID)) + if _, err := util.ConvertToTempKubeconfigFile(kubeconfigBytes, kubeconfigPath); err != nil { + return err + } + + logPath := proxyOutLogPath(connectionID) + if err := os.MkdirAll(filepath.Dir(logPath), 0755); err != nil { + _ = os.Remove(kubeconfigPath) + return err + } + logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + _ = os.Remove(kubeconfigPath) + return err + } + defer logFile.Close() + + exe, err := resolveCurrentExecutable() + if err != nil { + return err + } + + args := []string{ + "--kubeconfig", kubeconfigPath, + "--namespace", namespace, + "proxy-out", + "--listen-socks", listenAddr, + } + cmd := execProxyOutCommand(exe, args...) + cmd.Stdout = logFile + cmd.Stderr = logFile + cmd.Stdin = nil + cmd.Env = os.Environ() + detachProcess(cmd) + + if err := cmd.Start(); err != nil { + _ = os.Remove(kubeconfigPath) + return err + } + + info := managedProxyInfo{ + ConnectionID: connectionID, + PID: cmd.Process.Pid, + SocksAddress: listenAddr, + KubeconfigPath: kubeconfigPath, + Namespace: namespace, + LogPath: logPath, + StartedAt: time.Now(), + } + if err := writeManagedProxyInfo(info); err != nil { + _ = cmd.Process.Kill() + _ = os.Remove(kubeconfigPath) + return err + } + return nil +} + +func writeManagedProxyInfo(info managedProxyInfo) error { + if err := ensureProxyOutStateDir(); err != nil { + return err + } + data, err := json.Marshal(info) + if err != nil { + return err + } + return os.WriteFile(proxyOutStatePath(info.ConnectionID), data, 0644) +} + +func readManagedProxyInfo(connectionID string) (*managedProxyInfo, error) { + data, err := os.ReadFile(proxyOutStatePath(connectionID)) + if err != nil { + return nil, err + } + var info managedProxyInfo + if err := json.Unmarshal(data, &info); err != nil { + return nil, err + } + return &info, nil +} + +func stopManagedProxy(connectionID string) error { + info, err := readManagedProxyInfo(connectionID) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + if info.PID > 0 { + process, err := os.FindProcess(info.PID) + if err == nil { + _ = process.Kill() + } + } + if info.KubeconfigPath != "" { + _ = os.Remove(info.KubeconfigPath) + } + _ = os.Remove(proxyOutStatePath(connectionID)) + return nil +} + +func stopAllManagedProxies() error { + entries, err := os.ReadDir(proxyOutStateDir()) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + for _, entry := range entries { + if entry.IsDir() || filepath.Ext(entry.Name()) != ".json" { + continue + } + connectionID := entry.Name()[:len(entry.Name())-len(".json")] + _ = stopManagedProxy(connectionID) + } + return nil +} diff --git a/cmd/kubevpn/cmds/proxy_out_manager_test.go b/cmd/kubevpn/cmds/proxy_out_manager_test.go new file mode 100644 index 00000000..00acdb20 --- /dev/null +++ b/cmd/kubevpn/cmds/proxy_out_manager_test.go @@ -0,0 +1,155 @@ +//go:build !windows + +package cmds + +import ( + "os" + "os/exec" + "path/filepath" + "testing" + + "github.com/wencaiwulue/kubevpn/v2/pkg/util" +) + +func TestManagedProxyInfoRoundTrip(t *testing.T) { + tempDir := t.TempDir() + oldStateDir := proxyOutStateDirFunc + oldLogPath := proxyOutLogPathFunc + proxyOutStateDirFunc = func() string { return tempDir } + proxyOutLogPathFunc = func(connectionID string) string { return filepath.Join(tempDir, connectionID+".log") } + defer func() { + proxyOutStateDirFunc = oldStateDir + proxyOutLogPathFunc = oldLogPath + }() + + info := managedProxyInfo{ + ConnectionID: "abc123", + PID: 12345, + SocksAddress: "127.0.0.1:1080", + Namespace: "kubevpn", + } + if err := writeManagedProxyInfo(info); err != nil { + t.Fatal(err) + } + got, err := readManagedProxyInfo("abc123") + if err != nil { + t.Fatal(err) + } + if got.ConnectionID != info.ConnectionID || got.PID != info.PID || got.SocksAddress != info.SocksAddress { + t.Fatalf("unexpected managed proxy info: %#v", got) + } +} + +func TestStopManagedProxyRemovesStateAndKubeconfig(t *testing.T) { + tempDir := t.TempDir() + oldStateDir := proxyOutStateDirFunc + oldLogPath := proxyOutLogPathFunc + proxyOutStateDirFunc = func() string { return tempDir } + proxyOutLogPathFunc = func(connectionID string) string { return filepath.Join(tempDir, connectionID+".log") } + defer func() { + proxyOutStateDirFunc = oldStateDir + proxyOutLogPathFunc = oldLogPath + }() + + kubeconfigPath := filepath.Join(tempDir, "proxy.kubeconfig") + if _, err := util.ConvertToTempKubeconfigFile([]byte(`{"apiVersion":"v1","kind":"Config","clusters":[],"contexts":[],"users":[]}`), kubeconfigPath); err != nil { + t.Fatal(err) + } + if err := writeManagedProxyInfo(managedProxyInfo{ + ConnectionID: "conn1", + PID: 999999, + KubeconfigPath: kubeconfigPath, + }); err != nil { + t.Fatal(err) + } + + if err := stopManagedProxy("conn1"); err != nil { + t.Fatal(err) + } + if _, err := os.Stat(proxyOutStatePath("conn1")); !os.IsNotExist(err) { + t.Fatalf("expected state file removed, got err=%v", err) + } + if _, err := os.Stat(kubeconfigPath); !os.IsNotExist(err) { + t.Fatalf("expected kubeconfig removed, got err=%v", err) + } +} + +func TestStartManagedSocksProxyWritesState(t *testing.T) { + tempDir := t.TempDir() + oldStateDir := proxyOutStateDirFunc + oldLogPath := proxyOutLogPathFunc + oldExec := execProxyOutCommand + oldResolve := resolveCurrentExecutable + proxyOutStateDirFunc = func() string { return tempDir } + proxyOutLogPathFunc = func(connectionID string) string { return filepath.Join(tempDir, connectionID+".log") } + resolveCurrentExecutable = func() (string, error) { return "/bin/sh", nil } + execProxyOutCommand = func(name string, args ...string) *exec.Cmd { + return exec.Command("/bin/sh", "-c", "sleep 30") + } + defer func() { + proxyOutStateDirFunc = oldStateDir + proxyOutLogPathFunc = oldLogPath + execProxyOutCommand = oldExec + resolveCurrentExecutable = oldResolve + }() + + if err := startManagedSocksProxy("conn2", []byte(`{"apiVersion":"v1","kind":"Config","clusters":[],"contexts":[],"users":[]}`), "kubevpn", "127.0.0.1:1089"); err != nil { + t.Fatal(err) + } + info, err := readManagedProxyInfo("conn2") + if err != nil { + t.Fatal(err) + } + if info.PID <= 0 || info.SocksAddress != "127.0.0.1:1089" { + t.Fatalf("unexpected managed proxy info: %#v", info) + } + _ = stopManagedProxy("conn2") +} + +func TestStartManagedSocksProxyReplacesPreviousState(t *testing.T) { + tempDir := t.TempDir() + oldStateDir := proxyOutStateDirFunc + oldLogPath := proxyOutLogPathFunc + oldExec := execProxyOutCommand + oldResolve := resolveCurrentExecutable + proxyOutStateDirFunc = func() string { return tempDir } + proxyOutLogPathFunc = func(connectionID string) string { return filepath.Join(tempDir, connectionID+".log") } + resolveCurrentExecutable = func() (string, error) { return "/bin/sh", nil } + execProxyOutCommand = func(name string, args ...string) *exec.Cmd { + return exec.Command("/bin/sh", "-c", "sleep 30") + } + defer func() { + proxyOutStateDirFunc = oldStateDir + proxyOutLogPathFunc = oldLogPath + execProxyOutCommand = oldExec + resolveCurrentExecutable = oldResolve + }() + + staleKubeconfig := filepath.Join(tempDir, "stale.kubeconfig") + if _, err := util.ConvertToTempKubeconfigFile([]byte(`{"apiVersion":"v1","kind":"Config","clusters":[],"contexts":[],"users":[]}`), staleKubeconfig); err != nil { + t.Fatal(err) + } + if err := writeManagedProxyInfo(managedProxyInfo{ + ConnectionID: "conn-restart", + PID: 999999, + KubeconfigPath: staleKubeconfig, + }); err != nil { + t.Fatal(err) + } + + if err := startManagedSocksProxy("conn-restart", []byte(`{"apiVersion":"v1","kind":"Config","clusters":[],"contexts":[],"users":[]}`), "kubevpn", "127.0.0.1:1090"); err != nil { + t.Fatal(err) + } + + if _, err := os.Stat(staleKubeconfig); !os.IsNotExist(err) { + t.Fatalf("expected stale kubeconfig removed, got err=%v", err) + } + info, err := readManagedProxyInfo("conn-restart") + if err != nil { + t.Fatal(err) + } + if info.PID <= 0 || info.SocksAddress != "127.0.0.1:1090" { + t.Fatalf("unexpected managed proxy info after restart: %#v", info) + } + _ = stopManagedProxy("conn-restart") +} diff --git a/cmd/kubevpn/cmds/proxy_out_manager_unix.go b/cmd/kubevpn/cmds/proxy_out_manager_unix.go new file mode 100644 index 00000000..0660524d --- /dev/null +++ b/cmd/kubevpn/cmds/proxy_out_manager_unix.go @@ -0,0 +1,12 @@ +//go:build !windows + +package cmds + +import ( + "os/exec" + "syscall" +) + +func detachProcess(cmd *exec.Cmd) { + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} +} diff --git a/cmd/kubevpn/cmds/proxy_out_manager_windows.go b/cmd/kubevpn/cmds/proxy_out_manager_windows.go new file mode 100644 index 00000000..c1462f8a --- /dev/null +++ b/cmd/kubevpn/cmds/proxy_out_manager_windows.go @@ -0,0 +1,7 @@ +//go:build windows + +package cmds + +import "os/exec" + +func detachProcess(cmd *exec.Cmd) {} diff --git a/cmd/kubevpn/cmds/proxy_out_test.go b/cmd/kubevpn/cmds/proxy_out_test.go new file mode 100644 index 00000000..0a4649bd --- /dev/null +++ b/cmd/kubevpn/cmds/proxy_out_test.go @@ -0,0 +1,37 @@ +package cmds + +import ( + "bytes" + "strings" + "testing" +) + +func TestValidateProxyOutListeners(t *testing.T) { + if err := validateProxyOutListeners("", ""); err == nil { + t.Fatal("expected validation error when no listeners are configured") + } + if err := validateProxyOutListeners("127.0.0.1:1080", ""); err != nil { + t.Fatalf("unexpected error for socks listener: %v", err) + } + if err := validateProxyOutListeners("", "127.0.0.1:3128"); err != nil { + t.Fatalf("unexpected error for http connect listener: %v", err) + } +} + +func TestPrintProxyOutHints(t *testing.T) { + var out bytes.Buffer + printProxyOutHints(&out, "127.0.0.1:1080", "127.0.0.1:3128") + + got := out.String() + for _, want := range []string{ + "ALL_PROXY=socks5h://127.0.0.1:1080", + "HTTP_PROXY=http://127.0.0.1:3128", + "HTTPS_PROXY=http://127.0.0.1:3128", + "TCP traffic only", + "socks5h", + } { + if !strings.Contains(got, want) { + t.Fatalf("expected output to contain %q, got %q", want, got) + } + } +} diff --git a/cmd/kubevpn/cmds/quit.go b/cmd/kubevpn/cmds/quit.go index c9f4fdf8..055877d5 100644 --- a/cmd/kubevpn/cmds/quit.go +++ b/cmd/kubevpn/cmds/quit.go @@ -31,6 +31,8 @@ func CmdQuit(f cmdutil.Factory) *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { _ = quit(cmd.Context(), true) _ = quit(cmd.Context(), false) + _ = stopAllManagedProxies() + _, _ = fmt.Fprintln(os.Stdout, "Stopped all managed local SOCKS proxies") util.CleanExtensionLib() _, _ = fmt.Fprint(os.Stdout, "Exited") return nil diff --git a/cmd/kubevpn/cmds/root.go b/cmd/kubevpn/cmds/root.go index f4ff4309..28c40b62 100644 --- a/cmd/kubevpn/cmds/root.go +++ b/cmd/kubevpn/cmds/root.go @@ -57,6 +57,7 @@ func NewKubeVPNCommand() *cobra.Command { CmdConnect(factory), CmdDisconnect(factory), CmdProxy(factory), + CmdProxyOut(factory), CmdLeave(factory), CmdSync(factory), CmdUnsync(factory), diff --git a/docs/en/LocalProxyDesign.md b/docs/en/LocalProxyDesign.md new file mode 100644 index 00000000..694944b3 --- /dev/null +++ b/docs/en/LocalProxyDesign.md @@ -0,0 +1,136 @@ +# Local Proxy Design For Nested VPNs + +## Summary + +`proxy-out` adds a local outbound proxy mode for environments where `kubevpn connect` succeeds but operating-system route ownership is controlled by another VPN client. Instead of relying on cluster CIDR routing, client applications send TCP traffic to a local proxy, and kubevpn forwards that traffic into the cluster through the Kubernetes API. + +This version ships: + +- a standalone `kubevpn proxy-out` command +- a managed SOCKS5 mode via `kubevpn connect --socks` +- SOCKS5 and optional HTTP CONNECT listeners +- remote resolution of cluster Service names when clients use `socks5h` +- support for Service ClusterIP and Pod IP TCP targets + +This version does not ship: + +- PAC files +- a DNS listener +- transparent interception +- `kubevpn status` integration for proxy listener state + +## Problem + +In nested VPN setups, another VPN client may own the host route table or DNS path. That breaks kubevpn's normal route-based access even when the tunnel itself is healthy. The most common symptoms are: + +- cluster IP traffic leaves through the outer tunnel instead of kubevpn +- split DNS settings do not resolve cluster names end to end +- local applications cannot reliably reach Services or Pod IPs without manual proxy support + +## Implemented Design + +### Data path + +For outbound proxy mode, the user-facing API is a local listener: + +```text +local app + -> SOCKS5 or HTTP CONNECT + -> kubevpn proxy-out server + -> Kubernetes API port-forward to a resolved Pod endpoint + -> cluster destination +``` + +### Name and target resolution + +- Service hostnames such as `svc`, `svc.ns`, `svc.ns.svc.cluster.local` are resolved inside kubevpn +- Service ClusterIP targets are mapped back to a Service, then to a ready endpoint Pod +- Pod IP targets dial the matching running Pod directly +- short names use the active kubeconfig namespace as the default namespace + +### Lifecycle + +- `kubevpn proxy-out` runs as a foreground local proxy process +- `kubevpn connect --socks` starts a managed background SOCKS5 proxy bound to the connection ID +- `kubevpn disconnect `, `kubevpn disconnect --all`, and `kubevpn quit` stop managed proxies +- managed state is stored under the kubevpn local state directory; transient kubeconfig files are deleted on stop +- proxy logs are intentionally retained for troubleshooting + +## User Experience + +### Standalone mode + +```bash +kubevpn proxy-out --listen-socks 127.0.0.1:1080 +curl --proxy socks5h://127.0.0.1:1080 http://productpage.default.svc.cluster.local:9080 +``` + +### Managed mode + +```bash +kubevpn connect --socks +curl --proxy socks5h://127.0.0.1:1080 http://productpage.default.svc.cluster.local:9080 +``` + +### Notes + +- default SOCKS5 listen address is `127.0.0.1:1080` +- HTTP CONNECT is optional and disabled by default +- proxy mode handles TCP traffic only in this version +- use `socks5h` when you want proxy-side cluster DNS resolution + +## Implementation Notes + +- Command entrypoints live in [/Users/tvorogme/projects/kubevpn/cmd/kubevpn/cmds/proxy_out.go](/Users/tvorogme/projects/kubevpn/cmd/kubevpn/cmds/proxy_out.go) and [/Users/tvorogme/projects/kubevpn/cmd/kubevpn/cmds/connect.go](/Users/tvorogme/projects/kubevpn/cmd/kubevpn/cmds/connect.go) +- Managed proxy state handling lives in [/Users/tvorogme/projects/kubevpn/cmd/kubevpn/cmds/proxy_out_manager.go](/Users/tvorogme/projects/kubevpn/cmd/kubevpn/cmds/proxy_out_manager.go) +- Local proxy implementation lives in [/Users/tvorogme/projects/kubevpn/pkg/localproxy](/Users/tvorogme/projects/kubevpn/pkg/localproxy) + +## Validation + +Recommended validation scenarios: + +1. Start `kubevpn proxy-out --listen-socks 127.0.0.1:1080` +2. Access a Service hostname through `socks5h` +3. Access a Service ClusterIP through `socks5h` +4. Start `kubevpn connect --socks` +5. Confirm `kubevpn disconnect` or `kubevpn quit` removes the managed proxy state and kubeconfig file + +### Milestone 1 + +- design doc +- CLI scaffolding +- daemon action scaffolding +- in-memory SOCKS5 server using current connection state + +### Milestone 2 + +- real resolver/dialer integration +- status output +- documentation and examples + +### Milestone 3 + +- HTTP CONNECT +- optional PAC file generation +- optional DNS helper listener + +## Why This Is The Right Feature + +Today kubevpn is strongest when it can own system routing. Nested VPN environments break that assumption. + +A local explicit proxy: + +- avoids fighting other VPN route owners +- gives users an app-level integration surface +- reuses kubevpn's existing transport and remote DNS +- solves the exact class of failures seen with `sing-box` packet tunnel on macOS + +## Next Implementation Slice + +The smallest useful next coding step is: + +1. add `kubevpn proxy-out` command and flags +2. add daemon action to attach proxy mode to an existing connection +3. implement TCP-only SOCKS5 CONNECT +4. implement hostname resolution through kubevpn remote DNS +5. verify `curl --proxy socks5h://127.0.0.1:1080` against a cluster service diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index f3ff0f25..8e712eab 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -1218,7 +1218,9 @@ func healthCheckPortForward(ctx context.Context, cancelFunc context.CancelFunc, } var healthChecker = func() error { - conn, err := net.Dial("tcp", fmt.Sprintf(":%s", localGvisorUDPPort)) + // Use loopback explicitly so the health check keeps working even when the + // default route is owned by another tunnel. + conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", localGvisorUDPPort)) if err != nil { return err } diff --git a/pkg/localproxy/cluster.go b/pkg/localproxy/cluster.go new file mode 100644 index 00000000..0403a9c0 --- /dev/null +++ b/pkg/localproxy/cluster.go @@ -0,0 +1,271 @@ +package localproxy + +import ( + "context" + "fmt" + "net" + "strings" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +type ClusterConnector struct { + Client ClusterAPI + Forwarder PodDialer + RESTConfig *rest.Config + DefaultNamespace string +} + +type resolvedTarget struct { + PodName string + Namespace string + PodPort int32 +} + +type ClusterAPI interface { + GetService(ctx context.Context, namespace, name string) (*corev1.Service, error) + GetEndpoints(ctx context.Context, namespace, name string) (*corev1.Endpoints, error) + ListServices(ctx context.Context) (*corev1.ServiceList, error) + ListPodsByIP(ctx context.Context, ip string) (*corev1.PodList, error) +} + +type PodDialer interface { + DialPod(ctx context.Context, namespace, podName string, port int32) (net.Conn, error) +} + +type kubeClusterAPI struct { + clientset kubernetes.Interface +} + +func NewClusterAPI(config *rest.Config) (ClusterAPI, *kubernetes.Clientset, error) { + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, nil, err + } + return &kubeClusterAPI{clientset: clientset}, clientset, nil +} + +func (k *kubeClusterAPI) GetService(ctx context.Context, namespace, name string) (*corev1.Service, error) { + return k.clientset.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{}) +} + +func (k *kubeClusterAPI) GetEndpoints(ctx context.Context, namespace, name string) (*corev1.Endpoints, error) { + return k.clientset.CoreV1().Endpoints(namespace).Get(ctx, name, metav1.GetOptions{}) +} + +func (k *kubeClusterAPI) ListServices(ctx context.Context) (*corev1.ServiceList, error) { + return k.clientset.CoreV1().Services("").List(ctx, metav1.ListOptions{}) +} + +func (k *kubeClusterAPI) ListPodsByIP(ctx context.Context, ip string) (*corev1.PodList, error) { + return k.clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("status.podIP", ip).String(), + }) +} + +func (c *ClusterConnector) Connect(ctx context.Context, host string, port int) (net.Conn, error) { + target, err := c.resolveTarget(ctx, host, port) + if err != nil { + return nil, err + } + if c.Forwarder == nil { + return nil, fmt.Errorf("pod dialer is required") + } + return c.Forwarder.DialPod(ctx, target.Namespace, target.PodName, target.PodPort) +} + +func (c *ClusterConnector) resolveTarget(ctx context.Context, host string, port int) (*resolvedTarget, error) { + if ip := net.ParseIP(host); ip != nil { + if target, err := c.resolveServiceIP(ctx, host, port); err == nil { + return target, nil + } + return c.resolvePodIP(ctx, host, port) + } + return c.resolveServiceHost(ctx, host, port) +} + +func (c *ClusterConnector) resolveServiceHost(ctx context.Context, host string, port int) (*resolvedTarget, error) { + svcName, ns, ok := parseServiceHost(host, c.DefaultNamespace) + if !ok { + return nil, fmt.Errorf("unsupported cluster host %q", host) + } + svc, err := c.Client.GetService(ctx, ns, svcName) + if err != nil { + return nil, err + } + return c.resolveServiceToPod(ctx, svc, port) +} + +func (c *ClusterConnector) resolveServiceIP(ctx context.Context, host string, port int) (*resolvedTarget, error) { + services, err := c.Client.ListServices(ctx) + if err != nil { + return nil, err + } + for i := range services.Items { + if services.Items[i].Spec.ClusterIP == host { + return c.resolveServiceToPod(ctx, &services.Items[i], port) + } + } + return nil, fmt.Errorf("no service found for cluster ip %s", host) +} + +func (c *ClusterConnector) resolveServiceToPod(ctx context.Context, svc *corev1.Service, port int) (*resolvedTarget, error) { + if svc == nil { + return nil, fmt.Errorf("service is required") + } + var servicePort *corev1.ServicePort + for i := range svc.Spec.Ports { + if int(svc.Spec.Ports[i].Port) == port { + servicePort = &svc.Spec.Ports[i] + break + } + } + if servicePort == nil { + if len(svc.Spec.Ports) == 1 { + servicePort = &svc.Spec.Ports[0] + } else { + return nil, fmt.Errorf("service %s/%s has no port %d", svc.Namespace, svc.Name, port) + } + } + + endpoints, err := c.Client.GetEndpoints(ctx, svc.Namespace, svc.Name) + if err != nil { + return nil, err + } + var endpointPort *corev1.EndpointPort + var podName, podNS, podIP string + for _, subset := range endpoints.Subsets { + for i := range subset.Ports { + portRef := &subset.Ports[i] + if endpointPortMatches(servicePort, portRef, port) { + endpointPort = portRef + break + } + } + if endpointPort == nil && len(subset.Ports) == 1 { + endpointPort = &subset.Ports[0] + } + if endpointPort == nil { + continue + } + for _, addr := range subset.Addresses { + podIP = addr.IP + if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" { + podName = addr.TargetRef.Name + podNS = addr.TargetRef.Namespace + break + } + } + if podName != "" || podIP != "" { + break + } + } + if endpointPort == nil { + return nil, fmt.Errorf("service %s/%s has no ready endpoints for port %d", svc.Namespace, svc.Name, port) + } + if podName == "" { + if podIP == "" { + return nil, fmt.Errorf("service %s/%s has endpoints without pod references", svc.Namespace, svc.Name) + } + target, err := c.resolvePodIP(ctx, podIP, int(endpointPort.Port)) + if err != nil { + return nil, err + } + target.PodPort = endpointPort.Port + return target, nil + } + if podNS == "" { + podNS = svc.Namespace + } + return &resolvedTarget{ + PodName: podName, + Namespace: podNS, + PodPort: endpointPort.Port, + }, nil +} + +func endpointPortMatches(servicePort *corev1.ServicePort, endpointPort *corev1.EndpointPort, requestedPort int) bool { + if servicePort == nil || endpointPort == nil { + return false + } + if servicePort.Name != "" && endpointPort.Name != "" && servicePort.Name == endpointPort.Name { + return true + } + if endpointPort.Port == int32(requestedPort) { + return true + } + if servicePort.TargetPort.IntValue() > 0 && int32(servicePort.TargetPort.IntValue()) == endpointPort.Port { + return true + } + return false +} + +func (c *ClusterConnector) resolvePodIP(ctx context.Context, host string, port int) (*resolvedTarget, error) { + pods, err := c.Client.ListPodsByIP(ctx, host) + if err != nil { + return nil, err + } + for i := range pods.Items { + pod := &pods.Items[i] + if pod.Status.Phase != corev1.PodRunning || pod.DeletionTimestamp != nil { + continue + } + return &resolvedTarget{ + PodName: pod.Name, + Namespace: pod.Namespace, + PodPort: int32(port), + }, nil + } + return nil, fmt.Errorf("no running pod found for ip %s", host) +} + +func parseServiceHost(host, defaultNamespace string) (service string, namespace string, ok bool) { + host = strings.TrimSuffix(host, ".") + parts := strings.Split(host, ".") + switch { + case len(parts) >= 5 && parts[2] == "svc" && parts[3] == "cluster" && parts[4] == "local": + return parts[0], parts[1], true + case len(parts) >= 3 && parts[2] == "svc": + return parts[0], parts[1], true + case len(parts) == 2: + return parts[0], parts[1], true + case len(parts) == 1 && defaultNamespace != "": + return parts[0], defaultNamespace, true + default: + return "", "", false + } +} + +func FirstNonLoopbackIPv4() string { + ifaces, err := net.Interfaces() + if err != nil { + return "" + } + for _, iface := range ifaces { + if iface.Flags&net.FlagUp == 0 || iface.Flags&net.FlagLoopback != 0 { + continue + } + addrs, err := iface.Addrs() + if err != nil { + continue + } + for _, addr := range addrs { + var ip net.IP + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + } + if ip == nil || ip.IsLoopback() || ip.To4() == nil { + continue + } + return ip.String() + } + } + return "" +} diff --git a/pkg/localproxy/httpconnect.go b/pkg/localproxy/httpconnect.go new file mode 100644 index 00000000..035d6445 --- /dev/null +++ b/pkg/localproxy/httpconnect.go @@ -0,0 +1,60 @@ +package localproxy + +import ( + "bufio" + "context" + "fmt" + "net" + "net/http" + "strconv" +) + +func ServeHTTPConnect(ctx context.Context, ln net.Listener, connector Connector) error { + for { + conn, err := ln.Accept() + if err != nil { + select { + case <-ctx.Done(): + return nil + default: + return err + } + } + go handleHTTPConnectConn(ctx, conn, connector) + } +} + +func handleHTTPConnectConn(ctx context.Context, conn net.Conn, connector Connector) { + defer conn.Close() + + reader := bufio.NewReader(conn) + req, err := http.ReadRequest(reader) + if err != nil { + return + } + if req.Method != http.MethodConnect { + _, _ = fmt.Fprint(conn, "HTTP/1.1 405 Method Not Allowed\r\nConnection: close\r\n\r\n") + return + } + + host, portStr, err := net.SplitHostPort(req.Host) + if err != nil { + _, _ = fmt.Fprint(conn, "HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n") + return + } + port, err := strconv.Atoi(portStr) + if err != nil { + _, _ = fmt.Fprint(conn, "HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n") + return + } + + remote, err := connector.Connect(ctx, host, port) + if err != nil { + _, _ = fmt.Fprint(conn, "HTTP/1.1 502 Bad Gateway\r\nConnection: close\r\n\r\n") + return + } + defer remote.Close() + + _, _ = fmt.Fprint(conn, "HTTP/1.1 200 Connection Established\r\n\r\n") + relayConns(conn, remote) +} diff --git a/pkg/localproxy/portforward.go b/pkg/localproxy/portforward.go new file mode 100644 index 00000000..586850c8 --- /dev/null +++ b/pkg/localproxy/portforward.go @@ -0,0 +1,87 @@ +package localproxy + +import ( + "context" + "fmt" + "io" + "net" + "strconv" + "sync" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "github.com/wencaiwulue/kubevpn/v2/pkg/util" +) + +type kubePortForwarder struct { + config *rest.Config + restClient *rest.RESTClient +} + +func NewPodDialer(config *rest.Config, clientset *kubernetes.Clientset) PodDialer { + restClient, _ := clientset.CoreV1().RESTClient().(*rest.RESTClient) + return &kubePortForwarder{ + config: config, + restClient: restClient, + } +} + +func (k *kubePortForwarder) DialPod(ctx context.Context, namespace, podName string, port int32) (net.Conn, error) { + if k.restClient == nil { + return nil, fmt.Errorf("pod port-forward requires REST client") + } + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, err + } + localPort := listener.Addr().(*net.TCPAddr).Port + _ = listener.Close() + + stopChan := make(chan struct{}) + readyChan := make(chan struct{}) + errCh := make(chan error, 1) + go func() { + errCh <- util.PortForwardPod( + k.config, + k.restClient, + podName, + namespace, + []string{fmt.Sprintf("%d:%d", localPort, port)}, + readyChan, + stopChan, + io.Discard, + io.Discard, + ) + }() + + select { + case <-readyChan: + case err := <-errCh: + return nil, err + case <-ctx.Done(): + close(stopChan) + return nil, ctx.Err() + } + + dialer := net.Dialer{} + conn, err := dialer.DialContext(ctx, "tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(localPort))) + if err != nil { + close(stopChan) + return nil, err + } + return &forwardedConn{Conn: conn, stop: stopChan}, nil +} + +type forwardedConn struct { + net.Conn + stop chan struct{} + once sync.Once +} + +func (c *forwardedConn) Close() error { + c.once.Do(func() { + close(c.stop) + }) + return c.Conn.Close() +} diff --git a/pkg/localproxy/relay.go b/pkg/localproxy/relay.go new file mode 100644 index 00000000..1d2b2e6e --- /dev/null +++ b/pkg/localproxy/relay.go @@ -0,0 +1,23 @@ +package localproxy + +import ( + "io" + "net" + "sync" +) + +func relayConns(left, right net.Conn) { + var wg sync.WaitGroup + copyHalf := func(dst, src net.Conn) { + defer wg.Done() + _, _ = io.Copy(dst, src) + if tcp, ok := dst.(*net.TCPConn); ok { + _ = tcp.CloseWrite() + } + } + + wg.Add(2) + go copyHalf(left, right) + go copyHalf(right, left) + wg.Wait() +} diff --git a/pkg/localproxy/server.go b/pkg/localproxy/server.go new file mode 100644 index 00000000..b1ece36d --- /dev/null +++ b/pkg/localproxy/server.go @@ -0,0 +1,103 @@ +package localproxy + +import ( + "context" + "fmt" + "io" + "net" + "sync" +) + +type Connector interface { + Connect(ctx context.Context, host string, port int) (net.Conn, error) +} + +type Server struct { + Connector Connector + SOCKSListenAddr string + HTTPConnectListen string + Stdout io.Writer + Stderr io.Writer +} + +func (s *Server) ListenAndServe(ctx context.Context) error { + if s.Connector == nil { + return fmt.Errorf("connector is required") + } + + var listeners []net.Listener + closeAll := func() { + for _, ln := range listeners { + _ = ln.Close() + } + } + defer closeAll() + + type serveFn func(context.Context, net.Listener, Connector) error + var entries []struct { + name string + addr string + serve serveFn + } + if s.SOCKSListenAddr != "" { + entries = append(entries, struct { + name string + addr string + serve serveFn + }{name: "SOCKS5", addr: s.SOCKSListenAddr, serve: ServeSOCKS5}) + } + if s.HTTPConnectListen != "" { + entries = append(entries, struct { + name string + addr string + serve serveFn + }{name: "HTTP CONNECT", addr: s.HTTPConnectListen, serve: ServeHTTPConnect}) + } + + errCh := make(chan error, len(entries)) + var wg sync.WaitGroup + for _, entry := range entries { + ln, err := net.Listen("tcp", entry.addr) + if err != nil { + return err + } + listeners = append(listeners, ln) + if s.Stdout != nil { + _, _ = fmt.Fprintf(s.Stdout, "%s proxy listening on %s\n", entry.name, ln.Addr().String()) + } + wg.Add(1) + go func(serve serveFn, ln net.Listener) { + defer wg.Done() + if err := serve(ctx, ln, s.Connector); err != nil && !isListenerClosedError(err) { + errCh <- err + } + }(entry.serve, ln) + } + + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() + + select { + case err := <-errCh: + return err + case <-ctx.Done(): + closeAll() + <-done + return nil + case <-done: + return nil + } +} + +func isListenerClosedError(err error) bool { + if err == nil { + return false + } + if oe, ok := err.(*net.OpError); ok && oe.Err != nil { + return oe.Err.Error() == "use of closed network connection" + } + return err.Error() == "use of closed network connection" +} diff --git a/pkg/localproxy/server_test.go b/pkg/localproxy/server_test.go new file mode 100644 index 00000000..1086d87b --- /dev/null +++ b/pkg/localproxy/server_test.go @@ -0,0 +1,435 @@ +package localproxy + +import ( + "bufio" + "context" + "fmt" + "io" + "net" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "sync" + "testing" + "time" + + xproxy "golang.org/x/net/proxy" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +type fakeConnector struct { + addr string + + mu sync.Mutex + hosts []string + ports []int +} + +func (f *fakeConnector) Connect(ctx context.Context, host string, port int) (net.Conn, error) { + f.mu.Lock() + f.hosts = append(f.hosts, host) + f.ports = append(f.ports, port) + f.mu.Unlock() + + var d net.Dialer + return d.DialContext(ctx, "tcp", f.addr) +} + +func (f *fakeConnector) lastTarget() (string, int) { + f.mu.Lock() + defer f.mu.Unlock() + if len(f.hosts) == 0 { + return "", 0 + } + return f.hosts[len(f.hosts)-1], f.ports[len(f.ports)-1] +} + +type fakeClusterAPI struct { + services map[string]*corev1.Service + endpoints map[string]*corev1.Endpoints + pods []*corev1.Pod +} + +func (f *fakeClusterAPI) GetService(_ context.Context, namespace, name string) (*corev1.Service, error) { + svc, ok := f.services[namespace+"/"+name] + if !ok { + return nil, fmt.Errorf("service %s/%s not found", namespace, name) + } + return svc, nil +} + +func (f *fakeClusterAPI) GetEndpoints(_ context.Context, namespace, name string) (*corev1.Endpoints, error) { + ep, ok := f.endpoints[namespace+"/"+name] + if !ok { + return nil, fmt.Errorf("endpoints %s/%s not found", namespace, name) + } + return ep, nil +} + +func (f *fakeClusterAPI) ListServices(_ context.Context) (*corev1.ServiceList, error) { + list := &corev1.ServiceList{} + for _, svc := range f.services { + list.Items = append(list.Items, *svc) + } + return list, nil +} + +func (f *fakeClusterAPI) ListPodsByIP(_ context.Context, ip string) (*corev1.PodList, error) { + list := &corev1.PodList{} + for _, pod := range f.pods { + if pod.Status.PodIP == ip { + list.Items = append(list.Items, *pod) + } + } + return list, nil +} + +func TestParseServiceHost(t *testing.T) { + tests := []struct { + host string + defaultNS string + wantSvc string + wantNS string + wantOK bool + }{ + {host: "jupyter.utils.svc.cluster.local", defaultNS: "default", wantSvc: "jupyter", wantNS: "utils", wantOK: true}, + {host: "jupyter.utils.svc", defaultNS: "default", wantSvc: "jupyter", wantNS: "utils", wantOK: true}, + {host: "jupyter.utils", defaultNS: "default", wantSvc: "jupyter", wantNS: "utils", wantOK: true}, + {host: "jupyter", defaultNS: "utils", wantSvc: "jupyter", wantNS: "utils", wantOK: true}, + {host: "api.example.com", defaultNS: "utils", wantOK: false}, + } + for _, tt := range tests { + t.Run(tt.host, func(t *testing.T) { + gotSvc, gotNS, gotOK := parseServiceHost(tt.host, tt.defaultNS) + if gotSvc != tt.wantSvc || gotNS != tt.wantNS || gotOK != tt.wantOK { + t.Fatalf("parseServiceHost(%q) = (%q, %q, %v), want (%q, %q, %v)", tt.host, gotSvc, gotNS, gotOK, tt.wantSvc, tt.wantNS, tt.wantOK) + } + }) + } +} + +func TestResolveServiceToPod(t *testing.T) { + api := &fakeClusterAPI{ + services: map[string]*corev1.Service{ + "utils/jupyter": { + ObjectMeta: metav1.ObjectMeta{Name: "jupyter", Namespace: "utils"}, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.103.245.38", + Ports: []corev1.ServicePort{{Name: "http", Port: 8086}}, + }, + }, + }, + endpoints: map[string]*corev1.Endpoints{ + "utils/jupyter": { + ObjectMeta: metav1.ObjectMeta{Name: "jupyter", Namespace: "utils"}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.1.243", + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "jupyter-0", + Namespace: "utils", + }, + }}, + Ports: []corev1.EndpointPort{{Name: "http", Port: 8086}}, + }}, + }, + }, + } + + connector := &ClusterConnector{ + Client: api, + DefaultNamespace: "default", + } + + target, err := connector.resolveServiceHost(context.Background(), "jupyter.utils.svc.cluster.local", 8086) + if err != nil { + t.Fatal(err) + } + if target.PodName != "jupyter-0" || target.Namespace != "utils" || target.PodPort != 8086 { + t.Fatalf("unexpected target: %#v", target) + } +} + +func TestResolveTargetByClusterIP(t *testing.T) { + api := &fakeClusterAPI{ + services: map[string]*corev1.Service{ + "utils/jupyter": { + ObjectMeta: metav1.ObjectMeta{Name: "jupyter", Namespace: "utils"}, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.103.245.38", + Ports: []corev1.ServicePort{{ + Name: "http", + Port: 8086, + TargetPort: intstr.FromInt(18086), + }}, + }, + }, + }, + endpoints: map[string]*corev1.Endpoints{ + "utils/jupyter": { + ObjectMeta: metav1.ObjectMeta{Name: "jupyter", Namespace: "utils"}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{ + IP: "10.0.1.243", + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "jupyter-0", + Namespace: "utils", + }, + }}, + Ports: []corev1.EndpointPort{{Name: "http", Port: 18086}}, + }}, + }, + }, + } + + connector := &ClusterConnector{Client: api, DefaultNamespace: "default"} + target, err := connector.resolveTarget(context.Background(), "10.103.245.38", 8086) + if err != nil { + t.Fatal(err) + } + if target.PodName != "jupyter-0" || target.Namespace != "utils" || target.PodPort != 18086 { + t.Fatalf("unexpected clusterIP target: %#v", target) + } +} + +func TestResolveTargetByPodIP(t *testing.T) { + api := &fakeClusterAPI{ + pods: []*corev1.Pod{{ + ObjectMeta: metav1.ObjectMeta{Name: "jupyter-0", Namespace: "utils"}, + Status: corev1.PodStatus{Phase: corev1.PodRunning, PodIP: "10.0.1.243"}, + }}, + } + + connector := &ClusterConnector{Client: api, DefaultNamespace: "default"} + target, err := connector.resolveTarget(context.Background(), "10.0.1.243", 8086) + if err != nil { + t.Fatal(err) + } + if target.PodName != "jupyter-0" || target.Namespace != "utils" || target.PodPort != 8086 { + t.Fatalf("unexpected podIP target: %#v", target) + } +} + +func TestResolveTargetUnsupportedHost(t *testing.T) { + connector := &ClusterConnector{Client: &fakeClusterAPI{}, DefaultNamespace: "default"} + if _, err := connector.resolveTarget(context.Background(), "api.example.com", 443); err == nil { + t.Fatal("expected unsupported host resolution to fail") + } +} + +func TestResolveServiceToPodNoReadyEndpoints(t *testing.T) { + api := &fakeClusterAPI{ + services: map[string]*corev1.Service{ + "utils/jupyter": { + ObjectMeta: metav1.ObjectMeta{Name: "jupyter", Namespace: "utils"}, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.103.245.38", + Ports: []corev1.ServicePort{{Name: "http", Port: 8086}}, + }, + }, + }, + endpoints: map[string]*corev1.Endpoints{ + "utils/jupyter": { + ObjectMeta: metav1.ObjectMeta{Name: "jupyter", Namespace: "utils"}, + }, + }, + } + + connector := &ClusterConnector{Client: api, DefaultNamespace: "default"} + if _, err := connector.resolveServiceHost(context.Background(), "jupyter.utils.svc.cluster.local", 8086); err == nil { + t.Fatal("expected resolution to fail when service has no ready endpoints") + } +} + +func TestSOCKS5ProxyPreservesHostname(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/tree/notebooks" { + http.NotFound(w, r) + return + } + _, _ = io.WriteString(w, "ok") + })) + defer backend.Close() + + connector := &fakeConnector{addr: strings.TrimPrefix(backend.URL, "http://")} + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go func() { + _ = ServeSOCKS5(ctx, ln, connector) + }() + + dialer, err := xproxy.SOCKS5("tcp", ln.Addr().String(), nil, xproxy.Direct) + if err != nil { + t.Fatal(err) + } + httpClient := &http.Client{ + Timeout: 5 * time.Second, + Transport: &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + return dialer.Dial(network, addr) + }, + }, + } + + resp, err := httpClient.Get("http://jupyter.utils:8086/tree/notebooks") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + if string(body) != "ok" { + t.Fatalf("unexpected body %q", string(body)) + } + host, port := connector.lastTarget() + if host != "jupyter.utils" || port != 8086 { + t.Fatalf("proxy connected to %s:%d, want jupyter.utils:8086", host, port) + } +} + +func TestHTTPConnectProxyPreservesHostname(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/tree/notebooks" { + http.NotFound(w, r) + return + } + _, _ = io.WriteString(w, "ok") + })) + defer backend.Close() + + connector := &fakeConnector{addr: strings.TrimPrefix(backend.URL, "http://")} + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go func() { + _ = ServeHTTPConnect(ctx, ln, connector) + }() + + conn, err := net.Dial("tcp", ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + _, err = fmt.Fprintf(conn, "CONNECT jupyter.utils:8086 HTTP/1.1\r\nHost: jupyter.utils:8086\r\n\r\n") + if err != nil { + t.Fatal(err) + } + + reader := bufio.NewReader(conn) + resp, err := http.ReadResponse(reader, nil) + if err != nil { + t.Fatal(err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected connect status %s", resp.Status) + } + + _, err = fmt.Fprintf(conn, "GET /tree/notebooks HTTP/1.1\r\nHost: jupyter.utils:8086\r\nConnection: close\r\n\r\n") + if err != nil { + t.Fatal(err) + } + httpResp, err := http.ReadResponse(reader, nil) + if err != nil { + t.Fatal(err) + } + defer httpResp.Body.Close() + body, err := io.ReadAll(httpResp.Body) + if err != nil { + t.Fatal(err) + } + if string(body) != "ok" { + t.Fatalf("unexpected body %q", string(body)) + } + host, port := connector.lastTarget() + if host != "jupyter.utils" || port != 8086 { + t.Fatalf("proxy connected to %s:%d, want jupyter.utils:8086", host, port) + } +} + +func TestProxyOutE2ECase(t *testing.T) { + if testing.Short() { + t.Skip("skipping e2e in short mode") + } + if strings.TrimSpace(getenv("KUBEVPN_PROXY_OUT_E2E", "")) == "" { + t.Skip("set KUBEVPN_PROXY_OUT_E2E=1 to run real cluster proxy-out test") + } + + host := getenv("KUBEVPN_PROXY_OUT_E2E_HOST", "jupyter.utils") + path := getenv("KUBEVPN_PROXY_OUT_E2E_PATH", "/tree/notebooks") + port := getenv("KUBEVPN_PROXY_OUT_E2E_PORT", "8086") + + config, err := restConfigFromDefault() + if err != nil { + t.Fatal(err) + } + clusterAPI, clientset, err := NewClusterAPI(config) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + connector := &ClusterConnector{ + Client: clusterAPI, + Forwarder: NewPodDialer(config, clientset), + RESTConfig: config, + DefaultNamespace: "default", + } + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + go func() { + _ = ServeSOCKS5(ctx, ln, connector) + }() + + dialer, err := xproxy.SOCKS5("tcp", ln.Addr().String(), nil, xproxy.Direct) + if err != nil { + t.Fatal(err) + } + httpClient := &http.Client{ + Timeout: 15 * time.Second, + Transport: &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + return dialer.Dial(network, addr) + }, + }, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + } + + targetURL := (&url.URL{Scheme: "http", Host: net.JoinHostPort(host, port), Path: path}).String() + resp, err := httpClient.Get(targetURL) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusFound && resp.StatusCode != http.StatusUnauthorized { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("unexpected status %s body=%s", resp.Status, string(body)) + } +} diff --git a/pkg/localproxy/socks5.go b/pkg/localproxy/socks5.go new file mode 100644 index 00000000..192c6fa7 --- /dev/null +++ b/pkg/localproxy/socks5.go @@ -0,0 +1,132 @@ +package localproxy + +import ( + "bufio" + "context" + "encoding/binary" + "fmt" + "io" + "net" +) + +const ( + socks5Version = 5 + socks5AuthNone = 0 + socks5CmdConnect = 1 + socks5AddrTypeIPv4 = 1 + socks5AddrTypeFQDN = 3 + socks5AddrTypeIPv6 = 4 + socks5ReplySuccess = 0 + socks5ReplyGeneral = 1 + socks5ReplyNotAllow = 2 + socks5ReplyUnsup = 7 +) + +func ServeSOCKS5(ctx context.Context, ln net.Listener, connector Connector) error { + for { + conn, err := ln.Accept() + if err != nil { + select { + case <-ctx.Done(): + return nil + default: + return err + } + } + go handleSOCKS5Conn(ctx, conn, connector) + } +} + +func handleSOCKS5Conn(ctx context.Context, conn net.Conn, connector Connector) { + defer conn.Close() + + reader := bufio.NewReader(conn) + if err := socks5Handshake(reader, conn); err != nil { + return + } + host, port, err := socks5ReadRequest(reader) + if err != nil { + _ = writeSOCKS5Reply(conn, socks5ReplyGeneral) + return + } + + remote, err := connector.Connect(ctx, host, port) + if err != nil { + _ = writeSOCKS5Reply(conn, socks5ReplyGeneral) + return + } + defer remote.Close() + + if err := writeSOCKS5Reply(conn, socks5ReplySuccess); err != nil { + return + } + relayConns(conn, remote) +} + +func socks5Handshake(reader *bufio.Reader, conn net.Conn) error { + header := make([]byte, 2) + if _, err := io.ReadFull(reader, header); err != nil { + return err + } + if header[0] != socks5Version { + return fmt.Errorf("unsupported socks version %d", header[0]) + } + methods := make([]byte, int(header[1])) + if _, err := io.ReadFull(reader, methods); err != nil { + return err + } + _, err := conn.Write([]byte{socks5Version, socks5AuthNone}) + return err +} + +func socks5ReadRequest(reader *bufio.Reader) (string, int, error) { + header := make([]byte, 4) + if _, err := io.ReadFull(reader, header); err != nil { + return "", 0, err + } + if header[0] != socks5Version { + return "", 0, fmt.Errorf("unsupported socks request version %d", header[0]) + } + if header[1] != socks5CmdConnect { + return "", 0, fmt.Errorf("unsupported socks command %d", header[1]) + } + + var host string + switch header[3] { + case socks5AddrTypeIPv4: + addr := make([]byte, 4) + if _, err := io.ReadFull(reader, addr); err != nil { + return "", 0, err + } + host = net.IP(addr).String() + case socks5AddrTypeIPv6: + addr := make([]byte, 16) + if _, err := io.ReadFull(reader, addr); err != nil { + return "", 0, err + } + host = net.IP(addr).String() + case socks5AddrTypeFQDN: + length, err := reader.ReadByte() + if err != nil { + return "", 0, err + } + addr := make([]byte, int(length)) + if _, err := io.ReadFull(reader, addr); err != nil { + return "", 0, err + } + host = string(addr) + default: + return "", 0, fmt.Errorf("unsupported socks address type %d", header[3]) + } + + portBuf := make([]byte, 2) + if _, err := io.ReadFull(reader, portBuf); err != nil { + return "", 0, err + } + return host, int(binary.BigEndian.Uint16(portBuf)), nil +} + +func writeSOCKS5Reply(conn net.Conn, rep byte) error { + _, err := conn.Write([]byte{socks5Version, rep, 0, socks5AddrTypeIPv4, 0, 0, 0, 0, 0, 0}) + return err +} diff --git a/pkg/localproxy/test_helpers_test.go b/pkg/localproxy/test_helpers_test.go new file mode 100644 index 00000000..3e9f76ea --- /dev/null +++ b/pkg/localproxy/test_helpers_test.go @@ -0,0 +1,21 @@ +package localproxy + +import ( + "os" + + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +func getenv(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +func restConfigFromDefault() (*rest.Config, error) { + loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + configOverrides := &clientcmd.ConfigOverrides{} + return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides).ClientConfig() +} diff --git a/pkg/util/pod.go b/pkg/util/pod.go index 1843c69f..c94e3bf4 100644 --- a/pkg/util/pod.go +++ b/pkg/util/pod.go @@ -10,6 +10,7 @@ import ( "os" "strconv" "strings" + "sync" "text/tabwriter" "time" @@ -19,6 +20,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/httpstream" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericiooptions" @@ -145,6 +147,7 @@ func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, na return err } + suppressExpectedPortForwardCloseErrors() defer forwarder.Close() var errChan = make(chan error, 1) @@ -160,6 +163,25 @@ func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, na } } +var portForwardErrorHandlerOnce sync.Once + +func suppressExpectedPortForwardCloseErrors() { + portForwardErrorHandlerOnce.Do(func() { + prev := append([]utilruntime.ErrorHandler(nil), utilruntime.ErrorHandlers...) + utilruntime.ErrorHandlers = []utilruntime.ErrorHandler{ + func(ctx context.Context, err error, msg string, keysAndValues ...interface{}) { + if err != nil && strings.Contains(strings.ToLower(err.Error()), "error closing listener: close tcp") && + strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") { + return + } + for _, handler := range prev { + handler(ctx, err, msg, keysAndValues...) + } + }, + } + }) +} + func Shell(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, containerName, ns string, cmd []string) (string, error) { stdin, _, _ := term.StdStreams() buf := bytes.NewBuffer(nil)