fix: bugs in redis models

This commit is contained in:
singchia
2024-05-07 20:45:09 +08:00
parent 58c4b3834d
commit 9bd5c4ffcb
11 changed files with 53 additions and 35 deletions
@@ -62,6 +62,7 @@ func newclusterServiceEnd(addr string, opts ...ServiceOption) (*clusterServiceEn
serviceOption: &serviceOption{},
rpcs: map[string]geminio.RPC{},
topics: mapset.NewSet[string](),
edgefrontiers: mapmap.NewBiMap(),
acceptStreamCh: make(chan geminio.Stream, 128),
acceptMsgCh: make(chan geminio.Message, 128),
closed: make(chan struct{}),
@@ -74,6 +75,11 @@ func newclusterServiceEnd(addr string, opts ...ServiceOption) (*clusterServiceEn
if end.serviceOption.logger == nil {
end.serviceOption.logger = armlog.DefaultLog
}
err = end.update()
if err != nil {
return nil, err
}
go end.start()
return end, nil
}
@@ -157,6 +163,7 @@ FOUND:
end.logger.Errorf("new service end err: %s", err)
continue
}
end.logger.Debugf("new service end succeed, frontierID: %s, addr: %s", new.FrontierId, new.AdvertisedSbAddr)
// new frontier
prev, ok := end.frontiers.Swap(new.FrontierId, &frontierNend{
frontier: new,
@@ -205,6 +212,7 @@ func (end *clusterServiceEnd) lookup(edgeID uint64) (string, *serviceEnd, error)
if ok {
found.(*frontierNend).end.Close()
}
end.logger.Debugf("new service end succeed, addr: %s", frontier.AdvertisedSbAddr)
} else {
serviceEnd = fe.(*frontierNend).end
}
@@ -222,7 +230,7 @@ func (end *clusterServiceEnd) pickone() *serviceEnd {
}
func frontierEqual(a, b *clusterv1.Frontier) bool {
return a.AdvertisedSbAddr == b.AdvertisedEbAddr &&
return a.AdvertisedSbAddr == b.AdvertisedSbAddr &&
a.FrontierId == b.FrontierId
}
@@ -268,6 +276,7 @@ func (service *clusterServiceEnd) newServiceEnd(addr string) (*serviceEnd, error
goto ERR
}
}
return serviceEnd, nil
ERR:
serviceEnd.Close()
+4 -1
View File
@@ -287,5 +287,8 @@ func (end *serviceEnd) ListStreams() []geminio.Stream {
}
func (end *serviceEnd) Close() error {
return end.End.Close()
if end.End != nil {
return end.End.Close()
}
return nil
}
+1 -1
View File
@@ -33,7 +33,7 @@ type LabelData struct {
func main() {
methodSlice = []string{}
network := pflag.String("network", "tcp", "network to dial")
address := pflag.String("address", "127.0.0.1:2432", "address to dial")
address := pflag.String("address", "127.0.0.1:30012", "address to dial")
loglevel := pflag.String("loglevel", "info", "log level, trace debug info warn error")
meta := pflag.String("meta", "test", "meta to set on connection")
methods := pflag.String("methods", "", "method name, support echo, calculate")
+11 -5
View File
@@ -72,7 +72,9 @@ func main() {
http.ListenAndServe("0.0.0.0:6062", nil)
}()
network := pflag.String("network", "tcp", "network to dial")
address := pflag.String("address", "127.0.0.1:2431", "address to dial")
address := pflag.String("address", "127.0.0.1:30011", "address to dial")
frontlasAddress := pflag.String("frontlas_address", "127.0.0.1:30020", "frontlas address to dial, mutex with address")
frontlas := pflag.Bool("frontlas", false, "frontlas or frontier")
loglevel := pflag.String("loglevel", "info", "log level, trace debug info warn error")
serviceName := pflag.String("service", "foo", "service name")
topics := pflag.String("topics", "", "topics to receive message, empty means without consuming")
@@ -81,9 +83,6 @@ func main() {
stats := pflag.Bool("stats", false, "print statistics or not")
pflag.Parse()
dialer := func() (net.Conn, error) {
return net.Dial(*network, *address)
}
// log
level, err := armlog.ParseLevel(*loglevel)
if err != nil {
@@ -99,7 +98,14 @@ func main() {
topicSlice = strings.Split(*topics, ",")
opt = append(opt, service.OptionServiceReceiveTopics(topicSlice))
}
srv, err = service.NewService(dialer, opt...)
if *frontlas {
srv, err = service.NewClusterService(*frontlasAddress, opt...)
} else {
dialer := func() (net.Conn, error) {
return net.Dial(*network, *address)
}
srv, err = service.NewService(dialer, opt...)
}
if err != nil {
log.Println("new end err:", err)
return
+12 -12
View File
@@ -16,50 +16,50 @@ const (
)
type FrontierInstance struct {
FrontierID string `yaml:"frontier_id"`
FrontierID string `json:"frontier_id"`
// in k8s, it should be podIP:port
AdvertisedServiceboundAddr string `yaml:"advertised_servicebound_addr"`
AdvertisedServiceboundAddr string `json:"advertised_servicebound_addr"`
// in k8s, it should be NodeportIP:port
AdvertisedEdgeboundAddr string `yaml:"advertised_edgebound_addr"`
AdvertisedEdgeboundAddr string `json:"advertised_edgebound_addr"`
}
type FrontierStats struct {
FrontierID string `yaml:"frontier_id"`
EdgeCount int `yaml:"edge_count"`
ServiceCount int `yaml:"service_count"`
FrontierID string `json:"frontier_id"`
EdgeCount int `json:"edge_count"`
ServiceCount int `json:"service_count"`
}
// edge protocols
type EdgeOnline struct {
FrontierID string `yaml:"frontier_id"`
FrontierID string `json:"frontier_id"`
EdgeID uint64 `json:"edge_id"`
Addr string `json:"addr"`
}
type EdgeOffline struct {
FrontierID string `yaml:"frontier_id"`
FrontierID string `json:"frontier_id"`
EdgeID uint64 `json:"edge_id"`
}
type EdgeHeartbeat struct {
FrontierID string `yaml:"frontier_id"`
FrontierID string `json:"frontier_id"`
EdgeID uint64 `json:"edge_id"`
}
// service protocols
type ServiceOnline struct {
FrontierID string `yaml:"frontier_id"`
FrontierID string `json:"frontier_id"`
ServiceID uint64 `json:"service_id"`
Service string `json:"service"`
Addr string `json:"addr"`
}
type ServiceOffline struct {
FrontierID string `yaml:"frontier_id"`
FrontierID string `json:"frontier_id"`
ServiceID uint64 `json:"service_id"`
}
type ServiceHeartbeat struct {
FrontierID string `yaml:"frontier_id"`
FrontierID string `json:"frontier_id"`
ServiceID uint64 `json:"service_id"`
}
+2 -2
View File
@@ -42,6 +42,6 @@ func (offline *OnEdgeOffline) String() string {
// service -> frontier
// meta carried when service inited
type Meta struct {
Service string
Topics []string
Service string `json:"service"`
Topics []string `json:"topics"`
}
+1 -1
View File
@@ -31,7 +31,7 @@ func (fm *FrontierManager) ConnOnline(d delegate.ConnDescriber) error {
klog.Errorf("frontier manager conn online, json unmarshal err: %s", err)
return err
}
klog.V(1).Infof("frontier online, frontierID: %s, advertised_servicebound_addr: %s, advertised_edgebount_addr: %s",
klog.V(1).Infof("frontier online, frontierID: %s, advertised_servicebound_addr: %s, advertised_edgebound_addr: %s",
instance.FrontierID, instance.AdvertisedServiceboundAddr, instance.AdvertisedEdgeboundAddr)
set, err := fm.repo.SetFrontierAndAlive(instance.FrontierID, &repo.Frontier{
+1 -1
View File
@@ -139,7 +139,7 @@ func (dao *Dao) SetEdgeAndAlive(edgeID uint64, edge *Edge, expiration time.Durat
pipeliner := dao.rds.TxPipeline()
// edge
pipeliner.Set(context.TODO(), getEdgeKey(edgeID), edgeData, -1)
pipeliner.Set(context.TODO(), getEdgeKey(edgeID), edgeData, 24*time.Hour)
// alive
pipeliner.Set(context.TODO(), getAliveEdgeKey(edgeID), 1, expiration)
// frontier edge_count
+4 -4
View File
@@ -120,7 +120,7 @@ func (dao *Dao) SetService(serviceID uint64, service *Service) error {
klog.Errorf("dao set service, json marshal err: %s", err)
return err
}
_, err = dao.rds.Set(context.TODO(), getServiceKey(serviceID), data, -1).Result()
_, err = dao.rds.Set(context.TODO(), getServiceKey(serviceID), string(data), -1).Result()
if err != nil {
klog.Errorf("dao set service, set err: %s", err)
return err
@@ -136,12 +136,12 @@ func (dao *Dao) SetServiceAndAlive(serviceID uint64, service *Service, expiratio
}
pipeliner := dao.rds.TxPipeline()
// service
pipeliner.Set(context.TODO(), getServiceKey(serviceID), serviceData, -1)
// service, // TODO set expiration
pipeliner.Set(context.TODO(), getServiceKey(serviceID), serviceData, 24*time.Hour)
// alive
pipeliner.Set(context.TODO(), getAliveServiceKey(serviceID), 1, expiration)
// frontier service_count
pipeliner.HIncrBy(context.TODO(), getServiceKey(serviceID), "service_count", 1)
pipeliner.HIncrBy(context.TODO(), getFrontierKey(service.FrontierID), "service_count", 1)
_, err = pipeliner.Exec(context.TODO())
if err != nil {
+3 -3
View File
@@ -2,7 +2,7 @@ package repo
// key: edgeID; value: Edge
type Edge struct {
FrontierID string `yaml:"frontier_id"`
Addr string `yaml:"addr"`
UpdateTime int64 `yaml:"update_time"`
FrontierID string `json:"frontier_id"`
Addr string `json:"addr"`
UpdateTime int64 `json:"update_time"`
}
+4 -4
View File
@@ -2,8 +2,8 @@ package repo
// key: serviceID; value: Service
type Service struct {
Service string `yaml:"service"`
FrontierID string `yaml:"frontier_id"`
Addr string `yaml:"addr"`
UpdateTime int64 `yaml:"update_time"`
Service string `json:"service"`
FrontierID string `json:"frontier_id"`
Addr string `json:"addr"`
UpdateTime int64 `json:"update_time"`
}