Files
netmaker/logic/nodes.go
T
Abhishek Kondur 292af315dd NM-271: Scalability Improvements (#3921)
* feat(go): add user schema;

* feat(go): migrate to user schema;

* feat(go): add audit fields;

* feat(go): remove unused fields from the network model;

* feat(go): add network schema;

* feat(go): migrate to network schema;

* refactor(go): add comment to clarify migration logic;

* fix(go): test failures;

* fix(go): test failures;

* feat(go): change membership table to store memberships at all scopes;

* feat(go): add schema for access grants;

* feat(go): remove nameservers from new networks table; ensure db passed for schema functions;

* feat(go): set max conns for sqlite to 1;

* fix(go): issues updating user account status;

* NM-236: streamline operations in HA mode

* NM-236: only master pod should subscribe to updates from clients

* refactor(go): remove converters and access grants;

* refactor(go): add json tags in schema models;

* refactor(go): rename file to migrate_v1_6_0.go;

* refactor(go): add user groups and user roles tables; use schema tables;

* refactor(go): inline get and list from schema package;

* refactor(go): inline get network and list users from schema package;

* fix(go): staticcheck issues;

* fix(go): remove test not in use; fix test case;

* fix(go): validate network;

* fix(go): resolve static checks;

* fix(go): new models errors;

* fix(go): test errors;

* fix(go): handle no records;

* fix(go): add validations for user object;

* fix(go): set correct extclient status;

* fix(go): test error;

* feat(go): make schema the base package;

* feat(go): add host schema;

* feat(go): use schema host everywhere;

* feat(go): inline get host, list hosts and delete host;

* feat(go): use non-ptr value;

* feat(go): use save to upsert all fields;

* feat(go): use save to upsert all fields;

* feat(go): save turn endpoint as string;

* feat(go): check for gorm error record not found;

* fix(go): test failures;

* fix(go): update all network fields;

* fix(go): update all network fields;

* feat(go): add paginated list networks api;

* feat(go): add paginated list users api;

* feat(go): add paginated list hosts api;

* feat(go): add pagination to list groups api;

* fix(go): comment;

* fix(go): implement marshal and unmarshal text for custom types;

* fix(go): implement marshal and unmarshal json for custom types;

* fix(go): just use the old model for unmarshalling;

* fix(go): implement marshal and unmarshal json for custom types;

* NM-271:Import swap: compress/gzip replaced with github.com/klauspost/compress/gzip (2-4x faster, wire-compatible output). Added sync import.
Two sync.Pool variables (gzipWriterPool, bufferPool): reuse gzip.Writer and bytes.Buffer across calls instead of allocating fresh ones per publish.
compressPayload rewritten: pulls writer + buffer from pools, resets them, compresses at gzip.BestSpeed (level 1), copies the result out of the pooled buffer, and returns both objects to the pools.

* feat(go): remove paginated list networks api;

* feat(go): use custom paginated response object;

* NM-271: Improve server scalability under high host count

- Replace stdlib compress/gzip with klauspost/compress at BestSpeed and
  pool gzip writers and buffers via sync.Pool to eliminate compression
  as the dominant CPU hotspot.

- Debounce peer update broadcasts with a 500ms resettable window capped
  at 3s max-wait, coalescing rapid-fire PublishPeerUpdate calls into a
  single broadcast cycle.

- Cache HostPeerInfo (batch-refreshed by debounce worker) and
  HostPeerUpdate (stored as side-effect of each publish) so the pull API
  and peer_info API serve from pre-computed maps instead of triggering
  expensive per-host computations under thundering herd conditions.

- Warm both caches synchronously at startup before the first publish
  cycle so early pull requests are served instantly.

- Bound concurrent MQTT publishes to 5 via semaphore to prevent
  broker TCP buffer overflows that caused broken pipe disconnects.

- Remove manual Disconnect+SetupMQTT from ConnectionLostHandler and
  rely on the paho client's built-in AutoReconnect; add a 5s retry
  wait in publish() to ride out brief reconnection windows.

* NM-271: Reduce server CPU contention under high concurrent load

- Cache ServerSettings with atomic.Value to eliminate repeated DB reads
  on every pull request (was 32+ goroutines blocked on read lock)
- Batch UpdateNodeCheckin writes in memory, flush every 30s to reduce
  per-checkin write lock contention (was 88+ goroutines blocked)
- Enable SQLite WAL mode + busy_timeout and remove global dbMutex;
  let SQLite handle concurrency natively (reads no longer block writes)
- Move ResetFailedOverPeer/ResetAutoRelayedPeer to async in pull()
  handler since results don't affect the cached response
- Skip no-op UpsertNode writes in failover/relay reset functions
  (early return when node has no failover/relay state)
- Remove CheckHostPorts from hostUpdateFallback hot path
- Switch to pure-Go SQLite driver (glebarez/sqlite), set CGO_ENABLED=0

* fix(go): ensure default values for page and per_page are used when not passed;

* fix(go): rename v1.6.0 to v1.5.1;

* fix(go): check for gorm.ErrRecordNotFound instead of database.IsEmptyRecord;

* fix(go): use host id, not pending host id;

* NM-271: Revert pure-Go SQLite and FIPS disable to verify impact

Revert to CGO-based mattn/go-sqlite3 driver and re-enable FIPS to
isolate whether these changes are still needed now that the global
dbMutex has been removed and WAL mode is enabled. Keep WAL mode
pragma with mattn-compatible DSN format.

* feat(go): add filters to paginated apis;

* feat(go): add filters to paginated apis;

* feat(go): remove check for max username length;

* feat(go): add filters to count as well;

* feat(go): use library to check email address validity;

* feat(go): ignore pagination if params not passed;

* fix(go): pagination issues;

* fix(go): check exists before using;

* fix(go): remove debug log;

* NM-271: rm debug logs

* NM-271: check if caching is enabled

* NM-271: add server sync mq topic for HA mode

* NM-271: fix build

* NM-271: push metrics in batch to exproter over api

* NM-271: use basic auth for exporter metrics api

* fix(go): use gorm err record not found;

* NM-271: Add monitoring stack on demand

* NM-271: -m arg for install script should only add monitoring stack

* fix(go): use gorm err record not found;

* NM-271: update docker compose file for prometheus

* NM-271: update docker compose file for prometheus

* fix(go): use user principal name when creating pending user;

* fix(go): use schema package for consts;

* NM-236: rm duplicate network hook

* NM-271: add server topic to reset idp hooks on master node

* fix(go): prevent disabling superadmin user;

Co-authored-by: tenki-reviewer[bot] <262613592+tenki-reviewer[bot]@users.noreply.github.com>

* fix(go): swap is admin and is superadmin;

Co-authored-by: tenki-reviewer[bot] <262613592+tenki-reviewer[bot]@users.noreply.github.com>

* fix(go): remove dead code block;

https://github.com/gravitl/netmaker/pull/3910#discussion_r2928837937

* fix(go): incorrect message when trying to disable self;

https://github.com/gravitl/netmaker/pull/3910#discussion_r2928837934

* NM-271: fix stale peers on reset_failovered pull and add HTTP timeout to metrics exporter

Run the failover/relay reset synchronously in the pull handler so the
response reflects post-reset topology instead of serving stale cached
peers. Add a 30s timeout to the metrics exporter HTTP client to prevent
PushAllMetricsToExporter from blocking the Keepalive loop.

* NM-271: fix gzip pool corruption, MQTT topic mismatch, stale settings cache, and reduce redundant DB fetches

- Only return gzip.Writer to pool after successful Close to prevent
  silently malformed MQTT payloads from a previously errored writer.
- Fix serversync subscription to exact topic match since syncType is
  now in the message payload, not the topic path.
- Prevent zero-value ServerSettings from being cached indefinitely
  when the DB record is missing or unmarshal fails on startup.
- Return fetched hosts/nodes from RefreshHostPeerInfoCache so
  warmPeerCaches reuses them instead of querying the DB twice.
- Compute fresh HostPeerUpdate on reset_failovered pull instead of
  serving stale cache, and store result back for subsequent requests.

* NM-271: fix gzip writer pool leak, log checkin flush errors, and fix master pod ordinal parsing

- Reset gzip.Writer to io.Discard before returning to pool so errored
  writers are never leaked or silently reused with corrupt state.
- Track and log failed DB inserts in FlushNodeCheckins so operators
  have visibility when check-in timestamps are lost.
- Parse StatefulSet pod ordinal as integer instead of using HasSuffix
  to prevent netmaker-10 from being misidentified as master pod.

* NM-271: simplify masterpod logic

* fix(go): use correct header;

Co-authored-by: tenki-reviewer[bot] <262613592+tenki-reviewer[bot]@users.noreply.github.com>

* fix(go): return after error response;

Co-authored-by: tenki-reviewer[bot] <262613592+tenki-reviewer[bot]@users.noreply.github.com>

* fix(go): use correct order of params;

https://github.com/gravitl/netmaker/pull/3910#discussion_r2929593036

* fix(go): set default values for page and page size; use v2 instead of /list;

* NM-271: use host name

* Update mq/serversync.go

Co-authored-by: tenki-reviewer[bot] <262613592+tenki-reviewer[bot]@users.noreply.github.com>

* NM-271: fix duplicate serversynce case

* NM-271: streamline gw updates

* Update logic/auth.go

Co-authored-by: tenki-reviewer[bot] <262613592+tenki-reviewer[bot]@users.noreply.github.com>

* Update schema/user_roles.go

Co-authored-by: tenki-reviewer[bot] <262613592+tenki-reviewer[bot]@users.noreply.github.com>

* fix(go): syntax error;

* fix(go): set default values when page and per_page are not passed or 0;

* fix(go): use uuid.parse instead of uuid.must parse;

* fix(go): review errors;

* fix(go): review errors;

* Update controllers/user.go

Co-authored-by: tenki-reviewer[bot] <262613592+tenki-reviewer[bot]@users.noreply.github.com>

* Update controllers/user.go

Co-authored-by: tenki-reviewer[bot] <262613592+tenki-reviewer[bot]@users.noreply.github.com>

* NM-163: fix errors:

* Update db/types/options.go

Co-authored-by: tenki-reviewer[bot] <262613592+tenki-reviewer[bot]@users.noreply.github.com>

* fix(go): persist return user in event;

* Update db/types/options.go

Co-authored-by: tenki-reviewer[bot] <262613592+tenki-reviewer[bot]@users.noreply.github.com>

* NM-271: signal pull on ip changes

* NM-163: duplicate lines of code

* NM-163: fix(go): fix missing return and filter parsing in user controller

- Add missing return after error response in updateUserAccountStatus
  to prevent double-response and spurious ext-client side-effects
- Use switch statements in listUsers to skip unrecognized
  account_status and mfa_status filter values

* NM-271: signal pull req on node ip change

* fix(go): check for both min and max page size;

* NM-271: refresh node object before update

* fix(go): enclose transfer superadmin in transaction;

* fix(go): review errors;

* fix(go): remove free tier checks;

* fix(go): review fixes;

* NM-271: streamline ip pool ops

* NM-271: fix tests, set max idle conns

* NM-271: fix(go): fix data races in settings cache and peer update worker

- Use pointer type in atomic.Value for serverSettingsCache to avoid
  replacing the variable non-atomically in InvalidateServerSettingsCache
- Swap peerUpdateReplace flag before draining the channel to prevent
  a concurrent replacePeers=true from being consumed by the wrong cycle

---------

Co-authored-by: VishalDalwadi <dalwadivishal26@gmail.com>
Co-authored-by: Vishal Dalwadi <51291657+VishalDalwadi@users.noreply.github.com>
Co-authored-by: tenki-reviewer[bot] <262613592+tenki-reviewer[bot]@users.noreply.github.com>
2026-03-18 00:24:54 +05:30

915 lines
26 KiB
Go

package logic
import (
"context"
"encoding/json"
"errors"
"fmt"
"maps"
"net"
"slices"
"sort"
"sync"
"time"
validator "github.com/go-playground/validator/v10"
"github.com/google/uuid"
"github.com/gravitl/netmaker/database"
"github.com/gravitl/netmaker/db"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic/acls"
"github.com/gravitl/netmaker/logic/acls/nodeacls"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/schema"
"github.com/gravitl/netmaker/servercfg"
"github.com/gravitl/netmaker/validation"
"github.com/seancfoley/ipaddress-go/ipaddr"
"golang.org/x/exp/slog"
)
var (
nodeCacheMutex = &sync.RWMutex{}
nodeNetworkCacheMutex = &sync.RWMutex{}
nodesCacheMap = make(map[string]models.Node)
nodesNetworkCacheMap = make(map[string]map[string]models.Node)
DeleteNodesCh = make(chan *models.Node, 100)
)
func getNodeFromCache(nodeID string) (node models.Node, ok bool) {
nodeCacheMutex.RLock()
node, ok = nodesCacheMap[nodeID]
if node.Mutex == nil {
node.Mutex = &sync.Mutex{}
}
nodeCacheMutex.RUnlock()
return
}
func getNodesFromCache() (nodes []models.Node) {
nodeCacheMutex.RLock()
for _, node := range nodesCacheMap {
if node.Mutex == nil {
node.Mutex = &sync.Mutex{}
}
nodes = append(nodes, node)
}
nodeCacheMutex.RUnlock()
return
}
func deleteNodeFromCache(nodeID string) {
nodeCacheMutex.Lock()
delete(nodesCacheMap, nodeID)
nodeCacheMutex.Unlock()
}
func deleteNodeFromNetworkCache(nodeID string, network string) {
nodeNetworkCacheMutex.Lock()
delete(nodesNetworkCacheMap[network], nodeID)
nodeNetworkCacheMutex.Unlock()
}
func storeNodeInNetworkCache(node models.Node, network string) {
nodeNetworkCacheMutex.Lock()
if nodesNetworkCacheMap[network] == nil {
nodesNetworkCacheMap[network] = make(map[string]models.Node)
}
nodesNetworkCacheMap[network][node.ID.String()] = node
nodeNetworkCacheMutex.Unlock()
}
func storeNodeInCache(node models.Node) {
nodeCacheMutex.Lock()
nodesCacheMap[node.ID.String()] = node
nodeCacheMutex.Unlock()
}
func loadNodesIntoNetworkCache(nMap map[string]models.Node) {
nodeNetworkCacheMutex.Lock()
for _, v := range nMap {
network := v.Network
if nodesNetworkCacheMap[network] == nil {
nodesNetworkCacheMap[network] = make(map[string]models.Node)
}
nodesNetworkCacheMap[network][v.ID.String()] = v
}
nodeNetworkCacheMutex.Unlock()
}
func loadNodesIntoCache(nMap map[string]models.Node) {
nodeCacheMutex.Lock()
nodesCacheMap = nMap
nodeCacheMutex.Unlock()
}
func ClearNodeCache() {
nodeCacheMutex.Lock()
nodesCacheMap = make(map[string]models.Node)
nodesNetworkCacheMap = make(map[string]map[string]models.Node)
nodeCacheMutex.Unlock()
}
const (
// RELAY_NODE_ERR - error to return if relay node is unfound
RELAY_NODE_ERR = "could not find relay for node"
// NodePurgeTime time to wait for node to response to a NODE_DELETE actions
NodePurgeTime = time.Second * 10
// NodePurgeCheckTime is how often to check nodes for Pending Delete
NodePurgeCheckTime = time.Second * 30
)
// GetNetworkNodes - gets the nodes of a network
func GetNetworkNodes(network string) ([]models.Node, error) {
if networkNodes, ok := nodesNetworkCacheMap[network]; ok {
nodeNetworkCacheMutex.Lock()
defer nodeNetworkCacheMutex.Unlock()
return slices.Collect(maps.Values(networkNodes)), nil
}
allnodes, err := GetAllNodes()
if err != nil {
return []models.Node{}, err
}
return GetNetworkNodesMemory(allnodes, network), nil
}
// GetHostNodes - fetches all nodes part of the host
func GetHostNodes(host *schema.Host) []models.Node {
nodes := []models.Node{}
for _, nodeID := range host.Nodes {
node, err := GetNodeByID(nodeID)
if err == nil {
nodes = append(nodes, node)
}
}
return nodes
}
// GetNetworkNodesMemory - gets all nodes belonging to a network from list in memory
func GetNetworkNodesMemory(allNodes []models.Node, network string) []models.Node {
if networkNodes, ok := nodesNetworkCacheMap[network]; ok {
nodeNetworkCacheMutex.Lock()
defer nodeNetworkCacheMutex.Unlock()
return slices.Collect(maps.Values(networkNodes))
}
var nodes = make([]models.Node, 0, len(allNodes))
for i := range allNodes {
node := allNodes[i]
if node.Network == network {
nodes = append(nodes, node)
}
}
return nodes
}
var (
pendingCheckins = make(map[string]models.Node)
pendingCheckinsMu sync.Mutex
)
// UpdateNodeCheckin - buffers the checkin timestamp in memory when caching is enabled.
// The actual DB write is deferred to FlushNodeCheckins (every 30s).
// When caching is disabled (HA mode), writes directly to the DB.
func UpdateNodeCheckin(node *models.Node) error {
node.SetLastCheckIn()
node.EgressDetails = models.EgressDetails{}
if servercfg.CacheEnabled() {
pendingCheckinsMu.Lock()
pendingCheckins[node.ID.String()] = *node
pendingCheckinsMu.Unlock()
storeNodeInCache(*node)
storeNodeInNetworkCache(*node, node.Network)
return nil
}
data, err := json.Marshal(node)
if err != nil {
return err
}
return database.Insert(node.ID.String(), string(data), database.NODES_TABLE_NAME)
}
// FlushNodeCheckins - writes all buffered check-in updates to the DB in one batch.
// Called periodically (e.g., every 30s) to avoid per-checkin write lock contention.
func FlushNodeCheckins() {
pendingCheckinsMu.Lock()
batch := pendingCheckins
pendingCheckins = make(map[string]models.Node)
pendingCheckinsMu.Unlock()
if len(batch) == 0 {
return
}
var failed int
for id, node := range batch {
data, err := json.Marshal(node)
if err != nil {
failed++
continue
}
if err := database.Insert(id, string(data), database.NODES_TABLE_NAME); err != nil {
failed++
}
}
if failed > 0 {
slog.Error("FlushNodeCheckins: failed to persist checkins", "failed", failed, "total", len(batch))
}
}
// UpsertNode - updates node in the DB
func UpsertNode(newNode *models.Node) error {
newNode.SetLastModified()
data, err := json.Marshal(newNode)
if err != nil {
return err
}
newNode.EgressDetails = models.EgressDetails{}
err = database.Insert(newNode.ID.String(), string(data), database.NODES_TABLE_NAME)
if err != nil {
return err
}
if servercfg.CacheEnabled() {
storeNodeInCache(*newNode)
storeNodeInNetworkCache(*newNode, newNode.Network)
}
return nil
}
// UpdateNode - takes a node and updates another node with it's values
func UpdateNode(currentNode *models.Node, newNode *models.Node) error {
if newNode.Address.IP.String() != currentNode.Address.IP.String() {
network := &schema.Network{Name: newNode.Network}
if err := network.Get(db.WithContext(context.TODO())); err == nil {
if !IsAddressInCIDR(newNode.Address.IP, network.AddressRange) {
return fmt.Errorf("invalid address provided; out of network range for node %s", newNode.ID)
}
}
}
nodeACLDelta := currentNode.DefaultACL != newNode.DefaultACL
newNode.Fill(currentNode, servercfg.IsPro)
// check for un-settable server values
if err := ValidateNode(newNode, true); err != nil {
return err
}
if newNode.ID == currentNode.ID {
if nodeACLDelta {
if err := UpdateProNodeACLs(newNode); err != nil {
logger.Log(1, "failed to apply node level ACLs during creation of node", newNode.ID.String(), "-", err.Error())
return err
}
}
newNode.EgressDetails = models.EgressDetails{}
newNode.SetLastModified()
if !currentNode.Connected && newNode.Connected {
newNode.SetLastCheckIn()
}
if data, err := json.Marshal(newNode); err != nil {
return err
} else {
err = database.Insert(newNode.ID.String(), string(data), database.NODES_TABLE_NAME)
if err != nil {
return err
}
if servercfg.CacheEnabled() {
storeNodeInCache(*newNode)
storeNodeInNetworkCache(*newNode, newNode.Network)
if newNode.Address.IP != nil && !newNode.Address.IP.Equal(currentNode.Address.IP) {
AddIpToAllocatedIpMap(newNode.Network, newNode.Address.IP)
RemoveIpFromAllocatedIpMap(currentNode.Network, currentNode.Address.IP.String())
}
if newNode.Address6.IP != nil && !newNode.Address6.IP.Equal(currentNode.Address6.IP) {
AddIpToAllocatedIpMap(newNode.Network, newNode.Address6.IP)
RemoveIpFromAllocatedIpMap(currentNode.Network, currentNode.Address6.IP.String())
}
}
return nil
}
}
return fmt.Errorf("failed to update node %s, cannot change ID", currentNode.ID.String())
}
// DeleteNode - marks node for deletion (and adds to zombie list) if called by UI or deletes node if called by node
func DeleteNode(node *models.Node, purge bool) error {
alreadyDeleted := node.PendingDelete || node.Action == models.NODE_DELETE
node.Action = models.NODE_DELETE
//delete ext clients if node is ingress gw
if node.IsIngressGateway {
if err := DeleteGatewayExtClients(node.ID.String(), node.Network); err != nil {
slog.Error("failed to delete ext clients", "nodeid", node.ID.String(), "error", err.Error())
}
}
if node.IsRelayed {
// cleanup node from relayednodes on relay node
relayNode, err := GetNodeByID(node.RelayedBy)
if err == nil {
relayedNodes := []string{}
for _, relayedNodeID := range relayNode.RelayedNodes {
if relayedNodeID == node.ID.String() {
continue
}
relayedNodes = append(relayedNodes, relayedNodeID)
}
relayNode.RelayedNodes = relayedNodes
UpsertNode(&relayNode)
}
}
if node.FailedOverBy != uuid.Nil {
ResetFailedOverPeer(node)
}
if len(node.AutoRelayedPeers) > 0 {
ResetAutoRelayedPeer(node)
}
if node.IsRelay {
// unset all the relayed nodes
SetRelayedNodes(false, node.ID.String(), node.RelayedNodes)
}
if node.InternetGwID != "" {
inetNode, err := GetNodeByID(node.InternetGwID)
if err == nil {
clientNodeIDs := []string{}
for _, inetNodeClientID := range inetNode.InetNodeReq.InetNodeClientIDs {
if inetNodeClientID == node.ID.String() {
continue
}
clientNodeIDs = append(clientNodeIDs, inetNodeClientID)
}
inetNode.InetNodeReq.InetNodeClientIDs = clientNodeIDs
UpsertNode(&inetNode)
}
}
if node.IsInternetGateway {
UnsetInternetGw(node)
}
if !purge && !alreadyDeleted {
newnode := *node
newnode.PendingDelete = true
if err := UpdateNode(node, &newnode); err != nil {
return err
}
newZombie <- node.ID
return nil
}
if alreadyDeleted {
logger.Log(1, "forcibly deleting node", node.ID.String())
}
host := &schema.Host{
ID: node.HostID,
}
err := host.Get(db.WithContext(context.TODO()))
if err != nil {
logger.Log(1, "no host found for node", node.ID.String(), "deleting..")
if delErr := DeleteNodeByID(node); delErr != nil {
logger.Log(0, "failed to delete node", node.ID.String(), delErr.Error())
}
return err
}
if err := DissasociateNodeFromHost(node, host); err != nil {
return err
}
filters := make(map[string]bool)
if node.Address.IP != nil {
filters[node.Address.IP.String()] = true
}
if node.Address6.IP != nil {
filters[node.Address6.IP.String()] = true
}
nameservers, _ := (&schema.Nameserver{
NetworkID: node.Network,
}).ListByNetwork(db.WithContext(context.TODO()))
for _, ns := range nameservers {
ns.Servers = FilterOutIPs(ns.Servers, filters)
if len(ns.Servers) > 0 {
_ = ns.Update(db.WithContext(context.TODO()))
} else {
// TODO: deleting a nameserver dns server could cause trouble for other nodes.
// TODO: try to figure out a sequence that works the best.
_ = ns.Delete(db.WithContext(context.TODO()))
}
}
go RemoveNodeFromAclPolicy(*node)
go RemoveNodeFromEgress(*node)
return nil
}
// GetNodeByHostRef - gets the node by host id and network
func GetNodeByHostRef(hostid, network string) (node models.Node, err error) {
nodes, err := GetNetworkNodes(network)
if err != nil {
return models.Node{}, err
}
for _, node := range nodes {
if node.HostID.String() == hostid && node.Network == network {
return node, nil
}
}
return models.Node{}, errors.New("node not found")
}
// DeleteNodeByID - deletes a node from database
func DeleteNodeByID(node *models.Node) error {
var err error
var key = node.ID.String()
if err = database.DeleteRecord(database.NODES_TABLE_NAME, key); err != nil {
if !database.IsEmptyRecord(err) {
return err
}
}
if servercfg.CacheEnabled() {
deleteNodeFromCache(node.ID.String())
deleteNodeFromNetworkCache(node.ID.String(), node.Network)
}
if servercfg.IsDNSMode() {
SetDNS()
}
_, err = nodeacls.RemoveNodeACL(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()))
if err != nil {
// ignoring for now, could hit a nil pointer if delete called twice
logger.Log(2, "attempted to remove node ACL for node", node.ID.String())
}
// removeZombie <- node.ID
if err = DeleteMetrics(node.ID.String()); err != nil {
logger.Log(1, "unable to remove metrics from DB for node", node.ID.String(), err.Error())
}
//recycle ip address
if servercfg.CacheEnabled() {
if node.Address.IP != nil {
RemoveIpFromAllocatedIpMap(node.Network, node.Address.IP.String())
}
if node.Address6.IP != nil {
RemoveIpFromAllocatedIpMap(node.Network, node.Address6.IP.String())
}
}
return nil
}
// IsNodeIDUnique - checks if node id is unique
func IsNodeIDUnique(node *models.Node) (bool, error) {
_, err := database.FetchRecord(database.NODES_TABLE_NAME, node.ID.String())
return database.IsEmptyRecord(err), err
}
// ValidateNode - validates node values
func ValidateNode(node *models.Node, isUpdate bool) error {
v := validator.New()
_ = v.RegisterValidation("id_unique", func(fl validator.FieldLevel) bool {
if isUpdate {
return true
}
isFieldUnique, _ := IsNodeIDUnique(node)
return isFieldUnique
})
_ = v.RegisterValidation("network_exists", func(fl validator.FieldLevel) bool {
err := (&schema.Network{Name: node.Network}).Get(db.WithContext(context.TODO()))
return err == nil
})
_ = v.RegisterValidation("checkyesornoorunset", func(f1 validator.FieldLevel) bool {
return validation.CheckYesOrNoOrUnset(f1)
})
err := v.Struct(node)
return err
}
// GetAllNodes - returns all nodes in the DB
func GetAllNodes() ([]models.Node, error) {
var nodes []models.Node
if servercfg.CacheEnabled() {
nodes = getNodesFromCache()
if len(nodes) != 0 {
return nodes, nil
}
}
nodesMap := make(map[string]models.Node)
if servercfg.CacheEnabled() {
defer loadNodesIntoCache(nodesMap)
defer loadNodesIntoNetworkCache(nodesMap)
}
collection, err := database.FetchRecords(database.NODES_TABLE_NAME)
if err != nil {
if database.IsEmptyRecord(err) {
return []models.Node{}, nil
}
return []models.Node{}, err
}
for _, value := range collection {
var node models.Node
// ignore legacy nodes in database
if err := json.Unmarshal([]byte(value), &node); err != nil {
logger.Log(3, "legacy node detected: ", err.Error())
continue
}
// add node to our array
nodes = append(nodes, node)
if node.Mutex == nil {
node.Mutex = &sync.Mutex{}
}
nodesMap[node.ID.String()] = node
}
return nodes, nil
}
func AddStaticNodestoList(nodes []models.Node) []models.Node {
netMap := make(map[string]struct{})
for _, node := range nodes {
if _, ok := netMap[node.Network]; ok {
continue
}
if node.IsIngressGateway {
nodes = append(nodes, GetStaticNodesByNetwork(schema.NetworkID(node.Network), false)...)
netMap[node.Network] = struct{}{}
}
}
return nodes
}
func AddStatusToNodes(nodes []models.Node, statusCall bool) (nodesWithStatus []models.Node) {
aclDefaultPolicyStatusMap := make(map[string]bool)
for _, node := range nodes {
if _, ok := aclDefaultPolicyStatusMap[node.Network]; !ok {
// check default policy if all allowed return true
defaultPolicy, _ := GetDefaultPolicy(schema.NetworkID(node.Network), models.DevicePolicy)
aclDefaultPolicyStatusMap[node.Network] = defaultPolicy.Enabled
}
if statusCall {
GetNodeStatus(&node, aclDefaultPolicyStatusMap[node.Network])
} else {
getNodeCheckInStatus(&node, true)
}
nodesWithStatus = append(nodesWithStatus, node)
}
return
}
// SetNodeDefaults - sets the defaults of a node to avoid empty fields
func SetNodeDefaults(node *models.Node, resetConnected bool) {
parentNetwork := &schema.Network{Name: node.Network}
_ = parentNetwork.Get(db.WithContext(context.TODO()))
_, cidr, err := net.ParseCIDR(parentNetwork.AddressRange)
if err == nil {
node.NetworkRange = *cidr
}
_, cidr, err = net.ParseCIDR(parentNetwork.AddressRange6)
if err == nil {
node.NetworkRange6 = *cidr
}
if node.DefaultACL == "" {
node.DefaultACL = parentNetwork.DefaultACL
}
if node.FailOverPeers == nil {
node.FailOverPeers = make(map[string]struct{})
}
node.SetLastModified()
//node.SetLastCheckIn()
if resetConnected {
node.SetDefaultConnected()
}
node.SetExpirationDateTime()
if node.Tags == nil {
node.Tags = make(map[models.TagID]struct{})
}
}
// GetRecordKey - get record key
// depricated
func GetRecordKey(id string, network string) (string, error) {
if id == "" || network == "" {
return "", errors.New("unable to get record key")
}
return id + "###" + network, nil
}
func GetNodeByID(uuid string) (models.Node, error) {
if servercfg.CacheEnabled() {
if node, ok := getNodeFromCache(uuid); ok {
return node, nil
}
}
var record, err = database.FetchRecord(database.NODES_TABLE_NAME, uuid)
if err != nil {
return models.Node{}, err
}
var node models.Node
if err = json.Unmarshal([]byte(record), &node); err != nil {
return models.Node{}, err
}
if servercfg.CacheEnabled() {
storeNodeInCache(node)
storeNodeInNetworkCache(node, node.Network)
}
return node, nil
}
// GetDeletedNodeByID - get a deleted node
func GetDeletedNodeByID(uuid string) (models.Node, error) {
var node models.Node
record, err := database.FetchRecord(database.DELETED_NODES_TABLE_NAME, uuid)
if err != nil {
return models.Node{}, err
}
if err = json.Unmarshal([]byte(record), &node); err != nil {
return models.Node{}, err
}
SetNodeDefaults(&node, true)
return node, nil
}
// FindRelay - returns the node that is the relay for a relayed node
func FindRelay(node *models.Node) *models.Node {
relay, err := GetNodeByID(node.RelayedBy)
if err != nil {
logger.Log(0, "FindRelay: "+err.Error())
return nil
}
return &relay
}
// GetAllNodesAPI - get all nodes for api usage
func GetAllNodesAPI(nodes []models.Node) []models.ApiNode {
apiNodes := []models.ApiNode{}
for i := range nodes {
node := nodes[i]
if !node.IsStatic {
h := &schema.Host{
ID: node.HostID,
}
err := h.Get(db.WithContext(context.TODO()))
if err == nil {
node.Location = h.Location
node.CountryCode = h.CountryCode
}
}
newApiNode := node.ConvertToAPINode()
apiNodes = append(apiNodes, *newApiNode)
}
return apiNodes[:]
}
// GetAllNodesAPI - get all nodes for api usage
func GetAllNodesAPIWithLocation(nodes []models.Node) []models.ApiNode {
apiNodes := []models.ApiNode{}
for i := range nodes {
node := nodes[i]
newApiNode := node.ConvertToAPINode()
if node.IsStatic {
newApiNode.Location = node.StaticNode.Location
} else {
host := &schema.Host{
ID: node.HostID,
}
_ = host.Get(db.WithContext(context.TODO()))
newApiNode.Location = host.Location
}
apiNodes = append(apiNodes, *newApiNode)
}
return apiNodes[:]
}
// GetNodesStatusAPI - gets nodes status
func GetNodesStatusAPI(nodes []models.Node) map[string]models.ApiNodeStatus {
apiStatusNodesMap := make(map[string]models.ApiNodeStatus)
for i := range nodes {
newApiNode := nodes[i].ConvertToStatusNode()
apiStatusNodesMap[newApiNode.ID] = *newApiNode
}
return apiStatusNodesMap
}
// DeleteExpiredNodes - goroutine which deletes nodes which are expired
func DeleteExpiredNodes(ctx context.Context) {
// Delete Expired Nodes Every Hour
ticker := time.NewTicker(time.Hour)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
allnodes, err := GetAllNodes()
if err != nil {
slog.Error("failed to retrieve all nodes", "error", err.Error())
return
}
for _, node := range allnodes {
node := node
if time.Now().After(node.ExpirationDateTime) {
DeleteNodesCh <- &node
slog.Info("deleting expired node", "nodeid", node.ID.String())
}
}
}
}
}
// createNode - creates a node in database
func createNode(node *models.Node) error {
// lock because we need unique IPs and having it concurrent makes parallel calls result in same "unique" IPs
addressLock.Lock()
defer addressLock.Unlock()
host := &schema.Host{
ID: node.HostID,
}
err := host.Get(db.WithContext(context.TODO()))
if err != nil {
return err
}
SetNodeDefaults(node, true)
defaultACLVal := acls.Allowed
parentNetwork := &schema.Network{Name: node.Network}
err = parentNetwork.Get(db.WithContext(context.TODO()))
if err == nil {
if parentNetwork.DefaultACL != "yes" {
defaultACLVal = acls.NotAllowed
}
}
if node.DefaultACL == "" {
node.DefaultACL = "unset"
}
if node.Address.IP == nil {
if parentNetwork.AddressRange != "" {
if node.Address.IP, err = UniqueAddress(node.Network, false); err != nil {
return err
}
_, cidr, err := net.ParseCIDR(parentNetwork.AddressRange)
if err != nil {
return err
}
node.Address.Mask = net.CIDRMask(cidr.Mask.Size())
}
} else if !IsIPUnique(node.Network, node.Address.String(), database.NODES_TABLE_NAME, false) {
return fmt.Errorf("invalid address: ipv4 %s is not unique", node.Address.String())
}
if node.Address6.IP == nil {
if parentNetwork.AddressRange6 != "" {
if node.Address6.IP, err = UniqueAddress6(node.Network, false); err != nil {
return err
}
_, cidr, err := net.ParseCIDR(parentNetwork.AddressRange6)
if err != nil {
return err
}
node.Address6.Mask = net.CIDRMask(cidr.Mask.Size())
}
} else if !IsIPUnique(node.Network, node.Address6.String(), database.NODES_TABLE_NAME, true) {
return fmt.Errorf("invalid address: ipv6 %s is not unique", node.Address6.String())
}
node.ID = uuid.New()
//Create a JWT for the node
tokenString, _ := CreateJWT(node.ID.String(), host.MacAddress.String(), node.Network)
if tokenString == "" {
//logic.ReturnErrorResponse(w, r, errorResponse)
return err
}
err = ValidateNode(node, false)
if err != nil {
return err
}
CheckZombies(node)
node.SetLastCheckIn()
nodebytes, err := json.Marshal(&node)
if err != nil {
return err
}
err = database.Insert(node.ID.String(), string(nodebytes), database.NODES_TABLE_NAME)
if err != nil {
return err
}
if servercfg.CacheEnabled() {
storeNodeInCache(*node)
storeNodeInNetworkCache(*node, node.Network)
if node.Address.IP != nil {
AddIpToAllocatedIpMap(node.Network, node.Address.IP)
}
if node.Address6.IP != nil {
AddIpToAllocatedIpMap(node.Network, node.Address6.IP)
}
}
_, err = nodeacls.CreateNodeACL(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), defaultACLVal)
if err != nil {
logger.Log(1, "failed to create node ACL for node,", node.ID.String(), "err:", err.Error())
return err
}
if err = UpdateProNodeACLs(node); err != nil {
logger.Log(1, "failed to apply node level ACLs during creation of node", node.ID.String(), "-", err.Error())
return err
}
if err = UpdateMetrics(node.ID.String(), &models.Metrics{Connectivity: make(map[string]models.Metric)}); err != nil {
logger.Log(1, "failed to initialize metrics for node", node.ID.String(), err.Error())
}
SetNetworkNodesLastModified(node.Network)
if servercfg.IsDNSMode() {
err = SetDNS()
}
return err
}
// SortApiNodes - Sorts slice of ApiNodes by their ID alphabetically with numbers first
func SortApiNodes(unsortedNodes []models.ApiNode) {
sort.Slice(unsortedNodes, func(i, j int) bool {
return unsortedNodes[i].ID < unsortedNodes[j].ID
})
}
func ValidateParams(nodeid, netid string) (models.Node, error) {
node, err := GetNodeByID(nodeid)
if err != nil {
slog.Error("error fetching node", "node", nodeid, "error", err.Error())
return node, fmt.Errorf("error fetching node during parameter validation: %v", err)
}
if node.Network != netid {
slog.Error("network url param does not match node id", "url nodeid", netid, "node", node.Network)
return node, fmt.Errorf("network url param does not match node network")
}
return node, nil
}
func ValidateNodeIp(currentNode *models.Node, newNode *models.ApiNode) error {
if currentNode.Address.IP != nil && currentNode.Address.String() != newNode.Address {
if !IsIPUnique(newNode.Network, newNode.Address, database.NODES_TABLE_NAME, false) ||
!IsIPUnique(newNode.Network, newNode.Address, database.EXT_CLIENT_TABLE_NAME, false) {
return errors.New("ip specified is already allocated: " + newNode.Address)
}
}
if currentNode.Address6.IP != nil && currentNode.Address6.String() != newNode.Address6 {
if !IsIPUnique(newNode.Network, newNode.Address6, database.NODES_TABLE_NAME, false) ||
!IsIPUnique(newNode.Network, newNode.Address6, database.EXT_CLIENT_TABLE_NAME, false) {
return errors.New("ip specified is already allocated: " + newNode.Address6)
}
}
return nil
}
func ValidateEgressRange(netID string, ranges []string) error {
network := &schema.Network{Name: netID}
err := network.Get(db.WithContext(context.TODO()))
if err != nil {
slog.Error("error getting network with netid", "error", netID, err.Error)
return errors.New("error getting network with netid: " + netID + " " + err.Error())
}
ipv4Net := network.AddressRange
ipv6Net := network.AddressRange6
for _, v := range ranges {
if ipv4Net != "" {
if ContainsCIDR(ipv4Net, v) {
slog.Error("egress range should not be the same as or contained in the netmaker network address", "error", v, ipv4Net)
return errors.New("egress range should not be the same as or contained in the netmaker network address" + v + " " + ipv4Net)
}
}
if ipv6Net != "" {
if ContainsCIDR(ipv6Net, v) {
slog.Error("egress range should not be the same as or contained in the netmaker network address", "error", v, ipv6Net)
return errors.New("egress range should not be the same as or contained in the netmaker network address" + v + " " + ipv6Net)
}
}
}
return nil
}
func ContainsCIDR(net1, net2 string) bool {
one, two := ipaddr.NewIPAddressString(net1),
ipaddr.NewIPAddressString(net2)
return one.Contains(two) || two.Contains(one)
}
// GetAllFailOvers - gets all the nodes that are failovers
func GetAllFailOvers() ([]models.Node, error) {
nodes, err := GetAllNodes()
if err != nil {
return nil, err
}
igs := make([]models.Node, 0)
for _, node := range nodes {
if node.IsFailOver {
igs = append(igs, node)
}
}
return igs, nil
}