merge conflicts resolved

This commit is contained in:
Abhishek Kondur
2022-09-30 20:15:17 +05:30
15 changed files with 441 additions and 114 deletions
+44 -2
View File
@@ -267,6 +267,10 @@ func authorize(nodesAllowed, networkCheck bool, authNetwork string, next http.Ha
if nodesAllowed {
// TODO --- should ensure that node is only operating on itself
if _, _, _, err := logic.VerifyToken(authToken); err == nil {
// this indicates request is from a node
// used for failover - if a getNode comes from node, this will trigger a metrics wipe
r.Header.Set("requestfrom", "node")
next.ServeHTTP(w, r)
return
}
@@ -452,6 +456,8 @@ func getNode(w http.ResponseWriter, r *http.Request) {
// set header.
w.Header().Set("Content-Type", "application/json")
nodeRequest := r.Header.Get("requestfrom") == "node"
var params = mux.Vars(r)
nodeid := params["nodeid"]
node, err := logic.GetNodeByID(nodeid)
@@ -481,6 +487,12 @@ func getNode(w http.ResponseWriter, r *http.Request) {
PeerIDs: peerUpdate.PeerIDs,
}
if servercfg.Is_EE && nodeRequest {
if err = logic.EnterpriseResetAllPeersFailovers(node.ID, node.Network); err != nil {
logger.Log(1, "failed to reset failover list during node config pull", node.Name, node.Network)
}
}
logger.Log(2, r.Header.Get("user"), "fetched node", params["nodeid"])
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(response)
@@ -841,7 +853,13 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
nodeid := params["nodeid"]
netid := params["network"]
node, err := logic.CreateIngressGateway(netid, nodeid)
type failoverData struct {
Failover bool `json:"failover"`
}
var failoverReqBody failoverData
json.NewDecoder(r.Body).Decode(&failoverReqBody)
node, err := logic.CreateIngressGateway(netid, nodeid, failoverReqBody.Failover)
if err != nil {
logger.Log(0, r.Header.Get("user"),
fmt.Sprintf("failed to create ingress gateway on node [%s] on network [%s]: %v",
@@ -850,6 +868,12 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) {
return
}
if servercfg.Is_EE && failoverReqBody.Failover {
if err = logic.EnterpriseResetFailoverFunc(node.Network); err != nil {
logger.Log(1, "failed to reset failover list during failover create", node.Name, node.Network)
}
}
logger.Log(1, r.Header.Get("user"), "created ingress gateway on node", nodeid, "on network", netid)
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(node)
@@ -873,7 +897,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
var params = mux.Vars(r)
nodeid := params["nodeid"]
netid := params["network"]
node, err := logic.DeleteIngressGateway(netid, nodeid)
node, wasFailover, err := logic.DeleteIngressGateway(netid, nodeid)
if err != nil {
logger.Log(0, r.Header.Get("user"),
fmt.Sprintf("failed to delete ingress gateway on node [%s] on network [%s]: %v",
@@ -882,6 +906,12 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
return
}
if servercfg.Is_EE && wasFailover {
if err = logic.EnterpriseResetFailoverFunc(node.Network); err != nil {
logger.Log(1, "failed to reset failover list during failover create", node.Name, node.Network)
}
}
logger.Log(1, r.Header.Get("user"), "deleted ingress gateway", nodeid)
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(node)
@@ -965,6 +995,12 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
}
}
if ifaceDelta && servercfg.Is_EE {
if err = logic.EnterpriseResetAllPeersFailovers(node.ID, node.Network); err != nil {
logger.Log(0, "failed to reset failover lists during node update for node", node.Name, node.Network)
}
}
err = logic.UpdateNode(&node, &newNode)
if err != nil {
logger.Log(0, r.Header.Get("user"),
@@ -1059,6 +1095,12 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
logger.Log(0, fmt.Sprintf("failed to send DynSec command [%v]: %v",
event.Commands, err.Error()))
}
if servercfg.Is_EE {
if err = logic.EnterpriseResetAllPeersFailovers(node.ID, node.Network); err != nil {
logger.Log(0, "failed to reset failover lists during node delete for node", node.Name, node.Network)
}
}
logic.ReturnSuccessResponse(w, r, nodeid+" deleted.")
logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"])
runUpdates(&node, false)
+17
View File
@@ -6,6 +6,7 @@ package ee
import (
controller "github.com/gravitl/netmaker/controllers"
"github.com/gravitl/netmaker/ee/ee_controllers"
eelogic "github.com/gravitl/netmaker/ee/logic"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models"
@@ -27,7 +28,11 @@ func InitEE() {
}
// == End License Handling ==
AddLicenseHooks()
resetFailover()
})
logic.EnterpriseFailoverFunc = eelogic.SetFailover
logic.EnterpriseResetFailoverFunc = eelogic.ResetFailover
logic.EnterpriseResetAllPeersFailovers = eelogic.WipeAffectedFailoversOnly
}
func setControllerLimits() {
@@ -38,6 +43,18 @@ func setControllerLimits() {
servercfg.Is_EE = true
}
func resetFailover() {
nets, err := logic.GetNetworks()
if err == nil {
for _, net := range nets {
err = eelogic.ResetFailover(net.NetID)
if err != nil {
logger.Log(0, "failed to reset failover on network", net.NetID, ":", err.Error())
}
}
}
}
func retrieveEELogo() string {
return `
__ __ ______ ______ __ __ ______ __ __ ______ ______
+121
View File
@@ -0,0 +1,121 @@
package logic
import (
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models"
)
// SetFailover - finds a suitable failover candidate and sets it
func SetFailover(node *models.Node) error {
failoverNode := determineFailoverCandidate(node)
if failoverNode != nil {
return setFailoverNode(failoverNode, node)
}
return nil
}
// ResetFailover - sets the failover node and wipes disconnected status
func ResetFailover(network string) error {
nodes, err := logic.GetNetworkNodes(network)
if err != nil {
return err
}
for _, node := range nodes {
err = SetFailover(&node)
if err != nil {
logger.Log(2, "error setting failover for node", node.Name, ":", err.Error())
}
err = WipeFailover(node.ID)
if err != nil {
logger.Log(2, "error wiping failover for node", node.Name, ":", err.Error())
}
}
return nil
}
// determineFailoverCandidate - returns a list of nodes that
// are suitable for relaying a given node
func determineFailoverCandidate(nodeToBeRelayed *models.Node) *models.Node {
currentNetworkNodes, err := logic.GetNetworkNodes(nodeToBeRelayed.Network)
if err != nil {
return nil
}
currentMetrics, err := logic.GetMetrics(nodeToBeRelayed.ID)
if err != nil || currentMetrics == nil || currentMetrics.Connectivity == nil {
return nil
}
minLatency := int64(9223372036854775807) // max signed int64 value
var fastestCandidate *models.Node
for i := range currentNetworkNodes {
if currentNetworkNodes[i].ID == nodeToBeRelayed.ID {
continue
}
if currentMetrics.Connectivity[currentNetworkNodes[i].ID].Connected && (currentNetworkNodes[i].Failover == "yes") {
if currentMetrics.Connectivity[currentNetworkNodes[i].ID].Latency < int64(minLatency) {
fastestCandidate = &currentNetworkNodes[i]
minLatency = currentMetrics.Connectivity[currentNetworkNodes[i].ID].Latency
}
}
}
return fastestCandidate
}
// setFailoverNode - changes node's failover node
func setFailoverNode(failoverNode, node *models.Node) error {
node.FailoverNode = failoverNode.ID
nodeToUpdate, err := logic.GetNodeByID(node.ID)
if err != nil {
return err
}
if nodeToUpdate.FailoverNode == failoverNode.ID {
return nil
}
return logic.UpdateNode(&nodeToUpdate, node)
}
// WipeFailover - removes the failover peers of given node (ID)
func WipeFailover(nodeid string) error {
metrics, err := logic.GetMetrics(nodeid)
if err != nil {
return err
}
if metrics != nil {
metrics.FailoverPeers = make(map[string]string)
return logic.UpdateMetrics(nodeid, metrics)
}
return nil
}
// WipeAffectedFailoversOnly - wipes failovers for nodes that have given node (ID)
// in their respective failover lists
func WipeAffectedFailoversOnly(nodeid, network string) error {
currentNetworkNodes, err := logic.GetNetworkNodes(network)
if err != nil {
return nil
}
WipeFailover(nodeid)
for i := range currentNetworkNodes {
currNodeID := currentNetworkNodes[i].ID
if currNodeID == nodeid {
continue
}
currMetrics, err := logic.GetMetrics(currNodeID)
if err != nil || currMetrics == nil {
continue
}
if currMetrics.FailoverPeers != nil {
if len(currMetrics.FailoverPeers[nodeid]) > 0 {
WipeFailover(currNodeID)
}
}
}
return nil
}
+14 -10
View File
@@ -10,6 +10,7 @@ import (
"github.com/gravitl/netmaker/database"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/servercfg"
)
// CreateEgressGateway - creates an egress gateway
@@ -172,7 +173,7 @@ func DeleteEgressGateway(network, nodeid string) (models.Node, error) {
}
// CreateIngressGateway - creates an ingress gateway
func CreateIngressGateway(netid string, nodeid string) (models.Node, error) {
func CreateIngressGateway(netid string, nodeid string, failover bool) (models.Node, error) {
var postUpCmd, postDownCmd string
node, err := GetNodeByID(nodeid)
@@ -224,7 +225,9 @@ func CreateIngressGateway(netid string, nodeid string) (models.Node, error) {
node.PostUp = postUpCmd
node.PostDown = postDownCmd
node.UDPHolePunch = "no"
if failover && servercfg.Is_EE {
node.Failover = "yes"
}
data, err := json.Marshal(&node)
if err != nil {
return models.Node{}, err
@@ -238,26 +241,27 @@ func CreateIngressGateway(netid string, nodeid string) (models.Node, error) {
}
// DeleteIngressGateway - deletes an ingress gateway
func DeleteIngressGateway(networkName string, nodeid string) (models.Node, error) {
func DeleteIngressGateway(networkName string, nodeid string) (models.Node, bool, error) {
node, err := GetNodeByID(nodeid)
if err != nil {
return models.Node{}, err
return models.Node{}, false, err
}
network, err := GetParentNetwork(networkName)
if err != nil {
return models.Node{}, err
return models.Node{}, false, err
}
// delete ext clients belonging to ingress gateway
if err = DeleteGatewayExtClients(node.ID, networkName); err != nil {
return models.Node{}, err
return models.Node{}, false, err
}
logger.Log(3, "deleting ingress gateway")
wasFailover := node.Failover == "yes"
node.UDPHolePunch = network.DefaultUDPHolePunch
node.LastModified = time.Now().Unix()
node.IsIngressGateway = "no"
node.IngressGatewayRange = ""
node.Failover = "no"
// default to removing postup and postdown
node.PostUp = ""
@@ -274,14 +278,14 @@ func DeleteIngressGateway(networkName string, nodeid string) (models.Node, error
data, err := json.Marshal(&node)
if err != nil {
return models.Node{}, err
return models.Node{}, false, err
}
err = database.Insert(node.ID, string(data), database.NODES_TABLE_NAME)
if err != nil {
return models.Node{}, err
return models.Node{}, wasFailover, err
}
err = SetNetworkNodesLastModified(networkName)
return node, err
return node, wasFailover, err
}
// DeleteGatewayExtClients - deletes ext clients based on gateway (mac) of ingress node and network
+16 -3
View File
@@ -68,11 +68,24 @@ func Collect(iface string, peerMap models.PeerMap) (*models.Metrics, error) {
newMetric.Latency = 999
} else {
pingStats := pinger.Statistics()
newMetric.Uptime = 1
newMetric.Connected = true
newMetric.Latency = pingStats.AvgRtt.Milliseconds()
if pingStats.PacketsRecv > 0 {
newMetric.Uptime = 1
newMetric.Connected = true
newMetric.Latency = pingStats.AvgRtt.Milliseconds()
}
}
}
// check device peer to see if WG is working if ping failed
if !newMetric.Connected {
if currPeer.ReceiveBytes > 0 &&
currPeer.TransmitBytes > 0 &&
time.Now().Before(currPeer.LastHandshakeTime.Add(time.Minute<<1)) {
newMetric.Connected = true
newMetric.Uptime = 1
}
}
newMetric.TotalTime = 1
metrics.Connectivity[id] = newMetric
}
+16
View File
@@ -212,6 +212,7 @@ func DeleteNodeByID(node *models.Node, exterminate bool) error {
if node.IsServer == "yes" {
return removeLocalServer(node)
}
return nil
}
@@ -250,6 +251,20 @@ func ValidateNode(node *models.Node, isUpdate bool) error {
return err
}
// IsFailoverPresent - checks if a node is marked as a failover in given network
func IsFailoverPresent(network string) bool {
netNodes, err := GetNetworkNodes(network)
if err != nil {
return false
}
for i := range netNodes {
if netNodes[i].Failover == "yes" {
return true
}
}
return false
}
// CreateNode - creates a node in database
func CreateNode(node *models.Node) error {
@@ -480,6 +495,7 @@ func SetNodeDefaults(node *models.Node) {
node.SetDefaultIsHub()
node.SetDefaultConnected()
node.SetDefaultACL()
node.SetDefaultFailover()
}
// GetRecordKey - get record key
+100 -60
View File
@@ -33,6 +33,16 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) {
}
var peerMap = make(models.PeerMap)
var metrics *models.Metrics
if servercfg.Is_EE {
metrics, _ = GetMetrics(node.ID)
}
if metrics == nil {
metrics = &models.Metrics{}
}
if metrics.FailoverPeers == nil {
metrics.FailoverPeers = make(map[string]string)
}
// udppeers = the peers parsed from the local interface
// gives us correct port to reach
udppeers, errN := database.GetPeers(node.Network)
@@ -85,7 +95,10 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) {
if isP2S && peer.IsHub != "yes" {
continue
}
if len(metrics.FailoverPeers[peer.ID]) > 0 && IsFailoverPresent(node.Network) {
logger.Log(2, "peer", peer.Name, peer.PrimaryAddress(), "was found to be in failover peers list for node", node.Name, node.PrimaryAddress())
continue
}
pubkey, err := wgtypes.ParseKey(peer.PublicKey)
if err != nil {
return models.PeerUpdate{}, err
@@ -138,8 +151,8 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) {
return models.PeerUpdate{}, err
}
}
// set_allowedips
allowedips := GetAllowedIPs(node, &peer)
allowedips := GetAllowedIPs(node, &peer, metrics)
var keepalive time.Duration
if node.PersistentKeepalive != 0 {
// set_keepalive
@@ -247,64 +260,9 @@ func getExtPeers(node *models.Node) ([]wgtypes.PeerConfig, []models.IDandAddr, e
}
// GetAllowedIPs - calculates the wireguard allowedip field for a peer of a node based on the peer and node settings
func GetAllowedIPs(node, peer *models.Node) []net.IPNet {
func GetAllowedIPs(node, peer *models.Node, metrics *models.Metrics) []net.IPNet {
var allowedips []net.IPNet
if peer.Address != "" {
var peeraddr = net.IPNet{
IP: net.ParseIP(peer.Address),
Mask: net.CIDRMask(32, 32),
}
allowedips = append(allowedips, peeraddr)
}
if peer.Address6 != "" {
var addr6 = net.IPNet{
IP: net.ParseIP(peer.Address6),
Mask: net.CIDRMask(128, 128),
}
allowedips = append(allowedips, addr6)
}
// handle manually set peers
for _, allowedIp := range peer.AllowedIPs {
// parsing as a CIDR first. If valid CIDR, append
if _, ipnet, err := net.ParseCIDR(allowedIp); err == nil {
nodeEndpointArr := strings.Split(node.Endpoint, ":")
if !ipnet.Contains(net.IP(nodeEndpointArr[0])) && ipnet.IP.String() != peer.Address { // don't need to add an allowed ip that already exists..
allowedips = append(allowedips, *ipnet)
}
} else { // parsing as an IP second. If valid IP, check if ipv4 or ipv6, then append
if iplib.Version(net.ParseIP(allowedIp)) == 4 && allowedIp != peer.Address {
ipnet := net.IPNet{
IP: net.ParseIP(allowedIp),
Mask: net.CIDRMask(32, 32),
}
allowedips = append(allowedips, ipnet)
} else if iplib.Version(net.ParseIP(allowedIp)) == 6 && allowedIp != peer.Address6 {
ipnet := net.IPNet{
IP: net.ParseIP(allowedIp),
Mask: net.CIDRMask(128, 128),
}
allowedips = append(allowedips, ipnet)
}
}
}
// handle egress gateway peers
if peer.IsEgressGateway == "yes" {
//hasGateway = true
egressIPs := getEgressIPs(node, peer)
// remove internet gateway if server
if node.IsServer == "yes" {
for i := len(egressIPs) - 1; i >= 0; i-- {
if egressIPs[i].String() == "0.0.0.0/0" || egressIPs[i].String() == "::/0" {
egressIPs = append(egressIPs[:i], egressIPs[i+1:]...)
}
}
}
allowedips = append(allowedips, egressIPs...)
}
allowedips = getNodeAllowedIPs(peer, node)
// handle ingress gateway peers
if peer.IsIngressGateway == "yes" {
@@ -315,6 +273,27 @@ func GetAllowedIPs(node, peer *models.Node) []net.IPNet {
for _, extPeer := range extPeers {
allowedips = append(allowedips, extPeer.AllowedIPs...)
}
// if node is a failover node, add allowed ips from nodes it is handling
if peer.Failover == "yes" && metrics.FailoverPeers != nil {
// traverse through nodes that need handling
logger.Log(3, "peer", peer.Name, "was found to be failover for", node.Name, "checking failover peers...")
for k := range metrics.FailoverPeers {
// if FailoverNode is me for this node, add allowedips
if metrics.FailoverPeers[k] == peer.ID {
// get original node so we can traverse the allowed ips
nodeToFailover, err := GetNodeByID(k)
if err == nil {
failoverNodeMetrics, err := GetMetrics(nodeToFailover.ID)
if err == nil && failoverNodeMetrics != nil {
if len(failoverNodeMetrics.NodeName) > 0 {
allowedips = append(allowedips, getNodeAllowedIPs(&nodeToFailover, peer)...)
logger.Log(0, "failing over node", nodeToFailover.Name, nodeToFailover.PrimaryAddress(), "to failover node", peer.Name)
}
}
}
}
}
}
}
// handle relay gateway peers
if peer.IsRelay == "yes" {
@@ -559,3 +538,64 @@ func getEgressIPs(node, peer *models.Node) []net.IPNet {
}
return allowedips
}
func getNodeAllowedIPs(peer, node *models.Node) []net.IPNet {
var allowedips = []net.IPNet{}
if peer.Address != "" {
var peeraddr = net.IPNet{
IP: net.ParseIP(peer.Address),
Mask: net.CIDRMask(32, 32),
}
allowedips = append(allowedips, peeraddr)
}
if peer.Address6 != "" {
var addr6 = net.IPNet{
IP: net.ParseIP(peer.Address6),
Mask: net.CIDRMask(128, 128),
}
allowedips = append(allowedips, addr6)
}
// handle manually set peers
for _, allowedIp := range peer.AllowedIPs {
// parsing as a CIDR first. If valid CIDR, append
if _, ipnet, err := net.ParseCIDR(allowedIp); err == nil {
nodeEndpointArr := strings.Split(node.Endpoint, ":")
if !ipnet.Contains(net.IP(nodeEndpointArr[0])) && ipnet.IP.String() != peer.Address { // don't need to add an allowed ip that already exists..
allowedips = append(allowedips, *ipnet)
}
} else { // parsing as an IP second. If valid IP, check if ipv4 or ipv6, then append
if iplib.Version(net.ParseIP(allowedIp)) == 4 && allowedIp != peer.Address {
ipnet := net.IPNet{
IP: net.ParseIP(allowedIp),
Mask: net.CIDRMask(32, 32),
}
allowedips = append(allowedips, ipnet)
} else if iplib.Version(net.ParseIP(allowedIp)) == 6 && allowedIp != peer.Address6 {
ipnet := net.IPNet{
IP: net.ParseIP(allowedIp),
Mask: net.CIDRMask(128, 128),
}
allowedips = append(allowedips, ipnet)
}
}
}
// handle egress gateway peers
if peer.IsEgressGateway == "yes" {
//hasGateway = true
egressIPs := getEgressIPs(node, peer)
// remove internet gateway if server
if node.IsServer == "yes" {
for i := len(egressIPs) - 1; i >= 0; i-- {
if egressIPs[i].String() == "0.0.0.0/0" || egressIPs[i].String() == "::/0" {
egressIPs = append(egressIPs[:i], egressIPs[i+1:]...)
}
}
}
allowedips = append(allowedips, egressIPs...)
}
return allowedips
}
+12 -2
View File
@@ -18,7 +18,17 @@ import (
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
)
var EnterpriseCheckFuncs []interface{}
// EnterpriseCheckFuncs - can be set to run functions for EE
var EnterpriseCheckFuncs []func()
// EnterpriseFailoverFunc - interface to control failover funcs
var EnterpriseFailoverFunc func(node *models.Node) error
// EnterpriseResetFailoverFunc - interface to control reset failover funcs
var EnterpriseResetFailoverFunc func(network string) error
// EnterpriseResetAllPeersFailovers - resets all nodes that are considering a node to be failover worthy (inclusive)
var EnterpriseResetAllPeersFailovers func(nodeid, network string) error
// == Join, Checkin, and Leave for Server ==
@@ -169,7 +179,7 @@ func ServerJoin(networkSettings *models.Network) (models.Node, error) {
// EnterpriseCheck - Runs enterprise functions if presented
func EnterpriseCheck() {
for _, check := range EnterpriseCheckFuncs {
check.(func())()
check()
}
}
+6 -5
View File
@@ -4,11 +4,12 @@ import "time"
// Metrics - metrics struct
type Metrics struct {
Network string `json:"network" bson:"network" yaml:"network"`
NodeID string `json:"node_id" bson:"node_id" yaml:"node_id"`
NodeName string `json:"node_name" bson:"node_name" yaml:"node_name"`
IsServer string `json:"isserver" bson:"isserver" yaml:"isserver" validate:"checkyesorno"`
Connectivity map[string]Metric `json:"connectivity" bson:"connectivity" yaml:"connectivity"`
Network string `json:"network" bson:"network" yaml:"network"`
NodeID string `json:"node_id" bson:"node_id" yaml:"node_id"`
NodeName string `json:"node_name" bson:"node_name" yaml:"node_name"`
IsServer string `json:"isserver" bson:"isserver" yaml:"isserver" validate:"checkyesorno"`
Connectivity map[string]Metric `json:"connectivity" bson:"connectivity" yaml:"connectivity"`
FailoverPeers map[string]string `json:"needsfailover" bson:"needsfailover" yaml:"needsfailover"`
}
// Metric - holds a metric for data between nodes
+13
View File
@@ -82,6 +82,7 @@ type Node struct {
EgressGatewayNatEnabled string `json:"egressgatewaynatenabled" bson:"egressgatewaynatenabled" yaml:"egressgatewaynatenabled"`
EgressGatewayRequest EgressGatewayRequest `json:"egressgatewayrequest" bson:"egressgatewayrequest" yaml:"egressgatewayrequest"`
RelayAddrs []string `json:"relayaddrs" bson:"relayaddrs" yaml:"relayaddrs"`
FailoverNode string `json:"failovernode" bson:"failovernode" yaml:"failovernode"`
IngressGatewayRange string `json:"ingressgatewayrange" bson:"ingressgatewayrange" yaml:"ingressgatewayrange"`
IngressGatewayRange6 string `json:"ingressgatewayrange6" bson:"ingressgatewayrange6" yaml:"ingressgatewayrange6"`
// IsStatic - refers to if the Endpoint is set manually or dynamically
@@ -104,6 +105,7 @@ type Node struct {
// == PRO ==
DefaultACL string `json:"defaultacl,omitempty" bson:"defaultacl,omitempty" yaml:"defaultacl,omitempty" validate:"checkyesornoorunset"`
OwnerID string `json:"ownerid,omitempty" bson:"ownerid,omitempty" yaml:"ownerid,omitempty"`
Failover string `json:"failover" bson:"failover" yaml:"failover" validate:"checkyesorno"`
}
// NodesArray - used for node sorting
@@ -297,6 +299,13 @@ func (node *Node) SetDefaultName() {
}
}
// Node.SetDefaultFailover - sets default value of failover status to no if not set
func (node *Node) SetDefaultFailover() {
if node.Failover == "" {
node.Failover = "no"
}
}
// Node.Fill - fills other node data into calling node data if not set on calling node
func (newNode *Node) Fill(currentNode *Node) { // TODO add new field for nftables present
newNode.ID = currentNode.ID
@@ -452,6 +461,10 @@ func (newNode *Node) Fill(currentNode *Node) { // TODO add new field for nftable
newNode.DefaultACL = currentNode.DefaultACL
}
if newNode.Failover == "" {
newNode.Failover = currentNode.Failover
}
newNode.TrafficKeys = currentNode.TrafficKeys
}
+57 -4
View File
@@ -86,6 +86,12 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
logger.Log(1, "error unmarshaling payload ", err.Error())
return
}
ifaceDelta := logic.IfaceDelta(&currentNode, &newNode)
if servercfg.Is_EE && ifaceDelta {
if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil {
logger.Log(1, "failed to reset failover list during node update", currentNode.Name, currentNode.Network)
}
}
newNode.SetLastCheckIn()
if err := logic.UpdateNode(&currentNode, &newNode); err != nil {
logger.Log(1, "error saving node", err.Error())
@@ -122,7 +128,7 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
return
}
updateNodeMetrics(&currentNode, &newMetrics)
shouldUpdate := updateNodeMetrics(&currentNode, &newMetrics)
if err = logic.UpdateMetrics(id, &newMetrics); err != nil {
logger.Log(1, "faield to update node metrics", id, currentNode.Name, err.Error())
@@ -135,6 +141,20 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
}
}
if newMetrics.Connectivity != nil {
err := logic.EnterpriseFailoverFunc(&currentNode)
if err != nil {
logger.Log(0, "failed to failover for node", currentNode.Name, "on network", currentNode.Network, "-", err.Error())
}
}
if shouldUpdate {
logger.Log(2, "updating peers after node", currentNode.Name, currentNode.Network, "detected connectivity issues")
if err = PublishSinglePeerUpdate(&currentNode); err != nil {
logger.Log(0, "failed to publish update after failover peer change for node", currentNode.Name, currentNode.Network)
}
}
logger.Log(1, "updated node metrics", id, currentNode.Name)
}()
}
@@ -194,11 +214,17 @@ func updateNodePeers(currentNode *models.Node) {
}
}
func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) {
func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) bool {
if newMetrics.FailoverPeers == nil {
newMetrics.FailoverPeers = make(map[string]string)
}
oldMetrics, err := logic.GetMetrics(currentNode.ID)
if err != nil {
logger.Log(1, "error finding old metrics for node", currentNode.ID, currentNode.Name)
return
return false
}
if oldMetrics.FailoverPeers == nil {
oldMetrics.FailoverPeers = make(map[string]string)
}
var attachedClients []models.ExtClient
@@ -230,13 +256,40 @@ func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) {
} else {
currMetric.PercentUp = 100.0 * (float64(currMetric.Uptime) / float64(currMetric.TotalTime))
}
totalUpMinutes := currMetric.Uptime * 5
totalUpMinutes := currMetric.Uptime * ncutils.CheckInInterval
currMetric.ActualUptime = time.Duration(totalUpMinutes) * time.Minute
delete(oldMetrics.Connectivity, k) // remove from old data
newMetrics.Connectivity[k] = currMetric
}
// add nodes that need failover
nodes, err := logic.GetNetworkNodes(currentNode.Network)
if err != nil {
logger.Log(0, "failed to retrieve nodes while updating metrics")
return false
}
for _, node := range nodes {
if !newMetrics.Connectivity[node.ID].Connected &&
len(newMetrics.Connectivity[node.ID].NodeName) > 0 &&
node.Connected == "yes" &&
len(node.FailoverNode) > 0 &&
node.Failover != "yes" {
newMetrics.FailoverPeers[node.ID] = node.FailoverNode
}
}
shouldUpdate := len(oldMetrics.FailoverPeers) == 0 && len(newMetrics.FailoverPeers) > 0
for k, v := range oldMetrics.FailoverPeers {
if len(newMetrics.FailoverPeers[k]) > 0 && len(v) == 0 {
shouldUpdate = true
}
if len(v) > 0 && len(newMetrics.FailoverPeers[k]) == 0 {
newMetrics.FailoverPeers[k] = v
}
}
for k := range oldMetrics.Connectivity { // cleanup any left over data, self healing
delete(newMetrics.Connectivity, k)
}
return shouldUpdate
}
+16 -16
View File
@@ -33,23 +33,25 @@ func PublishPeerUpdate(newNode *models.Node, publishToSelf bool) error {
//skip self
continue
}
peerUpdate, err := logic.GetPeerUpdate(&node)
err = PublishSinglePeerUpdate(&node)
if err != nil {
logger.Log(1, "error getting peer update for node", node.ID, err.Error())
continue
}
data, err := json.Marshal(&peerUpdate)
if err != nil {
logger.Log(2, "error marshaling peer update for node", node.ID, err.Error())
continue
}
if err = publish(&node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil {
logger.Log(1, "failed to publish peer update for node", node.ID)
} else {
logger.Log(1, "sent peer update for node", node.Name, "on network:", node.Network)
logger.Log(1, "failed to publish peer update to node", node.Name, "on network", node.Network, ":", err.Error())
}
}
return nil
return err
}
// PublishSinglePeerUpdate --- determines and publishes a peer update to one node
func PublishSinglePeerUpdate(node *models.Node) error {
peerUpdate, err := logic.GetPeerUpdate(node)
if err != nil {
return err
}
data, err := json.Marshal(&peerUpdate)
if err != nil {
return err
}
return publish(node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data)
}
// PublishPeerUpdate --- publishes a peer update to all the peers of a node
@@ -217,9 +219,7 @@ func collectServerMetrics(networks []models.Network) {
logger.Log(2, "failed to push server metrics to exporter: ", err.Error())
}
}
}
}
}
}
+5 -11
View File
@@ -31,27 +31,21 @@ var metricsCache = new(sync.Map)
func Checkin(ctx context.Context, wg *sync.WaitGroup) {
logger.Log(2, "starting checkin goroutine")
defer wg.Done()
currentRun := 0
checkin(currentRun)
ticker := time.NewTicker(time.Second * 60)
checkin()
ticker := time.NewTicker(time.Minute * ncutils.CheckInInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
logger.Log(0, "checkin routine closed")
return
//delay should be configuraable -> use cfg.Node.NetworkSettings.DefaultCheckInInterval ??
case <-ticker.C:
currentRun++
checkin(currentRun)
if currentRun >= 5 {
currentRun = 0
}
checkin()
}
}
}
func checkin(currentRun int) {
func checkin() {
networks, _ := ncutils.GetSystemNetworks()
logger.Log(3, "checkin with server(s) for all networks")
for _, network := range networks {
@@ -115,7 +109,7 @@ func checkin(currentRun int) {
config.Write(&nodeCfg, nodeCfg.Network)
}
Hello(&nodeCfg)
if currentRun >= 5 && nodeCfg.Server.Is_EE {
if nodeCfg.Server.Is_EE {
logger.Log(0, "collecting metrics for node", nodeCfg.Node.Name)
publishMetrics(&nodeCfg)
}
+3
View File
@@ -7,6 +7,9 @@ import (
"github.com/gravitl/netmaker/logger"
)
// CheckInInterval - the interval for check-in time in units/minute
const CheckInInterval = 1
// BackOff - back off any function while there is an error
func BackOff(isExponential bool, maxTime int, f interface{}) (interface{}, error) {
// maxTime seconds
+1 -1
View File
@@ -106,7 +106,7 @@ func GetServerInfo() models.ServerConfig {
}
cfg.Version = GetVersion()
cfg.Server = GetServer()
cfg.Is_EE = GetServerConfig().IsEE == "yes"
cfg.Is_EE = Is_EE
return cfg
}