support k8s discovery

This commit is contained in:
Liujian
2026-01-20 17:23:35 +08:00
parent 3f15557a27
commit 46d4781312
9 changed files with 480 additions and 22 deletions
+2
View File
@@ -7,6 +7,7 @@ import (
"github.com/eolinker/apinto/drivers/certs"
"github.com/eolinker/apinto/drivers/discovery/consul"
"github.com/eolinker/apinto/drivers/discovery/eureka"
"github.com/eolinker/apinto/drivers/discovery/kubernetes"
"github.com/eolinker/apinto/drivers/discovery/nacos"
"github.com/eolinker/apinto/drivers/discovery/static"
gm_certs "github.com/eolinker/apinto/drivers/gm-certs"
@@ -56,6 +57,7 @@ func driverRegister(extenderRegister eosc.IExtenderDriverRegister) {
consul.Register(extenderRegister)
eureka.Register(extenderRegister)
polaris.Register(extenderRegister)
kubernetes.Register(extenderRegister)
// 应用
app.Register(extenderRegister)
+5
View File
@@ -108,6 +108,11 @@ func Profession() []*eosc.ProfessionConfig {
Name: "polaris",
Label: "北极星服务发现",
Desc: "北极星服务发现",
}, {
Id: "eolinker.com:apinto:discovery_kubernetes",
Name: "kubernetes",
Label: "kubernetes服务发现",
Desc: "kubernetes服务发现",
},
},
Mod: eosc.ProfessionConfig_Worker,
+205
View File
@@ -0,0 +1,205 @@
package kubernetes
import (
"context"
"fmt"
"github.com/eolinker/apinto/discovery"
"github.com/eolinker/eosc/log"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"net/url"
)
type client struct {
client *kubernetes.Clientset
ctx context.Context
inner bool
name string
portName string
namespace string
}
func newClient(ctx context.Context, name string, cfg *AccessConfig) (*client, error) {
var restCfg *rest.Config
if cfg.Inner {
var err error
restCfg, err = rest.InClusterConfig()
if err != nil {
return nil, err
}
} else {
if len(cfg.Address) < 1 {
return nil, fmt.Errorf("address is nil")
}
u, err := url.Parse(cfg.Address[0])
if err != nil {
return nil, err
}
if u.Scheme != "http" && u.Scheme != "https" {
u.Scheme = "https"
}
restCfg = &rest.Config{
Host: u.String(),
Username: cfg.Username,
Password: cfg.Password,
BearerToken: cfg.BearerToken,
TLSClientConfig: rest.TLSClientConfig{
Insecure: true,
},
}
}
c, err := kubernetes.NewForConfig(restCfg)
if err != nil {
return nil, err
}
return &client{
client: c,
name: name,
ctx: ctx,
namespace: cfg.Namespace,
inner: cfg.Inner,
portName: cfg.PortName,
}, nil
}
// GetNodeList 从Client获取对应服务的节点列表
func (c *client) GetNodeList(serviceName string) ([]discovery.NodeInfo, error) {
if c.inner {
return c.getInternalAccess(serviceName)
}
return c.getExternalAccess(serviceName)
}
func (c *client) getInternalAccess(serviceName string) ([]discovery.NodeInfo, error) {
endpoints, err := c.client.CoreV1().Endpoints(c.namespace).Get(c.ctx, serviceName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("get endpoints fail. err: %w", err)
}
if len(endpoints.Subsets) == 0 {
return nil, fmt.Errorf("no available subset")
}
nodes := make([]discovery.NodeInfo, 0, 10)
for _, subset := range endpoints.Subsets {
for _, addr := range subset.Addresses {
port := -1
for _, p := range subset.Ports {
// 筛选逻辑:匹配名称或端口号
if p.Name == c.portName {
port = int(p.Port)
break
}
}
// 如果该 Pod 无匹配端口,提示
if len(subset.Ports) > 0 && port < 0 {
port = int(subset.Ports[0].Port)
}
if port < 0 {
log.Errorf("no available port, service: %s, subset: %s", serviceName, subset.String())
continue
}
log.DebugF("service: %s, subset: %s, port: %d", serviceName, subset.String(), port)
nodes = append(nodes, discovery.NodeInfo{
Ip: addr.IP,
Port: port,
})
}
}
return nodes, nil
}
func (c *client) getExternalAccess(serviceName string) ([]discovery.NodeInfo, error) {
// 获取 Nodes(用于 NodePort
availableNodes, err := c.client.CoreV1().Nodes().List(c.ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("get nodes fail. err: %w", err)
}
service, err := c.client.CoreV1().Services(c.namespace).Get(c.ctx, serviceName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("get service fail.service: %s, err: %w", serviceName, err)
}
switch service.Spec.Type {
case v1.ServiceTypeLoadBalancer:
if len(service.Status.LoadBalancer.Ingress) <= 0 {
return nil, fmt.Errorf("no available ingress")
}
nodes := make([]discovery.NodeInfo, 0, 10)
for _, ingress := range service.Status.LoadBalancer.Ingress {
ip := ingress.IP
if ip == "" {
ip = ingress.Hostname
}
port := 0
for _, p := range service.Spec.Ports {
// 筛选逻辑:匹配名称或端口号
if p.Name == c.portName {
port = int(p.Port)
break
}
}
if port == 0 {
log.Errorf("no available port for service: %s", serviceName)
continue
}
log.DebugF("service: %s, ip: %s, port: %d", serviceName, ip, port)
nodes = append(nodes, discovery.NodeInfo{
Ip: ip,
Port: port,
})
}
return nodes, nil
case v1.ServiceTypeNodePort:
port := 0
for _, p := range service.Spec.Ports {
if port == 0 {
if p.NodePort != 0 {
port = int(p.NodePort)
continue
}
}
// 筛选逻辑:匹配名称或端口号
if p.Name == c.portName {
if p.NodePort != 0 {
port = int(p.NodePort)
}
}
}
if port == 0 {
return nil, fmt.Errorf("no available port for service: %s", serviceName)
}
nodes := make([]discovery.NodeInfo, 0, 10)
for _, node := range availableNodes.Items {
for _, addr := range node.Status.Addresses {
switch addr.Type {
case v1.NodeInternalIP:
log.DebugF("node: %s, ip: %s, port: %d", node.Name, addr.Address, port)
nodes = append(nodes, discovery.NodeInfo{
Ip: addr.Address,
Port: port,
})
case v1.NodeExternalIP:
log.DebugF("node: %s, ip: %s, port: %d", node.Name, addr.Address, port)
nodes = append(nodes, discovery.NodeInfo{
Ip: addr.Address,
Port: port,
})
}
}
}
return nodes, nil
case v1.ServiceTypeClusterIP:
return nil, fmt.Errorf("the %s type is for internal access only. To enable external access, change the type to NodePort/LoadBalancer or use Ingress", service.Spec.Type)
default:
return nil, fmt.Errorf("unsupported service type: %s", service.Spec.Type)
}
}
func (c *client) Close() error {
return nil
}
@@ -0,0 +1,29 @@
package kubernetes
import (
"context"
"os"
"testing"
)
func TestClient(t *testing.T) {
cfg := &AccessConfig{
Address: []string{"https://172.18.189.43:6443"},
Namespace: "apinto",
Inner: false,
BearerToken: os.Getenv("KUBERNETES_TOKEN"),
Username: "",
Password: "",
}
c, err := newClient(context.Background(), "apinto-gateway-stateful", cfg)
if err != nil {
t.Fatal(err)
}
list, err := c.GetNodeList("apinto-gateway")
if err != nil {
t.Fatal(err)
}
for _, v := range list {
t.Log(v)
}
}
+17
View File
@@ -0,0 +1,17 @@
package kubernetes
// Config nacos驱动配置
type Config struct {
Config *AccessConfig `json:"config" label:"配置信息"`
}
// AccessConfig 接入地址配置
type AccessConfig struct {
Inner bool `json:"inner" label:"是否内部接入"`
Address []string `json:"address" label:"接入地址"`
Namespace string `json:"namespace" label:"命名空间"`
Username string `json:"username" label:"用户名"`
Password string `json:"password" label:"密码"`
BearerToken string `json:"breaker_token" label:"Bearer Token"`
PortName string `json:"port_name" label:"端口名称"`
}
+37
View File
@@ -0,0 +1,37 @@
package kubernetes
import (
"context"
"fmt"
"sync"
"github.com/eolinker/apinto/drivers"
"github.com/eolinker/apinto/discovery"
"github.com/eolinker/eosc"
)
const (
driverName = "kubernetes"
)
// Create 创建nacos驱动实例
func Create(id, name string, cfg *Config, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) {
if cfg == nil || cfg.Config == nil {
return nil, fmt.Errorf("config is nil")
}
ctx, cancelFunc := context.WithCancel(context.Background())
c, err := newClient(ctx, name, cfg.Config)
if err != nil {
return nil, fmt.Errorf("create nacos client fail. err: %w", err)
}
return &executor{
WorkerBase: drivers.Worker(id, name),
ctx: ctx,
cancelFunc: cancelFunc,
client: c,
services: discovery.NewAppContainer(),
locker: sync.RWMutex{},
}, nil
}
+16
View File
@@ -0,0 +1,16 @@
package kubernetes
import (
"github.com/eolinker/apinto/drivers"
"github.com/eolinker/eosc"
)
var name = "discovery_kubernetes"
// Register 注册nacos驱动工厂
func Register(register eosc.IExtenderDriverRegister) {
register.RegisterExtenderDriver(name, NewFactory())
}
func NewFactory() eosc.IExtenderDriverFactory {
return drivers.NewFactory[Config](Create)
}
+126
View File
@@ -0,0 +1,126 @@
package kubernetes
import (
"context"
"fmt"
"sync"
"time"
"github.com/eolinker/apinto/discovery"
"github.com/eolinker/apinto/drivers"
"github.com/eolinker/eosc/utils/config"
"github.com/eolinker/eosc/log"
"github.com/eolinker/eosc"
)
var _ discovery.IDiscovery = (*executor)(nil)
var _ eosc.IWorkerDestroy = (*executor)(nil)
type executor struct {
drivers.WorkerBase
client *client
services discovery.IAppContainer
ctx context.Context
cancelFunc context.CancelFunc
locker sync.RWMutex
}
func (n *executor) Destroy() error {
n.cancelFunc()
if n.client != nil {
n.client = nil
}
return nil
}
// GetApp 获取服务发现中目标服务的app
func (n *executor) GetApp(serviceName string) (discovery.IApp, error) {
n.locker.RLock()
app, ok := n.services.GetApp(serviceName)
n.locker.RUnlock()
if ok {
return app.Agent(), nil
}
n.locker.Lock()
defer n.locker.Unlock()
app, ok = n.services.GetApp(serviceName)
if ok {
return app.Agent(), nil
}
ns, err := n.client.GetNodeList(serviceName)
if err != nil {
log.Warnf("%s get %s node list error: %v", driverName, serviceName, err)
}
app = n.services.Set(serviceName, ns)
return app.Agent(), nil
}
// CheckSkill 检查目标能力是否存在
func (n *executor) CheckSkill(skill string) bool {
return discovery.CheckSkill(skill)
}
// Start 开始服务发现
func (n *executor) Start() error {
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
EXIT:
for {
log.Debug("continue go")
select {
case <-n.ctx.Done():
break EXIT
case <-ticker.C:
{
log.Debug("continue kubernetes ticket")
//获取现有服务app的服务名名称列表,并从注册中心获取目标服务名的节点列表
keys := n.services.Keys()
for _, serviceName := range keys {
res, err := n.client.GetNodeList(serviceName)
if err != nil {
log.Warnf("executor %s:%v for service %s,err:%v", n.Name(), discovery.ErrDiscoveryDown, serviceName, err)
}
//更新目标服务的节点列表
n.services.Set(serviceName, res)
}
}
}
}
}()
return nil
}
// Reset 重置nacos实例配置
func (n *executor) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker) error {
cfg, ok := conf.(*Config)
if !ok {
return fmt.Errorf("need %s,now %s", config.TypeNameOf((*Config)(nil)), config.TypeNameOf(conf))
}
if cfg.Config == nil {
return fmt.Errorf("config is nil")
}
nClient, err := newClient(n.ctx, n.Name(), cfg.Config)
if err != nil {
return fmt.Errorf("create executor client fail. err: %w", err)
}
if n.client != nil {
n.client.Close()
}
n.client = nClient
return nil
}
// Stop 停止服务发现
func (n *executor) Stop() error {
n.cancelFunc()
return n.Destroy()
}
+43 -22
View File
@@ -1,8 +1,6 @@
module github.com/eolinker/apinto
go 1.24.0
toolchain go1.24.7
go 1.25.0
require (
github.com/IBM/sarama v1.45.2
@@ -14,8 +12,8 @@ require (
github.com/eolinker/eosc v0.21.4
github.com/fasthttp/websocket v1.5.0
github.com/fullstorydev/grpcurl v1.8.7
github.com/google/uuid v1.4.0
github.com/gorilla/websocket v1.4.2
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674
github.com/hashicorp/consul/api v1.9.1
github.com/influxdata/influxdb-client-go/v2 v2.12.1
github.com/jhump/protoreflect v1.16.0
@@ -30,16 +28,17 @@ require (
github.com/robertkrimen/otto v0.0.0-20211024170158-b87d35c0b86f
github.com/sashabaranov/go-openai v1.40.5
github.com/soheilhy/cmux v0.1.5
github.com/stretchr/testify v1.10.0
github.com/stretchr/testify v1.11.1
github.com/traefik/yaegi v0.16.1
github.com/urfave/cli/v2 v2.23.4
github.com/valyala/fasthttp v1.66.0
golang.org/x/crypto v0.42.0
golang.org/x/net v0.44.0
golang.org/x/oauth2 v0.14.0
golang.org/x/crypto v0.44.0
golang.org/x/net v0.47.0
golang.org/x/oauth2 v0.30.0
google.golang.org/api v0.149.0
google.golang.org/grpc v1.61.0
google.golang.org/protobuf v1.34.2
google.golang.org/protobuf v1.36.8
k8s.io/client-go v0.35.0
)
require (
@@ -48,11 +47,12 @@ require (
github.com/tidwall/sjson v1.2.5
github.com/tjfoc/gmsm v1.4.1
github.com/xdg-go/scram v1.1.0
k8s.io/api v0.35.0
k8s.io/apimachinery v0.35.0
)
require (
cloud.google.com/go/compute v1.23.3 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
contrib.go.opencensus.io/exporter/prometheus v0.4.1 // indirect
github.com/RoaringBitmap/roaring v0.7.1 // indirect
github.com/Workiva/go-datastructures v1.0.52 // indirect
@@ -65,11 +65,16 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dlclark/regexp2 v1.10.0 // indirect
github.com/dubbogo/triple v1.1.8 // indirect
github.com/emicklei/go-restful/v3 v3.12.2 // indirect
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
github.com/go-kit/log v0.1.0 // indirect
github.com/go-logfmt/logfmt v0.5.0 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-playground/validator/v10 v10.11.0 // indirect
@@ -77,6 +82,7 @@ require (
github.com/golang-jwt/jwt/v4 v4.4.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/google/gnostic-models v0.7.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
@@ -84,6 +90,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jinzhu/copier v0.3.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/k0kubun/pp v3.0.1+incompatible // indirect
github.com/knadh/koanf v1.4.1 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
@@ -94,14 +101,16 @@ require (
github.com/lestrrat-go/option v1.0.1 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml v1.7.0 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b // indirect
@@ -111,18 +120,30 @@ require (
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/stringprep v1.0.2 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel/metric v1.20.0 // indirect
golang.org/x/sync v0.17.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/sync v0.18.0 // indirect
golang.org/x/term v0.37.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.2 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect
k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 // indirect
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect
sigs.k8s.io/randfill v1.0.0 // indirect
sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect
sigs.k8s.io/yaml v1.6.0 // indirect
)
require (
@@ -135,7 +156,7 @@ require (
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/deepmap/oapi-codegen v1.8.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
@@ -147,7 +168,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4
github.com/google/btree v1.0.1 // indirect
github.com/google/btree v1.1.3 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
@@ -174,7 +195,7 @@ require (
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.0
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.2
github.com/prometheus/client_model v0.2.0 // indirect
@@ -200,9 +221,9 @@ require (
go.uber.org/atomic v1.9.0
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.23.0
golang.org/x/sys v0.36.0 // indirect
golang.org/x/text v0.29.0
golang.org/x/time v0.1.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/text v0.31.0
golang.org/x/time v0.9.0 // indirect
google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 // indirect
gopkg.in/sourcemap.v1 v1.0.5 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect