fix peer establish direct conn with subnet proxy to one of local interface (#1782)

* fix peer establish direct conn with subnet proxy to one of local interface

* fix peer mgr ref loop
This commit is contained in:
KKRainbow
2026-01-15 01:00:32 +08:00
committed by GitHub
parent f8b34e3c86
commit 53264f67bf
21 changed files with 354 additions and 170 deletions
+54 -32
View File
@@ -1,6 +1,6 @@
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
sync::{atomic::AtomicBool, Arc},
sync::{atomic::AtomicBool, Arc, Weak},
time::Duration,
};
@@ -46,25 +46,35 @@ struct UdpNatEntry {
src_peer_id: PeerId,
my_peer_id: PeerId,
src_socket: SocketAddr,
socket: UdpSocket,
socket: Option<UdpSocket>,
forward_task: Mutex<Option<JoinHandle<()>>>,
stopped: AtomicBool,
start_time: std::time::Instant,
last_active_time: AtomicCell<std::time::Instant>,
denied: bool,
}
impl UdpNatEntry {
#[tracing::instrument(err(level = Level::WARN))]
fn new(src_peer_id: PeerId, my_peer_id: PeerId, src_socket: SocketAddr) -> Result<Self, Error> {
fn new(
src_peer_id: PeerId,
my_peer_id: PeerId,
src_socket: SocketAddr,
denied: bool,
) -> Result<Self, Error> {
// TODO: try use src port, so we will be ip restricted nat type
let socket2_socket = socket2::Socket::new(
socket2::Domain::IPV4,
socket2::Type::DGRAM,
Some(socket2::Protocol::UDP),
)?;
let dst_socket_addr = "0.0.0.0:0".parse().unwrap();
setup_sokcet2(&socket2_socket, &dst_socket_addr)?;
let socket = UdpSocket::from_std(socket2_socket.into())?;
let socket = if denied {
None
} else {
let socket2_socket = socket2::Socket::new(
socket2::Domain::IPV4,
socket2::Type::DGRAM,
Some(socket2::Protocol::UDP),
)?;
let dst_socket_addr = "0.0.0.0:0".parse().unwrap();
setup_sokcet2(&socket2_socket, &dst_socket_addr)?;
Some(UdpSocket::from_std(socket2_socket.into())?)
};
Ok(Self {
src_peer_id,
@@ -75,6 +85,7 @@ impl UdpNatEntry {
stopped: AtomicBool::new(false),
start_time: std::time::Instant::now(),
last_active_time: AtomicCell::new(std::time::Instant::now()),
denied,
})
}
@@ -165,7 +176,11 @@ impl UdpNatEntry {
let (len, src_socket) = match timeout(
Duration::from_secs(120),
self_clone.socket.recv_buf_from(&mut cur_buf),
self_clone
.socket
.as_ref()
.unwrap()
.recv_buf_from(&mut cur_buf),
)
.await
{
@@ -239,7 +254,7 @@ impl UdpNatEntry {
#[derive(Debug)]
pub struct UdpProxy {
global_ctx: ArcGlobalCtx,
peer_manager: Arc<PeerManager>,
peer_manager: Weak<PeerManager>,
cidr_set: CidrSet,
@@ -299,22 +314,7 @@ impl UdpProxy {
};
// TODO: should it be async.
let dst_socket = if Some(ipv4.get_destination())
== self.global_ctx.get_ipv4().as_ref().map(Ipv4Inet::address)
{
if self
.global_ctx
.is_port_in_running_listeners(udp_packet.get_destination(), true)
&& self
.global_ctx
.is_ip_in_same_network(&std::net::IpAddr::V4(ipv4.get_source()))
{
tracing::debug!(
dst_port = udp_packet.get_destination(),
"dst socket is in running listeners, ignore it"
);
return Some(());
}
let dst_socket = if self.global_ctx.is_ip_local_virtual_ip(&real_dst_ip.into()) {
format!("127.0.0.1:{}", udp_packet.get_destination())
.parse()
.unwrap()
@@ -337,16 +337,29 @@ impl UdpProxy {
.entry(nat_key)
.or_try_insert_with::<Error>(|| {
tracing::info!(?packet, ?ipv4, ?udp_packet, "udp nat table entry created");
let denied = self.global_ctx.should_deny_proxy(
&SocketAddr::new(real_dst_ip.into(), udp_packet.get_destination()),
true,
);
let _g = self.global_ctx.net_ns.guard();
Ok(Arc::new(UdpNatEntry::new(
hdr.from_peer_id.get(),
hdr.to_peer_id.get(),
nat_key.src_socket,
denied,
)?))
})
.ok()?
.clone();
if nat_entry.denied {
tracing::debug!(
dst_port = udp_packet.get_destination(),
"dst socket is in running listeners, ignore it"
);
return Some(());
}
if nat_entry.forward_task.lock().await.is_none() {
nat_entry
.forward_task
@@ -367,6 +380,8 @@ impl UdpProxy {
let _g = self.global_ctx.net_ns.guard();
nat_entry
.socket
.as_ref()
.unwrap()
.send_to(udp_packet.payload(), dst_socket)
.await
};
@@ -405,7 +420,7 @@ impl UdpProxy {
let (sender, receiver) = channel(1024);
let ret = Self {
global_ctx,
peer_manager,
peer_manager: Arc::downgrade(&peer_manager),
cidr_set,
nat_table: Arc::new(DashMap::new()),
sender,
@@ -417,7 +432,10 @@ impl UdpProxy {
}
pub async fn start(self: &Arc<Self>) -> Result<(), Error> {
self.peer_manager
let Some(peer_manager) = self.peer_manager.upgrade() else {
return Err(anyhow::anyhow!("peer manager is gone").into());
};
peer_manager
.add_packet_process_pipeline(Box::new(self.clone()))
.await;
@@ -457,7 +475,11 @@ impl UdpProxy {
hdr.set_latency_first(is_latency_first);
let to_peer_id = hdr.to_peer_id.into();
tracing::trace!(?msg, ?to_peer_id, "udp nat packet response send");
let ret = peer_manager.send_msg_for_proxy(msg, to_peer_id).await;
let Some(pm) = peer_manager.upgrade() else {
tracing::warn!("peer manager is gone, udp proxy send loop exit");
return;
};
let ret = pm.send_msg_for_proxy(msg, to_peer_id).await;
if ret.is_err() {
tracing::error!("send icmp packet to peer failed: {:?}", ret);
}