support shared tun

This commit is contained in:
sijie.sun
2026-04-19 21:52:31 +08:00
parent f63054e937
commit 0ee551a285
7 changed files with 1814 additions and 83 deletions
+1
View File
@@ -42,6 +42,7 @@ pub type NetworkIdentity = crate::common::config::NetworkIdentity;
pub enum GlobalCtxEvent {
TunDeviceReady(String),
TunDeviceError(String),
TunDeviceFallback(String),
PeerAdded(PeerId),
PeerRemoved(PeerId),
+286 -83
View File
@@ -1,5 +1,3 @@
#[cfg(feature = "tun")]
use std::any::Any;
use std::collections::HashSet;
use std::net::{IpAddr, Ipv4Addr};
use std::sync::atomic::{AtomicBool, Ordering};
@@ -65,6 +63,10 @@ use crate::vpn_portal::{self, VpnPortal};
#[cfg(feature = "magic-dns")]
use super::dns_server::{MAGIC_DNS_FAKE_IP, runner::DnsRunner};
use super::listeners::ListenerManager;
#[cfg(feature = "tun")]
use super::shared_tun::{
SharedTunAccess, SharedTunAttachError, SharedTunMemberHandle, try_attach_shared_tun,
};
#[cfg(feature = "socks5")]
use crate::gateway::socks5::Socks5Server;
@@ -133,6 +135,13 @@ impl IpProxy {
#[cfg(feature = "tun")]
type NicCtx = super::virtual_nic::NicCtx;
#[cfg(feature = "tun")]
enum NicRuntime {
Dedicated(NicCtx),
Shared(SharedTunMemberHandle),
Dummy(JoinSet<()>),
}
#[cfg(feature = "magic-dns")]
struct MagicDnsContainer {
dns_runner_task: ScopedTask<()>,
@@ -142,7 +151,7 @@ struct MagicDnsContainer {
// nic container will be cleared when dhcp ip changed
#[cfg(feature = "tun")]
pub struct NicCtxContainer {
nic_ctx: Option<Box<dyn Any + 'static + Send>>,
nic_ctx: Option<NicRuntime>,
#[cfg(feature = "magic-dns")]
magic_dns: Option<MagicDnsContainer>,
}
@@ -150,14 +159,14 @@ pub struct NicCtxContainer {
#[cfg(feature = "tun")]
impl NicCtxContainer {
#[cfg(not(feature = "magic-dns"))]
fn new(nic_ctx: NicCtx) -> Self {
fn new_dedicated(nic_ctx: NicCtx) -> Self {
Self {
nic_ctx: Some(Box::new(nic_ctx)),
nic_ctx: Some(NicRuntime::Dedicated(nic_ctx)),
}
}
#[cfg(feature = "magic-dns")]
fn new(nic_ctx: NicCtx, dns_runner: Option<DnsRunner>) -> Self {
fn new_dedicated(nic_ctx: NicCtx, dns_runner: Option<DnsRunner>) -> Self {
if let Some(mut dns_runner) = dns_runner {
let token = CancellationToken::new();
let token_clone = token.clone();
@@ -165,7 +174,7 @@ impl NicCtxContainer {
let _ = dns_runner.run(token_clone).await;
});
Self {
nic_ctx: Some(Box::new(nic_ctx)),
nic_ctx: Some(NicRuntime::Dedicated(nic_ctx)),
magic_dns: Some(MagicDnsContainer {
dns_runner_task: task.into(),
dns_runner_cancel_token: token,
@@ -173,15 +182,45 @@ impl NicCtxContainer {
}
} else {
Self {
nic_ctx: Some(Box::new(nic_ctx)),
nic_ctx: Some(NicRuntime::Dedicated(nic_ctx)),
magic_dns: None,
}
}
}
fn new_with_any<T: 'static + Send>(ctx: T) -> Self {
#[cfg(not(feature = "magic-dns"))]
fn new_shared(handle: SharedTunMemberHandle) -> Self {
Self {
nic_ctx: Some(Box::new(ctx)),
nic_ctx: Some(NicRuntime::Shared(handle)),
}
}
#[cfg(feature = "magic-dns")]
fn new_shared(handle: SharedTunMemberHandle, dns_runner: Option<DnsRunner>) -> Self {
if let Some(mut dns_runner) = dns_runner {
let token = CancellationToken::new();
let token_clone = token.clone();
let task = tokio::spawn(async move {
let _ = dns_runner.run(token_clone).await;
});
Self {
nic_ctx: Some(NicRuntime::Shared(handle)),
magic_dns: Some(MagicDnsContainer {
dns_runner_task: task.into(),
dns_runner_cancel_token: token,
}),
}
} else {
Self {
nic_ctx: Some(NicRuntime::Shared(handle)),
magic_dns: None,
}
}
}
fn new_dummy(tasks: JoinSet<()>) -> Self {
Self {
nic_ctx: Some(NicRuntime::Dummy(tasks)),
#[cfg(feature = "magic-dns")]
magic_dns: None,
}
@@ -666,19 +705,31 @@ impl Instance {
// use a mock nic ctx to consume packets.
#[cfg(feature = "tun")]
async fn clear_nic_ctx(
arc_nic_ctx: ArcNicCtx,
packet_recv: Arc<Mutex<PacketRecvChanReceiver>>,
) {
async fn cleanup_nic_ctx(mut old_ctx: NicCtxContainer) {
if let Some(runtime) = old_ctx.nic_ctx.take() {
match runtime {
NicRuntime::Shared(handle) => handle.shutdown().await,
NicRuntime::Dedicated(_) | NicRuntime::Dummy(_) => {}
}
}
#[cfg(feature = "magic-dns")]
if let Some(old_ctx) = arc_nic_ctx.lock().await.take()
&& let Some(dns_runner) = old_ctx.magic_dns
{
if let Some(dns_runner) = old_ctx.magic_dns.take() {
dns_runner.dns_runner_cancel_token.cancel();
tracing::debug!("cancelling dns runner task");
let ret = dns_runner.dns_runner_task.await;
tracing::debug!("dns runner task cancelled, ret: {:?}", ret);
};
}
}
#[cfg(feature = "tun")]
async fn clear_nic_ctx(
arc_nic_ctx: ArcNicCtx,
packet_recv: Arc<Mutex<PacketRecvChanReceiver>>,
) {
if let Some(old_ctx) = arc_nic_ctx.lock().await.take() {
Self::cleanup_nic_ctx(old_ctx).await;
}
let mut tasks = JoinSet::new();
tasks.spawn(async move {
@@ -690,7 +741,7 @@ impl Instance {
arc_nic_ctx
.lock()
.await
.replace(NicCtxContainer::new_with_any(tasks));
.replace(NicCtxContainer::new_dummy(tasks));
tracing::debug!("nic ctx cleared.");
}
@@ -716,13 +767,13 @@ impl Instance {
}
#[cfg(feature = "tun")]
async fn use_new_nic_ctx(
async fn use_new_dedicated_nic_ctx(
arc_nic_ctx: ArcNicCtx,
nic_ctx: NicCtx,
#[cfg(feature = "magic-dns")] magic_dns: Option<DnsRunner>,
) {
let mut g = arc_nic_ctx.lock().await;
*g = Some(NicCtxContainer::new(
*g = Some(NicCtxContainer::new_dedicated(
nic_ctx,
#[cfg(feature = "magic-dns")]
magic_dns,
@@ -730,6 +781,91 @@ impl Instance {
tracing::debug!("nic ctx updated.");
}
#[cfg(feature = "tun")]
async fn use_new_shared_nic_ctx(
arc_nic_ctx: ArcNicCtx,
handle: SharedTunMemberHandle,
#[cfg(feature = "magic-dns")] magic_dns: Option<DnsRunner>,
) {
let mut g = arc_nic_ctx.lock().await;
*g = Some(NicCtxContainer::new_shared(
handle,
#[cfg(feature = "magic-dns")]
magic_dns,
));
tracing::debug!("shared nic ctx updated.");
}
#[cfg(all(not(mobile), feature = "tun"))]
async fn setup_dedicated_nic_ctx(
arc_nic_ctx: ArcNicCtx,
global_ctx: ArcGlobalCtx,
peer_mgr: Arc<PeerManager>,
peer_packet_receiver: Arc<Mutex<PacketRecvChanReceiver>>,
close_notifier: Arc<Notify>,
ipv4_addr: Option<Ipv4Inet>,
ipv6_addr: Option<cidr::Ipv6Inet>,
) -> Result<(), Error> {
let mut new_nic_ctx =
NicCtx::new(global_ctx, &peer_mgr, peer_packet_receiver, close_notifier);
new_nic_ctx.run(ipv4_addr, ipv6_addr).await?;
#[cfg(feature = "magic-dns")]
{
let ifname = new_nic_ctx.ifname().await;
let dns_runner =
ipv4_addr.and_then(|ipv4| Self::create_magic_dns_runner(peer_mgr, ifname, ipv4));
Self::use_new_dedicated_nic_ctx(arc_nic_ctx, new_nic_ctx, dns_runner).await;
}
#[cfg(not(feature = "magic-dns"))]
Self::use_new_dedicated_nic_ctx(arc_nic_ctx, new_nic_ctx).await;
Ok(())
}
#[cfg(all(not(mobile), feature = "tun"))]
async fn try_setup_shared_tun(
arc_nic_ctx: ArcNicCtx,
global_ctx: ArcGlobalCtx,
peer_mgr: Arc<PeerManager>,
peer_packet_receiver: Arc<Mutex<PacketRecvChanReceiver>>,
close_notifier: Arc<Notify>,
ipv4_addr: Option<Ipv4Inet>,
) -> Result<bool, Error> {
match try_attach_shared_tun(
global_ctx.clone(),
peer_mgr.clone(),
peer_packet_receiver,
close_notifier,
SharedTunAccess::Native,
)
.await
{
Ok(attached) => {
global_ctx.issue_event(GlobalCtxEvent::TunDeviceReady(attached.ifname.clone()));
#[cfg(feature = "magic-dns")]
let dns_runner = ipv4_addr
.and_then(|ip| {
Self::create_magic_dns_runner(peer_mgr, Some(attached.ifname.clone()), ip)
});
Self::use_new_shared_nic_ctx(
arc_nic_ctx,
attached.handle,
#[cfg(feature = "magic-dns")]
dns_runner,
)
.await;
Ok(true)
}
Err(SharedTunAttachError::Fallback(reason)) => {
tracing::info!(instance_id = %global_ctx.get_id(), %reason, "shared tun unavailable, falling back to dedicated tun");
global_ctx.issue_event(GlobalCtxEvent::TunDeviceFallback(reason));
Ok(false)
}
Err(SharedTunAttachError::Fatal(err)) => Err(err),
}
}
// Warning, if there is an IP conflict in the network when using DHCP, the IP will be automatically changed.
fn check_dhcp_ip_conflict(&self) {
use rand::Rng;
@@ -813,37 +949,57 @@ impl Instance {
continue;
}
global_ctx_c.set_ipv4(Some(ip));
#[cfg(all(not(mobile), feature = "tun"))]
{
let mut new_nic_ctx = NicCtx::new(
match Self::try_setup_shared_tun(
nic_ctx.clone(),
global_ctx_c.clone(),
&peer_manager_c,
peer_manager_c.clone(),
_peer_packet_receiver.clone(),
nic_closed_notifier.clone(),
);
if let Err(e) = new_nic_ctx.run(Some(ip), global_ctx_c.get_ipv6()).await {
tracing::error!(
?current_dhcp_ip,
?candidate_ipv4_addr,
?e,
"add ip failed"
);
global_ctx_c.set_ipv4(None);
continue;
}
#[cfg(feature = "magic-dns")]
let ifname = new_nic_ctx.ifname().await;
Self::use_new_nic_ctx(
nic_ctx.clone(),
new_nic_ctx,
#[cfg(feature = "magic-dns")]
Self::create_magic_dns_runner(peer_manager_c.clone(), ifname, ip),
Some(ip),
)
.await;
.await
{
Ok(true) => {}
Ok(false) => {
if let Err(e) = Self::setup_dedicated_nic_ctx(
nic_ctx.clone(),
global_ctx_c.clone(),
peer_manager_c.clone(),
_peer_packet_receiver.clone(),
nic_closed_notifier.clone(),
Some(ip),
global_ctx_c.get_ipv6(),
)
.await
{
tracing::error!(
?current_dhcp_ip,
?candidate_ipv4_addr,
?e,
"add ip failed"
);
global_ctx_c.set_ipv4(None);
continue;
}
}
Err(e) => {
tracing::error!(
?current_dhcp_ip,
?candidate_ipv4_addr,
?e,
"shared tun attach failed"
);
global_ctx_c.set_ipv4(None);
continue;
}
}
}
current_dhcp_ip = Some(ip);
global_ctx_c.set_ipv4(Some(ip));
global_ctx_c.issue_event(GlobalCtxEvent::DhcpIpv4Changed(last_ip, Some(ip)));
} else {
current_dhcp_ip = None;
@@ -883,36 +1039,51 @@ impl Instance {
return;
};
let mut new_nic_ctx = NicCtx::new(
let shared_ready = match Self::try_setup_shared_tun(
nic_ctx.clone(),
peer_mgr.get_global_ctx(),
&peer_mgr,
peer_mgr.clone(),
peer_packet_receiver.clone(),
close_notifier.clone(),
);
if let Err(e) = new_nic_ctx.run(ipv4_addr, ipv6_addr).await {
if let Some(output_tx) = output_tx.take() {
let _ = output_tx.send(Err(e));
return;
}
tracing::error!("failed to create new nic ctx, err: {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
// Create Magic DNS runner only if we have IPv4
#[cfg(feature = "magic-dns")]
ipv4_addr,
)
.await
{
let ifname = new_nic_ctx.ifname().await;
let dns_runner = if let Some(ipv4) = ipv4_addr {
Self::create_magic_dns_runner(peer_mgr, ifname, ipv4)
} else {
None
};
Self::use_new_nic_ctx(nic_ctx.clone(), new_nic_ctx, dns_runner).await;
Ok(ready) => ready,
Err(e) => {
if let Some(output_tx) = output_tx.take() {
let _ = output_tx.send(Err(e));
return;
}
tracing::error!("failed to attach shared tun, err: {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
if shared_ready {
// shared nic context is installed
} else {
if let Err(e) = Self::setup_dedicated_nic_ctx(
nic_ctx.clone(),
peer_mgr.get_global_ctx(),
peer_mgr.clone(),
peer_packet_receiver.clone(),
close_notifier.clone(),
ipv4_addr,
ipv6_addr,
)
.await
{
if let Some(output_tx) = output_tx.take() {
let _ = output_tx.send(Err(e));
return;
}
tracing::error!("failed to create new nic ctx, err: {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
}
#[cfg(not(feature = "magic-dns"))]
Self::use_new_nic_ctx(nic_ctx.clone(), new_nic_ctx).await;
}
if let Some(output_tx) = output_tx.take() {
@@ -1480,30 +1651,60 @@ impl Instance {
return Ok(());
}
let close_notifier = Arc::new(Notify::new());
let mut new_nic_ctx = NicCtx::new(
match try_attach_shared_tun(
global_ctx.clone(),
&peer_manager,
peer_manager.clone(),
peer_packet_receiver.clone(),
close_notifier.clone(),
);
new_nic_ctx
.run_for_mobile(fd)
.await
.with_context(|| "add ip failed")?;
SharedTunAccess::MobileFd(fd),
)
.await
{
Ok(attached) => {
global_ctx.issue_event(GlobalCtxEvent::TunDeviceReady(attached.ifname.clone()));
let magic_dns_runner = if let Some(ipv4) = global_ctx.get_ipv4() {
Self::create_magic_dns_runner(
peer_manager.clone(),
Some(attached.ifname.clone()),
ipv4,
)
} else {
None
};
Self::use_new_shared_nic_ctx(nic_ctx, attached.handle, magic_dns_runner).await;
}
Err(SharedTunAttachError::Fallback(reason)) => {
tracing::info!(instance_id = %global_ctx.get_id(), %reason, "shared mobile tun unavailable, falling back to dedicated tun");
global_ctx.issue_event(GlobalCtxEvent::TunDeviceFallback(reason));
let mut dedicated_nic_ctx = NicCtx::new(
global_ctx.clone(),
&peer_manager,
peer_packet_receiver.clone(),
close_notifier.clone(),
);
dedicated_nic_ctx
.run_for_mobile(fd)
.await
.with_context(|| "add ip failed")?;
let magic_dns_runner = if let Some(ipv4) = global_ctx.get_ipv4() {
Self::create_magic_dns_runner(peer_manager.clone(), None, ipv4)
} else {
None
};
Self::use_new_nic_ctx(nic_ctx.clone(), new_nic_ctx, magic_dns_runner).await;
let magic_dns_runner = if let Some(ipv4) = global_ctx.get_ipv4() {
Self::create_magic_dns_runner(peer_manager.clone(), None, ipv4)
} else {
None
};
Self::use_new_dedicated_nic_ctx(nic_ctx, dedicated_nic_ctx, magic_dns_runner).await;
}
Err(SharedTunAttachError::Fatal(err)) => return Err(err.into()),
}
Ok(())
}
pub async fn clear_resources(&mut self) {
self.peer_manager.clear_resources().await;
#[cfg(feature = "tun")]
let _ = self.nic_ctx.lock().await.take();
if let Some(old_ctx) = self.nic_ctx.lock().await.take() {
Self::cleanup_nic_ctx(old_ctx).await;
}
}
}
@@ -1515,7 +1716,9 @@ impl Drop for Instance {
let nic_ctx = self.nic_ctx.clone();
tokio::spawn(async move {
#[cfg(feature = "tun")]
nic_ctx.lock().await.take();
if let Some(old_ctx) = nic_ctx.lock().await.take() {
Instance::cleanup_nic_ctx(old_ctx).await;
}
if let Some(pm) = pm.upgrade() {
pm.clear_resources().await;
};
+3
View File
@@ -6,5 +6,8 @@ pub mod listeners;
pub mod proxy_cidrs_monitor;
#[cfg(feature = "tun")]
pub mod shared_tun;
#[cfg(feature = "tun")]
pub mod virtual_nic;
File diff suppressed because it is too large Load Diff
+26
View File
@@ -742,6 +742,22 @@ impl VirtualNic {
Ok(())
}
pub async fn remove_route(&self, address: Ipv4Addr, cidr: u8) -> Result<(), Error> {
let _g = self.global_ctx.net_ns.guard();
self.ifcfg
.remove_ipv4_route(self.ifname(), address, cidr)
.await?;
Ok(())
}
pub async fn remove_ipv6_route(&self, address: Ipv6Addr, cidr: u8) -> Result<(), Error> {
let _g = self.global_ctx.net_ns.guard();
self.ifcfg
.remove_ipv6_route(self.ifname(), address, cidr)
.await?;
Ok(())
}
pub async fn remove_ip(&self, ip: Option<Ipv4Inet>) -> Result<(), Error> {
let _g = self.global_ctx.net_ns.guard();
self.ifcfg.remove_ip(self.ifname(), ip).await?;
@@ -770,6 +786,12 @@ impl VirtualNic {
Ok(())
}
pub async fn set_mtu(&self, mtu: u16) -> Result<(), Error> {
let _g = self.global_ctx.net_ns.guard();
self.ifcfg.set_mtu(self.ifname(), mtu as u32).await?;
Ok(())
}
pub fn get_ifcfg(&self) -> impl IfConfiguerTrait + use<> {
IfConfiger {}
}
@@ -943,6 +965,10 @@ impl NicCtx {
}
}
pub(crate) async fn forward_nic_packet_to_peers(ret: ZCPacket, mgr: &PeerManager) {
Self::do_forward_nic_to_peers(ret, mgr).await;
}
fn do_forward_nic_to_peers_task(
&mut self,
mut stream: Pin<Box<dyn ZCPacketStream>>,
+4
View File
@@ -363,6 +363,10 @@ fn handle_event(
event!(error, %err, "[{}] tun device error", instance_id);
}
GlobalCtxEvent::TunDeviceFallback(reason) => {
event!(warn, %reason, "[{}] tun device fallback", instance_id);
}
GlobalCtxEvent::Connecting(dst) => {
event!(info, category: "CONNECTION", %dst, "[{}] connecting to peer", instance_id);
}
+492
View File
@@ -258,6 +258,211 @@ pub async fn drop_insts(insts: Vec<Instance>) {
while set.join_next().await.is_some() {}
}
async fn wait_for_tun_ready_event(
receiver: &mut tokio::sync::broadcast::Receiver<crate::common::global_ctx::GlobalCtxEvent>,
) -> String {
tokio::time::timeout(Duration::from_secs(5), async {
loop {
if let crate::common::global_ctx::GlobalCtxEvent::TunDeviceReady(ifname) = receiver.recv().await.unwrap() {
return ifname;
}
}
})
.await
.unwrap()
}
async fn assert_no_tun_ready_event(
receiver: &mut tokio::sync::broadcast::Receiver<crate::common::global_ctx::GlobalCtxEvent>,
timeout: Duration,
) {
tokio::time::timeout(timeout, async {
loop {
if let crate::common::global_ctx::GlobalCtxEvent::TunDeviceReady(ifname) = receiver.recv().await.unwrap() {
panic!("unexpected TunDeviceReady event: {ifname}");
}
}
})
.await
.ok();
}
async fn assert_no_tun_fallback_event(
receiver: &mut tokio::sync::broadcast::Receiver<crate::common::global_ctx::GlobalCtxEvent>,
timeout: Duration,
) {
tokio::time::timeout(timeout, async {
loop {
if let crate::common::global_ctx::GlobalCtxEvent::TunDeviceFallback(reason) = receiver.recv().await.unwrap() {
panic!("unexpected TunDeviceFallback event: {reason}");
}
}
})
.await
.ok();
}
fn assert_tcp_proxy_metric_has_protocol(
inst: &Instance,
protocol: TcpProxyEntryTransportType,
min_value: u64,
) {
let metrics = inst
.get_global_ctx()
.stats_manager()
.get_metrics_by_prefix(&MetricName::TcpProxyConnect.to_string());
assert!(
metrics.iter().any(|metric| {
metric.value >= min_value
&& metric.labels.labels().iter().any(|l| {
let t = LabelType::Protocol(protocol.as_str_name().to_string());
t.key() == l.key && t.value() == l.value
})
}),
"metrics: {:?}",
metrics
);
}
async fn shared_tun_subnet_proxy_transport_test(
transport: TcpProxyEntryTransportType,
source_shared: bool,
) {
prepare_linux_namespaces();
let center_cfg = get_inst_config("center", Some("net_b"), "10.144.144.100", "fd00::64/64");
center_cfg.set_listeners(vec![]);
let mut center = Instance::new(center_cfg);
let mut shared_events = Vec::new();
let mut insts = Vec::new();
let dst_idx;
if source_shared {
let source_cfg = get_inst_config("src_shared", Some("net_a"), "10.144.144.1", "fd00::1/64");
source_cfg.set_listeners(vec![]);
source_cfg.set_socks5_portal(None);
let mut source_flags = source_cfg.get_flags();
source_flags.dev_name = "et_ssrc0".to_string();
match transport {
TcpProxyEntryTransportType::Kcp => source_flags.enable_kcp_proxy = true,
TcpProxyEntryTransportType::Quic => source_flags.enable_quic_proxy = true,
_ => unreachable!(),
}
source_cfg.set_flags(source_flags.clone());
let source = Instance::new(source_cfg);
shared_events.push(source.get_global_ctx().subscribe());
let source_peer = get_inst_config("src_peer", Some("net_a"), "10.144.144.2", "fd00::2/64");
source_peer.set_listeners(vec![]);
source_peer.set_socks5_portal(None);
source_peer.set_flags(source_flags);
let source_peer = Instance::new(source_peer);
shared_events.push(source_peer.get_global_ctx().subscribe());
let dst_cfg = get_inst_config("dst", Some("net_c"), "10.144.144.3", "fd00::3/64");
dst_cfg.set_listeners(vec![]);
dst_cfg.set_socks5_portal(None);
dst_cfg
.add_proxy_cidr("10.1.2.0/24".parse().unwrap(), None)
.unwrap();
let dst = Instance::new(dst_cfg);
insts.push(source);
insts.push(source_peer);
insts.push(dst);
dst_idx = 2;
} else {
let src_cfg = get_inst_config("src", Some("net_a"), "10.144.144.1", "fd00::1/64");
src_cfg.set_listeners(vec![]);
src_cfg.set_socks5_portal(None);
let mut src_flags = src_cfg.get_flags();
match transport {
TcpProxyEntryTransportType::Kcp => src_flags.enable_kcp_proxy = true,
TcpProxyEntryTransportType::Quic => src_flags.enable_quic_proxy = true,
_ => unreachable!(),
}
src_cfg.set_flags(src_flags);
let src = Instance::new(src_cfg);
let dst_cfg = get_inst_config("dst_shared", Some("net_c"), "10.144.144.3", "fd00::3/64");
dst_cfg.set_listeners(vec![]);
dst_cfg.set_socks5_portal(None);
dst_cfg
.add_proxy_cidr("10.1.2.0/24".parse().unwrap(), None)
.unwrap();
let mut dst_flags = dst_cfg.get_flags();
dst_flags.dev_name = "et_sdst0".to_string();
dst_cfg.set_flags(dst_flags.clone());
let dst = Instance::new(dst_cfg);
shared_events.push(dst.get_global_ctx().subscribe());
let dst_peer = get_inst_config("dst_peer", Some("net_c"), "10.144.144.4", "fd00::4/64");
dst_peer.set_listeners(vec![]);
dst_peer.set_socks5_portal(None);
dst_peer.set_flags(dst_flags);
let dst_peer = Instance::new(dst_peer);
shared_events.push(dst_peer.get_global_ctx().subscribe());
insts.push(src);
insts.push(dst);
insts.push(dst_peer);
dst_idx = 1;
}
center.run().await.unwrap();
for inst in &mut insts {
inst.run().await.unwrap();
}
let ifname = wait_for_tun_ready_event(&mut shared_events[0]).await;
for receiver in shared_events.iter_mut().skip(1) {
assert_eq!(ifname, wait_for_tun_ready_event(receiver).await);
}
insts[0]
.get_conn_manager()
.add_connector(RingTunnelConnector::new(
format!("ring://{}", center.id()).parse().unwrap(),
));
insts[dst_idx]
.get_conn_manager()
.add_connector(RingTunnelConnector::new(
format!("ring://{}", center.id()).parse().unwrap(),
));
wait_for_condition(
|| async {
insts[0].get_peer_manager().list_routes().await.len() >= 2
&& insts[dst_idx].get_peer_manager().list_routes().await.len() >= 2
},
Duration::from_secs(8),
)
.await;
wait_proxy_route_appear(
&insts[0].get_peer_manager(),
"10.144.144.3/24",
insts[dst_idx].peer_id(),
"10.1.2.0/24",
)
.await;
subnet_proxy_test_icmp("10.1.2.4", Duration::from_secs(8)).await;
subnet_proxy_test_tcp("10.1.2.4", "10.1.2.4", Duration::from_secs(8)).await;
subnet_proxy_test_udp("10.1.2.4", "10.1.2.4", Duration::from_secs(8)).await;
assert_tcp_proxy_metric_has_protocol(&insts[0], transport, 1);
for receiver in &mut shared_events {
assert_no_tun_fallback_event(receiver, Duration::from_secs(2)).await;
}
let mut all_insts = vec![center];
all_insts.extend(insts);
drop_insts(all_insts).await;
}
async fn ping_test(from_netns: &str, target_ip: &str, payload_size: Option<usize>) -> bool {
let _g = NetNS::new(Some(ROOT_NETNS_NAME.to_owned())).guard();
let code = tokio::process::Command::new("ip")
@@ -994,6 +1199,293 @@ pub async fn foreign_network_forward_nic_data() {
drop_insts(vec![center_inst, inst1, inst2]).await;
}
#[tokio::test]
#[serial_test::serial]
pub async fn shared_tun_same_namespace_real_tun() {
prepare_linux_namespaces();
let center_cfg = get_inst_config("center", Some("net_a"), "10.144.144.1", "fd00::1/64");
center_cfg.set_listeners(vec![]);
let mut center = Instance::new(center_cfg);
let shared_cfg_1 = get_inst_config("shared_1", Some("net_b"), "10.144.144.2", "fd00::2/64");
shared_cfg_1.set_listeners(vec![]);
shared_cfg_1.set_socks5_portal(None);
let mut shared_flags = shared_cfg_1.get_flags();
shared_flags.dev_name = "et_shared0".to_string();
shared_cfg_1.set_flags(shared_flags.clone());
let mut shared_1 = Instance::new(shared_cfg_1);
let shared_cfg_2 = get_inst_config("shared_2", Some("net_b"), "10.144.144.3", "fd00::3/64");
shared_cfg_2.set_listeners(vec![]);
shared_cfg_2.set_socks5_portal(None);
shared_cfg_2.set_flags(shared_flags);
let mut shared_2 = Instance::new(shared_cfg_2);
let remote_cfg = get_inst_config("remote", Some("net_c"), "10.144.144.4", "fd00::4/64");
remote_cfg.set_listeners(vec![]);
let mut remote = Instance::new(remote_cfg);
let mut shared_1_events = shared_1.get_global_ctx().subscribe();
let mut shared_2_events = shared_2.get_global_ctx().subscribe();
center.run().await.unwrap();
shared_1.run().await.unwrap();
shared_2.run().await.unwrap();
remote.run().await.unwrap();
let shared_1_tun = wait_for_tun_ready_event(&mut shared_1_events).await;
let shared_2_tun = wait_for_tun_ready_event(&mut shared_2_events).await;
assert_eq!(shared_1_tun, shared_2_tun);
shared_1
.get_conn_manager()
.add_connector(RingTunnelConnector::new(
format!("ring://{}", center.id()).parse().unwrap(),
));
shared_2
.get_conn_manager()
.add_connector(RingTunnelConnector::new(
format!("ring://{}", center.id()).parse().unwrap(),
));
remote
.get_conn_manager()
.add_connector(RingTunnelConnector::new(
format!("ring://{}", center.id()).parse().unwrap(),
));
wait_for_condition(
|| async {
shared_1.get_peer_manager().list_routes().await.len() == 3
&& shared_2.get_peer_manager().list_routes().await.len() == 3
&& remote.get_peer_manager().list_routes().await.len() == 3
},
Duration::from_secs(8),
)
.await;
wait_for_condition(
|| async { ping_test("net_c", "10.144.144.2", None).await },
Duration::from_secs(8),
)
.await;
wait_for_condition(
|| async { ping_test("net_c", "10.144.144.3", None).await },
Duration::from_secs(8),
)
.await;
wait_for_condition(
|| async { ping_test("net_b", "10.144.144.4", None).await },
Duration::from_secs(8),
)
.await;
drop_insts(vec![center, shared_1, shared_2, remote]).await;
}
#[tokio::test]
#[serial_test::serial]
pub async fn shared_tun_proxy_cidr_same_namespace_real_tun() {
prepare_linux_namespaces();
let center_cfg = get_inst_config("center", Some("net_a"), "10.144.144.1", "fd00::1/64");
center_cfg.set_listeners(vec![]);
let mut center = Instance::new(center_cfg);
let shared_cfg_1 = get_inst_config("shared_1", Some("net_c"), "10.144.144.2", "fd00::2/64");
shared_cfg_1.set_listeners(vec![]);
shared_cfg_1.set_socks5_portal(None);
shared_cfg_1
.add_proxy_cidr("10.1.2.0/24".parse().unwrap(), None)
.unwrap();
let mut shared_flags = shared_cfg_1.get_flags();
shared_flags.dev_name = "et_shp0".to_string();
shared_cfg_1.set_flags(shared_flags.clone());
let mut shared_1 = Instance::new(shared_cfg_1);
let shared_cfg_2 = get_inst_config("shared_2", Some("net_c"), "10.144.144.3", "fd00::3/64");
shared_cfg_2.set_listeners(vec![]);
shared_cfg_2.set_socks5_portal(None);
shared_cfg_2.set_flags(shared_flags);
let mut shared_2 = Instance::new(shared_cfg_2);
let remote_cfg = get_inst_config("remote", Some("net_b"), "10.144.144.4", "fd00::4/64");
remote_cfg.set_listeners(vec![]);
let mut remote = Instance::new(remote_cfg);
let mut shared_1_events = shared_1.get_global_ctx().subscribe();
let mut shared_2_events = shared_2.get_global_ctx().subscribe();
center.run().await.unwrap();
shared_1.run().await.unwrap();
shared_2.run().await.unwrap();
remote.run().await.unwrap();
let shared_1_tun = wait_for_tun_ready_event(&mut shared_1_events).await;
let shared_2_tun = wait_for_tun_ready_event(&mut shared_2_events).await;
assert_eq!(shared_1_tun, shared_2_tun);
shared_1
.get_conn_manager()
.add_connector(RingTunnelConnector::new(
format!("ring://{}", center.id()).parse().unwrap(),
));
shared_2
.get_conn_manager()
.add_connector(RingTunnelConnector::new(
format!("ring://{}", center.id()).parse().unwrap(),
));
remote
.get_conn_manager()
.add_connector(RingTunnelConnector::new(
format!("ring://{}", center.id()).parse().unwrap(),
));
wait_for_condition(
|| async {
shared_1.get_peer_manager().list_routes().await.len() == 3
&& shared_2.get_peer_manager().list_routes().await.len() == 3
&& remote.get_peer_manager().list_routes().await.len() == 3
},
Duration::from_secs(8),
)
.await;
wait_proxy_route_appear(
&center.get_peer_manager(),
"10.144.144.2/24",
shared_1.peer_id(),
"10.1.2.0/24",
)
.await;
wait_proxy_route_appear(
&remote.get_peer_manager(),
"10.144.144.2/24",
shared_1.peer_id(),
"10.1.2.0/24",
)
.await;
wait_for_condition(
|| async { ping_test("net_a", "10.1.2.4", None).await },
Duration::from_secs(8),
)
.await;
wait_for_condition(
|| async { ping_test("net_b", "10.1.2.4", None).await },
Duration::from_secs(8),
)
.await;
assert_no_tun_fallback_event(&mut shared_1_events, Duration::from_secs(2)).await;
assert_no_tun_fallback_event(&mut shared_2_events, Duration::from_secs(2)).await;
drop_insts(vec![center, shared_1, shared_2, remote]).await;
}
#[tokio::test]
#[serial_test::serial]
pub async fn shared_tun_kcp_proxy_with_source_shared_tun() {
shared_tun_subnet_proxy_transport_test(TcpProxyEntryTransportType::Kcp, true).await;
}
#[tokio::test]
#[serial_test::serial]
pub async fn shared_tun_quic_proxy_with_source_shared_tun() {
shared_tun_subnet_proxy_transport_test(TcpProxyEntryTransportType::Quic, true).await;
}
#[tokio::test]
#[serial_test::serial]
pub async fn shared_tun_kcp_proxy_with_destination_shared_tun() {
shared_tun_subnet_proxy_transport_test(TcpProxyEntryTransportType::Kcp, false).await;
}
#[tokio::test]
#[serial_test::serial]
pub async fn shared_tun_quic_proxy_with_destination_shared_tun() {
shared_tun_subnet_proxy_transport_test(TcpProxyEntryTransportType::Quic, false).await;
}
#[tokio::test]
#[serial_test::serial]
pub async fn same_namespace_no_tun_skips_shared_tun_and_keeps_connectivity() {
prepare_linux_namespaces();
let center_cfg = get_inst_config("center", Some("net_a"), "10.144.144.1", "fd00::1/64");
center_cfg.set_listeners(vec![]);
let mut center = Instance::new(center_cfg);
let no_tun_cfg_1 = get_inst_config("no_tun_1", Some("net_b"), "10.144.144.2", "fd00::2/64");
no_tun_cfg_1.set_listeners(vec![]);
no_tun_cfg_1.set_socks5_portal(None);
let mut no_tun_flags = no_tun_cfg_1.get_flags();
no_tun_flags.dev_name = "et_shared_disabled0".to_string();
no_tun_flags.no_tun = true;
no_tun_cfg_1.set_flags(no_tun_flags.clone());
let mut no_tun_1 = Instance::new(no_tun_cfg_1);
let no_tun_cfg_2 = get_inst_config("no_tun_2", Some("net_b"), "10.144.144.3", "fd00::3/64");
no_tun_cfg_2.set_listeners(vec![]);
no_tun_cfg_2.set_socks5_portal(None);
no_tun_cfg_2.set_flags(no_tun_flags);
let mut no_tun_2 = Instance::new(no_tun_cfg_2);
let remote_cfg = get_inst_config("remote", Some("net_c"), "10.144.144.4", "fd00::4/64");
remote_cfg.set_listeners(vec![]);
let mut remote = Instance::new(remote_cfg);
let mut no_tun_1_events = no_tun_1.get_global_ctx().subscribe();
let mut no_tun_2_events = no_tun_2.get_global_ctx().subscribe();
center.run().await.unwrap();
no_tun_1.run().await.unwrap();
no_tun_2.run().await.unwrap();
remote.run().await.unwrap();
assert_no_tun_ready_event(&mut no_tun_1_events, Duration::from_secs(2)).await;
assert_no_tun_ready_event(&mut no_tun_2_events, Duration::from_secs(2)).await;
no_tun_1
.get_conn_manager()
.add_connector(RingTunnelConnector::new(
format!("ring://{}", center.id()).parse().unwrap(),
));
no_tun_2
.get_conn_manager()
.add_connector(RingTunnelConnector::new(
format!("ring://{}", center.id()).parse().unwrap(),
));
remote
.get_conn_manager()
.add_connector(RingTunnelConnector::new(
format!("ring://{}", center.id()).parse().unwrap(),
));
wait_for_condition(
|| async {
no_tun_1.get_peer_manager().list_routes().await.len() == 3
&& no_tun_2.get_peer_manager().list_routes().await.len() == 3
&& remote.get_peer_manager().list_routes().await.len() == 3
},
Duration::from_secs(8),
)
.await;
wait_for_condition(
|| async { ping_test("net_c", "10.144.144.2", None).await },
Duration::from_secs(8),
)
.await;
wait_for_condition(
|| async { ping_test("net_c", "10.144.144.3", None).await },
Duration::from_secs(8),
)
.await;
drop_insts(vec![center, no_tun_1, no_tun_2, remote]).await;
}
use std::{net::SocketAddr, str::FromStr};
use defguard_wireguard_rs::{