NET-1082: Scale Testing Fixes (#2894)

* add additional mutex lock on node acls func

* increase verbosity

* disable acls on cloud emqx

* add emqx creds creation to go routine

* add debug log of mq client id

* comment port check

* uncomment port check

* check for connection mq connection open

* use username for client id

* add write mutex on acl is allowed

* add mq connection lost handler on server

* spin off zombie init as go routine

* get whole api path from config

* Revert "get whole api path from config"

This reverts commit 392f5f4c5f.

* update extclient acls async

* add additional mutex lock on node acls func

(cherry picked from commit 5325f0e7d7)

* increase verbosity

(cherry picked from commit 705b3cf0bf)

* add emqx creds creation to go routine

(cherry picked from commit c8e65f4820)

* add debug log of mq client id

(cherry picked from commit 29c5d6ceca)

* comment port check

(cherry picked from commit db8d6d95ea)

* check for connection mq connection open

(cherry picked from commit 13b11033b0)

* use username for client id

(cherry picked from commit e90c7386de)

* add write mutex on acl is allowed

(cherry picked from commit 4cae1b0bb4)

* add mq connection lost handler on server

(cherry picked from commit c82918ad35)

* spin off zombie init as go routine

(cherry picked from commit 6d65c44c43)

* update extclient acls async

(cherry picked from commit 6557ef1ebe)

* additionl logs for oauth user flow

(cherry picked from commit 61703038ae)

* add more debug logs

(cherry picked from commit 5980beacd1)

* add more debug logs

(cherry picked from commit 4d001f0d27)

* add set auth secret

(cherry picked from commit f41cef5da5)

* fix fetch pass

(cherry picked from commit 825caf4b60)

* make sure auth secret is set only once

(cherry picked from commit ba33ed02aa)

* make sure auth secret is set only once

(cherry picked from commit 920ac4c507)

* comment usage of emqx acls

* replace  read lock with write lock on acls

* replace  read lock with write lock on acls

(cherry picked from commit 808d2135c8)

* use deadlock pkg for visibility

* add additional mutex locks

* remove race flag

* on mq re-connecting donot exit if failed

* on mq re-connecting donot exit if failed

* revert mutex package change

* set mq clean session

* remove debug log

* go mod tidy

* revert on prem emqx acls del
This commit is contained in:
Abhishek K 2024-04-11 21:18:57 +05:30 committed by GitHub
parent 0b2422b848
commit 66069fbc34
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 71 additions and 179 deletions

View File

@ -308,7 +308,7 @@ func handleHostRegister(w http.ResponseWriter, r *http.Request) {
if !hostExists {
newHost.PersistentKeepalive = models.DefaultPersistentKeepAlive
// register host
logic.CheckHostPorts(&newHost)
//logic.CheckHostPorts(&newHost)
// create EMQX credentials and ACLs for host
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
if err := mq.GetEmqxHandler().CreateEmqxUser(newHost.ID.String(), newHost.HostPass); err != nil {

View File

@ -436,15 +436,14 @@ func createExtClient(w http.ResponseWriter, r *http.Request) {
return
}
if err := logic.SetClientDefaultACLs(&extclient); err != nil {
slog.Error("failed to set default acls for extclient", "user", r.Header.Get("user"), "network", node.Network, "error", err)
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return
}
slog.Info("created extclient", "user", r.Header.Get("user"), "network", node.Network, "clientid", extclient.ClientID)
w.WriteHeader(http.StatusOK)
go func() {
if err := logic.SetClientDefaultACLs(&extclient); err != nil {
slog.Error("failed to set default acls for extclient", "user", r.Header.Get("user"), "network", node.Network, "error", err)
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return
}
if err := mq.PublishPeerUpdate(false); err != nil {
logger.Log(1, "error setting ext peers on "+nodeid+": "+err.Error())
}

View File

@ -554,26 +554,27 @@ func authenticateHost(response http.ResponseWriter, request *http.Request) {
logic.ReturnErrorResponse(response, request, errorResponse)
return
}
// Create EMQX creds and ACLs if not found
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
if err := mq.GetEmqxHandler().CreateEmqxUser(host.ID.String(), authRequest.Password); err != nil {
slog.Error("failed to create host credentials for EMQX: ", err.Error())
} else {
if err := mq.GetEmqxHandler().CreateHostACL(host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
slog.Error("failed to add host ACL rules to EMQX: ", err.Error())
}
for _, nodeID := range host.Nodes {
if node, err := logic.GetNodeByID(nodeID); err == nil {
if err = mq.GetEmqxHandler().AppendNodeUpdateACL(host.ID.String(), node.Network, node.ID.String(), servercfg.GetServer()); err != nil {
slog.Error("failed to add ACLs for EMQX node", "error", err)
go func() {
// Create EMQX creds and ACLs if not found
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
if err := mq.GetEmqxHandler().CreateEmqxUser(host.ID.String(), authRequest.Password); err != nil {
slog.Error("failed to create host credentials for EMQX: ", err.Error())
} else {
if err := mq.GetEmqxHandler().CreateHostACL(host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
slog.Error("failed to add host ACL rules to EMQX: ", err.Error())
}
for _, nodeID := range host.Nodes {
if node, err := logic.GetNodeByID(nodeID); err == nil {
if err = mq.GetEmqxHandler().AppendNodeUpdateACL(host.ID.String(), node.Network, node.ID.String(), servercfg.GetServer()); err != nil {
slog.Error("failed to add ACLs for EMQX node", "error", err)
}
} else {
slog.Error("failed to get node", "nodeid", nodeID, "error", err)
}
} else {
slog.Error("failed to get node", "nodeid", nodeID, "error", err)
}
}
}
}
}()
response.WriteHeader(http.StatusOK)
response.Header().Set("Content-Type", "application/json")

View File

@ -64,9 +64,9 @@ func (acl ACL) Save(containerID ContainerID, ID AclID) (ACL, error) {
// ACL.IsAllowed - sees if ID is allowed in referring ACL
func (acl ACL) IsAllowed(ID AclID) (allowed bool) {
AclMutex.RLock()
AclMutex.Lock()
allowed = acl[ID] == Allowed
AclMutex.RUnlock()
AclMutex.Unlock()
return
}
@ -88,6 +88,8 @@ func (aclContainer ACLContainer) RemoveACL(ID AclID) ACLContainer {
// ACLContainer.ChangeAccess - changes the relationship between two nodes in memory
func (networkACL ACLContainer) ChangeAccess(ID1, ID2 AclID, value byte) {
AclMutex.Lock()
defer AclMutex.Unlock()
if _, ok := networkACL[ID1]; !ok {
slog.Error("ACL missing for ", "id", ID1)
return

View File

@ -3,21 +3,26 @@ package nodeacls
import (
"encoding/json"
"fmt"
"sync"
"github.com/gravitl/netmaker/logic/acls"
)
var NodesAllowedACLMutex = &sync.Mutex{}
// AreNodesAllowed - checks if nodes are allowed to communicate in their network ACL
func AreNodesAllowed(networkID NetworkID, node1, node2 NodeID) bool {
NodesAllowedACLMutex.Lock()
defer NodesAllowedACLMutex.Unlock()
var currentNetworkACL, err = FetchAllACLs(networkID)
if err != nil {
return false
}
var allowed bool
acls.AclMutex.RLock()
acls.AclMutex.Lock()
currNetworkACLNode1 := currentNetworkACL[acls.AclID(node1)]
currNetworkACLNode2 := currentNetworkACL[acls.AclID(node2)]
acls.AclMutex.RUnlock()
acls.AclMutex.Unlock()
allowed = currNetworkACLNode1.IsAllowed(acls.AclID(node2)) && currNetworkACLNode2.IsAllowed(acls.AclID(node1))
return allowed
}

View File

@ -4,8 +4,8 @@ import (
"encoding/json"
"net/http"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/models"
"golang.org/x/exp/slog"
)
// FormatError - takes ErrorResponse and uses correct code
@ -62,7 +62,7 @@ func ReturnErrorResponse(response http.ResponseWriter, request *http.Request, er
if err != nil {
panic(err)
}
logger.Log(1, "processed request error:", errorMessage.Message)
slog.Debug("processed request error", "err", errorMessage.Message)
response.Header().Set("Content-Type", "application/json")
response.WriteHeader(errorMessage.Code)
response.Write(jsonResponse)

View File

@ -76,7 +76,7 @@ func checkForZombieHosts(h *models.Host) {
// ManageZombies - goroutine which adds/removes/deletes nodes from the zombie node quarantine list
func ManageZombies(ctx context.Context, peerUpdate chan *models.Node) {
logger.Log(2, "Zombie management started")
InitializeZombies()
go InitializeZombies()
// Zombie Nodes Cleanup Four Times a Day
ticker := time.NewTicker(time.Hour * ZOMBIE_TIMEOUT)

View File

@ -155,7 +155,7 @@ func runMessageQueue(wg *sync.WaitGroup, ctx context.Context) {
defer wg.Done()
brokerHost, _ := servercfg.GetMessageQueueEndpoint()
logger.Log(0, "connecting to mq broker at", brokerHost)
mq.SetupMQTT()
mq.SetupMQTT(true)
if mq.IsConnected() {
logger.Log(0, "connected to MQ Broker")
} else {

View File

@ -287,7 +287,7 @@ func updateAcls() {
}
// save new acls
slog.Info(fmt.Sprintf("(migration) saving new acls for network: %s", network.NetID), "networkAcl", networkAcl)
slog.Debug(fmt.Sprintf("(migration) saving new acls for network: %s", network.NetID), "networkAcl", networkAcl)
if _, err := networkAcl.Save(acls.ContainerID(network.NetID)); err != nil {
slog.Error(fmt.Sprintf("error during acls migration. error saving new acls for network: %s", network.NetID), "error", err)
continue

View File

@ -22,13 +22,6 @@ type userCreateReq struct {
Password string `json:"password"`
}
type cloudAcl struct {
UserName string `json:"username"`
Topic string `json:"topic"`
Action string `json:"action"`
Access string `json:"access"`
}
func (e *EmqxCloud) GetType() servercfg.Emqxdeploy { return servercfg.EmqxCloudDeploy }
func (e *EmqxCloud) CreateEmqxUser(username, pass string) error {
@ -89,54 +82,7 @@ func (e *EmqxCloud) CreateEmqxUserforServer() error {
if res.StatusCode != http.StatusOK {
return errors.New("request failed " + string(body))
}
// add acls
acls := []cloudAcl{
{
UserName: servercfg.GetMqUserName(),
Topic: fmt.Sprintf("update/%s/#", servercfg.GetServer()),
Access: "allow",
Action: "sub",
},
{
UserName: servercfg.GetMqUserName(),
Topic: fmt.Sprintf("host/serverupdate/%s/#", servercfg.GetServer()),
Access: "allow",
Action: "sub",
},
{
UserName: servercfg.GetMqUserName(),
Topic: fmt.Sprintf("signal/%s/#", servercfg.GetServer()),
Access: "allow",
Action: "sub",
},
{
UserName: servercfg.GetMqUserName(),
Topic: fmt.Sprintf("metrics/%s/#", servercfg.GetServer()),
Access: "allow",
Action: "sub",
},
{
UserName: servercfg.GetMqUserName(),
Topic: "peers/host/#",
Access: "allow",
Action: "pub",
},
{
UserName: servercfg.GetMqUserName(),
Topic: "node/update/#",
Access: "allow",
Action: "pub",
},
{
UserName: servercfg.GetMqUserName(),
Topic: "host/update/#",
Access: "allow",
Action: "pub",
},
}
return e.createacls(acls)
return nil
}
func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ignore
@ -147,94 +93,13 @@ func (e *EmqxCloud) CreateDefaultDenyRule() error {
return nil
}
func (e *EmqxCloud) createacls(acls []cloudAcl) error {
payload, err := json.Marshal(acls)
if err != nil {
return err
}
client := &http.Client{}
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/api/acl", e.URL), strings.NewReader(string(payload)))
if err != nil {
return err
}
req.Header.Add("Content-Type", "application/json")
req.SetBasicAuth(e.AppID, e.AppSecret)
res, err := client.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return errors.New("request failed " + string(body))
}
func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error {
return nil
}
func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error {
acls := []cloudAcl{
{
UserName: hostID,
Topic: fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
Access: "allow",
Action: "sub",
},
{
UserName: hostID,
Topic: fmt.Sprintf("host/update/%s/%s", hostID, serverName),
Access: "allow",
Action: "sub",
},
{
UserName: hostID,
Topic: fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
Access: "allow",
Action: "pub",
},
}
return e.createacls(acls)
}
func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
acls := []cloudAcl{
{
UserName: hostID,
Topic: fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID),
Access: "allow",
Action: "sub",
},
{
UserName: hostID,
Topic: fmt.Sprintf("ping/%s/%s", serverName, nodeID),
Access: "allow",
Action: "pubsub",
},
{
UserName: hostID,
Topic: fmt.Sprintf("update/%s/%s", serverName, nodeID),
Access: "allow",
Action: "pubsub",
},
{
UserName: hostID,
Topic: fmt.Sprintf("signal/%s/%s", serverName, nodeID),
Access: "allow",
Action: "pubsub",
},
{
UserName: hostID,
Topic: fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
Access: "allow",
Action: "pubsub",
},
}
return nil
return e.createacls(acls)
}
func (e *EmqxCloud) GetUserACL(username string) (*aclObject, error) { return nil, nil } // ununsed on cloud since it doesn't overwrite acls list

View File

@ -92,7 +92,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
}
decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload())
if decryptErr != nil {
slog.Error("failed to decrypt message for host", "id", id, "error", decryptErr)
slog.Error("failed to decrypt message for host", "id", id, "name", currentHost.Name, "error", decryptErr)
return
}
var hostUpdate models.HostUpdate

View File

@ -8,8 +8,8 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/servercfg"
"golang.org/x/exp/slog"
)
// KEEPALIVE_TIMEOUT - time in seconds for timeout
@ -27,12 +27,12 @@ var mqclient mqtt.Client
func setMqOptions(user, password string, opts *mqtt.ClientOptions) {
broker, _ := servercfg.GetMessageQueueEndpoint()
opts.AddBroker(broker)
id := logic.RandomString(23)
opts.ClientID = id
opts.ClientID = user
opts.SetUsername(user)
opts.SetPassword(password)
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
opts.SetCleanSession(true)
opts.SetConnectRetryInterval(time.Second * 4)
opts.SetKeepAlive(time.Minute)
opts.SetCleanSession(true)
@ -40,7 +40,7 @@ func setMqOptions(user, password string, opts *mqtt.ClientOptions) {
}
// SetupMQTT creates a connection to broker and return client
func SetupMQTT() {
func SetupMQTT(fatal bool) {
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
if emqx.GetType() == servercfg.EmqxOnPremDeploy {
time.Sleep(10 * time.Second) // wait for the REST endpoint to be ready
@ -70,6 +70,7 @@ func SetupMQTT() {
opts := mqtt.NewClientOptions()
setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts)
logger.Log(0, "Mq Client Connecting with Random ID: ", opts.ClientID)
opts.SetOnConnectHandler(func(client mqtt.Client) {
serverName := servercfg.GetServer()
if token := client.Subscribe(fmt.Sprintf("update/%s/#", serverName), 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
@ -92,6 +93,13 @@ func SetupMQTT() {
opts.SetOrderMatters(false)
opts.SetResumeSubs(true)
})
opts.SetConnectionLostHandler(func(c mqtt.Client, e error) {
slog.Warn("detected broker connection lost", "err", e.Error())
c.Disconnect(250)
slog.Info("re-initiating MQ connection")
SetupMQTT(false)
})
mqclient = mqtt.NewClient(opts)
tperiod := time.Now().Add(10 * time.Second)
for {
@ -99,9 +107,16 @@ func SetupMQTT() {
logger.Log(2, "unable to connect to broker, retrying ...")
if time.Now().After(tperiod) {
if token.Error() == nil {
logger.FatalLog("could not connect to broker, token timeout, exiting ...")
if fatal {
logger.FatalLog("could not connect to broker, token timeout, exiting ...")
}
logger.Log(0, "could not connect to broker, token timeout, exiting ...")
} else {
logger.FatalLog("could not connect to broker, exiting ...", token.Error().Error())
if fatal {
logger.FatalLog("could not connect to broker, exiting ...", token.Error().Error())
}
logger.Log(0, "could not connect to broker, exiting ...", token.Error().Error())
}
}
} else {
@ -125,7 +140,7 @@ func Keepalive(ctx context.Context) {
// IsConnected - function for determining if the mqclient is connected or not
func IsConnected() bool {
return mqclient != nil && mqclient.IsConnected()
return mqclient != nil && mqclient.IsConnectionOpen()
}
// CloseClient - function to close the mq connection from server

View File

@ -134,10 +134,15 @@ func failOverME(w http.ResponseWriter, r *http.Request) {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
return
}
host, err := logic.GetHost(node.HostID.String())
if err != nil {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
return
}
failOverNode, exists := proLogic.FailOverExists(node.Network)
if !exists {
logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("failover node doesn't exist in the network"), "badrequest"))
logic.ReturnErrorResponse(w, r, logic.FormatError(fmt.Errorf("req-from: %s, failover node doesn't exist in the network", host.Name), "badrequest"))
return
}
var failOverReq models.FailOverMeReq