diff --git a/Cargo.lock b/Cargo.lock index 98685637..a72d6dd0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2175,6 +2175,7 @@ dependencies = [ "byteorder", "bytes", "cfg-if", + "cfg_aliases 0.2.1", "chrono", "cidr", "clap", diff --git a/easytier-web/frontend-lib/src/locales/cn.yaml b/easytier-web/frontend-lib/src/locales/cn.yaml index 09147254..b9e03753 100644 --- a/easytier-web/frontend-lib/src/locales/cn.yaml +++ b/easytier-web/frontend-lib/src/locales/cn.yaml @@ -117,7 +117,7 @@ disable_quic_input: 禁用 QUIC 输入 disable_quic_input_help: 禁用 QUIC 入站流量,其他开启 QUIC 代理的节点仍然使用 TCP 连接到本节点。 disable_p2p: 禁用 P2P -disable_p2p_help: 禁用 P2P 模式,所有流量通过手动指定的服务器中转。 +disable_p2p_help: 禁用普通自动 P2P。开启 need-p2p 的节点仍可与当前节点建立 P2P。 p2p_only: 仅 P2P p2p_only_help: 仅与已经建立P2P连接的对等节点通信,不通过其他节点中转。 diff --git a/easytier-web/frontend-lib/src/locales/en.yaml b/easytier-web/frontend-lib/src/locales/en.yaml index 430845f5..c614d50c 100644 --- a/easytier-web/frontend-lib/src/locales/en.yaml +++ b/easytier-web/frontend-lib/src/locales/en.yaml @@ -116,7 +116,7 @@ disable_quic_input: Disable QUIC Input disable_quic_input_help: Disable inbound QUIC traffic, while nodes with QUIC proxy enabled continue to connect using TCP. disable_p2p: Disable P2P -disable_p2p_help: Disable P2P mode; route all traffic through a manually specified relay server. +disable_p2p_help: Disable ordinary automatic P2P. Nodes with need-p2p enabled can still establish P2P with this node. p2p_only: P2P Only p2p_only_help: Only communicate with peers that have already established P2P connections, do not relay through other nodes. diff --git a/easytier-web/src/client_manager/session.rs b/easytier-web/src/client_manager/session.rs index 4675c6de..2159a093 100644 --- a/easytier-web/src/client_manager/session.rs +++ b/easytier-web/src/client_manager/session.rs @@ -1,4 +1,9 @@ -use std::{collections::HashSet, fmt::Debug, str::FromStr as _, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + fmt::Debug, + str::FromStr as _, + sync::Arc, +}; use anyhow::Context; use easytier::{ @@ -37,6 +42,7 @@ pub struct SessionData { storage_token: Option, binding_version: Option, + applied_config_revision: Option, notifier: broadcast::Sender, req: Option, location: Option, @@ -59,6 +65,7 @@ impl SessionData { client_url, storage_token: None, binding_version: None, + applied_config_revision: None, notifier: tx, req: None, location, @@ -117,37 +124,16 @@ struct SessionRpcService { } impl SessionRpcService { - async fn persist_webhook_network_config( - storage: &Storage, - user_id: i32, - machine_id: uuid::Uuid, - network_config: serde_json::Value, - ) -> anyhow::Result<()> { - let mut network_config = network_config; + fn normalize_network_config( + mut network_config: serde_json::Value, + inst_id: uuid::Uuid, + ) -> anyhow::Result { let network_name = network_config .get("network_name") .and_then(|v| v.as_str()) .filter(|v| !v.is_empty()) .ok_or_else(|| anyhow::anyhow!("webhook response missing network_name"))? .to_string(); - let existing_configs = storage - .db() - .list_network_configs((user_id, machine_id), ListNetworkProps::All) - .await - .map_err(|e| anyhow::anyhow!("failed to list existing network configs: {:?}", e))?; - let inst_id = existing_configs - .iter() - .find_map(|cfg| { - let value = serde_json::from_str::(&cfg.network_config).ok()?; - let cfg_network_name = value.get("network_name")?.as_str()?; - if cfg_network_name == network_name { - uuid::Uuid::parse_str(&cfg.network_instance_id).ok() - } else { - None - } - }) - .unwrap_or_else(uuid::Uuid::new_v4); - let config_obj = network_config .as_object_mut() .ok_or_else(|| anyhow::anyhow!("webhook network_config must be a JSON object"))?; @@ -157,14 +143,66 @@ impl SessionRpcService { ); config_obj .entry("instance_name".to_string()) - .or_insert_with(|| serde_json::Value::String(network_name.clone())); + .or_insert_with(|| serde_json::Value::String(network_name)); - let config = serde_json::from_value::(network_config)?; - storage + Ok(serde_json::from_value::(network_config)?) + } + + async fn reconcile_managed_network_configs( + storage: &Storage, + user_id: i32, + machine_id: uuid::Uuid, + desired_configs: Vec, + ) -> anyhow::Result<()> { + let existing_configs = storage .db() - .insert_or_update_user_network_config((user_id, machine_id), inst_id, config) + .list_network_configs((user_id, machine_id), ListNetworkProps::All) .await - .map_err(|e| anyhow::anyhow!("failed to persist webhook network config: {:?}", e))?; + .map_err(|e| anyhow::anyhow!("failed to list existing network configs: {:?}", e))?; + let existing_ids = existing_configs + .iter() + .filter_map(|cfg| uuid::Uuid::parse_str(&cfg.network_instance_id).ok()) + .collect::>(); + + let mut desired_ids = HashSet::with_capacity(desired_configs.len()); + let mut normalized = HashMap::with_capacity(desired_configs.len()); + for desired in desired_configs { + let inst_id = uuid::Uuid::parse_str(&desired.instance_id).with_context(|| { + format!( + "invalid desired managed instance id: {}", + desired.instance_id + ) + })?; + let config = Self::normalize_network_config(desired.network_config, inst_id)?; + desired_ids.insert(inst_id); + normalized.insert(inst_id, config); + } + + for (inst_id, config) in normalized { + storage + .db() + .insert_or_update_user_network_config((user_id, machine_id), inst_id, config) + .await + .map_err(|e| { + anyhow::anyhow!( + "failed to persist managed network config {}: {:?}", + inst_id, + e + ) + })?; + } + + let stale_ids = existing_ids + .difference(&desired_ids) + .copied() + .collect::>(); + if !stale_ids.is_empty() { + storage + .db() + .delete_network_configs((user_id, machine_id), &stale_ids) + .await + .map_err(|e| anyhow::anyhow!("failed to delete stale network configs: {:?}", e))?; + } Ok(()) } @@ -185,10 +223,13 @@ impl SessionRpcService { req.machine_id ))?; - let (user_id, webhook_network_config, webhook_validated, binding_version) = if data - .webhook_config - .is_enabled() - { + let ( + user_id, + webhook_managed_network_configs, + webhook_config_revision, + webhook_validated, + binding_version, + ) = if data.webhook_config.is_enabled() { let webhook_req = crate::webhook::ValidateTokenRequest { token: req.user_token.clone(), machine_id: machine_id.to_string(), @@ -223,7 +264,8 @@ impl SessionRpcService { }; ( user_id, - resp.network_config, + resp.managed_network_configs, + resp.config_revision, true, Some(resp.binding_version), ) @@ -257,21 +299,21 @@ impl SessionRpcService { ); } }; - (user_id, None, false, None) + (user_id, Vec::new(), String::new(), false, None) }; - if webhook_validated { - if let Some(network_config) = webhook_network_config { - Self::persist_webhook_network_config(&storage, user_id, machine_id, network_config) - .await - .map_err(rpc_types::error::Error::from)?; - } - } else if webhook_network_config.is_some() { - return Err(anyhow::anyhow!( - "unexpected webhook network_config for non-webhook token {:?}", - req.user_token + if webhook_validated + && data.applied_config_revision.as_deref() != Some(webhook_config_revision.as_str()) + { + Self::reconcile_managed_network_configs( + &storage, + user_id, + machine_id, + webhook_managed_network_configs, ) - .into()); + .await + .map_err(rpc_types::error::Error::from)?; + data.applied_config_revision = Some(webhook_config_revision); } if data.req.replace(req.clone()).is_none() { @@ -411,6 +453,7 @@ impl Session { rpc_client: SessionRpcClient, ) { let mut cleaned_web_managed_instances = false; + let mut last_desired_inst_ids: Option> = None; loop { heartbeat_waiter = heartbeat_waiter.resubscribe(); let req = heartbeat_waiter.recv().await; @@ -467,8 +510,15 @@ impl Session { }; let mut has_failed = false; + let should_be_alive_inst_ids = local_configs + .iter() + .map(|cfg| cfg.network_instance_id.clone()) + .collect::>(); + let desired_changed = last_desired_inst_ids + .as_ref() + .is_none_or(|last| last != &should_be_alive_inst_ids); - if !cleaned_web_managed_instances { + if !cleaned_web_managed_instances || desired_changed { let all_local_configs = match storage .db .list_network_configs((user_id, machine_id.into()), ListNetworkProps::All) @@ -486,11 +536,6 @@ impl Session { .map(|cfg| cfg.network_instance_id.clone()) .collect::>(); - let should_be_alive_inst_ids = local_configs - .iter() - .map(|cfg| cfg.network_instance_id.clone()) - .collect::>(); - let should_delete_ids = running_inst_ids .iter() .chain(all_inst_ids.iter()) @@ -519,6 +564,7 @@ impl Session { if !has_failed { cleaned_web_managed_instances = true; + last_desired_inst_ids = Some(should_be_alive_inst_ids.clone()); } } @@ -549,8 +595,7 @@ impl Session { } if !has_failed { - tracing::info!(?req, "All network instances are running"); - break; + last_desired_inst_ids = Some(should_be_alive_inst_ids); } } } @@ -585,3 +630,103 @@ impl Session { self.data.read().await.req() } } + +#[cfg(test)] +mod tests { + use easytier::rpc_service::remote_client::{ListNetworkProps, Storage as _}; + use serde_json::json; + + use super::{super::storage::Storage, *}; + + #[tokio::test] + async fn reconcile_managed_network_configs_upserts_and_deletes_exact_set() { + let storage = Storage::new(crate::db::Db::memory_db().await); + let user_id = storage + .db() + .auto_create_user("webhook-user") + .await + .unwrap() + .id; + let machine_id = uuid::Uuid::new_v4(); + let keep_id = uuid::Uuid::new_v4(); + let stale_id = uuid::Uuid::new_v4(); + let new_id = uuid::Uuid::new_v4(); + + storage + .db() + .insert_or_update_user_network_config( + (user_id, machine_id), + keep_id, + NetworkConfig { + network_name: Some("old-name".to_string()), + ..Default::default() + }, + ) + .await + .unwrap(); + storage + .db() + .insert_or_update_user_network_config( + (user_id, machine_id), + stale_id, + NetworkConfig { + network_name: Some("stale".to_string()), + ..Default::default() + }, + ) + .await + .unwrap(); + + SessionRpcService::reconcile_managed_network_configs( + &storage, + user_id, + machine_id, + vec![ + crate::webhook::ManagedNetworkConfig { + instance_id: keep_id.to_string(), + network_config: json!({ + "instance_id": keep_id.to_string(), + "network_name": "updated-name" + }), + }, + crate::webhook::ManagedNetworkConfig { + instance_id: new_id.to_string(), + network_config: json!({ + "instance_id": new_id.to_string(), + "network_name": "new-name" + }), + }, + ], + ) + .await + .unwrap(); + + let configs = storage + .db() + .list_network_configs((user_id, machine_id), ListNetworkProps::All) + .await + .unwrap(); + let config_ids = configs + .iter() + .map(|cfg| cfg.network_instance_id.clone()) + .collect::>(); + + assert_eq!(configs.len(), 2); + assert!(config_ids.contains(&keep_id.to_string())); + assert!(config_ids.contains(&new_id.to_string())); + assert!(!config_ids.contains(&stale_id.to_string())); + + let updated_keep = storage + .db() + .get_network_config((user_id, machine_id), &keep_id.to_string()) + .await + .unwrap() + .unwrap(); + let updated_keep_config: NetworkConfig = + serde_json::from_str(&updated_keep.network_config).unwrap(); + assert_eq!( + updated_keep_config.network_name.as_deref(), + Some("updated-name") + ); + } +} diff --git a/easytier-web/src/db/mod.rs b/easytier-web/src/db/mod.rs index c230ddc5..b525a0cc 100644 --- a/easytier-web/src/db/mod.rs +++ b/easytier-web/src/db/mod.rs @@ -154,13 +154,17 @@ impl Storage<(UserIdInDb, Uuid), user_running_network_configs::Model, DbErr> for use entity::user_running_network_configs as urnc; - let on_conflict = OnConflict::column(urnc::Column::NetworkInstanceId) - .update_columns([ - urnc::Column::NetworkConfig, - urnc::Column::Disabled, - urnc::Column::UpdateTime, - ]) - .to_owned(); + let on_conflict = OnConflict::columns([ + urnc::Column::UserId, + urnc::Column::DeviceId, + urnc::Column::NetworkInstanceId, + ]) + .update_columns([ + urnc::Column::NetworkConfig, + urnc::Column::Disabled, + urnc::Column::UpdateTime, + ]) + .to_owned(); let insert_m = urnc::ActiveModel { user_id: sea_orm::Set(user_id), device_id: sea_orm::Set(device_id.to_string()), @@ -184,13 +188,14 @@ impl Storage<(UserIdInDb, Uuid), user_running_network_configs::Model, DbErr> for async fn delete_network_configs( &self, - (user_id, _): (UserIdInDb, Uuid), + (user_id, device_id): (UserIdInDb, Uuid), network_inst_ids: &[Uuid], ) -> Result<(), DbErr> { use entity::user_running_network_configs as urnc; urnc::Entity::delete_many() .filter(urnc::Column::UserId.eq(user_id)) + .filter(urnc::Column::DeviceId.eq(device_id.to_string())) .filter( urnc::Column::NetworkInstanceId .is_in(network_inst_ids.iter().map(|id| id.to_string())), @@ -203,7 +208,7 @@ impl Storage<(UserIdInDb, Uuid), user_running_network_configs::Model, DbErr> for async fn update_network_config_state( &self, - (user_id, _): (UserIdInDb, Uuid), + (user_id, device_id): (UserIdInDb, Uuid), network_inst_id: Uuid, disabled: bool, ) -> Result<(), DbErr> { @@ -211,6 +216,7 @@ impl Storage<(UserIdInDb, Uuid), user_running_network_configs::Model, DbErr> for urnc::Entity::update_many() .filter(urnc::Column::UserId.eq(user_id)) + .filter(urnc::Column::DeviceId.eq(device_id.to_string())) .filter(urnc::Column::NetworkInstanceId.eq(network_inst_id.to_string())) .col_expr(urnc::Column::Disabled, Expr::value(disabled)) .col_expr( @@ -341,4 +347,60 @@ mod tests { .unwrap(); assert!(result3.is_none()); } + + #[tokio::test] + async fn test_user_network_config_same_instance_id_is_scoped_by_device() { + let db = Db::memory_db().await; + let user_id = db.auto_create_user("user-1").await.unwrap().id; + let device1 = uuid::Uuid::new_v4(); + let device2 = uuid::Uuid::new_v4(); + let inst_id = uuid::Uuid::new_v4(); + + db.insert_or_update_user_network_config( + (user_id, device1), + inst_id, + NetworkConfig { + network_name: Some("cfg-1".to_string()), + ..Default::default() + }, + ) + .await + .unwrap(); + db.insert_or_update_user_network_config( + (user_id, device2), + inst_id, + NetworkConfig { + network_name: Some("cfg-2".to_string()), + ..Default::default() + }, + ) + .await + .unwrap(); + + let first = db + .get_network_config((user_id, device1), &inst_id.to_string()) + .await + .unwrap() + .unwrap(); + let second = db + .get_network_config((user_id, device2), &inst_id.to_string()) + .await + .unwrap() + .unwrap(); + assert_eq!(first.user_id, user_id); + assert_eq!(first.device_id, device1.to_string()); + assert_eq!(second.user_id, user_id); + assert_eq!(second.device_id, device2.to_string()); + + let device1_configs = db + .list_network_configs((user_id, device1), ListNetworkProps::All) + .await + .unwrap(); + let device2_configs = db + .list_network_configs((user_id, device2), ListNetworkProps::All) + .await + .unwrap(); + assert_eq!(device1_configs.len(), 1); + assert_eq!(device2_configs.len(), 1); + } } diff --git a/easytier-web/src/migrator/m20260403_000002_scope_network_config_unique.rs b/easytier-web/src/migrator/m20260403_000002_scope_network_config_unique.rs new file mode 100644 index 00000000..84882745 --- /dev/null +++ b/easytier-web/src/migrator/m20260403_000002_scope_network_config_unique.rs @@ -0,0 +1,120 @@ +use sea_orm_migration::prelude::*; + +pub struct Migration; + +impl MigrationName for Migration { + fn name(&self) -> &str { + "m20260403_000002_scope_network_config_unique" + } +} + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let db = manager.get_connection(); + + db.execute_unprepared( + r#" + CREATE TABLE user_running_network_configs_new ( + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + user_id INTEGER NOT NULL, + device_id TEXT NOT NULL, + network_instance_id TEXT NOT NULL, + network_config TEXT NOT NULL, + disabled BOOLEAN NOT NULL DEFAULT FALSE, + create_time TEXT NOT NULL, + update_time TEXT NOT NULL, + CONSTRAINT fk_user_running_network_configs_user_id_to_users_id + FOREIGN KEY (user_id) REFERENCES users(id) + ON DELETE CASCADE + ON UPDATE CASCADE + ); + + INSERT INTO user_running_network_configs_new ( + id, + user_id, + device_id, + network_instance_id, + network_config, + disabled, + create_time, + update_time + ) + SELECT + id, + user_id, + device_id, + network_instance_id, + network_config, + disabled, + create_time, + update_time + FROM user_running_network_configs; + + DROP TABLE user_running_network_configs; + ALTER TABLE user_running_network_configs_new RENAME TO user_running_network_configs; + + CREATE INDEX idx_user_running_network_configs_user_id + ON user_running_network_configs(user_id); + CREATE UNIQUE INDEX idx_user_running_network_configs_scope_inst + ON user_running_network_configs(user_id, device_id, network_instance_id); + "#, + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let db = manager.get_connection(); + + db.execute_unprepared( + r#" + CREATE TABLE user_running_network_configs_old ( + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + user_id INTEGER NOT NULL, + device_id TEXT NOT NULL, + network_instance_id TEXT NOT NULL UNIQUE, + network_config TEXT NOT NULL, + disabled BOOLEAN NOT NULL DEFAULT FALSE, + create_time TEXT NOT NULL, + update_time TEXT NOT NULL, + CONSTRAINT fk_user_running_network_configs_user_id_to_users_id + FOREIGN KEY (user_id) REFERENCES users(id) + ON DELETE CASCADE + ON UPDATE CASCADE + ); + + INSERT INTO user_running_network_configs_old ( + id, + user_id, + device_id, + network_instance_id, + network_config, + disabled, + create_time, + update_time + ) + SELECT + id, + user_id, + device_id, + network_instance_id, + network_config, + disabled, + create_time, + update_time + FROM user_running_network_configs; + + DROP TABLE user_running_network_configs; + ALTER TABLE user_running_network_configs_old RENAME TO user_running_network_configs; + + CREATE INDEX idx_user_running_network_configs_user_id + ON user_running_network_configs(user_id); + "#, + ) + .await?; + + Ok(()) + } +} diff --git a/easytier-web/src/migrator/mod.rs b/easytier-web/src/migrator/mod.rs index 652f4730..e5490267 100644 --- a/easytier-web/src/migrator/mod.rs +++ b/easytier-web/src/migrator/mod.rs @@ -1,12 +1,16 @@ use sea_orm_migration::prelude::*; mod m20241029_000001_init; +mod m20260403_000002_scope_network_config_unique; pub struct Migrator; #[async_trait::async_trait] impl MigratorTrait for Migrator { fn migrations() -> Vec> { - vec![Box::new(m20241029_000001_init::Migration)] + vec![ + Box::new(m20241029_000001_init::Migration), + Box::new(m20260403_000002_scope_network_config_unique::Migration), + ] } } diff --git a/easytier-web/src/webhook.rs b/easytier-web/src/webhook.rs index 29ad5043..f93a14ea 100644 --- a/easytier-web/src/webhook.rs +++ b/easytier-web/src/webhook.rs @@ -65,7 +65,14 @@ pub struct ValidateTokenResponse { pub pre_approved: bool, #[serde(default)] pub binding_version: u64, - pub network_config: Option, + pub managed_network_configs: Vec, + pub config_revision: String, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct ManagedNetworkConfig { + pub instance_id: String, + pub network_config: serde_json::Value, } #[derive(Debug, Serialize)] diff --git a/easytier/locales/app.yml b/easytier/locales/app.yml index c6be067a..bc6cb7d3 100644 --- a/easytier/locales/app.yml +++ b/easytier/locales/app.yml @@ -152,8 +152,8 @@ core_clap: 如果该参数为空,则禁用转发。默认允许所有网络。 例如:'*'(所有网络),'def*'(以def为前缀的网络),'net1 net2'(只允许net1和net2)" disable_p2p: - en: "disable p2p communication, will only relay packets with peers specified by --peers" - zh-CN: "禁用P2P通信,只通过--peers指定的节点转发数据包" + en: "disable ordinary automatic p2p; still establish p2p with peers marked as need-p2p, and other peers should not proactively connect to this node" + zh-CN: "禁用普通自动P2P;仍会与标记为 need-p2p 的节点建立P2P连接,其他节点不应主动与当前节点建立P2P" p2p_only: en: "only communicate with peers that already establish p2p connection" zh-CN: "仅与已经建立P2P连接的对等节点通信" diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index fa1caed8..6a61b774 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -244,6 +244,7 @@ impl GlobalCtx { feature_flags.quic_input = !flags.disable_quic_input; feature_flags.no_relay_quic = flags.disable_relay_quic; feature_flags.need_p2p = flags.need_p2p; + feature_flags.disable_p2p = flags.disable_p2p; feature_flags } @@ -743,12 +744,13 @@ pub mod tests { feature_flags.is_public_server = true; global_ctx.set_feature_flags(feature_flags); - let mut flags = global_ctx.get_flags(); + let mut flags = global_ctx.get_flags().clone(); flags.disable_kcp_input = true; flags.disable_relay_kcp = true; flags.disable_quic_input = true; flags.disable_relay_quic = true; flags.need_p2p = true; + flags.disable_p2p = true; global_ctx.set_flags(flags); let feature_flags = global_ctx.get_feature_flags(); @@ -757,6 +759,7 @@ pub mod tests { assert!(!feature_flags.quic_input); assert!(feature_flags.no_relay_quic); assert!(feature_flags.need_p2p); + assert!(feature_flags.disable_p2p); assert!(feature_flags.support_conn_list_sync); assert!(feature_flags.avoid_relay_data); assert!(feature_flags.is_public_server); diff --git a/easytier/src/common/stats_manager.rs b/easytier/src/common/stats_manager.rs index 38448f8e..2218263a 100644 --- a/easytier/src/common/stats_manager.rs +++ b/easytier/src/common/stats_manager.rs @@ -581,9 +581,9 @@ impl StatsManager { break; }; - // Remove entries that haven't been updated for 3 minutes - counters.retain(|_, metric_data: &mut Arc| unsafe { - metric_data.get_last_updated() > cutoff_time + counters.retain(|_, metric_data: &mut Arc| { + Arc::strong_count(metric_data) > 1 + || unsafe { metric_data.get_last_updated() > cutoff_time } }); counters.shrink_to_fit(); } @@ -900,6 +900,33 @@ mod tests { assert_eq!(counter2.get(), 25); } + #[tokio::test] + async fn test_cleanup_keeps_metrics_with_live_handles() { + let stats = StatsManager::new(); + let counter = stats.get_simple_counter(MetricName::TrafficBytesForwarded); + counter.set(1); + + let cutoff_time = Instant::now().checked_add(Duration::from_secs(1)).unwrap(); + stats + .counters + .retain(|_, metric_data: &mut Arc| { + Arc::strong_count(metric_data) > 1 + || unsafe { metric_data.get_last_updated() > cutoff_time } + }); + + assert_eq!(stats.metric_count(), 1); + assert_eq!(stats.get_all_metrics().len(), 1); + + drop(counter); + stats + .counters + .retain(|_, metric_data: &mut Arc| { + Arc::strong_count(metric_data) > 1 + || unsafe { metric_data.get_last_updated() > cutoff_time } + }); + assert_eq!(stats.metric_count(), 0); + } + #[tokio::test] async fn test_stats_rpc_data_structures() { // Test GetStatsRequest diff --git a/easytier/src/connector/direct.rs b/easytier/src/connector/direct.rs index 0d630058..00a88265 100644 --- a/easytier/src/connector/direct.rs +++ b/easytier/src/connector/direct.rs @@ -62,7 +62,8 @@ impl PeerManagerForDirectConnector for PeerManager { async fn list_peers(&self) -> Vec { let mut ret = vec![]; let allow_public_server = use_global_var!(DIRECT_CONNECT_TO_PUBLIC_SERVER); - let lazy_p2p = self.get_global_ctx().get_flags().lazy_p2p; + let flags = self.get_global_ctx().get_flags(); + let lazy_p2p = flags.lazy_p2p; let now = Instant::now(); let routes = self.list_routes().await; @@ -71,10 +72,15 @@ impl PeerManagerForDirectConnector for PeerManager { route.feature_flag.as_ref(), allow_public_server, lazy_p2p, + flags.disable_p2p, + flags.need_p2p, ); - let dynamic_allowed = - should_try_p2p_with_peer(route.feature_flag.as_ref(), allow_public_server) - && self.has_recent_traffic(route.peer_id, now); + let dynamic_allowed = should_try_p2p_with_peer( + route.feature_flag.as_ref(), + allow_public_server, + flags.disable_p2p, + flags.need_p2p, + ) && self.has_recent_traffic(route.peer_id, now); if static_allowed || dynamic_allowed { ret.push(route.peer_id); } @@ -650,10 +656,6 @@ impl DirectConnectorManager { } pub fn run(&mut self) { - if self.global_ctx.get_flags().disable_p2p { - return; - } - self.run_as_server(); self.run_as_client(); } diff --git a/easytier/src/connector/mod.rs b/easytier/src/connector/mod.rs index bc6a4d64..c6ab6a28 100644 --- a/easytier/src/connector/mod.rs +++ b/easytier/src/connector/mod.rs @@ -26,19 +26,31 @@ pub mod http_connector; pub(crate) fn should_try_p2p_with_peer( feature_flag: Option<&PeerFeatureFlag>, allow_public_server: bool, + local_disable_p2p: bool, + local_need_p2p: bool, ) -> bool { feature_flag - .map(|flag| allow_public_server || !flag.is_public_server) - .unwrap_or(true) + .map(|flag| { + (allow_public_server || !flag.is_public_server) + && (!local_disable_p2p || flag.need_p2p) + && (!flag.disable_p2p || local_need_p2p) + }) + .unwrap_or(!local_disable_p2p) } pub(crate) fn should_background_p2p_with_peer( feature_flag: Option<&PeerFeatureFlag>, allow_public_server: bool, lazy_p2p: bool, + local_disable_p2p: bool, + local_need_p2p: bool, ) -> bool { - should_try_p2p_with_peer(feature_flag, allow_public_server) - && (!lazy_p2p || feature_flag.map(|flag| flag.need_p2p).unwrap_or(false)) + should_try_p2p_with_peer( + feature_flag, + allow_public_server, + local_disable_p2p, + local_need_p2p, + ) && (!lazy_p2p || feature_flag.map(|flag| flag.need_p2p).unwrap_or(false)) } async fn set_bind_addr_for_peer_connector( @@ -162,17 +174,23 @@ mod tests { assert!(should_background_p2p_with_peer( Some(&no_need_p2p), false, + false, + false, false )); assert!(!should_background_p2p_with_peer( Some(&no_need_p2p), false, - true + true, + false, + false )); assert!(should_background_p2p_with_peer( Some(&need_p2p), false, - true + true, + false, + false )); } @@ -183,16 +201,93 @@ mod tests { ..Default::default() }; - assert!(!should_try_p2p_with_peer(Some(&public_server), false)); - assert!(should_try_p2p_with_peer(Some(&public_server), true)); + assert!(!should_try_p2p_with_peer( + Some(&public_server), + false, + false, + false + )); + assert!(should_try_p2p_with_peer( + Some(&public_server), + true, + false, + false + )); assert!(!should_background_p2p_with_peer( Some(&public_server), false, + false, + false, false )); assert!(should_background_p2p_with_peer( Some(&public_server), true, + false, + false, + false + )); + } + + #[test] + fn disable_p2p_only_allows_need_p2p_exceptions() { + let normal_peer = PeerFeatureFlag::default(); + let need_peer = PeerFeatureFlag { + need_p2p: true, + ..Default::default() + }; + let disable_peer = PeerFeatureFlag { + disable_p2p: true, + ..Default::default() + }; + let disable_need_peer = PeerFeatureFlag { + disable_p2p: true, + need_p2p: true, + ..Default::default() + }; + + assert!(should_try_p2p_with_peer( + Some(&normal_peer), + false, + false, + false + )); + assert!(should_try_p2p_with_peer(None, false, false, false)); + assert!(!should_try_p2p_with_peer(None, false, true, false)); + assert!(!should_try_p2p_with_peer( + Some(&normal_peer), + false, + true, + false + )); + assert!(should_try_p2p_with_peer( + Some(&need_peer), + false, + true, + false + )); + assert!(!should_try_p2p_with_peer( + Some(&disable_peer), + false, + false, + false + )); + assert!(should_try_p2p_with_peer( + Some(&disable_peer), + false, + false, + true + )); + assert!(should_try_p2p_with_peer( + Some(&disable_need_peer), + false, + true, + true + )); + assert!(!should_try_p2p_with_peer( + Some(&disable_need_peer), + false, + true, false )); } diff --git a/easytier/src/connector/tcp_hole_punch.rs b/easytier/src/connector/tcp_hole_punch.rs index 3bf8e06f..7ce4f2b4 100644 --- a/easytier/src/connector/tcp_hole_punch.rs +++ b/easytier/src/connector/tcp_hole_punch.rs @@ -420,7 +420,8 @@ impl PeerTaskLauncher for TcpHolePunchPeerTaskLauncher { #[tracing::instrument(skip(self, data))] async fn collect_peers_need_task(&self, data: &Self::Data) -> Vec { let global_ctx = data.peer_mgr.get_global_ctx(); - let lazy_p2p = global_ctx.get_flags().lazy_p2p; + let flags = global_ctx.get_flags(); + let lazy_p2p = flags.lazy_p2p; let my_tcp_nat_type = NatType::try_from( global_ctx .get_stun_info_collector() @@ -443,10 +444,19 @@ impl PeerTaskLauncher for TcpHolePunchPeerTaskLauncher { let mut peers_to_connect = Vec::new(); for route in data.peer_mgr.list_routes().await.iter() { - let static_allowed = - should_background_p2p_with_peer(route.feature_flag.as_ref(), false, lazy_p2p); - let dynamic_allowed = should_try_p2p_with_peer(route.feature_flag.as_ref(), false) - && data.peer_mgr.has_recent_traffic(route.peer_id, now); + let static_allowed = should_background_p2p_with_peer( + route.feature_flag.as_ref(), + false, + lazy_p2p, + flags.disable_p2p, + flags.need_p2p, + ); + let dynamic_allowed = should_try_p2p_with_peer( + route.feature_flag.as_ref(), + false, + flags.disable_p2p, + flags.need_p2p, + ) && data.peer_mgr.has_recent_traffic(route.peer_id, now); if !static_allowed && !dynamic_allowed { continue; } @@ -554,10 +564,9 @@ impl TcpHolePunchConnector { pub async fn run(&mut self) -> Result<(), Error> { let flags = self.peer_mgr.get_global_ctx().get_flags(); - if flags.disable_p2p || flags.disable_tcp_hole_punching { + if flags.disable_tcp_hole_punching { tracing::debug!( - "tcp hole punch disabled by disable_p2p(={}) or disable_tcp_hole_punching(={});", - flags.disable_p2p, + "tcp hole punch disabled by disable_tcp_hole_punching(={});", flags.disable_tcp_hole_punching ); return Ok(()); diff --git a/easytier/src/connector/udp_hole_punch/mod.rs b/easytier/src/connector/udp_hole_punch/mod.rs index f097bdab..66197d01 100644 --- a/easytier/src/connector/udp_hole_punch/mod.rs +++ b/easytier/src/connector/udp_hole_punch/mod.rs @@ -428,7 +428,8 @@ impl PeerTaskLauncher for UdpHolePunchPeerTaskLauncher { } let my_peer_id = data.peer_mgr.my_peer_id(); - let lazy_p2p = data.peer_mgr.get_global_ctx().get_flags().lazy_p2p; + let flags = data.peer_mgr.get_global_ctx().get_flags(); + let lazy_p2p = flags.lazy_p2p; let now = Instant::now(); data.blacklist.cleanup(); @@ -438,10 +439,19 @@ impl PeerTaskLauncher for UdpHolePunchPeerTaskLauncher { // 2. peers is full cone (any restricted type); // 3. peers not in blacklist; for route in data.peer_mgr.list_routes().await.iter() { - let static_allowed = - should_background_p2p_with_peer(route.feature_flag.as_ref(), false, lazy_p2p); - let dynamic_allowed = should_try_p2p_with_peer(route.feature_flag.as_ref(), false) - && data.peer_mgr.has_recent_traffic(route.peer_id, now); + let static_allowed = should_background_p2p_with_peer( + route.feature_flag.as_ref(), + false, + lazy_p2p, + flags.disable_p2p, + flags.need_p2p, + ); + let dynamic_allowed = should_try_p2p_with_peer( + route.feature_flag.as_ref(), + false, + flags.disable_p2p, + flags.need_p2p, + ) && data.peer_mgr.has_recent_traffic(route.peer_id, now); if !static_allowed && !dynamic_allowed { continue; } @@ -565,9 +575,6 @@ impl UdpHolePunchConnector { pub async fn run(&mut self) -> Result<(), Error> { let global_ctx = self.peer_mgr.get_global_ctx(); - if global_ctx.get_flags().disable_p2p { - return Ok(()); - } if global_ctx.get_flags().disable_udp_hole_punching { return Ok(()); } diff --git a/easytier/src/proto/common.proto b/easytier/src/proto/common.proto index 26a4759a..a49396b9 100644 --- a/easytier/src/proto/common.proto +++ b/easytier/src/proto/common.proto @@ -222,6 +222,7 @@ message PeerFeatureFlag { bool no_relay_quic = 7; bool is_credential_peer = 8; bool need_p2p = 9; + bool disable_p2p = 10; } enum SocketType { diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index 0e2a887b..dafabd8b 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -2565,6 +2565,73 @@ pub async fn need_p2p_overrides_lazy_p2p() { drop_insts(insts).await; } +#[tokio::test] +#[serial_test::serial] +pub async fn disable_p2p_still_connects_to_need_p2p_peers() { + let insts = init_lazy_p2p_three_node_ex("udp", |cfg| { + let mut flags = cfg.get_flags(); + if cfg.get_inst_name() == "inst1" { + flags.disable_p2p = true; + } + if cfg.get_inst_name() == "inst3" { + flags.need_p2p = true; + } + cfg.set_flags(flags); + cfg + }) + .await; + + let inst3_peer_id = insts[2].peer_id(); + wait_route_cost(&insts[0], inst3_peer_id, 2, Duration::from_secs(5)).await; + wait_for_condition( + || async { + insts[0] + .get_peer_manager() + .get_peer_map() + .has_peer(inst3_peer_id) + }, + Duration::from_secs(10), + ) + .await; + wait_route_cost(&insts[0], inst3_peer_id, 1, Duration::from_secs(10)).await; + + drop_insts(insts).await; +} + +#[tokio::test] +#[serial_test::serial] +pub async fn ordinary_nodes_do_not_proactively_connect_to_disable_p2p_peers() { + let insts = init_lazy_p2p_three_node_ex("udp", |cfg| { + if cfg.get_inst_name() == "inst3" { + let mut flags = cfg.get_flags(); + flags.disable_p2p = true; + cfg.set_flags(flags); + } + cfg + }) + .await; + + let inst3_peer_id = insts[2].peer_id(); + wait_route_cost(&insts[0], inst3_peer_id, 2, Duration::from_secs(5)).await; + assert!( + ping_test("net_a", "10.144.144.3", None).await, + "relay traffic to disable-p2p peers should still succeed" + ); + + tokio::time::sleep(Duration::from_secs(3)).await; + + assert!( + !insts[0] + .get_peer_manager() + .get_peer_map() + .has_peer(inst3_peer_id), + "ordinary nodes should not proactively establish p2p with disable-p2p peers" + ); + wait_route_cost(&insts[0], inst3_peer_id, 2, Duration::from_secs(3)).await; + + drop_insts(insts).await; +} + #[tokio::test] #[serial_test::serial] pub async fn lazy_p2p_warms_up_before_p2p_only_send() {