diff --git a/controllers/node.go b/controllers/node.go index a1e0c9bd..3014d6c4 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -267,6 +267,10 @@ func authorize(nodesAllowed, networkCheck bool, authNetwork string, next http.Ha if nodesAllowed { // TODO --- should ensure that node is only operating on itself if _, _, _, err := logic.VerifyToken(authToken); err == nil { + + // this indicates request is from a node + // used for failover - if a getNode comes from node, this will trigger a metrics wipe + r.Header.Set("requestfrom", "node") next.ServeHTTP(w, r) return } @@ -452,6 +456,8 @@ func getNode(w http.ResponseWriter, r *http.Request) { // set header. w.Header().Set("Content-Type", "application/json") + nodeRequest := r.Header.Get("requestfrom") == "node" + var params = mux.Vars(r) nodeid := params["nodeid"] node, err := logic.GetNodeByID(nodeid) @@ -481,6 +487,12 @@ func getNode(w http.ResponseWriter, r *http.Request) { PeerIDs: peerUpdate.PeerIDs, } + if servercfg.Is_EE && nodeRequest { + if err = logic.EnterpriseResetAllPeersFailovers(node.ID, node.Network); err != nil { + logger.Log(1, "failed to reset failover list during node config pull", node.Name, node.Network) + } + } + logger.Log(2, r.Header.Get("user"), "fetched node", params["nodeid"]) w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(response) @@ -841,7 +853,13 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") nodeid := params["nodeid"] netid := params["network"] - node, err := logic.CreateIngressGateway(netid, nodeid) + type failoverData struct { + Failover bool `json:"failover"` + } + var failoverReqBody failoverData + json.NewDecoder(r.Body).Decode(&failoverReqBody) + + node, err := logic.CreateIngressGateway(netid, nodeid, failoverReqBody.Failover) if err != nil { logger.Log(0, r.Header.Get("user"), fmt.Sprintf("failed to create ingress gateway on node [%s] on network [%s]: %v", @@ -850,6 +868,12 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) { return } + if servercfg.Is_EE && failoverReqBody.Failover { + if err = logic.EnterpriseResetFailoverFunc(node.Network); err != nil { + logger.Log(1, "failed to reset failover list during failover create", node.Name, node.Network) + } + } + logger.Log(1, r.Header.Get("user"), "created ingress gateway on node", nodeid, "on network", netid) w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) @@ -873,7 +897,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) { var params = mux.Vars(r) nodeid := params["nodeid"] netid := params["network"] - node, err := logic.DeleteIngressGateway(netid, nodeid) + node, wasFailover, err := logic.DeleteIngressGateway(netid, nodeid) if err != nil { logger.Log(0, r.Header.Get("user"), fmt.Sprintf("failed to delete ingress gateway on node [%s] on network [%s]: %v", @@ -882,6 +906,12 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) { return } + if servercfg.Is_EE && wasFailover { + if err = logic.EnterpriseResetFailoverFunc(node.Network); err != nil { + logger.Log(1, "failed to reset failover list during failover create", node.Name, node.Network) + } + } + logger.Log(1, r.Header.Get("user"), "deleted ingress gateway", nodeid) w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) @@ -965,6 +995,12 @@ func updateNode(w http.ResponseWriter, r *http.Request) { } } + if ifaceDelta && servercfg.Is_EE { + if err = logic.EnterpriseResetAllPeersFailovers(node.ID, node.Network); err != nil { + logger.Log(0, "failed to reset failover lists during node update for node", node.Name, node.Network) + } + } + err = logic.UpdateNode(&node, &newNode) if err != nil { logger.Log(0, r.Header.Get("user"), @@ -1059,6 +1095,12 @@ func deleteNode(w http.ResponseWriter, r *http.Request) { logger.Log(0, fmt.Sprintf("failed to send DynSec command [%v]: %v", event.Commands, err.Error())) } + if servercfg.Is_EE { + if err = logic.EnterpriseResetAllPeersFailovers(node.ID, node.Network); err != nil { + logger.Log(0, "failed to reset failover lists during node delete for node", node.Name, node.Network) + } + } + logic.ReturnSuccessResponse(w, r, nodeid+" deleted.") logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"]) runUpdates(&node, false) diff --git a/ee/initialize.go b/ee/initialize.go index a83ad354..c6dedb06 100644 --- a/ee/initialize.go +++ b/ee/initialize.go @@ -6,6 +6,7 @@ package ee import ( controller "github.com/gravitl/netmaker/controllers" "github.com/gravitl/netmaker/ee/ee_controllers" + eelogic "github.com/gravitl/netmaker/ee/logic" "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" @@ -27,7 +28,11 @@ func InitEE() { } // == End License Handling == AddLicenseHooks() + resetFailover() }) + logic.EnterpriseFailoverFunc = eelogic.SetFailover + logic.EnterpriseResetFailoverFunc = eelogic.ResetFailover + logic.EnterpriseResetAllPeersFailovers = eelogic.WipeAffectedFailoversOnly } func setControllerLimits() { @@ -38,6 +43,18 @@ func setControllerLimits() { servercfg.Is_EE = true } +func resetFailover() { + nets, err := logic.GetNetworks() + if err == nil { + for _, net := range nets { + err = eelogic.ResetFailover(net.NetID) + if err != nil { + logger.Log(0, "failed to reset failover on network", net.NetID, ":", err.Error()) + } + } + } +} + func retrieveEELogo() string { return ` __ __ ______ ______ __ __ ______ __ __ ______ ______ diff --git a/ee/logic/failover.go b/ee/logic/failover.go new file mode 100644 index 00000000..a8930889 --- /dev/null +++ b/ee/logic/failover.go @@ -0,0 +1,121 @@ +package logic + +import ( + "github.com/gravitl/netmaker/logger" + "github.com/gravitl/netmaker/logic" + "github.com/gravitl/netmaker/models" +) + +// SetFailover - finds a suitable failover candidate and sets it +func SetFailover(node *models.Node) error { + failoverNode := determineFailoverCandidate(node) + if failoverNode != nil { + return setFailoverNode(failoverNode, node) + } + return nil +} + +// ResetFailover - sets the failover node and wipes disconnected status +func ResetFailover(network string) error { + nodes, err := logic.GetNetworkNodes(network) + if err != nil { + return err + } + for _, node := range nodes { + err = SetFailover(&node) + if err != nil { + logger.Log(2, "error setting failover for node", node.Name, ":", err.Error()) + } + err = WipeFailover(node.ID) + if err != nil { + logger.Log(2, "error wiping failover for node", node.Name, ":", err.Error()) + } + } + return nil +} + +// determineFailoverCandidate - returns a list of nodes that +// are suitable for relaying a given node +func determineFailoverCandidate(nodeToBeRelayed *models.Node) *models.Node { + + currentNetworkNodes, err := logic.GetNetworkNodes(nodeToBeRelayed.Network) + if err != nil { + return nil + } + + currentMetrics, err := logic.GetMetrics(nodeToBeRelayed.ID) + if err != nil || currentMetrics == nil || currentMetrics.Connectivity == nil { + return nil + } + + minLatency := int64(9223372036854775807) // max signed int64 value + var fastestCandidate *models.Node + for i := range currentNetworkNodes { + if currentNetworkNodes[i].ID == nodeToBeRelayed.ID { + continue + } + + if currentMetrics.Connectivity[currentNetworkNodes[i].ID].Connected && (currentNetworkNodes[i].Failover == "yes") { + if currentMetrics.Connectivity[currentNetworkNodes[i].ID].Latency < int64(minLatency) { + fastestCandidate = ¤tNetworkNodes[i] + minLatency = currentMetrics.Connectivity[currentNetworkNodes[i].ID].Latency + } + } + } + + return fastestCandidate +} + +// setFailoverNode - changes node's failover node +func setFailoverNode(failoverNode, node *models.Node) error { + + node.FailoverNode = failoverNode.ID + nodeToUpdate, err := logic.GetNodeByID(node.ID) + if err != nil { + return err + } + if nodeToUpdate.FailoverNode == failoverNode.ID { + return nil + } + return logic.UpdateNode(&nodeToUpdate, node) +} + +// WipeFailover - removes the failover peers of given node (ID) +func WipeFailover(nodeid string) error { + metrics, err := logic.GetMetrics(nodeid) + if err != nil { + return err + } + if metrics != nil { + metrics.FailoverPeers = make(map[string]string) + return logic.UpdateMetrics(nodeid, metrics) + } + return nil +} + +// WipeAffectedFailoversOnly - wipes failovers for nodes that have given node (ID) +// in their respective failover lists +func WipeAffectedFailoversOnly(nodeid, network string) error { + currentNetworkNodes, err := logic.GetNetworkNodes(network) + if err != nil { + return nil + } + WipeFailover(nodeid) + + for i := range currentNetworkNodes { + currNodeID := currentNetworkNodes[i].ID + if currNodeID == nodeid { + continue + } + currMetrics, err := logic.GetMetrics(currNodeID) + if err != nil || currMetrics == nil { + continue + } + if currMetrics.FailoverPeers != nil { + if len(currMetrics.FailoverPeers[nodeid]) > 0 { + WipeFailover(currNodeID) + } + } + } + return nil +} diff --git a/logic/gateway.go b/logic/gateway.go index 2837b24c..e54e976c 100644 --- a/logic/gateway.go +++ b/logic/gateway.go @@ -10,6 +10,7 @@ import ( "github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/models" + "github.com/gravitl/netmaker/servercfg" ) // CreateEgressGateway - creates an egress gateway @@ -172,7 +173,7 @@ func DeleteEgressGateway(network, nodeid string) (models.Node, error) { } // CreateIngressGateway - creates an ingress gateway -func CreateIngressGateway(netid string, nodeid string) (models.Node, error) { +func CreateIngressGateway(netid string, nodeid string, failover bool) (models.Node, error) { var postUpCmd, postDownCmd string node, err := GetNodeByID(nodeid) @@ -224,7 +225,9 @@ func CreateIngressGateway(netid string, nodeid string) (models.Node, error) { node.PostUp = postUpCmd node.PostDown = postDownCmd node.UDPHolePunch = "no" - + if failover && servercfg.Is_EE { + node.Failover = "yes" + } data, err := json.Marshal(&node) if err != nil { return models.Node{}, err @@ -238,26 +241,27 @@ func CreateIngressGateway(netid string, nodeid string) (models.Node, error) { } // DeleteIngressGateway - deletes an ingress gateway -func DeleteIngressGateway(networkName string, nodeid string) (models.Node, error) { +func DeleteIngressGateway(networkName string, nodeid string) (models.Node, bool, error) { node, err := GetNodeByID(nodeid) if err != nil { - return models.Node{}, err + return models.Node{}, false, err } network, err := GetParentNetwork(networkName) if err != nil { - return models.Node{}, err + return models.Node{}, false, err } // delete ext clients belonging to ingress gateway if err = DeleteGatewayExtClients(node.ID, networkName); err != nil { - return models.Node{}, err + return models.Node{}, false, err } logger.Log(3, "deleting ingress gateway") - + wasFailover := node.Failover == "yes" node.UDPHolePunch = network.DefaultUDPHolePunch node.LastModified = time.Now().Unix() node.IsIngressGateway = "no" node.IngressGatewayRange = "" + node.Failover = "no" // default to removing postup and postdown node.PostUp = "" @@ -274,14 +278,14 @@ func DeleteIngressGateway(networkName string, nodeid string) (models.Node, error data, err := json.Marshal(&node) if err != nil { - return models.Node{}, err + return models.Node{}, false, err } err = database.Insert(node.ID, string(data), database.NODES_TABLE_NAME) if err != nil { - return models.Node{}, err + return models.Node{}, wasFailover, err } err = SetNetworkNodesLastModified(networkName) - return node, err + return node, wasFailover, err } // DeleteGatewayExtClients - deletes ext clients based on gateway (mac) of ingress node and network diff --git a/logic/metrics/metrics.go b/logic/metrics/metrics.go index fd9fbbc7..6a3372eb 100644 --- a/logic/metrics/metrics.go +++ b/logic/metrics/metrics.go @@ -68,11 +68,24 @@ func Collect(iface string, peerMap models.PeerMap) (*models.Metrics, error) { newMetric.Latency = 999 } else { pingStats := pinger.Statistics() - newMetric.Uptime = 1 - newMetric.Connected = true - newMetric.Latency = pingStats.AvgRtt.Milliseconds() + if pingStats.PacketsRecv > 0 { + newMetric.Uptime = 1 + newMetric.Connected = true + newMetric.Latency = pingStats.AvgRtt.Milliseconds() + } } } + + // check device peer to see if WG is working if ping failed + if !newMetric.Connected { + if currPeer.ReceiveBytes > 0 && + currPeer.TransmitBytes > 0 && + time.Now().Before(currPeer.LastHandshakeTime.Add(time.Minute<<1)) { + newMetric.Connected = true + newMetric.Uptime = 1 + } + } + newMetric.TotalTime = 1 metrics.Connectivity[id] = newMetric } diff --git a/logic/nodes.go b/logic/nodes.go index 7bb02e74..38851142 100644 --- a/logic/nodes.go +++ b/logic/nodes.go @@ -212,6 +212,7 @@ func DeleteNodeByID(node *models.Node, exterminate bool) error { if node.IsServer == "yes" { return removeLocalServer(node) } + return nil } @@ -250,6 +251,20 @@ func ValidateNode(node *models.Node, isUpdate bool) error { return err } +// IsFailoverPresent - checks if a node is marked as a failover in given network +func IsFailoverPresent(network string) bool { + netNodes, err := GetNetworkNodes(network) + if err != nil { + return false + } + for i := range netNodes { + if netNodes[i].Failover == "yes" { + return true + } + } + return false +} + // CreateNode - creates a node in database func CreateNode(node *models.Node) error { @@ -480,6 +495,7 @@ func SetNodeDefaults(node *models.Node) { node.SetDefaultIsHub() node.SetDefaultConnected() node.SetDefaultACL() + node.SetDefaultFailover() } // GetRecordKey - get record key diff --git a/logic/peers.go b/logic/peers.go index edfaff92..edd5024b 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -33,6 +33,16 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) { } var peerMap = make(models.PeerMap) + var metrics *models.Metrics + if servercfg.Is_EE { + metrics, _ = GetMetrics(node.ID) + } + if metrics == nil { + metrics = &models.Metrics{} + } + if metrics.FailoverPeers == nil { + metrics.FailoverPeers = make(map[string]string) + } // udppeers = the peers parsed from the local interface // gives us correct port to reach udppeers, errN := database.GetPeers(node.Network) @@ -85,7 +95,10 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) { if isP2S && peer.IsHub != "yes" { continue } - + if len(metrics.FailoverPeers[peer.ID]) > 0 && IsFailoverPresent(node.Network) { + logger.Log(2, "peer", peer.Name, peer.PrimaryAddress(), "was found to be in failover peers list for node", node.Name, node.PrimaryAddress()) + continue + } pubkey, err := wgtypes.ParseKey(peer.PublicKey) if err != nil { return models.PeerUpdate{}, err @@ -138,8 +151,8 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) { return models.PeerUpdate{}, err } } - // set_allowedips - allowedips := GetAllowedIPs(node, &peer) + + allowedips := GetAllowedIPs(node, &peer, metrics) var keepalive time.Duration if node.PersistentKeepalive != 0 { // set_keepalive @@ -247,64 +260,9 @@ func getExtPeers(node *models.Node) ([]wgtypes.PeerConfig, []models.IDandAddr, e } // GetAllowedIPs - calculates the wireguard allowedip field for a peer of a node based on the peer and node settings -func GetAllowedIPs(node, peer *models.Node) []net.IPNet { +func GetAllowedIPs(node, peer *models.Node, metrics *models.Metrics) []net.IPNet { var allowedips []net.IPNet - - if peer.Address != "" { - var peeraddr = net.IPNet{ - IP: net.ParseIP(peer.Address), - Mask: net.CIDRMask(32, 32), - } - allowedips = append(allowedips, peeraddr) - } - - if peer.Address6 != "" { - var addr6 = net.IPNet{ - IP: net.ParseIP(peer.Address6), - Mask: net.CIDRMask(128, 128), - } - allowedips = append(allowedips, addr6) - } - // handle manually set peers - for _, allowedIp := range peer.AllowedIPs { - - // parsing as a CIDR first. If valid CIDR, append - if _, ipnet, err := net.ParseCIDR(allowedIp); err == nil { - nodeEndpointArr := strings.Split(node.Endpoint, ":") - if !ipnet.Contains(net.IP(nodeEndpointArr[0])) && ipnet.IP.String() != peer.Address { // don't need to add an allowed ip that already exists.. - allowedips = append(allowedips, *ipnet) - } - - } else { // parsing as an IP second. If valid IP, check if ipv4 or ipv6, then append - if iplib.Version(net.ParseIP(allowedIp)) == 4 && allowedIp != peer.Address { - ipnet := net.IPNet{ - IP: net.ParseIP(allowedIp), - Mask: net.CIDRMask(32, 32), - } - allowedips = append(allowedips, ipnet) - } else if iplib.Version(net.ParseIP(allowedIp)) == 6 && allowedIp != peer.Address6 { - ipnet := net.IPNet{ - IP: net.ParseIP(allowedIp), - Mask: net.CIDRMask(128, 128), - } - allowedips = append(allowedips, ipnet) - } - } - } - // handle egress gateway peers - if peer.IsEgressGateway == "yes" { - //hasGateway = true - egressIPs := getEgressIPs(node, peer) - // remove internet gateway if server - if node.IsServer == "yes" { - for i := len(egressIPs) - 1; i >= 0; i-- { - if egressIPs[i].String() == "0.0.0.0/0" || egressIPs[i].String() == "::/0" { - egressIPs = append(egressIPs[:i], egressIPs[i+1:]...) - } - } - } - allowedips = append(allowedips, egressIPs...) - } + allowedips = getNodeAllowedIPs(peer, node) // handle ingress gateway peers if peer.IsIngressGateway == "yes" { @@ -315,6 +273,27 @@ func GetAllowedIPs(node, peer *models.Node) []net.IPNet { for _, extPeer := range extPeers { allowedips = append(allowedips, extPeer.AllowedIPs...) } + // if node is a failover node, add allowed ips from nodes it is handling + if peer.Failover == "yes" && metrics.FailoverPeers != nil { + // traverse through nodes that need handling + logger.Log(3, "peer", peer.Name, "was found to be failover for", node.Name, "checking failover peers...") + for k := range metrics.FailoverPeers { + // if FailoverNode is me for this node, add allowedips + if metrics.FailoverPeers[k] == peer.ID { + // get original node so we can traverse the allowed ips + nodeToFailover, err := GetNodeByID(k) + if err == nil { + failoverNodeMetrics, err := GetMetrics(nodeToFailover.ID) + if err == nil && failoverNodeMetrics != nil { + if len(failoverNodeMetrics.NodeName) > 0 { + allowedips = append(allowedips, getNodeAllowedIPs(&nodeToFailover, peer)...) + logger.Log(0, "failing over node", nodeToFailover.Name, nodeToFailover.PrimaryAddress(), "to failover node", peer.Name) + } + } + } + } + } + } } // handle relay gateway peers if peer.IsRelay == "yes" { @@ -559,3 +538,64 @@ func getEgressIPs(node, peer *models.Node) []net.IPNet { } return allowedips } + +func getNodeAllowedIPs(peer, node *models.Node) []net.IPNet { + var allowedips = []net.IPNet{} + + if peer.Address != "" { + var peeraddr = net.IPNet{ + IP: net.ParseIP(peer.Address), + Mask: net.CIDRMask(32, 32), + } + allowedips = append(allowedips, peeraddr) + } + + if peer.Address6 != "" { + var addr6 = net.IPNet{ + IP: net.ParseIP(peer.Address6), + Mask: net.CIDRMask(128, 128), + } + allowedips = append(allowedips, addr6) + } + // handle manually set peers + for _, allowedIp := range peer.AllowedIPs { + + // parsing as a CIDR first. If valid CIDR, append + if _, ipnet, err := net.ParseCIDR(allowedIp); err == nil { + nodeEndpointArr := strings.Split(node.Endpoint, ":") + if !ipnet.Contains(net.IP(nodeEndpointArr[0])) && ipnet.IP.String() != peer.Address { // don't need to add an allowed ip that already exists.. + allowedips = append(allowedips, *ipnet) + } + + } else { // parsing as an IP second. If valid IP, check if ipv4 or ipv6, then append + if iplib.Version(net.ParseIP(allowedIp)) == 4 && allowedIp != peer.Address { + ipnet := net.IPNet{ + IP: net.ParseIP(allowedIp), + Mask: net.CIDRMask(32, 32), + } + allowedips = append(allowedips, ipnet) + } else if iplib.Version(net.ParseIP(allowedIp)) == 6 && allowedIp != peer.Address6 { + ipnet := net.IPNet{ + IP: net.ParseIP(allowedIp), + Mask: net.CIDRMask(128, 128), + } + allowedips = append(allowedips, ipnet) + } + } + } + // handle egress gateway peers + if peer.IsEgressGateway == "yes" { + //hasGateway = true + egressIPs := getEgressIPs(node, peer) + // remove internet gateway if server + if node.IsServer == "yes" { + for i := len(egressIPs) - 1; i >= 0; i-- { + if egressIPs[i].String() == "0.0.0.0/0" || egressIPs[i].String() == "::/0" { + egressIPs = append(egressIPs[:i], egressIPs[i+1:]...) + } + } + } + allowedips = append(allowedips, egressIPs...) + } + return allowedips +} diff --git a/logic/server.go b/logic/server.go index 6e892bfb..5ed2ddcd 100644 --- a/logic/server.go +++ b/logic/server.go @@ -18,7 +18,17 @@ import ( "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ) -var EnterpriseCheckFuncs []interface{} +// EnterpriseCheckFuncs - can be set to run functions for EE +var EnterpriseCheckFuncs []func() + +// EnterpriseFailoverFunc - interface to control failover funcs +var EnterpriseFailoverFunc func(node *models.Node) error + +// EnterpriseResetFailoverFunc - interface to control reset failover funcs +var EnterpriseResetFailoverFunc func(network string) error + +// EnterpriseResetAllPeersFailovers - resets all nodes that are considering a node to be failover worthy (inclusive) +var EnterpriseResetAllPeersFailovers func(nodeid, network string) error // == Join, Checkin, and Leave for Server == @@ -169,7 +179,7 @@ func ServerJoin(networkSettings *models.Network) (models.Node, error) { // EnterpriseCheck - Runs enterprise functions if presented func EnterpriseCheck() { for _, check := range EnterpriseCheckFuncs { - check.(func())() + check() } } diff --git a/models/metrics.go b/models/metrics.go index cb8e2b74..ab641377 100644 --- a/models/metrics.go +++ b/models/metrics.go @@ -4,11 +4,12 @@ import "time" // Metrics - metrics struct type Metrics struct { - Network string `json:"network" bson:"network" yaml:"network"` - NodeID string `json:"node_id" bson:"node_id" yaml:"node_id"` - NodeName string `json:"node_name" bson:"node_name" yaml:"node_name"` - IsServer string `json:"isserver" bson:"isserver" yaml:"isserver" validate:"checkyesorno"` - Connectivity map[string]Metric `json:"connectivity" bson:"connectivity" yaml:"connectivity"` + Network string `json:"network" bson:"network" yaml:"network"` + NodeID string `json:"node_id" bson:"node_id" yaml:"node_id"` + NodeName string `json:"node_name" bson:"node_name" yaml:"node_name"` + IsServer string `json:"isserver" bson:"isserver" yaml:"isserver" validate:"checkyesorno"` + Connectivity map[string]Metric `json:"connectivity" bson:"connectivity" yaml:"connectivity"` + FailoverPeers map[string]string `json:"needsfailover" bson:"needsfailover" yaml:"needsfailover"` } // Metric - holds a metric for data between nodes diff --git a/models/node.go b/models/node.go index 25c01afc..c4641d35 100644 --- a/models/node.go +++ b/models/node.go @@ -82,6 +82,7 @@ type Node struct { EgressGatewayNatEnabled string `json:"egressgatewaynatenabled" bson:"egressgatewaynatenabled" yaml:"egressgatewaynatenabled"` EgressGatewayRequest EgressGatewayRequest `json:"egressgatewayrequest" bson:"egressgatewayrequest" yaml:"egressgatewayrequest"` RelayAddrs []string `json:"relayaddrs" bson:"relayaddrs" yaml:"relayaddrs"` + FailoverNode string `json:"failovernode" bson:"failovernode" yaml:"failovernode"` IngressGatewayRange string `json:"ingressgatewayrange" bson:"ingressgatewayrange" yaml:"ingressgatewayrange"` IngressGatewayRange6 string `json:"ingressgatewayrange6" bson:"ingressgatewayrange6" yaml:"ingressgatewayrange6"` // IsStatic - refers to if the Endpoint is set manually or dynamically @@ -104,6 +105,7 @@ type Node struct { // == PRO == DefaultACL string `json:"defaultacl,omitempty" bson:"defaultacl,omitempty" yaml:"defaultacl,omitempty" validate:"checkyesornoorunset"` OwnerID string `json:"ownerid,omitempty" bson:"ownerid,omitempty" yaml:"ownerid,omitempty"` + Failover string `json:"failover" bson:"failover" yaml:"failover" validate:"checkyesorno"` } // NodesArray - used for node sorting @@ -297,6 +299,13 @@ func (node *Node) SetDefaultName() { } } +// Node.SetDefaultFailover - sets default value of failover status to no if not set +func (node *Node) SetDefaultFailover() { + if node.Failover == "" { + node.Failover = "no" + } +} + // Node.Fill - fills other node data into calling node data if not set on calling node func (newNode *Node) Fill(currentNode *Node) { // TODO add new field for nftables present newNode.ID = currentNode.ID @@ -452,6 +461,10 @@ func (newNode *Node) Fill(currentNode *Node) { // TODO add new field for nftable newNode.DefaultACL = currentNode.DefaultACL } + if newNode.Failover == "" { + newNode.Failover = currentNode.Failover + } + newNode.TrafficKeys = currentNode.TrafficKeys } diff --git a/mq/handlers.go b/mq/handlers.go index db4ec42a..9638043f 100644 --- a/mq/handlers.go +++ b/mq/handlers.go @@ -86,6 +86,12 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) { logger.Log(1, "error unmarshaling payload ", err.Error()) return } + ifaceDelta := logic.IfaceDelta(¤tNode, &newNode) + if servercfg.Is_EE && ifaceDelta { + if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil { + logger.Log(1, "failed to reset failover list during node update", currentNode.Name, currentNode.Network) + } + } newNode.SetLastCheckIn() if err := logic.UpdateNode(¤tNode, &newNode); err != nil { logger.Log(1, "error saving node", err.Error()) @@ -122,7 +128,7 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) { return } - updateNodeMetrics(¤tNode, &newMetrics) + shouldUpdate := updateNodeMetrics(¤tNode, &newMetrics) if err = logic.UpdateMetrics(id, &newMetrics); err != nil { logger.Log(1, "faield to update node metrics", id, currentNode.Name, err.Error()) @@ -135,6 +141,20 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) { } } + if newMetrics.Connectivity != nil { + err := logic.EnterpriseFailoverFunc(¤tNode) + if err != nil { + logger.Log(0, "failed to failover for node", currentNode.Name, "on network", currentNode.Network, "-", err.Error()) + } + } + + if shouldUpdate { + logger.Log(2, "updating peers after node", currentNode.Name, currentNode.Network, "detected connectivity issues") + if err = PublishSinglePeerUpdate(¤tNode); err != nil { + logger.Log(0, "failed to publish update after failover peer change for node", currentNode.Name, currentNode.Network) + } + } + logger.Log(1, "updated node metrics", id, currentNode.Name) }() } @@ -194,11 +214,17 @@ func updateNodePeers(currentNode *models.Node) { } } -func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) { +func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) bool { + if newMetrics.FailoverPeers == nil { + newMetrics.FailoverPeers = make(map[string]string) + } oldMetrics, err := logic.GetMetrics(currentNode.ID) if err != nil { logger.Log(1, "error finding old metrics for node", currentNode.ID, currentNode.Name) - return + return false + } + if oldMetrics.FailoverPeers == nil { + oldMetrics.FailoverPeers = make(map[string]string) } var attachedClients []models.ExtClient @@ -230,13 +256,40 @@ func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) { } else { currMetric.PercentUp = 100.0 * (float64(currMetric.Uptime) / float64(currMetric.TotalTime)) } - totalUpMinutes := currMetric.Uptime * 5 + totalUpMinutes := currMetric.Uptime * ncutils.CheckInInterval currMetric.ActualUptime = time.Duration(totalUpMinutes) * time.Minute delete(oldMetrics.Connectivity, k) // remove from old data newMetrics.Connectivity[k] = currMetric } + // add nodes that need failover + nodes, err := logic.GetNetworkNodes(currentNode.Network) + if err != nil { + logger.Log(0, "failed to retrieve nodes while updating metrics") + return false + } + for _, node := range nodes { + if !newMetrics.Connectivity[node.ID].Connected && + len(newMetrics.Connectivity[node.ID].NodeName) > 0 && + node.Connected == "yes" && + len(node.FailoverNode) > 0 && + node.Failover != "yes" { + newMetrics.FailoverPeers[node.ID] = node.FailoverNode + } + } + shouldUpdate := len(oldMetrics.FailoverPeers) == 0 && len(newMetrics.FailoverPeers) > 0 + for k, v := range oldMetrics.FailoverPeers { + if len(newMetrics.FailoverPeers[k]) > 0 && len(v) == 0 { + shouldUpdate = true + } + + if len(v) > 0 && len(newMetrics.FailoverPeers[k]) == 0 { + newMetrics.FailoverPeers[k] = v + } + } + for k := range oldMetrics.Connectivity { // cleanup any left over data, self healing delete(newMetrics.Connectivity, k) } + return shouldUpdate } diff --git a/mq/publishers.go b/mq/publishers.go index 3d002812..19511acd 100644 --- a/mq/publishers.go +++ b/mq/publishers.go @@ -33,23 +33,25 @@ func PublishPeerUpdate(newNode *models.Node, publishToSelf bool) error { //skip self continue } - peerUpdate, err := logic.GetPeerUpdate(&node) + err = PublishSinglePeerUpdate(&node) if err != nil { - logger.Log(1, "error getting peer update for node", node.ID, err.Error()) - continue - } - data, err := json.Marshal(&peerUpdate) - if err != nil { - logger.Log(2, "error marshaling peer update for node", node.ID, err.Error()) - continue - } - if err = publish(&node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil { - logger.Log(1, "failed to publish peer update for node", node.ID) - } else { - logger.Log(1, "sent peer update for node", node.Name, "on network:", node.Network) + logger.Log(1, "failed to publish peer update to node", node.Name, "on network", node.Network, ":", err.Error()) } } - return nil + return err +} + +// PublishSinglePeerUpdate --- determines and publishes a peer update to one node +func PublishSinglePeerUpdate(node *models.Node) error { + peerUpdate, err := logic.GetPeerUpdate(node) + if err != nil { + return err + } + data, err := json.Marshal(&peerUpdate) + if err != nil { + return err + } + return publish(node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data) } // PublishPeerUpdate --- publishes a peer update to all the peers of a node @@ -217,9 +219,7 @@ func collectServerMetrics(networks []models.Network) { logger.Log(2, "failed to push server metrics to exporter: ", err.Error()) } } - } - } } } diff --git a/netclient/functions/mqpublish.go b/netclient/functions/mqpublish.go index 46f641bc..722ba0a3 100644 --- a/netclient/functions/mqpublish.go +++ b/netclient/functions/mqpublish.go @@ -31,27 +31,21 @@ var metricsCache = new(sync.Map) func Checkin(ctx context.Context, wg *sync.WaitGroup) { logger.Log(2, "starting checkin goroutine") defer wg.Done() - currentRun := 0 - checkin(currentRun) - ticker := time.NewTicker(time.Second * 60) + checkin() + ticker := time.NewTicker(time.Minute * ncutils.CheckInInterval) defer ticker.Stop() for { select { case <-ctx.Done(): logger.Log(0, "checkin routine closed") return - //delay should be configuraable -> use cfg.Node.NetworkSettings.DefaultCheckInInterval ?? case <-ticker.C: - currentRun++ - checkin(currentRun) - if currentRun >= 5 { - currentRun = 0 - } + checkin() } } } -func checkin(currentRun int) { +func checkin() { networks, _ := ncutils.GetSystemNetworks() logger.Log(3, "checkin with server(s) for all networks") for _, network := range networks { @@ -115,7 +109,7 @@ func checkin(currentRun int) { config.Write(&nodeCfg, nodeCfg.Network) } Hello(&nodeCfg) - if currentRun >= 5 && nodeCfg.Server.Is_EE { + if nodeCfg.Server.Is_EE { logger.Log(0, "collecting metrics for node", nodeCfg.Node.Name) publishMetrics(&nodeCfg) } diff --git a/netclient/ncutils/util.go b/netclient/ncutils/util.go index f1da648b..d7df17b0 100644 --- a/netclient/ncutils/util.go +++ b/netclient/ncutils/util.go @@ -7,6 +7,9 @@ import ( "github.com/gravitl/netmaker/logger" ) +// CheckInInterval - the interval for check-in time in units/minute +const CheckInInterval = 1 + // BackOff - back off any function while there is an error func BackOff(isExponential bool, maxTime int, f interface{}) (interface{}, error) { // maxTime seconds diff --git a/servercfg/serverconf.go b/servercfg/serverconf.go index f9c08aa0..0006ce3c 100644 --- a/servercfg/serverconf.go +++ b/servercfg/serverconf.go @@ -106,7 +106,7 @@ func GetServerInfo() models.ServerConfig { } cfg.Version = GetVersion() cfg.Server = GetServer() - cfg.Is_EE = GetServerConfig().IsEE == "yes" + cfg.Is_EE = Is_EE return cfg }