From 1d89ddbb1681a20427ef41ac52cd0858afa0ac5b Mon Sep 17 00:00:00 2001 From: KKRainbow <443152178@qq.com> Date: Mon, 23 Mar 2026 09:38:57 +0800 Subject: [PATCH] Add lazy P2P demand tracking and need_p2p override (#2003) - add lazy_p2p so nodes only start background P2P for peers that actually have recent business traffic - add need_p2p so specific peers can still request eager background P2P even when other nodes enable lazy mode - cover the new behavior with focused connector/peer-manager tests plus three-node integration tests that verify relay-to-direct route transition --- .../frontend-lib/src/components/Config.vue | 2 + easytier-web/frontend-lib/src/locales/cn.yaml | 6 + easytier-web/frontend-lib/src/locales/en.yaml | 6 + .../frontend-lib/src/types/network.ts | 4 + easytier/locales/app.yml | 6 + easytier/src/common/config.rs | 2 + easytier/src/common/global_ctx.rs | 88 ++++-- easytier/src/connector/direct.rs | 37 ++- easytier/src/connector/http_connector.rs | 2 +- easytier/src/connector/mod.rs | 75 +++++ easytier/src/connector/tcp_hole_punch.rs | 65 +++- easytier/src/connector/udp_hole_punch/mod.rs | 61 +++- easytier/src/core.rs | 20 ++ easytier/src/instance/dns_server/tests.rs | 2 +- easytier/src/instance/instance.rs | 13 +- easytier/src/launcher.rs | 12 + easytier/src/peers/foreign_network_manager.rs | 4 +- easytier/src/peers/peer_manager.rs | 291 +++++++++++++++++- easytier/src/peers/peer_task.rs | 92 +++++- easytier/src/proto/api_manage.proto | 2 + easytier/src/proto/common.proto | 4 + easytier/src/tests/three_node.rs | 173 ++++++++++- 22 files changed, 876 insertions(+), 91 deletions(-) diff --git a/easytier-web/frontend-lib/src/components/Config.vue b/easytier-web/frontend-lib/src/components/Config.vue index 0e49ada4..8b13a30c 100644 --- a/easytier-web/frontend-lib/src/components/Config.vue +++ b/easytier-web/frontend-lib/src/components/Config.vue @@ -93,10 +93,12 @@ const bool_flags: BoolFlag[] = [ { field: 'disable_quic_input', help: 'disable_quic_input_help' }, { field: 'disable_p2p', help: 'disable_p2p_help' }, { field: 'p2p_only', help: 'p2p_only_help' }, + { field: 'lazy_p2p', help: 'lazy_p2p_help' }, { field: 'bind_device', help: 'bind_device_help' }, { field: 'no_tun', help: 'no_tun_help' }, { field: 'enable_exit_node', help: 'enable_exit_node_help' }, { field: 'relay_all_peer_rpc', help: 'relay_all_peer_rpc_help' }, + { field: 'need_p2p', help: 'need_p2p_help' }, { field: 'multi_thread', help: 'multi_thread_help' }, { field: 'proxy_forward_by_system', help: 'proxy_forward_by_system_help' }, { field: 'disable_encryption', help: 'disable_encryption_help' }, diff --git a/easytier-web/frontend-lib/src/locales/cn.yaml b/easytier-web/frontend-lib/src/locales/cn.yaml index f641e793..dc128836 100644 --- a/easytier-web/frontend-lib/src/locales/cn.yaml +++ b/easytier-web/frontend-lib/src/locales/cn.yaml @@ -113,6 +113,9 @@ disable_p2p_help: 禁用 P2P 模式,所有流量通过手动指定的服务器 p2p_only: 仅 P2P p2p_only_help: 仅与已经建立P2P连接的对等节点通信,不通过其他节点中转。 +lazy_p2p: 延迟 P2P +lazy_p2p_help: 仅在实际流量需要某个对等节点时才尝试建立 P2P。开启 need-p2p 的节点仍会被主动连接。 + bind_device: 仅使用物理网卡 bind_device_help: 仅使用物理网卡,避免 EasyTier 通过其他虚拟网建立连接。 @@ -127,6 +130,9 @@ relay_all_peer_rpc_help: | 允许转发所有对等节点的RPC数据包,即使对等节点不在转发网络白名单中。 这可以帮助白名单外网络中的对等节点建立P2P连接。 +need_p2p: 需要 P2P +need_p2p_help: 即使其他节点启用了 lazy p2p,也要求它们主动与当前节点建立 P2P 连接。 + multi_thread: 启用多线程 multi_thread_help: 使用多线程运行时 diff --git a/easytier-web/frontend-lib/src/locales/en.yaml b/easytier-web/frontend-lib/src/locales/en.yaml index ebf22840..110a7399 100644 --- a/easytier-web/frontend-lib/src/locales/en.yaml +++ b/easytier-web/frontend-lib/src/locales/en.yaml @@ -112,6 +112,9 @@ disable_p2p_help: Disable P2P mode; route all traffic through a manually specifi p2p_only: P2P Only p2p_only_help: Only communicate with peers that have already established P2P connections, do not relay through other nodes. +lazy_p2p: Lazy P2P +lazy_p2p_help: Only try to establish P2P when traffic actually targets a peer. Peers with need-p2p enabled are still connected proactively. + bind_device: Bind to Physical Device Only bind_device_help: Use only the physical network interface to prevent EasyTier from connecting via virtual networks. @@ -126,6 +129,9 @@ relay_all_peer_rpc_help: | Relay all peer rpc packets, even if the peer is not in the relay network whitelist. This can help peers not in relay network whitelist to establish p2p connection. +need_p2p: Need P2P +need_p2p_help: Ask other peers to proactively establish P2P connections to this node even when they enable lazy P2P. + multi_thread: Multi Thread multi_thread_help: Use multi-thread runtime diff --git a/easytier-web/frontend-lib/src/types/network.ts b/easytier-web/frontend-lib/src/types/network.ts index 2f554dbc..70684cdb 100644 --- a/easytier-web/frontend-lib/src/types/network.ts +++ b/easytier-web/frontend-lib/src/types/network.ts @@ -53,10 +53,12 @@ export interface NetworkConfig { disable_quic_input?: boolean disable_p2p?: boolean p2p_only?: boolean + lazy_p2p?: boolean bind_device?: boolean no_tun?: boolean enable_exit_node?: boolean relay_all_peer_rpc?: boolean + need_p2p?: boolean multi_thread?: boolean proxy_forward_by_system?: boolean disable_encryption?: boolean @@ -125,10 +127,12 @@ export function DEFAULT_NETWORK_CONFIG(): NetworkConfig { disable_quic_input: false, disable_p2p: false, p2p_only: false, + lazy_p2p: false, bind_device: true, no_tun: false, enable_exit_node: false, relay_all_peer_rpc: false, + need_p2p: false, multi_thread: true, proxy_forward_by_system: false, disable_encryption: false, diff --git a/easytier/locales/app.yml b/easytier/locales/app.yml index ff4484d2..4a94979a 100644 --- a/easytier/locales/app.yml +++ b/easytier/locales/app.yml @@ -157,6 +157,12 @@ core_clap: p2p_only: en: "only communicate with peers that already establish p2p connection" zh-CN: "仅与已经建立P2P连接的对等节点通信" + lazy_p2p: + en: "only try to establish p2p when traffic actually needs the peer; peers marked as need-p2p are still connected proactively" + zh-CN: "仅在实际流量需要某个对等节点时才尝试建立P2P;被标记为 need-p2p 的节点仍会主动建立连接" + need_p2p: + en: "announce that other peers should proactively establish p2p connections to this node even when they enable lazy-p2p" + zh-CN: "声明即使其他节点启用了 lazy-p2p,也应主动与当前节点建立P2P连接" disable_tcp_hole_punching: en: "disable tcp hole punching" zh-CN: "禁用TCP打洞功能" diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index c8bd738f..b24187f2 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -40,6 +40,7 @@ pub fn gen_default_flags() -> Flags { relay_network_whitelist: "*".to_string(), disable_p2p: false, p2p_only: false, + lazy_p2p: false, relay_all_peer_rpc: false, disable_tcp_hole_punching: false, disable_udp_hole_punching: false, @@ -63,6 +64,7 @@ pub fn gen_default_flags() -> Flags { tld_dns_zone: DEFAULT_ET_DNS_ZONE.to_string(), quic_listen_port: u32::MAX, + need_p2p: false, } } diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index dd291e9c..d879c15e 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -202,10 +202,7 @@ pub struct GlobalCtx { running_listeners: Mutex>, - enable_exit_node: bool, - proxy_forward_by_system: bool, - no_tun: bool, - p2p_only: bool, + flags: ArcSwap, feature_flags: AtomicCell, @@ -237,6 +234,17 @@ impl std::fmt::Debug for GlobalCtx { pub type ArcGlobalCtx = std::sync::Arc; impl GlobalCtx { + fn derive_feature_flags(flags: &Flags, current: Option) -> PeerFeatureFlag { + let mut feature_flags = current.unwrap_or_default(); + feature_flags.kcp_input = !flags.disable_kcp_input; + feature_flags.no_relay_kcp = flags.disable_relay_kcp; + feature_flags.support_conn_list_sync = true; + feature_flags.quic_input = !flags.disable_quic_input; + feature_flags.no_relay_quic = flags.disable_relay_quic; + feature_flags.need_p2p = flags.need_p2p; + feature_flags + } + pub fn new(config_fs: impl ConfigLoader + 'static) -> Self { let id = config_fs.get_id(); let network = config_fs.get_network_identity(); @@ -261,19 +269,9 @@ impl GlobalCtx { let stun_info_collector = Arc::new(stun_info_collector); - let enable_exit_node = config_fs.get_flags().enable_exit_node || cfg!(target_env = "ohos"); - let proxy_forward_by_system = config_fs.get_flags().proxy_forward_by_system; - let no_tun = config_fs.get_flags().no_tun; - let p2p_only = config_fs.get_flags().p2p_only; + let flags = config_fs.get_flags(); - let feature_flags = PeerFeatureFlag { - kcp_input: !config_fs.get_flags().disable_kcp_input, - no_relay_kcp: config_fs.get_flags().disable_relay_kcp, - support_conn_list_sync: true, // Enable selective peer list sync by default - quic_input: !config_fs.get_flags().disable_quic_input, - no_relay_quic: config_fs.get_flags().disable_relay_quic, - ..Default::default() - }; + let feature_flags = Self::derive_feature_flags(&flags, None); let credential_storage_path = config_fs.get_credential_file(); let credential_manager = Arc::new(CredentialManager::new(credential_storage_path)); @@ -301,10 +299,7 @@ impl GlobalCtx { running_listeners: Mutex::new(Vec::new()), - enable_exit_node, - proxy_forward_by_system, - no_tun, - p2p_only, + flags: ArcSwap::new(Arc::new(flags)), feature_flags: AtomicCell::new(feature_flags), @@ -455,11 +450,20 @@ impl GlobalCtx { } pub fn get_flags(&self) -> Flags { - self.config.get_flags() + self.flags.load().as_ref().clone() } pub fn set_flags(&self, flags: Flags) { - self.config.set_flags(flags); + self.config.set_flags(flags.clone()); + self.feature_flags.store(Self::derive_feature_flags( + &flags, + Some(self.feature_flags.load()), + )); + self.flags.store(Arc::new(flags)); + } + + pub fn flags_arc(&self) -> Arc { + self.flags.load_full() } pub fn get_128_key(&self) -> [u8; 16] { @@ -503,15 +507,15 @@ impl GlobalCtx { } pub fn enable_exit_node(&self) -> bool { - self.enable_exit_node + self.flags.load().enable_exit_node || cfg!(target_env = "ohos") } pub fn proxy_forward_by_system(&self) -> bool { - self.proxy_forward_by_system + self.flags.load().proxy_forward_by_system } pub fn no_tun(&self) -> bool { - self.no_tun + self.flags.load().no_tun } pub fn get_feature_flags(&self) -> PeerFeatureFlag { @@ -611,12 +615,13 @@ impl GlobalCtx { } pub fn p2p_only(&self) -> bool { - self.p2p_only + self.flags.load().p2p_only } pub fn latency_first(&self) -> bool { // NOTICE: p2p only is conflict with latency first - self.config.get_flags().latency_first && !self.p2p_only + let flags = self.flags.load(); + flags.latency_first && !flags.p2p_only } fn is_port_in_running_listeners(&self, port: u16, is_udp: bool) -> bool { @@ -730,6 +735,35 @@ pub mod tests { )); } + #[tokio::test] + async fn set_flags_keeps_derived_feature_flags_in_sync() { + let config = TomlConfigLoader::default(); + let global_ctx = GlobalCtx::new(config); + + let mut feature_flags = global_ctx.get_feature_flags(); + feature_flags.avoid_relay_data = true; + feature_flags.is_public_server = true; + global_ctx.set_feature_flags(feature_flags); + + let mut flags = global_ctx.get_flags(); + flags.disable_kcp_input = true; + flags.disable_relay_kcp = true; + flags.disable_quic_input = true; + flags.disable_relay_quic = true; + flags.need_p2p = true; + global_ctx.set_flags(flags); + + let feature_flags = global_ctx.get_feature_flags(); + assert!(!feature_flags.kcp_input); + assert!(feature_flags.no_relay_kcp); + assert!(!feature_flags.quic_input); + assert!(feature_flags.no_relay_quic); + assert!(feature_flags.need_p2p); + assert!(feature_flags.support_conn_list_sync); + assert!(feature_flags.avoid_relay_data); + assert!(feature_flags.is_public_server); + } + pub fn get_mock_global_ctx_with_network( network_identy: Option, ) -> ArcGlobalCtx { diff --git a/easytier/src/connector/direct.rs b/easytier/src/connector/direct.rs index 2655de88..d84cc209 100644 --- a/easytier/src/connector/direct.rs +++ b/easytier/src/connector/direct.rs @@ -8,7 +8,7 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, }, - time::Duration, + time::{Duration, Instant}, }; use crate::{ @@ -40,7 +40,10 @@ use rand::Rng; use tokio::{net::UdpSocket, task::JoinSet, time::timeout}; use url::Host; -use super::{create_connector_by_url, udp_hole_punch}; +use super::{ + create_connector_by_url, should_background_p2p_with_peer, should_try_p2p_with_peer, + udp_hole_punch, +}; pub const DIRECT_CONNECTOR_SERVICE_ID: u32 = 1; pub const DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC: u64 = 300; @@ -58,14 +61,22 @@ impl PeerManagerForDirectConnector for PeerManager { async fn list_peers(&self) -> Vec { let mut ret = vec![]; let allow_public_server = use_global_var!(DIRECT_CONNECT_TO_PUBLIC_SERVER); + let lazy_p2p = self.get_global_ctx().get_flags().lazy_p2p; + let now = Instant::now(); let routes = self.list_routes().await; - for r in routes.iter().filter(|r| { - r.feature_flag - .map(|r| allow_public_server || !r.is_public_server) - .unwrap_or(true) - }) { - ret.push(r.peer_id); + for route in routes.iter() { + let static_allowed = should_background_p2p_with_peer( + route.feature_flag.as_ref(), + allow_public_server, + lazy_p2p, + ); + let dynamic_allowed = + should_try_p2p_with_peer(route.feature_flag.as_ref(), allow_public_server) + && self.has_recent_traffic(route.peer_id, now); + if static_allowed || dynamic_allowed { + ret.push(route.peer_id); + } } ret @@ -625,7 +636,11 @@ impl DirectConnectorManager { global_ctx.clone(), peer_manager.clone(), )); - let client = PeerTaskManager::new(DirectConnectorLauncher(data.clone()), peer_manager); + let client = PeerTaskManager::new_with_external_signal( + DirectConnectorLauncher(data.clone()), + peer_manager.clone(), + Some(peer_manager.p2p_demand_notify()), + ); Self { global_ctx, data, @@ -696,7 +711,7 @@ mod tests { let mut f = p_a.get_global_ctx().get_flags(); f.bind_device = false; - p_a.get_global_ctx().config.set_flags(f); + p_a.get_global_ctx().set_flags(f); p_c.get_global_ctx() .config @@ -765,7 +780,7 @@ mod tests { } let mut f = p_c.get_global_ctx().config.get_flags(); f.enable_ipv6 = ipv6; - p_c.get_global_ctx().config.set_flags(f); + p_c.get_global_ctx().set_flags(f); let mut lis_c = ListenerManager::new(p_c.get_global_ctx(), p_c.clone()); lis_c.prepare_listeners().await.unwrap(); diff --git a/easytier/src/connector/http_connector.rs b/easytier/src/connector/http_connector.rs index fdaf4d2c..bcd35192 100644 --- a/easytier/src/connector/http_connector.rs +++ b/easytier/src/connector/http_connector.rs @@ -334,7 +334,7 @@ mod tests { let mut flags = global_ctx.config.get_flags(); flags.bind_device = false; - global_ctx.config.set_flags(flags); + global_ctx.set_flags(flags); let mut connector = HttpTunnelConnector::new(test_url.clone(), global_ctx.clone()); let mut listener = TcpTunnelListener::new("tcp://0.0.0.0:25888".parse().unwrap()); diff --git a/easytier/src/connector/mod.rs b/easytier/src/connector/mod.rs index 113498d5..fdb3959e 100644 --- a/easytier/src/connector/mod.rs +++ b/easytier/src/connector/mod.rs @@ -15,6 +15,7 @@ use crate::tunnel::unix::UnixSocketTunnelConnector; use crate::tunnel::wireguard::{WgConfig, WgTunnelConnector}; use crate::{ common::{error::Error, global_ctx::ArcGlobalCtx, idn, network::IPCollector}, + proto::common::PeerFeatureFlag, tunnel::{ check_scheme_and_get_socket_addr, ring::RingTunnelConnector, tcp::TcpTunnelConnector, udp::UdpTunnelConnector, IpVersion, TunnelConnector, @@ -29,6 +30,24 @@ pub mod udp_hole_punch; pub mod dns_connector; pub mod http_connector; +pub(crate) fn should_try_p2p_with_peer( + feature_flag: Option<&PeerFeatureFlag>, + allow_public_server: bool, +) -> bool { + feature_flag + .map(|flag| allow_public_server || !flag.is_public_server) + .unwrap_or(true) +} + +pub(crate) fn should_background_p2p_with_peer( + feature_flag: Option<&PeerFeatureFlag>, + allow_public_server: bool, + lazy_p2p: bool, +) -> bool { + should_try_p2p_with_peer(feature_flag, allow_public_server) + && (!lazy_p2p || feature_flag.map(|flag| flag.need_p2p).unwrap_or(false)) +} + async fn set_bind_addr_for_peer_connector( connector: &mut (impl TunnelConnector + ?Sized), is_ipv4: bool, @@ -197,3 +216,59 @@ pub async fn create_connector_by_url( Ok(connector) } + +#[cfg(test)] +mod tests { + use crate::proto::common::PeerFeatureFlag; + + use super::{should_background_p2p_with_peer, should_try_p2p_with_peer}; + + #[test] + fn lazy_background_p2p_requires_need_p2p() { + let no_need_p2p = PeerFeatureFlag { + need_p2p: false, + ..Default::default() + }; + let need_p2p = PeerFeatureFlag { + need_p2p: true, + ..Default::default() + }; + + assert!(should_background_p2p_with_peer( + Some(&no_need_p2p), + false, + false + )); + assert!(!should_background_p2p_with_peer( + Some(&no_need_p2p), + false, + true + )); + assert!(should_background_p2p_with_peer( + Some(&need_p2p), + false, + true + )); + } + + #[test] + fn p2p_policy_respects_public_server_setting() { + let public_server = PeerFeatureFlag { + is_public_server: true, + ..Default::default() + }; + + assert!(!should_try_p2p_with_peer(Some(&public_server), false)); + assert!(should_try_p2p_with_peer(Some(&public_server), true)); + assert!(!should_background_p2p_with_peer( + Some(&public_server), + false, + false + )); + assert!(should_background_p2p_with_peer( + Some(&public_server), + true, + false + )); + } +} diff --git a/easytier/src/connector/tcp_hole_punch.rs b/easytier/src/connector/tcp_hole_punch.rs index efe30511..3bf8e06f 100644 --- a/easytier/src/connector/tcp_hole_punch.rs +++ b/easytier/src/connector/tcp_hole_punch.rs @@ -1,7 +1,7 @@ use std::{ net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, sync::Arc, - time::Duration, + time::{Duration, Instant}, }; use anyhow::{Context, Error}; @@ -29,6 +29,8 @@ use crate::{ }, }; +use crate::connector::{should_background_p2p_with_peer, should_try_p2p_with_peer}; + pub const BLACKLIST_TIMEOUT_SEC: u64 = 3600; fn handle_rpc_result( @@ -418,6 +420,7 @@ impl PeerTaskLauncher for TcpHolePunchPeerTaskLauncher { #[tracing::instrument(skip(self, data))] async fn collect_peers_need_task(&self, data: &Self::Data) -> Vec { let global_ctx = data.peer_mgr.get_global_ctx(); + let lazy_p2p = global_ctx.get_flags().lazy_p2p; let my_tcp_nat_type = NatType::try_from( global_ctx .get_stun_info_collector() @@ -434,16 +437,17 @@ impl PeerTaskLauncher for TcpHolePunchPeerTaskLauncher { } let my_peer_id = data.peer_mgr.my_peer_id(); + let now = Instant::now(); data.blacklist.cleanup(); let mut peers_to_connect = Vec::new(); for route in data.peer_mgr.list_routes().await.iter() { - if route - .feature_flag - .map(|x| x.is_public_server) - .unwrap_or(false) - { + let static_allowed = + should_background_p2p_with_peer(route.feature_flag.as_ref(), false, lazy_p2p); + let dynamic_allowed = should_try_p2p_with_peer(route.feature_flag.as_ref(), false) + && data.peer_mgr.has_recent_traffic(route.peer_id, now); + if !static_allowed && !dynamic_allowed { continue; } @@ -520,7 +524,11 @@ impl TcpHolePunchConnector { pub fn new(peer_mgr: Arc) -> Self { Self { server: TcpHolePunchServer::new(peer_mgr.clone()), - client: PeerTaskManager::new(TcpHolePunchPeerTaskLauncher {}, peer_mgr.clone()), + client: PeerTaskManager::new_with_external_signal( + TcpHolePunchPeerTaskLauncher {}, + peer_mgr.clone(), + Some(peer_mgr.p2p_demand_notify()), + ), peer_mgr, } } @@ -570,12 +578,15 @@ mod tests { connector::tcp_hole_punch::TcpHolePunchConnector, peers::{ peer_manager::PeerManager, + peer_task::PeerTaskLauncher, tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear}, }, proto::common::{NatType, StunInfo}, tunnel::common::tests::wait_for_condition, }; + use super::TcpHolePunchPeerTaskLauncher; + struct MockStunInfoCollector { udp_nat_type: NatType, tcp_nat_type: NatType, @@ -619,6 +630,17 @@ mod tests { .replace_stun_info_collector(collector); } + async fn collect_lazy_punch_peers(peer_mgr: Arc) -> Vec { + let launcher = TcpHolePunchPeerTaskLauncher {}; + let data = launcher.new_data(peer_mgr); + launcher + .collect_peers_need_task(&data) + .await + .into_iter() + .map(|task| task.dst_peer_id) + .collect() + } + #[tokio::test] async fn tcp_hole_punch_connects() { let p_a = create_mock_peer_manager().await; @@ -701,4 +723,33 @@ mod tests { .map(|c| c.is_empty()) .unwrap_or(true)); } + + #[tokio::test] + async fn lazy_p2p_collects_tcp_hole_punch_tasks_only_after_recent_traffic() { + let p_a = create_mock_peer_manager().await; + let p_b = create_mock_peer_manager().await; + let p_c = create_mock_peer_manager().await; + + replace_stun_info_collector(p_a.clone(), NatType::PortRestricted); + replace_stun_info_collector(p_b.clone(), NatType::PortRestricted); + replace_stun_info_collector(p_c.clone(), NatType::PortRestricted); + + let mut flags = p_a.get_global_ctx().get_flags(); + flags.lazy_p2p = true; + p_a.get_global_ctx().set_flags(flags); + + connect_peer_manager(p_a.clone(), p_b.clone()).await; + connect_peer_manager(p_b.clone(), p_c.clone()).await; + wait_route_appear(p_a.clone(), p_c.clone()).await.unwrap(); + + assert!(!collect_lazy_punch_peers(p_a.clone()) + .await + .contains(&p_c.my_peer_id())); + + p_a.mark_recent_traffic(p_c.my_peer_id()); + + assert!(collect_lazy_punch_peers(p_a.clone()) + .await + .contains(&p_c.my_peer_id())); + } } diff --git a/easytier/src/connector/udp_hole_punch/mod.rs b/easytier/src/connector/udp_hole_punch/mod.rs index 8c61a72d..f097bdab 100644 --- a/easytier/src/connector/udp_hole_punch/mod.rs +++ b/easytier/src/connector/udp_hole_punch/mod.rs @@ -1,6 +1,6 @@ use std::{ sync::{atomic::AtomicBool, Arc}, - time::Duration, + time::{Duration, Instant}, }; use anyhow::{Context, Error}; @@ -32,6 +32,8 @@ use crate::{ tunnel::Tunnel, }; +use crate::connector::{should_background_p2p_with_peer, should_try_p2p_with_peer}; + pub(crate) mod both_easy_sym; pub(crate) mod common; pub(crate) mod cone; @@ -426,6 +428,8 @@ impl PeerTaskLauncher for UdpHolePunchPeerTaskLauncher { } let my_peer_id = data.peer_mgr.my_peer_id(); + let lazy_p2p = data.peer_mgr.get_global_ctx().get_flags().lazy_p2p; + let now = Instant::now(); data.blacklist.cleanup(); @@ -434,11 +438,11 @@ impl PeerTaskLauncher for UdpHolePunchPeerTaskLauncher { // 2. peers is full cone (any restricted type); // 3. peers not in blacklist; for route in data.peer_mgr.list_routes().await.iter() { - if route - .feature_flag - .map(|x| x.is_public_server) - .unwrap_or(false) - { + let static_allowed = + should_background_p2p_with_peer(route.feature_flag.as_ref(), false, lazy_p2p); + let dynamic_allowed = should_try_p2p_with_peer(route.feature_flag.as_ref(), false) + && data.peer_mgr.has_recent_traffic(route.peer_id, now); + if !static_allowed && !dynamic_allowed { continue; } @@ -531,7 +535,11 @@ impl UdpHolePunchConnector { pub fn new(peer_mgr: Arc) -> Self { Self { server: UdpHolePunchServer::new(peer_mgr.clone()), - client: PeerTaskManager::new(UdpHolePunchPeerTaskLauncher {}, peer_mgr.clone()), + client: PeerTaskManager::new_with_external_signal( + UdpHolePunchPeerTaskLauncher {}, + peer_mgr.clone(), + Some(peer_mgr.p2p_demand_notify()), + ), peer_mgr, } } @@ -580,12 +588,13 @@ pub mod tests { use crate::common::stun::MockStunInfoCollector; use crate::peers::{ peer_manager::PeerManager, + peer_task::PeerTaskLauncher, tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear}, }; use crate::proto::common::NatType; use crate::tunnel::common::tests::wait_for_condition; - use super::{UdpHolePunchConnector, RUN_TESTING}; + use super::{UdpHolePunchConnector, UdpHolePunchPeerTaskLauncher, RUN_TESTING}; pub fn replace_stun_info_collector(peer_mgr: Arc, udp_nat_type: NatType) { let collector = Box::new(MockStunInfoCollector { udp_nat_type }); @@ -602,6 +611,17 @@ pub mod tests { p_a } + async fn collect_lazy_punch_peers(peer_mgr: Arc) -> Vec { + let launcher = UdpHolePunchPeerTaskLauncher {}; + let data = launcher.new_data(peer_mgr); + launcher + .collect_peers_need_task(&data) + .await + .into_iter() + .map(|task| task.dst_peer_id) + .collect() + } + #[rstest::rstest] #[tokio::test] pub async fn test_hole_punching_blacklist( @@ -634,4 +654,29 @@ pub mod tests { ) .await; } + + #[tokio::test] + async fn lazy_p2p_collects_udp_hole_punch_tasks_only_after_recent_traffic() { + let p_a = create_mock_peer_manager_with_mock_stun(NatType::PortRestricted).await; + let p_b = create_mock_peer_manager_with_mock_stun(NatType::PortRestricted).await; + let p_c = create_mock_peer_manager_with_mock_stun(NatType::PortRestricted).await; + + let mut flags = p_a.get_global_ctx().get_flags(); + flags.lazy_p2p = true; + p_a.get_global_ctx().set_flags(flags); + + connect_peer_manager(p_a.clone(), p_b.clone()).await; + connect_peer_manager(p_b.clone(), p_c.clone()).await; + wait_route_appear(p_a.clone(), p_c.clone()).await.unwrap(); + + assert!(!collect_lazy_punch_peers(p_a.clone()) + .await + .contains(&p_c.my_peer_id())); + + p_a.mark_recent_traffic(p_c.my_peer_id()); + + assert!(collect_lazy_punch_peers(p_a.clone()) + .await + .contains(&p_c.my_peer_id())); + } } diff --git a/easytier/src/core.rs b/easytier/src/core.rs index 234de969..f1b88a08 100644 --- a/easytier/src/core.rs +++ b/easytier/src/core.rs @@ -404,6 +404,15 @@ struct NetworkOptions { )] p2p_only: Option, + #[arg( + long, + env = "ET_LAZY_P2P", + help = t!("core_clap.lazy_p2p").to_string(), + num_args = 0..=1, + default_missing_value = "true" + )] + lazy_p2p: Option, + #[arg( long, env = "ET_DISABLE_P2P", @@ -449,6 +458,15 @@ struct NetworkOptions { )] relay_all_peer_rpc: Option, + #[arg( + long, + env = "ET_NEED_P2P", + help = t!("core_clap.need_p2p").to_string(), + num_args = 0..=1, + default_missing_value = "true" + )] + need_p2p: Option, + #[cfg(feature = "socks5")] #[arg( long, @@ -1012,6 +1030,7 @@ impl NetworkOptions { } f.disable_p2p = self.disable_p2p.unwrap_or(f.disable_p2p); f.p2p_only = self.p2p_only.unwrap_or(f.p2p_only); + f.lazy_p2p = self.lazy_p2p.unwrap_or(f.lazy_p2p); f.disable_tcp_hole_punching = self .disable_tcp_hole_punching .unwrap_or(f.disable_tcp_hole_punching); @@ -1019,6 +1038,7 @@ impl NetworkOptions { .disable_udp_hole_punching .unwrap_or(f.disable_udp_hole_punching); f.relay_all_peer_rpc = self.relay_all_peer_rpc.unwrap_or(f.relay_all_peer_rpc); + f.need_p2p = self.need_p2p.unwrap_or(f.need_p2p); f.multi_thread = self.multi_thread.unwrap_or(f.multi_thread); if let Some(compression) = &self.compression { f.data_compress_algo = match compression.as_str() { diff --git a/easytier/src/instance/dns_server/tests.rs b/easytier/src/instance/dns_server/tests.rs index cad28c62..2bf237c2 100644 --- a/easytier/src/instance/dns_server/tests.rs +++ b/easytier/src/instance/dns_server/tests.rs @@ -45,7 +45,7 @@ pub async fn prepare_env_with_tld_dns_zone( if let Some(zone) = tld_dns_zone { flags.tld_dns_zone = zone.to_string(); } - ctx.config.set_flags(flags); + ctx.set_flags(flags); } let (s, r) = create_packet_recv_chan(); diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index da8c0997..c17427c4 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -593,9 +593,12 @@ impl Instance { let mut direct_conn_manager = DirectConnectorManager::new(global_ctx.clone(), peer_manager.clone()); direct_conn_manager.run(); + let direct_conn_manager = Arc::new(direct_conn_manager); - let udp_hole_puncher = UdpHolePunchConnector::new(peer_manager.clone()); - let tcp_hole_puncher = TcpHolePunchConnector::new(peer_manager.clone()); + let udp_hole_puncher = + Arc::new(Mutex::new(UdpHolePunchConnector::new(peer_manager.clone()))); + let tcp_hole_puncher = + Arc::new(Mutex::new(TcpHolePunchConnector::new(peer_manager.clone()))); let peer_center = Arc::new(PeerCenterInstance::new(peer_manager.clone())); @@ -618,9 +621,9 @@ impl Instance { peer_manager, listener_manager, conn_manager, - direct_conn_manager: Arc::new(direct_conn_manager), - udp_hole_puncher: Arc::new(Mutex::new(udp_hole_puncher)), - tcp_hole_puncher: Arc::new(Mutex::new(tcp_hole_puncher)), + direct_conn_manager, + udp_hole_puncher, + tcp_hole_puncher, ip_proxy: None, #[cfg(feature = "kcp")] diff --git a/easytier/src/launcher.rs b/easytier/src/launcher.rs index 1b97e243..85000810 100644 --- a/easytier/src/launcher.rs +++ b/easytier/src/launcher.rs @@ -762,6 +762,10 @@ impl NetworkConfig { flags.p2p_only = p2p_only; } + if let Some(lazy_p2p) = self.lazy_p2p { + flags.lazy_p2p = lazy_p2p; + } + if let Some(bind_device) = self.bind_device { flags.bind_device = bind_device; } @@ -778,6 +782,10 @@ impl NetworkConfig { flags.relay_all_peer_rpc = relay_all_peer_rpc; } + if let Some(need_p2p) = self.need_p2p { + flags.need_p2p = need_p2p; + } + if let Some(multi_thread) = self.multi_thread { flags.multi_thread = multi_thread; } @@ -956,10 +964,12 @@ impl NetworkConfig { result.disable_quic_input = Some(flags.disable_quic_input); result.disable_p2p = Some(flags.disable_p2p); result.p2p_only = Some(flags.p2p_only); + result.lazy_p2p = Some(flags.lazy_p2p); result.bind_device = Some(flags.bind_device); result.no_tun = Some(flags.no_tun); result.enable_exit_node = Some(flags.enable_exit_node); result.relay_all_peer_rpc = Some(flags.relay_all_peer_rpc); + result.need_p2p = Some(flags.need_p2p); result.multi_thread = Some(flags.multi_thread); result.proxy_forward_by_system = Some(flags.proxy_forward_by_system); result.disable_encryption = Some(!flags.enable_encryption); @@ -1214,10 +1224,12 @@ mod tests { flags.disable_quic_input = rng.gen_bool(0.3); flags.disable_p2p = rng.gen_bool(0.2); flags.p2p_only = rng.gen_bool(0.2); + flags.lazy_p2p = rng.gen_bool(0.3); flags.bind_device = rng.gen_bool(0.3); flags.no_tun = rng.gen_bool(0.1); flags.enable_exit_node = rng.gen_bool(0.4); flags.relay_all_peer_rpc = rng.gen_bool(0.5); + flags.need_p2p = rng.gen_bool(0.3); flags.multi_thread = rng.gen_bool(0.7); flags.proxy_forward_by_system = rng.gen_bool(0.3); flags.enable_encryption = rng.gen_bool(0.8); diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index e05724be..8c2e25b5 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -1182,7 +1182,7 @@ pub mod tests { tracing::debug!("pm_center: {:?}", pm_center.my_peer_id()); let mut flag = pm_center.get_global_ctx().get_flags(); flag.relay_network_whitelist = ["net1".to_string(), "net2*".to_string()].join(" "); - pm_center.get_global_ctx().config.set_flags(flag); + pm_center.get_global_ctx().set_flags(flag); let pma_net1 = create_mock_peer_manager_for_foreign_network(name.as_str()).await; @@ -1209,7 +1209,7 @@ pub mod tests { let mut flag = pm_center.get_global_ctx().get_flags(); flag.relay_network_whitelist = "".to_string(); flag.relay_all_peer_rpc = true; - pm_center.get_global_ctx().config.set_flags(flag); + pm_center.get_global_ctx().set_flags(flag); tracing::debug!("pm_center: {:?}", pm_center.my_peer_id()); let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await; diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 28fe540a..426a4e68 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -2,7 +2,7 @@ use std::{ fmt::Debug, net::{IpAddr, Ipv4Addr, Ipv6Addr}, sync::{atomic::AtomicBool, Arc, Weak}, - time::{Instant, SystemTime}, + time::{Duration, Instant, SystemTime}, }; use anyhow::Context; @@ -63,6 +63,7 @@ use super::{ peer_map::PeerMap, peer_ospf_route::PeerRoute, peer_rpc::PeerRpcManager, + peer_task::ExternalTaskSignal, relay_peer_map::RelayPeerMap, route_trait::{ArcRoute, Route}, BoxNicPacketFilter, BoxPeerPacketFilter, PacketRecvChan, PacketRecvChanReceiver, @@ -161,6 +162,8 @@ pub struct PeerManager { exit_nodes: RwLock>, reserved_my_peer_id_map: DashMap, + recent_have_traffic: Arc>, + p2p_demand_notify: Arc, allow_loopback_tunnel: AtomicBool, @@ -181,6 +184,39 @@ impl Debug for PeerManager { } impl PeerManager { + // Keep lazy-p2p demand alive across the 5s task rescan interval and a full on-demand + // connect attempt, without retaining extra per-task state in the hot path. + const RECENT_HAVE_TRAFFIC_TTL: Duration = Duration::from_secs(30); + + fn should_mark_recent_traffic_for_fanout(total_dst_peers: usize) -> bool { + total_dst_peers <= 1 + } + + fn gc_recent_traffic_entries( + recent_have_traffic: &DashMap, + now: Instant, + mut has_directly_connected_conn: F, + ) where + F: FnMut(PeerId) -> bool, + { + let mut to_remove = Vec::new(); + for entry in recent_have_traffic.iter() { + let peer_id = *entry.key(); + let expired = + now.saturating_duration_since(*entry.value()) > Self::RECENT_HAVE_TRAFFIC_TTL; + if expired || has_directly_connected_conn(peer_id) { + to_remove.push(peer_id); + } + } + + if !to_remove.is_empty() { + for peer_id in to_remove { + recent_have_traffic.remove(&peer_id); + } + shrink_dashmap(recent_have_traffic, None); + } + } + pub fn new( route_algo: RouteAlgoType, global_ctx: ArcGlobalCtx, @@ -334,6 +370,8 @@ impl PeerManager { exit_nodes: RwLock::new(exit_nodes), reserved_my_peer_id_map: DashMap::new(), + recent_have_traffic: Arc::new(DashMap::new()), + p2p_demand_notify: Arc::new(ExternalTaskSignal::new()), allow_loopback_tunnel: AtomicBool::new(true), @@ -349,6 +387,57 @@ impl PeerManager { .store(allow_loopback_tunnel, std::sync::atomic::Ordering::Relaxed); } + pub fn mark_recent_traffic(&self, dst_peer_id: PeerId) { + if dst_peer_id == self.my_peer_id { + return; + } + + let flags = self.global_ctx.flags_arc(); + if flags.disable_p2p || !flags.lazy_p2p || self.has_directly_connected_conn(dst_peer_id) { + return; + } + + let now = Instant::now(); + if let Some(mut last_seen) = self.recent_have_traffic.get_mut(&dst_peer_id) { + let should_notify = + now.saturating_duration_since(*last_seen) > Self::RECENT_HAVE_TRAFFIC_TTL; + *last_seen = now; + if !should_notify { + return; + } + } else { + self.recent_have_traffic.insert(dst_peer_id, now); + } + self.p2p_demand_notify.notify(); + } + + pub fn has_recent_traffic(&self, peer_id: PeerId, now: Instant) -> bool { + if self.has_directly_connected_conn(peer_id) { + return false; + } + + self.recent_have_traffic + .get(&peer_id) + .map(|last_seen| { + now.saturating_duration_since(*last_seen) <= Self::RECENT_HAVE_TRAFFIC_TTL + }) + .unwrap_or(false) + } + + pub fn clear_recent_traffic(&self, peer_id: PeerId) { + self.recent_have_traffic.remove(&peer_id); + } + + pub fn p2p_demand_notify(&self) -> Arc { + self.p2p_demand_notify.clone() + } + + fn gc_recent_traffic(&self) { + Self::gc_recent_traffic_entries(&self.recent_have_traffic, Instant::now(), |peer_id| { + self.has_directly_connected_conn(peer_id) + }); + } + fn build_foreign_network_manager_accessor( peer_map: &Arc, ) -> Box { @@ -405,7 +494,9 @@ impl PeerManager { "network identity not match".to_string(), )); } + let peer_id = peer_conn.get_peer_id(); self.peers.add_new_peer_conn(peer_conn).await?; + self.clear_recent_traffic(peer_id); Ok(()) } @@ -1152,6 +1243,7 @@ impl PeerManager { mut msg: ZCPacket, dst_peer_id: PeerId, ) -> Result<(), Error> { + self.mark_recent_traffic(dst_peer_id); self.check_p2p_only_before_send(dst_peer_id)?; self.self_tx_counters @@ -1229,19 +1321,32 @@ impl PeerManager { } } - pub async fn get_msg_dst_peer_ipv4(&self, ipv4_addr: &Ipv4Addr) -> (Vec, bool) { - let mut is_exit_node = false; - let mut dst_peers = vec![]; + fn is_all_peers_broadcast_ipv4(&self, ipv4_addr: &Ipv4Addr) -> bool { let network_length = self .global_ctx .get_ipv4() .map(|x| x.network_length()) .unwrap_or(24); let ipv4_inet = cidr::Ipv4Inet::new(*ipv4_addr, network_length).unwrap(); - if ipv4_addr.is_broadcast() + ipv4_addr.is_broadcast() || ipv4_addr.is_multicast() || *ipv4_addr == ipv4_inet.last_address() - { + } + + fn is_all_peers_broadcast_ipv6(&self, ipv6_addr: &Ipv6Addr) -> bool { + let network_length = self + .global_ctx + .get_ipv6() + .map(|x| x.network_length()) + .unwrap_or(64); + let ipv6_inet = cidr::Ipv6Inet::new(*ipv6_addr, network_length).unwrap(); + ipv6_addr.is_multicast() || *ipv6_addr == ipv6_inet.last_address() + } + + pub async fn get_msg_dst_peer_ipv4(&self, ipv4_addr: &Ipv4Addr) -> (Vec, bool) { + let mut is_exit_node = false; + let mut dst_peers = vec![]; + if self.is_all_peers_broadcast_ipv4(ipv4_addr) { dst_peers.extend(self.peers.list_routes().await.iter().filter_map(|x| { if *x.key() != self.my_peer_id { Some(*x.key()) @@ -1284,13 +1389,7 @@ impl PeerManager { pub async fn get_msg_dst_peer_ipv6(&self, ipv6_addr: &Ipv6Addr) -> (Vec, bool) { let mut is_exit_node = false; let mut dst_peers = vec![]; - let network_length = self - .global_ctx - .get_ipv6() - .map(|x| x.network_length()) - .unwrap_or(64); - let ipv6_inet = cidr::Ipv6Inet::new(*ipv6_addr, network_length).unwrap(); - if ipv6_addr.is_multicast() || *ipv6_addr == ipv6_inet.last_address() { + if self.is_all_peers_broadcast_ipv6(ipv6_addr) { dst_peers.extend(self.peers.list_routes().await.iter().map(|x| *x.key())); } else if let Some(peer_id) = self.peers.get_peer_id_by_ipv6(ipv6_addr).await { dst_peers.push(peer_id); @@ -1350,6 +1449,7 @@ impl PeerManager { } let cur_to_peer_id = msg.peer_manager_header().unwrap().to_peer_id.into(); if cur_to_peer_id != 0 { + self.mark_recent_traffic(cur_to_peer_id); return Self::send_msg_internal( &self.peers, &self.foreign_network_client, @@ -1395,7 +1495,12 @@ impl PeerManager { let mut errs: Vec = vec![]; let mut msg = Some(msg); let total_dst_peers = dst_peers.len(); + let should_mark_recent_traffic = + Self::should_mark_recent_traffic_for_fanout(total_dst_peers); for (i, peer_id) in dst_peers.iter().enumerate() { + if should_mark_recent_traffic { + self.mark_recent_traffic(*peer_id); + } if let Err(e) = self.check_p2p_only_before_send(*peer_id) { errs.push(e); continue; @@ -1467,6 +1572,28 @@ impl PeerManager { }); } + async fn run_recent_traffic_gc_routine(&self) { + let recent_have_traffic = self.recent_have_traffic.clone(); + let peers = self.peers.clone(); + let foreign_network_client = self.foreign_network_client.clone(); + self.tasks.lock().await.spawn(async move { + loop { + PeerManager::gc_recent_traffic_entries( + recent_have_traffic.as_ref(), + Instant::now(), + |peer_id| { + if let Some(peer) = peers.get_peer_by_id(peer_id) { + peer.has_directly_connected_conn() + } else { + foreign_network_client.get_peer_map().has_peer(peer_id) + } + }, + ); + tokio::time::sleep(std::time::Duration::from_secs(30)).await; + } + }); + } + async fn run_peer_session_gc_routine(&self) { let peer_session_store = self.peer_session_store.clone(); self.tasks.lock().await.spawn(async move { @@ -1499,6 +1626,7 @@ impl PeerManager { self.start_peer_recv().await; self.run_clean_peer_without_conn_routine().await; self.run_relay_session_gc_routine().await; + self.run_recent_traffic_gc_routine().await; self.run_peer_session_gc_routine().await; self.run_foriegn_network().await; @@ -1729,7 +1857,11 @@ impl PeerManager { #[cfg(test)] mod tests { - use std::{fmt::Debug, sync::Arc, time::Duration}; + use std::{ + fmt::Debug, + sync::Arc, + time::{Duration, Instant}, + }; use crate::{ common::{ @@ -1766,6 +1898,135 @@ mod tests { use super::PeerManager; + async fn create_lazy_peer_manager() -> Arc { + let peer_mgr = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let mut flags = peer_mgr.get_global_ctx().get_flags(); + flags.lazy_p2p = true; + peer_mgr.get_global_ctx().set_flags(flags); + peer_mgr + } + + #[test] + fn recent_traffic_fanout_policy_only_marks_single_peer() { + assert!(PeerManager::should_mark_recent_traffic_for_fanout(0)); + assert!(PeerManager::should_mark_recent_traffic_for_fanout(1)); + assert!(!PeerManager::should_mark_recent_traffic_for_fanout(2)); + } + + #[test] + fn gc_recent_traffic_removes_expired_and_connected_entries() { + let stale_peer = 1; + let direct_peer = 2; + let active_peer = 3; + let recent_have_traffic = dashmap::DashMap::new(); + + recent_have_traffic.insert( + stale_peer, + Instant::now() - PeerManager::RECENT_HAVE_TRAFFIC_TTL - Duration::from_millis(1), + ); + recent_have_traffic.insert(direct_peer, Instant::now()); + recent_have_traffic.insert(active_peer, Instant::now()); + + let future_peer = 4; + + recent_have_traffic.insert(future_peer, Instant::now() + Duration::from_secs(1)); + + PeerManager::gc_recent_traffic_entries(&recent_have_traffic, Instant::now(), |peer_id| { + peer_id == direct_peer + }); + + assert!(!recent_have_traffic.contains_key(&stale_peer)); + assert!(!recent_have_traffic.contains_key(&direct_peer)); + assert!(recent_have_traffic.contains_key(&active_peer)); + assert!(recent_have_traffic.contains_key(&future_peer)); + } + + #[tokio::test] + async fn recent_traffic_skips_direct_peers_and_clears_after_direct_connect() { + let peer_mgr_a = create_lazy_peer_manager().await; + let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let peer_b_id = peer_mgr_b.my_peer_id(); + + peer_mgr_a.mark_recent_traffic(peer_b_id); + assert!(peer_mgr_a.has_recent_traffic(peer_b_id, Instant::now())); + + let (a_ring, b_ring) = create_ring_tunnel_pair(); + let (client_ret, server_ret) = tokio::join!( + peer_mgr_a.add_client_tunnel(a_ring, true), + peer_mgr_b.add_tunnel_as_server(b_ring, true) + ); + client_ret.unwrap(); + server_ret.unwrap(); + + wait_for_condition( + || { + let peer_mgr_a = peer_mgr_a.clone(); + async move { peer_mgr_a.has_directly_connected_conn(peer_b_id) } + }, + Duration::from_secs(5), + ) + .await; + + wait_for_condition( + || { + let peer_mgr_a = peer_mgr_a.clone(); + async move { !peer_mgr_a.has_recent_traffic(peer_b_id, Instant::now()) } + }, + Duration::from_secs(5), + ) + .await; + + peer_mgr_a.mark_recent_traffic(peer_b_id); + assert!( + !peer_mgr_a.has_recent_traffic(peer_b_id, Instant::now()), + "directly connected peers should not be tracked as lazy-p2p demand" + ); + } + + #[tokio::test] + async fn recent_traffic_notifies_only_when_demand_becomes_active() { + let peer_mgr_a = create_lazy_peer_manager().await; + let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let peer_b_id = peer_mgr_b.my_peer_id(); + let signal = peer_mgr_a.p2p_demand_notify(); + + let initial_version = signal.version(); + peer_mgr_a.mark_recent_traffic(peer_b_id); + assert_eq!(signal.version(), initial_version + 1); + + let first_seen = *peer_mgr_a.recent_have_traffic.get(&peer_b_id).unwrap(); + tokio::time::sleep(Duration::from_millis(5)).await; + peer_mgr_a.mark_recent_traffic(peer_b_id); + assert_eq!( + signal.version(), + initial_version + 1, + "fresh demand should not wake all p2p workers again" + ); + let refreshed_seen = *peer_mgr_a.recent_have_traffic.get(&peer_b_id).unwrap(); + assert!(refreshed_seen > first_seen); + + if let Some(mut last_seen) = peer_mgr_a.recent_have_traffic.get_mut(&peer_b_id) { + *last_seen = + Instant::now() - PeerManager::RECENT_HAVE_TRAFFIC_TTL - Duration::from_millis(1); + } + peer_mgr_a.mark_recent_traffic(peer_b_id); + assert_eq!(signal.version(), initial_version + 2); + } + + #[tokio::test] + async fn recent_traffic_tolerates_future_timestamps() { + let peer_mgr_a = create_lazy_peer_manager().await; + let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let peer_b_id = peer_mgr_b.my_peer_id(); + + peer_mgr_a + .recent_have_traffic + .insert(peer_b_id, Instant::now() + Duration::from_secs(1)); + + assert!(peer_mgr_a.has_recent_traffic(peer_b_id, Instant::now())); + peer_mgr_a.mark_recent_traffic(peer_b_id); + } + #[tokio::test] async fn drop_peer_manager() { let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; @@ -2175,7 +2436,7 @@ mod tests { let create_mgr = |enable_encryption| async move { let (s, _r) = create_packet_recv_chan(); let mock_global_ctx = get_mock_global_ctx(); - mock_global_ctx.config.set_flags(Flags { + mock_global_ctx.set_flags(Flags { enable_encryption, data_compress_algo: CompressionAlgoPb::Zstd.into(), ..Default::default() diff --git a/easytier/src/peers/peer_task.rs b/easytier/src/peers/peer_task.rs index d5250ae3..4af026a2 100644 --- a/easytier/src/peers/peer_task.rs +++ b/easytier/src/peers/peer_task.rs @@ -1,5 +1,9 @@ -use std::result::Result; -use std::sync::{Arc, Mutex}; +use std::{ + result::Result, + sync::{atomic::Ordering, Arc, Mutex}, +}; + +use atomic_shim::AtomicU64; use async_trait::async_trait; use dashmap::DashMap; @@ -12,6 +16,39 @@ use anyhow::Error; use super::peer_manager::PeerManager; +pub struct ExternalTaskSignal { + version: AtomicU64, + notify: Notify, +} + +impl Default for ExternalTaskSignal { + fn default() -> Self { + Self::new() + } +} + +impl ExternalTaskSignal { + pub fn new() -> Self { + Self { + version: AtomicU64::new(0), + notify: Notify::new(), + } + } + + pub fn notify(&self) { + self.version.fetch_add(1, Ordering::Relaxed); + self.notify.notify_waiters(); + } + + pub fn version(&self) -> u64 { + self.version.load(Ordering::Relaxed) + } + + pub fn notified(&self) -> impl std::future::Future + '_ { + self.notify.notified() + } +} + #[async_trait] pub trait PeerTaskLauncher: Send + Sync + Clone + 'static { type Data; @@ -35,9 +72,9 @@ pub trait PeerTaskLauncher: Send + Sync + Clone + 'static { pub struct PeerTaskManager { launcher: Launcher, - peer_mgr: Arc, main_loop_task: Mutex>>, run_signal: Arc, + external_signal: Option>, data: Launcher::Data, } @@ -49,12 +86,20 @@ where L: PeerTaskLauncher + 'static, { pub fn new(launcher: L, peer_mgr: Arc) -> Self { + Self::new_with_external_signal(launcher, peer_mgr, None) + } + + pub fn new_with_external_signal( + launcher: L, + peer_mgr: Arc, + external_signal: Option>, + ) -> Self { let data = launcher.new_data(peer_mgr.clone()); Self { launcher, - peer_mgr, main_loop_task: Mutex::new(None), run_signal: Arc::new(Notify::new()), + external_signal, data, } } @@ -64,13 +109,20 @@ where self.launcher.clone(), self.data.clone(), self.run_signal.clone(), + self.external_signal.clone(), )) .into(); self.main_loop_task.lock().unwrap().replace(task); } - async fn main_loop(launcher: L, data: D, signal: Arc) { + async fn main_loop( + launcher: L, + data: D, + signal: Arc, + external_signal: Option>, + ) { let peer_task_map = Arc::new(DashMap::>>::new()); + let mut external_signal_version = external_signal.as_ref().map(|signal| signal.version()); loop { let peers_to_connect = launcher.collect_peers_need_task(&data).await; @@ -113,11 +165,31 @@ where launcher.all_task_done(&data).await; } - select! { - _ = tokio::time::sleep(std::time::Duration::from_millis( - launcher.loop_interval_ms(), - )) => {}, - _ = signal.notified() => {} + if let Some(external_signal) = external_signal.as_ref() { + let notified = external_signal.notified(); + tokio::pin!(notified); + let cur_version = external_signal.version(); + if external_signal_version != Some(cur_version) { + external_signal_version = Some(cur_version); + continue; + } + + select! { + _ = tokio::time::sleep(std::time::Duration::from_millis( + launcher.loop_interval_ms(), + )) => {}, + _ = signal.notified() => {}, + _ = &mut notified => { + external_signal_version = Some(external_signal.version()); + } + } + } else { + select! { + _ = tokio::time::sleep(std::time::Duration::from_millis( + launcher.loop_interval_ms(), + )) => {}, + _ = signal.notified() => {} + } } } } diff --git a/easytier/src/proto/api_manage.proto b/easytier/src/proto/api_manage.proto index 55461d99..4d6ae2e1 100644 --- a/easytier/src/proto/api_manage.proto +++ b/easytier/src/proto/api_manage.proto @@ -85,6 +85,8 @@ message NetworkConfig { common.SecureModeConfig secure_mode = 55; reserved 56; optional string credential_file = 57; + optional bool lazy_p2p = 58; + optional bool need_p2p = 59; } message PortForwardConfig { diff --git a/easytier/src/proto/common.proto b/easytier/src/proto/common.proto index 1ef107b2..1497bfb2 100644 --- a/easytier/src/proto/common.proto +++ b/easytier/src/proto/common.proto @@ -70,6 +70,9 @@ message FlagsInConfig { bool p2p_only = 32; bool disable_tcp_hole_punching = 34; + + bool lazy_p2p = 37; + bool need_p2p = 38; } message RpcDescriptor { @@ -217,6 +220,7 @@ message PeerFeatureFlag { bool quic_input = 6; bool no_relay_quic = 7; bool is_credential_peer = 8; + bool need_p2p = 9; } enum SocketType { diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index e1e0c897..c23b0a77 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -48,17 +48,20 @@ pub fn prepare_linux_namespaces() { del_netns("net_b"); del_netns("net_c"); del_netns("net_d"); + del_netns("net_e"); create_netns("net_a", "10.1.1.1/24", "fd11::1/64"); create_netns("net_b", "10.1.1.2/24", "fd11::2/64"); create_netns("net_c", "10.1.2.3/24", "fd12::3/64"); create_netns("net_d", "10.1.2.4/24", "fd12::4/64"); + create_netns("net_e", "10.1.1.3/24", "fd11::3/64"); prepare_bridge("br_a"); prepare_bridge("br_b"); add_ns_to_bridge("br_a", "net_a"); add_ns_to_bridge("br_a", "net_b"); + add_ns_to_bridge("br_a", "net_e"); add_ns_to_bridge("br_b", "net_c"); add_ns_to_bridge("br_b", "net_d"); } @@ -89,10 +92,13 @@ pub async fn init_three_node(proto: &str) -> Vec { init_three_node_ex(proto, |cfg| cfg, false).await } -pub async fn init_three_node_ex TomlConfigLoader>( +async fn init_three_node_ex_with_inst3 TomlConfigLoader>( proto: &str, cfg_cb: F, use_public_server: bool, + inst3_ns: &str, + inst3_ipv4: &str, + inst3_ipv6: &str, ) -> Vec { prepare_linux_namespaces(); @@ -110,9 +116,9 @@ pub async fn init_three_node_ex TomlConfigLoader>( ))); let mut inst3 = Instance::new(cfg_cb(get_inst_config( "inst3", - Some("net_c"), - "10.144.144.3", - "fd00::3/64", + Some(inst3_ns), + inst3_ipv4, + inst3_ipv6, ))); inst1.run().await.unwrap(); @@ -211,6 +217,29 @@ pub async fn init_three_node_ex TomlConfigLoader>( vec![inst1, inst2, inst3] } +pub async fn init_three_node_ex TomlConfigLoader>( + proto: &str, + cfg_cb: F, + use_public_server: bool, +) -> Vec { + init_three_node_ex_with_inst3( + proto, + cfg_cb, + use_public_server, + "net_c", + "10.144.144.3", + "fd00::3/64", + ) + .await +} + +async fn init_lazy_p2p_three_node_ex TomlConfigLoader>( + proto: &str, + cfg_cb: F, +) -> Vec { + init_three_node_ex_with_inst3(proto, cfg_cb, false, "net_e", "10.144.144.3", "fd00::3/64").await +} + pub async fn drop_insts(insts: Vec) { let mut set = JoinSet::new(); for mut inst in insts { @@ -2079,6 +2108,24 @@ where } } +async fn wait_route_cost(inst: &Instance, peer_id: u32, cost: i32, timeout: Duration) { + let peer_manager = inst.get_peer_manager(); + wait_for_condition( + move || { + let peer_manager = peer_manager.clone(); + async move { + peer_manager + .list_routes() + .await + .iter() + .any(|route| route.peer_id == peer_id && route.cost == cost) + } + }, + timeout, + ) + .await; +} + #[rstest::rstest] #[tokio::test] #[serial_test::serial] @@ -2400,6 +2447,124 @@ pub async fn acl_group_base_test( drop_insts(insts).await; } +#[tokio::test] +#[serial_test::serial] +pub async fn lazy_p2p_builds_direct_connection_on_demand() { + let insts = init_lazy_p2p_three_node_ex("udp", |cfg| { + if cfg.get_inst_name() == "inst1" { + let mut flags = cfg.get_flags(); + flags.lazy_p2p = true; + cfg.set_flags(flags); + } + cfg + }) + .await; + + let inst3_peer_id = insts[2].peer_id(); + assert!( + !insts[0] + .get_peer_manager() + .get_peer_map() + .has_peer(inst3_peer_id), + "inst1 should not proactively connect to inst3 when lazy_p2p is enabled" + ); + wait_route_cost(&insts[0], inst3_peer_id, 2, Duration::from_secs(5)).await; + + assert!( + ping_test("net_a", "10.144.144.3", None).await, + "initial relay traffic should still succeed" + ); + + wait_for_condition( + || async { + insts[0] + .get_peer_manager() + .get_peer_map() + .has_peer(inst3_peer_id) + }, + Duration::from_secs(10), + ) + .await; + wait_route_cost(&insts[0], inst3_peer_id, 1, Duration::from_secs(10)).await; + + drop_insts(insts).await; +} + +#[tokio::test] +#[serial_test::serial] +pub async fn need_p2p_overrides_lazy_p2p() { + let insts = init_lazy_p2p_three_node_ex("udp", |cfg| { + let mut flags = cfg.get_flags(); + if cfg.get_inst_name() == "inst1" { + flags.lazy_p2p = true; + } + if cfg.get_inst_name() == "inst3" { + flags.need_p2p = true; + } + cfg.set_flags(flags); + cfg + }) + .await; + + let inst3_peer_id = insts[2].peer_id(); + wait_route_cost(&insts[0], inst3_peer_id, 2, Duration::from_secs(5)).await; + wait_for_condition( + || async { + insts[0] + .get_peer_manager() + .get_peer_map() + .has_peer(inst3_peer_id) + }, + Duration::from_secs(10), + ) + .await; + wait_route_cost(&insts[0], inst3_peer_id, 1, Duration::from_secs(10)).await; + + drop_insts(insts).await; +} + +#[tokio::test] +#[serial_test::serial] +pub async fn lazy_p2p_warms_up_before_p2p_only_send() { + let insts = init_lazy_p2p_three_node_ex("udp", |cfg| { + if cfg.get_inst_name() == "inst1" { + let mut flags = cfg.get_flags(); + flags.lazy_p2p = true; + flags.p2p_only = true; + cfg.set_flags(flags); + } + cfg + }) + .await; + + let inst3_peer_id = insts[2].peer_id(); + wait_route_cost(&insts[0], inst3_peer_id, 2, Duration::from_secs(5)).await; + assert!( + !ping_test("net_a", "10.144.144.3", None).await, + "the first send should still fail under p2p_only before direct connectivity exists" + ); + + wait_for_condition( + || async { + insts[0] + .get_peer_manager() + .get_peer_map() + .has_peer(inst3_peer_id) + }, + Duration::from_secs(10), + ) + .await; + wait_route_cost(&insts[0], inst3_peer_id, 1, Duration::from_secs(10)).await; + + wait_for_condition( + || async { ping_test("net_a", "10.144.144.3", None).await }, + Duration::from_secs(6), + ) + .await; + + drop_insts(insts).await; +} + #[rstest::rstest] #[tokio::test] #[serial_test::serial]