fix(go): add tenant id to flow logs;

This commit is contained in:
VishalDalwadi
2026-04-06 11:16:23 +05:30
parent f8a0cfd744
commit 55c2d19c30
8 changed files with 34 additions and 6 deletions
+9 -1
View File
@@ -23,7 +23,10 @@ var ErrConnNotFound = errors.New("no connection in context")
//go:embed initdb.d/02_create_flows_table.sql
var createFlowsTableScript string
func Initialize() error {
//go:embed initdb.d/03_alter_flows_table_tenant_id.sql
var alterFlowsTableTenantIDScript string
func Initialize(defaultTenantID string) error {
if ch != nil {
return nil
}
@@ -49,6 +52,11 @@ func Initialize() error {
return err
}
err = chConn.Exec(ctx, fmt.Sprintf(alterFlowsTableTenantIDScript, defaultTenantID))
if err != nil {
return err
}
ch = chConn
return nil
}
@@ -1,6 +1,7 @@
CREATE TABLE IF NOT EXISTS flows (
-- Identity
flow_id String,
tenant_id String,
host_id String,
host_name String,
network_id String,
@@ -0,0 +1,2 @@
ALTER TABLE flows
ADD COLUMN IF NOT EXISTS tenant_id String DEFAULT '%s';
+1 -1
View File
@@ -272,7 +272,7 @@ func updateSettings(w http.ResponseWriter, r *http.Request) {
if currSettings.EnableFlowLogs != req.EnableFlowLogs {
if req.EnableFlowLogs {
err := ch.Initialize()
err := ch.Initialize(servercfg.GetNetmakerTenantID())
if err != nil {
err = fmt.Errorf("failed to enable flow logs: %v", err)
logic.ReturnErrorResponse(w, r, logic.FormatError(err, logic.Internal))
+12 -3
View File
@@ -291,7 +291,8 @@ type FlowEvent struct {
// Version used by ClickHouse for merging.
// Must be strictly increasing for START → DESTROY.
// Usually equal to the netclient event timestamp (ms).
Version int64 `protobuf:"varint,21,opt,name=version,proto3" json:"version,omitempty"`
Version int64 `protobuf:"varint,21,opt,name=version,proto3" json:"version,omitempty"`
TenantId string `protobuf:"bytes,22,opt,name=tenant_id,json=tenantId,proto3" json:"tenant_id,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -473,6 +474,13 @@ func (x *FlowEvent) GetVersion() int64 {
return 0
}
func (x *FlowEvent) GetTenantId() string {
if x != nil {
return x.TenantId
}
return ""
}
// *
// Envelope sent by netclients containing multiple FlowEvents.
type FlowEnvelope struct {
@@ -582,7 +590,7 @@ const file_grpc_flow_flow_proto_rawDesc = "" +
"\x02ip\x18\x01 \x01(\tR\x02ip\x122\n" +
"\x04type\x18\x02 \x01(\x0e2\x1e.netmaker.flow.ParticipantTypeR\x04type\x12\x0e\n" +
"\x02id\x18\x03 \x01(\tR\x02id\x12\x12\n" +
"\x04name\x18\x04 \x01(\tR\x04name\"\xc1\x05\n" +
"\x04name\x18\x04 \x01(\tR\x04name\"\xde\x05\n" +
"\tFlowEvent\x12,\n" +
"\x04type\x18\x01 \x01(\x0e2\x18.netmaker.flow.EventTypeR\x04type\x12\x17\n" +
"\aflow_id\x18\x02 \x01(\tR\x06flowId\x12\x17\n" +
@@ -608,7 +616,8 @@ const file_grpc_flow_flow_proto_rawDesc = "" +
"\fpackets_sent\x18\x12 \x01(\x04R\vpacketsSent\x12!\n" +
"\fpackets_recv\x18\x13 \x01(\x04R\vpacketsRecv\x12\x16\n" +
"\x06status\x18\x14 \x01(\rR\x06status\x12\x18\n" +
"\aversion\x18\x15 \x01(\x03R\aversion\"@\n" +
"\aversion\x18\x15 \x01(\x03R\aversion\x12\x1b\n" +
"\ttenant_id\x18\x16 \x01(\tR\btenantId\"@\n" +
"\fFlowEnvelope\x120\n" +
"\x06events\x18\x01 \x03(\v2\x18.netmaker.flow.FlowEventR\x06events\">\n" +
"\fFlowResponse\x12\x18\n" +
+2
View File
@@ -111,6 +111,8 @@ message FlowEvent {
* Usually equal to the netclient event timestamp (ms).
*/
int64 version = 21;
string tenant_id = 22;
}
// ============================================================
+6
View File
@@ -13,6 +13,7 @@ import (
"github.com/gravitl/netmaker/database"
"github.com/gravitl/netmaker/logic"
proLogic "github.com/gravitl/netmaker/pro/logic"
"github.com/gravitl/netmaker/servercfg"
)
func FlowHandlers(r *mux.Router) {
@@ -217,6 +218,11 @@ func handleListFlows(w http.ResponseWriter, r *http.Request) {
args = append(args, srcTypeStr, srcEntity, dstTypeStr, dstEntity)
}
if servercfg.GetNetmakerTenantID() != "" {
whereParts = append(whereParts, "tenant_id = ?")
args = append(args, servercfg.GetNetmakerTenantID())
}
// Pagination
page := parseIntOrDefault(q.Get("page"), 1)
perPage := parseIntOrDefault(q.Get("per_page"), 100)
+1 -1
View File
@@ -118,7 +118,7 @@ func InitPro() {
addJitExpiryHookWithEmail()
if proLogic.GetFeatureFlags().EnableFlowLogs && logic.GetServerSettings().EnableFlowLogs {
err := ch.Initialize()
err := ch.Initialize(servercfg.GetNetmakerTenantID())
if err != nil {
logger.Log(0, "error connecting to clickhouse:", err.Error())
}