Add lazy P2P demand tracking and need_p2p override (#2003)

- add lazy_p2p so nodes only start background P2P for peers that actually have recent business traffic
- add need_p2p so specific peers can still request eager background P2P even when other nodes enable lazy mode
- cover the new behavior with focused connector/peer-manager tests plus three-node integration tests that verify relay-to-direct route transition
This commit is contained in:
KKRainbow
2026-03-23 09:38:57 +08:00
committed by GitHub
parent 2bfdd44759
commit 1d89ddbb16
22 changed files with 876 additions and 91 deletions
@@ -93,10 +93,12 @@ const bool_flags: BoolFlag[] = [
{ field: 'disable_quic_input', help: 'disable_quic_input_help' },
{ field: 'disable_p2p', help: 'disable_p2p_help' },
{ field: 'p2p_only', help: 'p2p_only_help' },
{ field: 'lazy_p2p', help: 'lazy_p2p_help' },
{ field: 'bind_device', help: 'bind_device_help' },
{ field: 'no_tun', help: 'no_tun_help' },
{ field: 'enable_exit_node', help: 'enable_exit_node_help' },
{ field: 'relay_all_peer_rpc', help: 'relay_all_peer_rpc_help' },
{ field: 'need_p2p', help: 'need_p2p_help' },
{ field: 'multi_thread', help: 'multi_thread_help' },
{ field: 'proxy_forward_by_system', help: 'proxy_forward_by_system_help' },
{ field: 'disable_encryption', help: 'disable_encryption_help' },
@@ -113,6 +113,9 @@ disable_p2p_help: 禁用 P2P 模式,所有流量通过手动指定的服务器
p2p_only: 仅 P2P
p2p_only_help: 仅与已经建立P2P连接的对等节点通信,不通过其他节点中转。
lazy_p2p: 延迟 P2P
lazy_p2p_help: 仅在实际流量需要某个对等节点时才尝试建立 P2P。开启 need-p2p 的节点仍会被主动连接。
bind_device: 仅使用物理网卡
bind_device_help: 仅使用物理网卡,避免 EasyTier 通过其他虚拟网建立连接。
@@ -127,6 +130,9 @@ relay_all_peer_rpc_help: |
允许转发所有对等节点的RPC数据包,即使对等节点不在转发网络白名单中。
这可以帮助白名单外网络中的对等节点建立P2P连接。
need_p2p: 需要 P2P
need_p2p_help: 即使其他节点启用了 lazy p2p,也要求它们主动与当前节点建立 P2P 连接。
multi_thread: 启用多线程
multi_thread_help: 使用多线程运行时
@@ -112,6 +112,9 @@ disable_p2p_help: Disable P2P mode; route all traffic through a manually specifi
p2p_only: P2P Only
p2p_only_help: Only communicate with peers that have already established P2P connections, do not relay through other nodes.
lazy_p2p: Lazy P2P
lazy_p2p_help: Only try to establish P2P when traffic actually targets a peer. Peers with need-p2p enabled are still connected proactively.
bind_device: Bind to Physical Device Only
bind_device_help: Use only the physical network interface to prevent EasyTier from connecting via virtual networks.
@@ -126,6 +129,9 @@ relay_all_peer_rpc_help: |
Relay all peer rpc packets, even if the peer is not in the relay network whitelist.
This can help peers not in relay network whitelist to establish p2p connection.
need_p2p: Need P2P
need_p2p_help: Ask other peers to proactively establish P2P connections to this node even when they enable lazy P2P.
multi_thread: Multi Thread
multi_thread_help: Use multi-thread runtime
@@ -53,10 +53,12 @@ export interface NetworkConfig {
disable_quic_input?: boolean
disable_p2p?: boolean
p2p_only?: boolean
lazy_p2p?: boolean
bind_device?: boolean
no_tun?: boolean
enable_exit_node?: boolean
relay_all_peer_rpc?: boolean
need_p2p?: boolean
multi_thread?: boolean
proxy_forward_by_system?: boolean
disable_encryption?: boolean
@@ -125,10 +127,12 @@ export function DEFAULT_NETWORK_CONFIG(): NetworkConfig {
disable_quic_input: false,
disable_p2p: false,
p2p_only: false,
lazy_p2p: false,
bind_device: true,
no_tun: false,
enable_exit_node: false,
relay_all_peer_rpc: false,
need_p2p: false,
multi_thread: true,
proxy_forward_by_system: false,
disable_encryption: false,
+6
View File
@@ -157,6 +157,12 @@ core_clap:
p2p_only:
en: "only communicate with peers that already establish p2p connection"
zh-CN: "仅与已经建立P2P连接的对等节点通信"
lazy_p2p:
en: "only try to establish p2p when traffic actually needs the peer; peers marked as need-p2p are still connected proactively"
zh-CN: "仅在实际流量需要某个对等节点时才尝试建立P2P;被标记为 need-p2p 的节点仍会主动建立连接"
need_p2p:
en: "announce that other peers should proactively establish p2p connections to this node even when they enable lazy-p2p"
zh-CN: "声明即使其他节点启用了 lazy-p2p,也应主动与当前节点建立P2P连接"
disable_tcp_hole_punching:
en: "disable tcp hole punching"
zh-CN: "禁用TCP打洞功能"
+2
View File
@@ -40,6 +40,7 @@ pub fn gen_default_flags() -> Flags {
relay_network_whitelist: "*".to_string(),
disable_p2p: false,
p2p_only: false,
lazy_p2p: false,
relay_all_peer_rpc: false,
disable_tcp_hole_punching: false,
disable_udp_hole_punching: false,
@@ -63,6 +64,7 @@ pub fn gen_default_flags() -> Flags {
tld_dns_zone: DEFAULT_ET_DNS_ZONE.to_string(),
quic_listen_port: u32::MAX,
need_p2p: false,
}
}
+61 -27
View File
@@ -202,10 +202,7 @@ pub struct GlobalCtx {
running_listeners: Mutex<Vec<url::Url>>,
enable_exit_node: bool,
proxy_forward_by_system: bool,
no_tun: bool,
p2p_only: bool,
flags: ArcSwap<Flags>,
feature_flags: AtomicCell<PeerFeatureFlag>,
@@ -237,6 +234,17 @@ impl std::fmt::Debug for GlobalCtx {
pub type ArcGlobalCtx = std::sync::Arc<GlobalCtx>;
impl GlobalCtx {
fn derive_feature_flags(flags: &Flags, current: Option<PeerFeatureFlag>) -> PeerFeatureFlag {
let mut feature_flags = current.unwrap_or_default();
feature_flags.kcp_input = !flags.disable_kcp_input;
feature_flags.no_relay_kcp = flags.disable_relay_kcp;
feature_flags.support_conn_list_sync = true;
feature_flags.quic_input = !flags.disable_quic_input;
feature_flags.no_relay_quic = flags.disable_relay_quic;
feature_flags.need_p2p = flags.need_p2p;
feature_flags
}
pub fn new(config_fs: impl ConfigLoader + 'static) -> Self {
let id = config_fs.get_id();
let network = config_fs.get_network_identity();
@@ -261,19 +269,9 @@ impl GlobalCtx {
let stun_info_collector = Arc::new(stun_info_collector);
let enable_exit_node = config_fs.get_flags().enable_exit_node || cfg!(target_env = "ohos");
let proxy_forward_by_system = config_fs.get_flags().proxy_forward_by_system;
let no_tun = config_fs.get_flags().no_tun;
let p2p_only = config_fs.get_flags().p2p_only;
let flags = config_fs.get_flags();
let feature_flags = PeerFeatureFlag {
kcp_input: !config_fs.get_flags().disable_kcp_input,
no_relay_kcp: config_fs.get_flags().disable_relay_kcp,
support_conn_list_sync: true, // Enable selective peer list sync by default
quic_input: !config_fs.get_flags().disable_quic_input,
no_relay_quic: config_fs.get_flags().disable_relay_quic,
..Default::default()
};
let feature_flags = Self::derive_feature_flags(&flags, None);
let credential_storage_path = config_fs.get_credential_file();
let credential_manager = Arc::new(CredentialManager::new(credential_storage_path));
@@ -301,10 +299,7 @@ impl GlobalCtx {
running_listeners: Mutex::new(Vec::new()),
enable_exit_node,
proxy_forward_by_system,
no_tun,
p2p_only,
flags: ArcSwap::new(Arc::new(flags)),
feature_flags: AtomicCell::new(feature_flags),
@@ -455,11 +450,20 @@ impl GlobalCtx {
}
pub fn get_flags(&self) -> Flags {
self.config.get_flags()
self.flags.load().as_ref().clone()
}
pub fn set_flags(&self, flags: Flags) {
self.config.set_flags(flags);
self.config.set_flags(flags.clone());
self.feature_flags.store(Self::derive_feature_flags(
&flags,
Some(self.feature_flags.load()),
));
self.flags.store(Arc::new(flags));
}
pub fn flags_arc(&self) -> Arc<Flags> {
self.flags.load_full()
}
pub fn get_128_key(&self) -> [u8; 16] {
@@ -503,15 +507,15 @@ impl GlobalCtx {
}
pub fn enable_exit_node(&self) -> bool {
self.enable_exit_node
self.flags.load().enable_exit_node || cfg!(target_env = "ohos")
}
pub fn proxy_forward_by_system(&self) -> bool {
self.proxy_forward_by_system
self.flags.load().proxy_forward_by_system
}
pub fn no_tun(&self) -> bool {
self.no_tun
self.flags.load().no_tun
}
pub fn get_feature_flags(&self) -> PeerFeatureFlag {
@@ -611,12 +615,13 @@ impl GlobalCtx {
}
pub fn p2p_only(&self) -> bool {
self.p2p_only
self.flags.load().p2p_only
}
pub fn latency_first(&self) -> bool {
// NOTICE: p2p only is conflict with latency first
self.config.get_flags().latency_first && !self.p2p_only
let flags = self.flags.load();
flags.latency_first && !flags.p2p_only
}
fn is_port_in_running_listeners(&self, port: u16, is_udp: bool) -> bool {
@@ -730,6 +735,35 @@ pub mod tests {
));
}
#[tokio::test]
async fn set_flags_keeps_derived_feature_flags_in_sync() {
let config = TomlConfigLoader::default();
let global_ctx = GlobalCtx::new(config);
let mut feature_flags = global_ctx.get_feature_flags();
feature_flags.avoid_relay_data = true;
feature_flags.is_public_server = true;
global_ctx.set_feature_flags(feature_flags);
let mut flags = global_ctx.get_flags();
flags.disable_kcp_input = true;
flags.disable_relay_kcp = true;
flags.disable_quic_input = true;
flags.disable_relay_quic = true;
flags.need_p2p = true;
global_ctx.set_flags(flags);
let feature_flags = global_ctx.get_feature_flags();
assert!(!feature_flags.kcp_input);
assert!(feature_flags.no_relay_kcp);
assert!(!feature_flags.quic_input);
assert!(feature_flags.no_relay_quic);
assert!(feature_flags.need_p2p);
assert!(feature_flags.support_conn_list_sync);
assert!(feature_flags.avoid_relay_data);
assert!(feature_flags.is_public_server);
}
pub fn get_mock_global_ctx_with_network(
network_identy: Option<NetworkIdentity>,
) -> ArcGlobalCtx {
+26 -11
View File
@@ -8,7 +8,7 @@ use std::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
time::{Duration, Instant},
};
use crate::{
@@ -40,7 +40,10 @@ use rand::Rng;
use tokio::{net::UdpSocket, task::JoinSet, time::timeout};
use url::Host;
use super::{create_connector_by_url, udp_hole_punch};
use super::{
create_connector_by_url, should_background_p2p_with_peer, should_try_p2p_with_peer,
udp_hole_punch,
};
pub const DIRECT_CONNECTOR_SERVICE_ID: u32 = 1;
pub const DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC: u64 = 300;
@@ -58,14 +61,22 @@ impl PeerManagerForDirectConnector for PeerManager {
async fn list_peers(&self) -> Vec<PeerId> {
let mut ret = vec![];
let allow_public_server = use_global_var!(DIRECT_CONNECT_TO_PUBLIC_SERVER);
let lazy_p2p = self.get_global_ctx().get_flags().lazy_p2p;
let now = Instant::now();
let routes = self.list_routes().await;
for r in routes.iter().filter(|r| {
r.feature_flag
.map(|r| allow_public_server || !r.is_public_server)
.unwrap_or(true)
}) {
ret.push(r.peer_id);
for route in routes.iter() {
let static_allowed = should_background_p2p_with_peer(
route.feature_flag.as_ref(),
allow_public_server,
lazy_p2p,
);
let dynamic_allowed =
should_try_p2p_with_peer(route.feature_flag.as_ref(), allow_public_server)
&& self.has_recent_traffic(route.peer_id, now);
if static_allowed || dynamic_allowed {
ret.push(route.peer_id);
}
}
ret
@@ -625,7 +636,11 @@ impl DirectConnectorManager {
global_ctx.clone(),
peer_manager.clone(),
));
let client = PeerTaskManager::new(DirectConnectorLauncher(data.clone()), peer_manager);
let client = PeerTaskManager::new_with_external_signal(
DirectConnectorLauncher(data.clone()),
peer_manager.clone(),
Some(peer_manager.p2p_demand_notify()),
);
Self {
global_ctx,
data,
@@ -696,7 +711,7 @@ mod tests {
let mut f = p_a.get_global_ctx().get_flags();
f.bind_device = false;
p_a.get_global_ctx().config.set_flags(f);
p_a.get_global_ctx().set_flags(f);
p_c.get_global_ctx()
.config
@@ -765,7 +780,7 @@ mod tests {
}
let mut f = p_c.get_global_ctx().config.get_flags();
f.enable_ipv6 = ipv6;
p_c.get_global_ctx().config.set_flags(f);
p_c.get_global_ctx().set_flags(f);
let mut lis_c = ListenerManager::new(p_c.get_global_ctx(), p_c.clone());
lis_c.prepare_listeners().await.unwrap();
+1 -1
View File
@@ -334,7 +334,7 @@ mod tests {
let mut flags = global_ctx.config.get_flags();
flags.bind_device = false;
global_ctx.config.set_flags(flags);
global_ctx.set_flags(flags);
let mut connector = HttpTunnelConnector::new(test_url.clone(), global_ctx.clone());
let mut listener = TcpTunnelListener::new("tcp://0.0.0.0:25888".parse().unwrap());
+75
View File
@@ -15,6 +15,7 @@ use crate::tunnel::unix::UnixSocketTunnelConnector;
use crate::tunnel::wireguard::{WgConfig, WgTunnelConnector};
use crate::{
common::{error::Error, global_ctx::ArcGlobalCtx, idn, network::IPCollector},
proto::common::PeerFeatureFlag,
tunnel::{
check_scheme_and_get_socket_addr, ring::RingTunnelConnector, tcp::TcpTunnelConnector,
udp::UdpTunnelConnector, IpVersion, TunnelConnector,
@@ -29,6 +30,24 @@ pub mod udp_hole_punch;
pub mod dns_connector;
pub mod http_connector;
pub(crate) fn should_try_p2p_with_peer(
feature_flag: Option<&PeerFeatureFlag>,
allow_public_server: bool,
) -> bool {
feature_flag
.map(|flag| allow_public_server || !flag.is_public_server)
.unwrap_or(true)
}
pub(crate) fn should_background_p2p_with_peer(
feature_flag: Option<&PeerFeatureFlag>,
allow_public_server: bool,
lazy_p2p: bool,
) -> bool {
should_try_p2p_with_peer(feature_flag, allow_public_server)
&& (!lazy_p2p || feature_flag.map(|flag| flag.need_p2p).unwrap_or(false))
}
async fn set_bind_addr_for_peer_connector(
connector: &mut (impl TunnelConnector + ?Sized),
is_ipv4: bool,
@@ -197,3 +216,59 @@ pub async fn create_connector_by_url(
Ok(connector)
}
#[cfg(test)]
mod tests {
use crate::proto::common::PeerFeatureFlag;
use super::{should_background_p2p_with_peer, should_try_p2p_with_peer};
#[test]
fn lazy_background_p2p_requires_need_p2p() {
let no_need_p2p = PeerFeatureFlag {
need_p2p: false,
..Default::default()
};
let need_p2p = PeerFeatureFlag {
need_p2p: true,
..Default::default()
};
assert!(should_background_p2p_with_peer(
Some(&no_need_p2p),
false,
false
));
assert!(!should_background_p2p_with_peer(
Some(&no_need_p2p),
false,
true
));
assert!(should_background_p2p_with_peer(
Some(&need_p2p),
false,
true
));
}
#[test]
fn p2p_policy_respects_public_server_setting() {
let public_server = PeerFeatureFlag {
is_public_server: true,
..Default::default()
};
assert!(!should_try_p2p_with_peer(Some(&public_server), false));
assert!(should_try_p2p_with_peer(Some(&public_server), true));
assert!(!should_background_p2p_with_peer(
Some(&public_server),
false,
false
));
assert!(should_background_p2p_with_peer(
Some(&public_server),
true,
false
));
}
}
+58 -7
View File
@@ -1,7 +1,7 @@
use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
sync::Arc,
time::Duration,
time::{Duration, Instant},
};
use anyhow::{Context, Error};
@@ -29,6 +29,8 @@ use crate::{
},
};
use crate::connector::{should_background_p2p_with_peer, should_try_p2p_with_peer};
pub const BLACKLIST_TIMEOUT_SEC: u64 = 3600;
fn handle_rpc_result<T>(
@@ -418,6 +420,7 @@ impl PeerTaskLauncher for TcpHolePunchPeerTaskLauncher {
#[tracing::instrument(skip(self, data))]
async fn collect_peers_need_task(&self, data: &Self::Data) -> Vec<Self::CollectPeerItem> {
let global_ctx = data.peer_mgr.get_global_ctx();
let lazy_p2p = global_ctx.get_flags().lazy_p2p;
let my_tcp_nat_type = NatType::try_from(
global_ctx
.get_stun_info_collector()
@@ -434,16 +437,17 @@ impl PeerTaskLauncher for TcpHolePunchPeerTaskLauncher {
}
let my_peer_id = data.peer_mgr.my_peer_id();
let now = Instant::now();
data.blacklist.cleanup();
let mut peers_to_connect = Vec::new();
for route in data.peer_mgr.list_routes().await.iter() {
if route
.feature_flag
.map(|x| x.is_public_server)
.unwrap_or(false)
{
let static_allowed =
should_background_p2p_with_peer(route.feature_flag.as_ref(), false, lazy_p2p);
let dynamic_allowed = should_try_p2p_with_peer(route.feature_flag.as_ref(), false)
&& data.peer_mgr.has_recent_traffic(route.peer_id, now);
if !static_allowed && !dynamic_allowed {
continue;
}
@@ -520,7 +524,11 @@ impl TcpHolePunchConnector {
pub fn new(peer_mgr: Arc<PeerManager>) -> Self {
Self {
server: TcpHolePunchServer::new(peer_mgr.clone()),
client: PeerTaskManager::new(TcpHolePunchPeerTaskLauncher {}, peer_mgr.clone()),
client: PeerTaskManager::new_with_external_signal(
TcpHolePunchPeerTaskLauncher {},
peer_mgr.clone(),
Some(peer_mgr.p2p_demand_notify()),
),
peer_mgr,
}
}
@@ -570,12 +578,15 @@ mod tests {
connector::tcp_hole_punch::TcpHolePunchConnector,
peers::{
peer_manager::PeerManager,
peer_task::PeerTaskLauncher,
tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear},
},
proto::common::{NatType, StunInfo},
tunnel::common::tests::wait_for_condition,
};
use super::TcpHolePunchPeerTaskLauncher;
struct MockStunInfoCollector {
udp_nat_type: NatType,
tcp_nat_type: NatType,
@@ -619,6 +630,17 @@ mod tests {
.replace_stun_info_collector(collector);
}
async fn collect_lazy_punch_peers(peer_mgr: Arc<PeerManager>) -> Vec<u32> {
let launcher = TcpHolePunchPeerTaskLauncher {};
let data = launcher.new_data(peer_mgr);
launcher
.collect_peers_need_task(&data)
.await
.into_iter()
.map(|task| task.dst_peer_id)
.collect()
}
#[tokio::test]
async fn tcp_hole_punch_connects() {
let p_a = create_mock_peer_manager().await;
@@ -701,4 +723,33 @@ mod tests {
.map(|c| c.is_empty())
.unwrap_or(true));
}
#[tokio::test]
async fn lazy_p2p_collects_tcp_hole_punch_tasks_only_after_recent_traffic() {
let p_a = create_mock_peer_manager().await;
let p_b = create_mock_peer_manager().await;
let p_c = create_mock_peer_manager().await;
replace_stun_info_collector(p_a.clone(), NatType::PortRestricted);
replace_stun_info_collector(p_b.clone(), NatType::PortRestricted);
replace_stun_info_collector(p_c.clone(), NatType::PortRestricted);
let mut flags = p_a.get_global_ctx().get_flags();
flags.lazy_p2p = true;
p_a.get_global_ctx().set_flags(flags);
connect_peer_manager(p_a.clone(), p_b.clone()).await;
connect_peer_manager(p_b.clone(), p_c.clone()).await;
wait_route_appear(p_a.clone(), p_c.clone()).await.unwrap();
assert!(!collect_lazy_punch_peers(p_a.clone())
.await
.contains(&p_c.my_peer_id()));
p_a.mark_recent_traffic(p_c.my_peer_id());
assert!(collect_lazy_punch_peers(p_a.clone())
.await
.contains(&p_c.my_peer_id()));
}
}
+53 -8
View File
@@ -1,6 +1,6 @@
use std::{
sync::{atomic::AtomicBool, Arc},
time::Duration,
time::{Duration, Instant},
};
use anyhow::{Context, Error};
@@ -32,6 +32,8 @@ use crate::{
tunnel::Tunnel,
};
use crate::connector::{should_background_p2p_with_peer, should_try_p2p_with_peer};
pub(crate) mod both_easy_sym;
pub(crate) mod common;
pub(crate) mod cone;
@@ -426,6 +428,8 @@ impl PeerTaskLauncher for UdpHolePunchPeerTaskLauncher {
}
let my_peer_id = data.peer_mgr.my_peer_id();
let lazy_p2p = data.peer_mgr.get_global_ctx().get_flags().lazy_p2p;
let now = Instant::now();
data.blacklist.cleanup();
@@ -434,11 +438,11 @@ impl PeerTaskLauncher for UdpHolePunchPeerTaskLauncher {
// 2. peers is full cone (any restricted type);
// 3. peers not in blacklist;
for route in data.peer_mgr.list_routes().await.iter() {
if route
.feature_flag
.map(|x| x.is_public_server)
.unwrap_or(false)
{
let static_allowed =
should_background_p2p_with_peer(route.feature_flag.as_ref(), false, lazy_p2p);
let dynamic_allowed = should_try_p2p_with_peer(route.feature_flag.as_ref(), false)
&& data.peer_mgr.has_recent_traffic(route.peer_id, now);
if !static_allowed && !dynamic_allowed {
continue;
}
@@ -531,7 +535,11 @@ impl UdpHolePunchConnector {
pub fn new(peer_mgr: Arc<PeerManager>) -> Self {
Self {
server: UdpHolePunchServer::new(peer_mgr.clone()),
client: PeerTaskManager::new(UdpHolePunchPeerTaskLauncher {}, peer_mgr.clone()),
client: PeerTaskManager::new_with_external_signal(
UdpHolePunchPeerTaskLauncher {},
peer_mgr.clone(),
Some(peer_mgr.p2p_demand_notify()),
),
peer_mgr,
}
}
@@ -580,12 +588,13 @@ pub mod tests {
use crate::common::stun::MockStunInfoCollector;
use crate::peers::{
peer_manager::PeerManager,
peer_task::PeerTaskLauncher,
tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear},
};
use crate::proto::common::NatType;
use crate::tunnel::common::tests::wait_for_condition;
use super::{UdpHolePunchConnector, RUN_TESTING};
use super::{UdpHolePunchConnector, UdpHolePunchPeerTaskLauncher, RUN_TESTING};
pub fn replace_stun_info_collector(peer_mgr: Arc<PeerManager>, udp_nat_type: NatType) {
let collector = Box::new(MockStunInfoCollector { udp_nat_type });
@@ -602,6 +611,17 @@ pub mod tests {
p_a
}
async fn collect_lazy_punch_peers(peer_mgr: Arc<PeerManager>) -> Vec<u32> {
let launcher = UdpHolePunchPeerTaskLauncher {};
let data = launcher.new_data(peer_mgr);
launcher
.collect_peers_need_task(&data)
.await
.into_iter()
.map(|task| task.dst_peer_id)
.collect()
}
#[rstest::rstest]
#[tokio::test]
pub async fn test_hole_punching_blacklist(
@@ -634,4 +654,29 @@ pub mod tests {
)
.await;
}
#[tokio::test]
async fn lazy_p2p_collects_udp_hole_punch_tasks_only_after_recent_traffic() {
let p_a = create_mock_peer_manager_with_mock_stun(NatType::PortRestricted).await;
let p_b = create_mock_peer_manager_with_mock_stun(NatType::PortRestricted).await;
let p_c = create_mock_peer_manager_with_mock_stun(NatType::PortRestricted).await;
let mut flags = p_a.get_global_ctx().get_flags();
flags.lazy_p2p = true;
p_a.get_global_ctx().set_flags(flags);
connect_peer_manager(p_a.clone(), p_b.clone()).await;
connect_peer_manager(p_b.clone(), p_c.clone()).await;
wait_route_appear(p_a.clone(), p_c.clone()).await.unwrap();
assert!(!collect_lazy_punch_peers(p_a.clone())
.await
.contains(&p_c.my_peer_id()));
p_a.mark_recent_traffic(p_c.my_peer_id());
assert!(collect_lazy_punch_peers(p_a.clone())
.await
.contains(&p_c.my_peer_id()));
}
}
+20
View File
@@ -404,6 +404,15 @@ struct NetworkOptions {
)]
p2p_only: Option<bool>,
#[arg(
long,
env = "ET_LAZY_P2P",
help = t!("core_clap.lazy_p2p").to_string(),
num_args = 0..=1,
default_missing_value = "true"
)]
lazy_p2p: Option<bool>,
#[arg(
long,
env = "ET_DISABLE_P2P",
@@ -449,6 +458,15 @@ struct NetworkOptions {
)]
relay_all_peer_rpc: Option<bool>,
#[arg(
long,
env = "ET_NEED_P2P",
help = t!("core_clap.need_p2p").to_string(),
num_args = 0..=1,
default_missing_value = "true"
)]
need_p2p: Option<bool>,
#[cfg(feature = "socks5")]
#[arg(
long,
@@ -1012,6 +1030,7 @@ impl NetworkOptions {
}
f.disable_p2p = self.disable_p2p.unwrap_or(f.disable_p2p);
f.p2p_only = self.p2p_only.unwrap_or(f.p2p_only);
f.lazy_p2p = self.lazy_p2p.unwrap_or(f.lazy_p2p);
f.disable_tcp_hole_punching = self
.disable_tcp_hole_punching
.unwrap_or(f.disable_tcp_hole_punching);
@@ -1019,6 +1038,7 @@ impl NetworkOptions {
.disable_udp_hole_punching
.unwrap_or(f.disable_udp_hole_punching);
f.relay_all_peer_rpc = self.relay_all_peer_rpc.unwrap_or(f.relay_all_peer_rpc);
f.need_p2p = self.need_p2p.unwrap_or(f.need_p2p);
f.multi_thread = self.multi_thread.unwrap_or(f.multi_thread);
if let Some(compression) = &self.compression {
f.data_compress_algo = match compression.as_str() {
+1 -1
View File
@@ -45,7 +45,7 @@ pub async fn prepare_env_with_tld_dns_zone(
if let Some(zone) = tld_dns_zone {
flags.tld_dns_zone = zone.to_string();
}
ctx.config.set_flags(flags);
ctx.set_flags(flags);
}
let (s, r) = create_packet_recv_chan();
+8 -5
View File
@@ -593,9 +593,12 @@ impl Instance {
let mut direct_conn_manager =
DirectConnectorManager::new(global_ctx.clone(), peer_manager.clone());
direct_conn_manager.run();
let direct_conn_manager = Arc::new(direct_conn_manager);
let udp_hole_puncher = UdpHolePunchConnector::new(peer_manager.clone());
let tcp_hole_puncher = TcpHolePunchConnector::new(peer_manager.clone());
let udp_hole_puncher =
Arc::new(Mutex::new(UdpHolePunchConnector::new(peer_manager.clone())));
let tcp_hole_puncher =
Arc::new(Mutex::new(TcpHolePunchConnector::new(peer_manager.clone())));
let peer_center = Arc::new(PeerCenterInstance::new(peer_manager.clone()));
@@ -618,9 +621,9 @@ impl Instance {
peer_manager,
listener_manager,
conn_manager,
direct_conn_manager: Arc::new(direct_conn_manager),
udp_hole_puncher: Arc::new(Mutex::new(udp_hole_puncher)),
tcp_hole_puncher: Arc::new(Mutex::new(tcp_hole_puncher)),
direct_conn_manager,
udp_hole_puncher,
tcp_hole_puncher,
ip_proxy: None,
#[cfg(feature = "kcp")]
+12
View File
@@ -762,6 +762,10 @@ impl NetworkConfig {
flags.p2p_only = p2p_only;
}
if let Some(lazy_p2p) = self.lazy_p2p {
flags.lazy_p2p = lazy_p2p;
}
if let Some(bind_device) = self.bind_device {
flags.bind_device = bind_device;
}
@@ -778,6 +782,10 @@ impl NetworkConfig {
flags.relay_all_peer_rpc = relay_all_peer_rpc;
}
if let Some(need_p2p) = self.need_p2p {
flags.need_p2p = need_p2p;
}
if let Some(multi_thread) = self.multi_thread {
flags.multi_thread = multi_thread;
}
@@ -956,10 +964,12 @@ impl NetworkConfig {
result.disable_quic_input = Some(flags.disable_quic_input);
result.disable_p2p = Some(flags.disable_p2p);
result.p2p_only = Some(flags.p2p_only);
result.lazy_p2p = Some(flags.lazy_p2p);
result.bind_device = Some(flags.bind_device);
result.no_tun = Some(flags.no_tun);
result.enable_exit_node = Some(flags.enable_exit_node);
result.relay_all_peer_rpc = Some(flags.relay_all_peer_rpc);
result.need_p2p = Some(flags.need_p2p);
result.multi_thread = Some(flags.multi_thread);
result.proxy_forward_by_system = Some(flags.proxy_forward_by_system);
result.disable_encryption = Some(!flags.enable_encryption);
@@ -1214,10 +1224,12 @@ mod tests {
flags.disable_quic_input = rng.gen_bool(0.3);
flags.disable_p2p = rng.gen_bool(0.2);
flags.p2p_only = rng.gen_bool(0.2);
flags.lazy_p2p = rng.gen_bool(0.3);
flags.bind_device = rng.gen_bool(0.3);
flags.no_tun = rng.gen_bool(0.1);
flags.enable_exit_node = rng.gen_bool(0.4);
flags.relay_all_peer_rpc = rng.gen_bool(0.5);
flags.need_p2p = rng.gen_bool(0.3);
flags.multi_thread = rng.gen_bool(0.7);
flags.proxy_forward_by_system = rng.gen_bool(0.3);
flags.enable_encryption = rng.gen_bool(0.8);
@@ -1182,7 +1182,7 @@ pub mod tests {
tracing::debug!("pm_center: {:?}", pm_center.my_peer_id());
let mut flag = pm_center.get_global_ctx().get_flags();
flag.relay_network_whitelist = ["net1".to_string(), "net2*".to_string()].join(" ");
pm_center.get_global_ctx().config.set_flags(flag);
pm_center.get_global_ctx().set_flags(flag);
let pma_net1 = create_mock_peer_manager_for_foreign_network(name.as_str()).await;
@@ -1209,7 +1209,7 @@ pub mod tests {
let mut flag = pm_center.get_global_ctx().get_flags();
flag.relay_network_whitelist = "".to_string();
flag.relay_all_peer_rpc = true;
pm_center.get_global_ctx().config.set_flags(flag);
pm_center.get_global_ctx().set_flags(flag);
tracing::debug!("pm_center: {:?}", pm_center.my_peer_id());
let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
+276 -15
View File
@@ -2,7 +2,7 @@ use std::{
fmt::Debug,
net::{IpAddr, Ipv4Addr, Ipv6Addr},
sync::{atomic::AtomicBool, Arc, Weak},
time::{Instant, SystemTime},
time::{Duration, Instant, SystemTime},
};
use anyhow::Context;
@@ -63,6 +63,7 @@ use super::{
peer_map::PeerMap,
peer_ospf_route::PeerRoute,
peer_rpc::PeerRpcManager,
peer_task::ExternalTaskSignal,
relay_peer_map::RelayPeerMap,
route_trait::{ArcRoute, Route},
BoxNicPacketFilter, BoxPeerPacketFilter, PacketRecvChan, PacketRecvChanReceiver,
@@ -161,6 +162,8 @@ pub struct PeerManager {
exit_nodes: RwLock<Vec<IpAddr>>,
reserved_my_peer_id_map: DashMap<String, PeerId>,
recent_have_traffic: Arc<DashMap<PeerId, Instant>>,
p2p_demand_notify: Arc<ExternalTaskSignal>,
allow_loopback_tunnel: AtomicBool,
@@ -181,6 +184,39 @@ impl Debug for PeerManager {
}
impl PeerManager {
// Keep lazy-p2p demand alive across the 5s task rescan interval and a full on-demand
// connect attempt, without retaining extra per-task state in the hot path.
const RECENT_HAVE_TRAFFIC_TTL: Duration = Duration::from_secs(30);
fn should_mark_recent_traffic_for_fanout(total_dst_peers: usize) -> bool {
total_dst_peers <= 1
}
fn gc_recent_traffic_entries<F>(
recent_have_traffic: &DashMap<PeerId, Instant>,
now: Instant,
mut has_directly_connected_conn: F,
) where
F: FnMut(PeerId) -> bool,
{
let mut to_remove = Vec::new();
for entry in recent_have_traffic.iter() {
let peer_id = *entry.key();
let expired =
now.saturating_duration_since(*entry.value()) > Self::RECENT_HAVE_TRAFFIC_TTL;
if expired || has_directly_connected_conn(peer_id) {
to_remove.push(peer_id);
}
}
if !to_remove.is_empty() {
for peer_id in to_remove {
recent_have_traffic.remove(&peer_id);
}
shrink_dashmap(recent_have_traffic, None);
}
}
pub fn new(
route_algo: RouteAlgoType,
global_ctx: ArcGlobalCtx,
@@ -334,6 +370,8 @@ impl PeerManager {
exit_nodes: RwLock::new(exit_nodes),
reserved_my_peer_id_map: DashMap::new(),
recent_have_traffic: Arc::new(DashMap::new()),
p2p_demand_notify: Arc::new(ExternalTaskSignal::new()),
allow_loopback_tunnel: AtomicBool::new(true),
@@ -349,6 +387,57 @@ impl PeerManager {
.store(allow_loopback_tunnel, std::sync::atomic::Ordering::Relaxed);
}
pub fn mark_recent_traffic(&self, dst_peer_id: PeerId) {
if dst_peer_id == self.my_peer_id {
return;
}
let flags = self.global_ctx.flags_arc();
if flags.disable_p2p || !flags.lazy_p2p || self.has_directly_connected_conn(dst_peer_id) {
return;
}
let now = Instant::now();
if let Some(mut last_seen) = self.recent_have_traffic.get_mut(&dst_peer_id) {
let should_notify =
now.saturating_duration_since(*last_seen) > Self::RECENT_HAVE_TRAFFIC_TTL;
*last_seen = now;
if !should_notify {
return;
}
} else {
self.recent_have_traffic.insert(dst_peer_id, now);
}
self.p2p_demand_notify.notify();
}
pub fn has_recent_traffic(&self, peer_id: PeerId, now: Instant) -> bool {
if self.has_directly_connected_conn(peer_id) {
return false;
}
self.recent_have_traffic
.get(&peer_id)
.map(|last_seen| {
now.saturating_duration_since(*last_seen) <= Self::RECENT_HAVE_TRAFFIC_TTL
})
.unwrap_or(false)
}
pub fn clear_recent_traffic(&self, peer_id: PeerId) {
self.recent_have_traffic.remove(&peer_id);
}
pub fn p2p_demand_notify(&self) -> Arc<ExternalTaskSignal> {
self.p2p_demand_notify.clone()
}
fn gc_recent_traffic(&self) {
Self::gc_recent_traffic_entries(&self.recent_have_traffic, Instant::now(), |peer_id| {
self.has_directly_connected_conn(peer_id)
});
}
fn build_foreign_network_manager_accessor(
peer_map: &Arc<PeerMap>,
) -> Box<dyn GlobalForeignNetworkAccessor> {
@@ -405,7 +494,9 @@ impl PeerManager {
"network identity not match".to_string(),
));
}
let peer_id = peer_conn.get_peer_id();
self.peers.add_new_peer_conn(peer_conn).await?;
self.clear_recent_traffic(peer_id);
Ok(())
}
@@ -1152,6 +1243,7 @@ impl PeerManager {
mut msg: ZCPacket,
dst_peer_id: PeerId,
) -> Result<(), Error> {
self.mark_recent_traffic(dst_peer_id);
self.check_p2p_only_before_send(dst_peer_id)?;
self.self_tx_counters
@@ -1229,19 +1321,32 @@ impl PeerManager {
}
}
pub async fn get_msg_dst_peer_ipv4(&self, ipv4_addr: &Ipv4Addr) -> (Vec<PeerId>, bool) {
let mut is_exit_node = false;
let mut dst_peers = vec![];
fn is_all_peers_broadcast_ipv4(&self, ipv4_addr: &Ipv4Addr) -> bool {
let network_length = self
.global_ctx
.get_ipv4()
.map(|x| x.network_length())
.unwrap_or(24);
let ipv4_inet = cidr::Ipv4Inet::new(*ipv4_addr, network_length).unwrap();
if ipv4_addr.is_broadcast()
ipv4_addr.is_broadcast()
|| ipv4_addr.is_multicast()
|| *ipv4_addr == ipv4_inet.last_address()
{
}
fn is_all_peers_broadcast_ipv6(&self, ipv6_addr: &Ipv6Addr) -> bool {
let network_length = self
.global_ctx
.get_ipv6()
.map(|x| x.network_length())
.unwrap_or(64);
let ipv6_inet = cidr::Ipv6Inet::new(*ipv6_addr, network_length).unwrap();
ipv6_addr.is_multicast() || *ipv6_addr == ipv6_inet.last_address()
}
pub async fn get_msg_dst_peer_ipv4(&self, ipv4_addr: &Ipv4Addr) -> (Vec<PeerId>, bool) {
let mut is_exit_node = false;
let mut dst_peers = vec![];
if self.is_all_peers_broadcast_ipv4(ipv4_addr) {
dst_peers.extend(self.peers.list_routes().await.iter().filter_map(|x| {
if *x.key() != self.my_peer_id {
Some(*x.key())
@@ -1284,13 +1389,7 @@ impl PeerManager {
pub async fn get_msg_dst_peer_ipv6(&self, ipv6_addr: &Ipv6Addr) -> (Vec<PeerId>, bool) {
let mut is_exit_node = false;
let mut dst_peers = vec![];
let network_length = self
.global_ctx
.get_ipv6()
.map(|x| x.network_length())
.unwrap_or(64);
let ipv6_inet = cidr::Ipv6Inet::new(*ipv6_addr, network_length).unwrap();
if ipv6_addr.is_multicast() || *ipv6_addr == ipv6_inet.last_address() {
if self.is_all_peers_broadcast_ipv6(ipv6_addr) {
dst_peers.extend(self.peers.list_routes().await.iter().map(|x| *x.key()));
} else if let Some(peer_id) = self.peers.get_peer_id_by_ipv6(ipv6_addr).await {
dst_peers.push(peer_id);
@@ -1350,6 +1449,7 @@ impl PeerManager {
}
let cur_to_peer_id = msg.peer_manager_header().unwrap().to_peer_id.into();
if cur_to_peer_id != 0 {
self.mark_recent_traffic(cur_to_peer_id);
return Self::send_msg_internal(
&self.peers,
&self.foreign_network_client,
@@ -1395,7 +1495,12 @@ impl PeerManager {
let mut errs: Vec<Error> = vec![];
let mut msg = Some(msg);
let total_dst_peers = dst_peers.len();
let should_mark_recent_traffic =
Self::should_mark_recent_traffic_for_fanout(total_dst_peers);
for (i, peer_id) in dst_peers.iter().enumerate() {
if should_mark_recent_traffic {
self.mark_recent_traffic(*peer_id);
}
if let Err(e) = self.check_p2p_only_before_send(*peer_id) {
errs.push(e);
continue;
@@ -1467,6 +1572,28 @@ impl PeerManager {
});
}
async fn run_recent_traffic_gc_routine(&self) {
let recent_have_traffic = self.recent_have_traffic.clone();
let peers = self.peers.clone();
let foreign_network_client = self.foreign_network_client.clone();
self.tasks.lock().await.spawn(async move {
loop {
PeerManager::gc_recent_traffic_entries(
recent_have_traffic.as_ref(),
Instant::now(),
|peer_id| {
if let Some(peer) = peers.get_peer_by_id(peer_id) {
peer.has_directly_connected_conn()
} else {
foreign_network_client.get_peer_map().has_peer(peer_id)
}
},
);
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
}
});
}
async fn run_peer_session_gc_routine(&self) {
let peer_session_store = self.peer_session_store.clone();
self.tasks.lock().await.spawn(async move {
@@ -1499,6 +1626,7 @@ impl PeerManager {
self.start_peer_recv().await;
self.run_clean_peer_without_conn_routine().await;
self.run_relay_session_gc_routine().await;
self.run_recent_traffic_gc_routine().await;
self.run_peer_session_gc_routine().await;
self.run_foriegn_network().await;
@@ -1729,7 +1857,11 @@ impl PeerManager {
#[cfg(test)]
mod tests {
use std::{fmt::Debug, sync::Arc, time::Duration};
use std::{
fmt::Debug,
sync::Arc,
time::{Duration, Instant},
};
use crate::{
common::{
@@ -1766,6 +1898,135 @@ mod tests {
use super::PeerManager;
async fn create_lazy_peer_manager() -> Arc<PeerManager> {
let peer_mgr = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let mut flags = peer_mgr.get_global_ctx().get_flags();
flags.lazy_p2p = true;
peer_mgr.get_global_ctx().set_flags(flags);
peer_mgr
}
#[test]
fn recent_traffic_fanout_policy_only_marks_single_peer() {
assert!(PeerManager::should_mark_recent_traffic_for_fanout(0));
assert!(PeerManager::should_mark_recent_traffic_for_fanout(1));
assert!(!PeerManager::should_mark_recent_traffic_for_fanout(2));
}
#[test]
fn gc_recent_traffic_removes_expired_and_connected_entries() {
let stale_peer = 1;
let direct_peer = 2;
let active_peer = 3;
let recent_have_traffic = dashmap::DashMap::new();
recent_have_traffic.insert(
stale_peer,
Instant::now() - PeerManager::RECENT_HAVE_TRAFFIC_TTL - Duration::from_millis(1),
);
recent_have_traffic.insert(direct_peer, Instant::now());
recent_have_traffic.insert(active_peer, Instant::now());
let future_peer = 4;
recent_have_traffic.insert(future_peer, Instant::now() + Duration::from_secs(1));
PeerManager::gc_recent_traffic_entries(&recent_have_traffic, Instant::now(), |peer_id| {
peer_id == direct_peer
});
assert!(!recent_have_traffic.contains_key(&stale_peer));
assert!(!recent_have_traffic.contains_key(&direct_peer));
assert!(recent_have_traffic.contains_key(&active_peer));
assert!(recent_have_traffic.contains_key(&future_peer));
}
#[tokio::test]
async fn recent_traffic_skips_direct_peers_and_clears_after_direct_connect() {
let peer_mgr_a = create_lazy_peer_manager().await;
let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let peer_b_id = peer_mgr_b.my_peer_id();
peer_mgr_a.mark_recent_traffic(peer_b_id);
assert!(peer_mgr_a.has_recent_traffic(peer_b_id, Instant::now()));
let (a_ring, b_ring) = create_ring_tunnel_pair();
let (client_ret, server_ret) = tokio::join!(
peer_mgr_a.add_client_tunnel(a_ring, true),
peer_mgr_b.add_tunnel_as_server(b_ring, true)
);
client_ret.unwrap();
server_ret.unwrap();
wait_for_condition(
|| {
let peer_mgr_a = peer_mgr_a.clone();
async move { peer_mgr_a.has_directly_connected_conn(peer_b_id) }
},
Duration::from_secs(5),
)
.await;
wait_for_condition(
|| {
let peer_mgr_a = peer_mgr_a.clone();
async move { !peer_mgr_a.has_recent_traffic(peer_b_id, Instant::now()) }
},
Duration::from_secs(5),
)
.await;
peer_mgr_a.mark_recent_traffic(peer_b_id);
assert!(
!peer_mgr_a.has_recent_traffic(peer_b_id, Instant::now()),
"directly connected peers should not be tracked as lazy-p2p demand"
);
}
#[tokio::test]
async fn recent_traffic_notifies_only_when_demand_becomes_active() {
let peer_mgr_a = create_lazy_peer_manager().await;
let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let peer_b_id = peer_mgr_b.my_peer_id();
let signal = peer_mgr_a.p2p_demand_notify();
let initial_version = signal.version();
peer_mgr_a.mark_recent_traffic(peer_b_id);
assert_eq!(signal.version(), initial_version + 1);
let first_seen = *peer_mgr_a.recent_have_traffic.get(&peer_b_id).unwrap();
tokio::time::sleep(Duration::from_millis(5)).await;
peer_mgr_a.mark_recent_traffic(peer_b_id);
assert_eq!(
signal.version(),
initial_version + 1,
"fresh demand should not wake all p2p workers again"
);
let refreshed_seen = *peer_mgr_a.recent_have_traffic.get(&peer_b_id).unwrap();
assert!(refreshed_seen > first_seen);
if let Some(mut last_seen) = peer_mgr_a.recent_have_traffic.get_mut(&peer_b_id) {
*last_seen =
Instant::now() - PeerManager::RECENT_HAVE_TRAFFIC_TTL - Duration::from_millis(1);
}
peer_mgr_a.mark_recent_traffic(peer_b_id);
assert_eq!(signal.version(), initial_version + 2);
}
#[tokio::test]
async fn recent_traffic_tolerates_future_timestamps() {
let peer_mgr_a = create_lazy_peer_manager().await;
let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let peer_b_id = peer_mgr_b.my_peer_id();
peer_mgr_a
.recent_have_traffic
.insert(peer_b_id, Instant::now() + Duration::from_secs(1));
assert!(peer_mgr_a.has_recent_traffic(peer_b_id, Instant::now()));
peer_mgr_a.mark_recent_traffic(peer_b_id);
}
#[tokio::test]
async fn drop_peer_manager() {
let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
@@ -2175,7 +2436,7 @@ mod tests {
let create_mgr = |enable_encryption| async move {
let (s, _r) = create_packet_recv_chan();
let mock_global_ctx = get_mock_global_ctx();
mock_global_ctx.config.set_flags(Flags {
mock_global_ctx.set_flags(Flags {
enable_encryption,
data_compress_algo: CompressionAlgoPb::Zstd.into(),
..Default::default()
+82 -10
View File
@@ -1,5 +1,9 @@
use std::result::Result;
use std::sync::{Arc, Mutex};
use std::{
result::Result,
sync::{atomic::Ordering, Arc, Mutex},
};
use atomic_shim::AtomicU64;
use async_trait::async_trait;
use dashmap::DashMap;
@@ -12,6 +16,39 @@ use anyhow::Error;
use super::peer_manager::PeerManager;
pub struct ExternalTaskSignal {
version: AtomicU64,
notify: Notify,
}
impl Default for ExternalTaskSignal {
fn default() -> Self {
Self::new()
}
}
impl ExternalTaskSignal {
pub fn new() -> Self {
Self {
version: AtomicU64::new(0),
notify: Notify::new(),
}
}
pub fn notify(&self) {
self.version.fetch_add(1, Ordering::Relaxed);
self.notify.notify_waiters();
}
pub fn version(&self) -> u64 {
self.version.load(Ordering::Relaxed)
}
pub fn notified(&self) -> impl std::future::Future<Output = ()> + '_ {
self.notify.notified()
}
}
#[async_trait]
pub trait PeerTaskLauncher: Send + Sync + Clone + 'static {
type Data;
@@ -35,9 +72,9 @@ pub trait PeerTaskLauncher: Send + Sync + Clone + 'static {
pub struct PeerTaskManager<Launcher: PeerTaskLauncher> {
launcher: Launcher,
peer_mgr: Arc<PeerManager>,
main_loop_task: Mutex<Option<ScopedTask<()>>>,
run_signal: Arc<Notify>,
external_signal: Option<Arc<ExternalTaskSignal>>,
data: Launcher::Data,
}
@@ -49,12 +86,20 @@ where
L: PeerTaskLauncher<Data = D, CollectPeerItem = C, TaskRet = T> + 'static,
{
pub fn new(launcher: L, peer_mgr: Arc<PeerManager>) -> Self {
Self::new_with_external_signal(launcher, peer_mgr, None)
}
pub fn new_with_external_signal(
launcher: L,
peer_mgr: Arc<PeerManager>,
external_signal: Option<Arc<ExternalTaskSignal>>,
) -> Self {
let data = launcher.new_data(peer_mgr.clone());
Self {
launcher,
peer_mgr,
main_loop_task: Mutex::new(None),
run_signal: Arc::new(Notify::new()),
external_signal,
data,
}
}
@@ -64,13 +109,20 @@ where
self.launcher.clone(),
self.data.clone(),
self.run_signal.clone(),
self.external_signal.clone(),
))
.into();
self.main_loop_task.lock().unwrap().replace(task);
}
async fn main_loop(launcher: L, data: D, signal: Arc<Notify>) {
async fn main_loop(
launcher: L,
data: D,
signal: Arc<Notify>,
external_signal: Option<Arc<ExternalTaskSignal>>,
) {
let peer_task_map = Arc::new(DashMap::<C, ScopedTask<Result<T, Error>>>::new());
let mut external_signal_version = external_signal.as_ref().map(|signal| signal.version());
loop {
let peers_to_connect = launcher.collect_peers_need_task(&data).await;
@@ -113,11 +165,31 @@ where
launcher.all_task_done(&data).await;
}
select! {
_ = tokio::time::sleep(std::time::Duration::from_millis(
launcher.loop_interval_ms(),
)) => {},
_ = signal.notified() => {}
if let Some(external_signal) = external_signal.as_ref() {
let notified = external_signal.notified();
tokio::pin!(notified);
let cur_version = external_signal.version();
if external_signal_version != Some(cur_version) {
external_signal_version = Some(cur_version);
continue;
}
select! {
_ = tokio::time::sleep(std::time::Duration::from_millis(
launcher.loop_interval_ms(),
)) => {},
_ = signal.notified() => {},
_ = &mut notified => {
external_signal_version = Some(external_signal.version());
}
}
} else {
select! {
_ = tokio::time::sleep(std::time::Duration::from_millis(
launcher.loop_interval_ms(),
)) => {},
_ = signal.notified() => {}
}
}
}
}
+2
View File
@@ -85,6 +85,8 @@ message NetworkConfig {
common.SecureModeConfig secure_mode = 55;
reserved 56;
optional string credential_file = 57;
optional bool lazy_p2p = 58;
optional bool need_p2p = 59;
}
message PortForwardConfig {
+4
View File
@@ -70,6 +70,9 @@ message FlagsInConfig {
bool p2p_only = 32;
bool disable_tcp_hole_punching = 34;
bool lazy_p2p = 37;
bool need_p2p = 38;
}
message RpcDescriptor {
@@ -217,6 +220,7 @@ message PeerFeatureFlag {
bool quic_input = 6;
bool no_relay_quic = 7;
bool is_credential_peer = 8;
bool need_p2p = 9;
}
enum SocketType {
+169 -4
View File
@@ -48,17 +48,20 @@ pub fn prepare_linux_namespaces() {
del_netns("net_b");
del_netns("net_c");
del_netns("net_d");
del_netns("net_e");
create_netns("net_a", "10.1.1.1/24", "fd11::1/64");
create_netns("net_b", "10.1.1.2/24", "fd11::2/64");
create_netns("net_c", "10.1.2.3/24", "fd12::3/64");
create_netns("net_d", "10.1.2.4/24", "fd12::4/64");
create_netns("net_e", "10.1.1.3/24", "fd11::3/64");
prepare_bridge("br_a");
prepare_bridge("br_b");
add_ns_to_bridge("br_a", "net_a");
add_ns_to_bridge("br_a", "net_b");
add_ns_to_bridge("br_a", "net_e");
add_ns_to_bridge("br_b", "net_c");
add_ns_to_bridge("br_b", "net_d");
}
@@ -89,10 +92,13 @@ pub async fn init_three_node(proto: &str) -> Vec<Instance> {
init_three_node_ex(proto, |cfg| cfg, false).await
}
pub async fn init_three_node_ex<F: Fn(TomlConfigLoader) -> TomlConfigLoader>(
async fn init_three_node_ex_with_inst3<F: Fn(TomlConfigLoader) -> TomlConfigLoader>(
proto: &str,
cfg_cb: F,
use_public_server: bool,
inst3_ns: &str,
inst3_ipv4: &str,
inst3_ipv6: &str,
) -> Vec<Instance> {
prepare_linux_namespaces();
@@ -110,9 +116,9 @@ pub async fn init_three_node_ex<F: Fn(TomlConfigLoader) -> TomlConfigLoader>(
)));
let mut inst3 = Instance::new(cfg_cb(get_inst_config(
"inst3",
Some("net_c"),
"10.144.144.3",
"fd00::3/64",
Some(inst3_ns),
inst3_ipv4,
inst3_ipv6,
)));
inst1.run().await.unwrap();
@@ -211,6 +217,29 @@ pub async fn init_three_node_ex<F: Fn(TomlConfigLoader) -> TomlConfigLoader>(
vec![inst1, inst2, inst3]
}
pub async fn init_three_node_ex<F: Fn(TomlConfigLoader) -> TomlConfigLoader>(
proto: &str,
cfg_cb: F,
use_public_server: bool,
) -> Vec<Instance> {
init_three_node_ex_with_inst3(
proto,
cfg_cb,
use_public_server,
"net_c",
"10.144.144.3",
"fd00::3/64",
)
.await
}
async fn init_lazy_p2p_three_node_ex<F: Fn(TomlConfigLoader) -> TomlConfigLoader>(
proto: &str,
cfg_cb: F,
) -> Vec<Instance> {
init_three_node_ex_with_inst3(proto, cfg_cb, false, "net_e", "10.144.144.3", "fd00::3/64").await
}
pub async fn drop_insts(insts: Vec<Instance>) {
let mut set = JoinSet::new();
for mut inst in insts {
@@ -2079,6 +2108,24 @@ where
}
}
async fn wait_route_cost(inst: &Instance, peer_id: u32, cost: i32, timeout: Duration) {
let peer_manager = inst.get_peer_manager();
wait_for_condition(
move || {
let peer_manager = peer_manager.clone();
async move {
peer_manager
.list_routes()
.await
.iter()
.any(|route| route.peer_id == peer_id && route.cost == cost)
}
},
timeout,
)
.await;
}
#[rstest::rstest]
#[tokio::test]
#[serial_test::serial]
@@ -2400,6 +2447,124 @@ pub async fn acl_group_base_test(
drop_insts(insts).await;
}
#[tokio::test]
#[serial_test::serial]
pub async fn lazy_p2p_builds_direct_connection_on_demand() {
let insts = init_lazy_p2p_three_node_ex("udp", |cfg| {
if cfg.get_inst_name() == "inst1" {
let mut flags = cfg.get_flags();
flags.lazy_p2p = true;
cfg.set_flags(flags);
}
cfg
})
.await;
let inst3_peer_id = insts[2].peer_id();
assert!(
!insts[0]
.get_peer_manager()
.get_peer_map()
.has_peer(inst3_peer_id),
"inst1 should not proactively connect to inst3 when lazy_p2p is enabled"
);
wait_route_cost(&insts[0], inst3_peer_id, 2, Duration::from_secs(5)).await;
assert!(
ping_test("net_a", "10.144.144.3", None).await,
"initial relay traffic should still succeed"
);
wait_for_condition(
|| async {
insts[0]
.get_peer_manager()
.get_peer_map()
.has_peer(inst3_peer_id)
},
Duration::from_secs(10),
)
.await;
wait_route_cost(&insts[0], inst3_peer_id, 1, Duration::from_secs(10)).await;
drop_insts(insts).await;
}
#[tokio::test]
#[serial_test::serial]
pub async fn need_p2p_overrides_lazy_p2p() {
let insts = init_lazy_p2p_three_node_ex("udp", |cfg| {
let mut flags = cfg.get_flags();
if cfg.get_inst_name() == "inst1" {
flags.lazy_p2p = true;
}
if cfg.get_inst_name() == "inst3" {
flags.need_p2p = true;
}
cfg.set_flags(flags);
cfg
})
.await;
let inst3_peer_id = insts[2].peer_id();
wait_route_cost(&insts[0], inst3_peer_id, 2, Duration::from_secs(5)).await;
wait_for_condition(
|| async {
insts[0]
.get_peer_manager()
.get_peer_map()
.has_peer(inst3_peer_id)
},
Duration::from_secs(10),
)
.await;
wait_route_cost(&insts[0], inst3_peer_id, 1, Duration::from_secs(10)).await;
drop_insts(insts).await;
}
#[tokio::test]
#[serial_test::serial]
pub async fn lazy_p2p_warms_up_before_p2p_only_send() {
let insts = init_lazy_p2p_three_node_ex("udp", |cfg| {
if cfg.get_inst_name() == "inst1" {
let mut flags = cfg.get_flags();
flags.lazy_p2p = true;
flags.p2p_only = true;
cfg.set_flags(flags);
}
cfg
})
.await;
let inst3_peer_id = insts[2].peer_id();
wait_route_cost(&insts[0], inst3_peer_id, 2, Duration::from_secs(5)).await;
assert!(
!ping_test("net_a", "10.144.144.3", None).await,
"the first send should still fail under p2p_only before direct connectivity exists"
);
wait_for_condition(
|| async {
insts[0]
.get_peer_manager()
.get_peer_map()
.has_peer(inst3_peer_id)
},
Duration::from_secs(10),
)
.await;
wait_route_cost(&insts[0], inst3_peer_id, 1, Duration::from_secs(10)).await;
wait_for_condition(
|| async { ping_test("net_a", "10.144.144.3", None).await },
Duration::from_secs(6),
)
.await;
drop_insts(insts).await;
}
#[rstest::rstest]
#[tokio::test]
#[serial_test::serial]