From 51849fdf6a895d47279d6e78102729f83a5e961f Mon Sep 17 00:00:00 2001 From: fanyang89 Date: Sun, 5 Apr 2026 23:20:04 +0800 Subject: [PATCH] fix(connector): classify manual reconnect errors by stage The manual reconnect flow wrapped DNS resolution, transport connect, and handshake in a single tokio::time::timeout, so any stage timeout surfaced as a generic Timeout(Elapsed(())). This made it impossible to diagnose which stage actually failed. Split the timeout budget across three explicit stages (resolve, connect, handshake) inside conn_reconnect_with_ip_version, each returning a typed ConnectError variant that names the stage and whether it was a timeout or an inner failure. - Add ConnectError enum (thiserror) with six failure variants and PeerManagerGone; use humantime for human-readable durations. - Extract PeerManager::connect_tunnel() as pub(crate) so the manual connector can run connect and handshake as separate steps. - Replace raw scheme string comparisons with matches_scheme! macro and TunnelScheme/IpScheme. - Share a single Instant + total Duration budget across all stages so the original 2 s / 20 s reconnect window is preserved. --- Cargo.lock | 1 + easytier/Cargo.toml | 1 + easytier/src/connector/manual.rs | 207 +++++++++++++++++++++-------- easytier/src/peers/peer_manager.rs | 17 ++- 4 files changed, 163 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a72d6dd0..5291266a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2207,6 +2207,7 @@ dependencies = [ "http", "http_req", "humansize", + "humantime", "humantime-serde", "idna 1.0.3", "indoc", diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index 410726c8..7c91e006 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -243,6 +243,7 @@ hickory-server = { version = "0.25.2", features = [ ], optional = true } derive_builder = "0.20.2" +humantime = "2" humantime-serde = "1.1.1" multimap = "0.10.1" version-compare = "0.2.0" diff --git a/easytier/src/connector/manual.rs b/easytier/src/connector/manual.rs index 98bbc1a4..dc97ccbf 100644 --- a/easytier/src/connector/manual.rs +++ b/easytier/src/connector/manual.rs @@ -1,6 +1,7 @@ use std::{ collections::BTreeSet, sync::{Arc, Weak}, + time::{Duration, Instant}, }; use dashmap::DashSet; @@ -16,7 +17,7 @@ use crate::{ }, rpc_types::{self, controller::BaseController}, }, - tunnel::{IpVersion, TunnelConnector}, + tunnel::{matches_scheme, IpVersion, TunnelConnector, TunnelScheme}, utils::weak_upgrade, }; @@ -41,6 +42,31 @@ struct ReconnResult { conn_id: PeerConnId, } +/// Errors that can occur during each stage of a manual reconnect attempt. +/// +/// Each variant identifies the stage (resolve / connect / handshake) and +/// whether the failure was a timeout or an inner error, so log consumers +/// can immediately tell where the reconnect got stuck. +#[derive(Debug, thiserror::Error)] +enum ConnectError { + // Resolve/Connect inner errors use Debug ({0:?}) for richer context. + #[error("resolve failed: {0:?}")] + ResolveFailed(Error), + #[error("resolve timeout after {}", humantime::format_duration(*.0))] + ResolveTimeout(Duration), + #[error("connect failed: {0:?}")] + ConnectFailed(Error), + #[error("connect timeout after {}", humantime::format_duration(*.0))] + ConnectTimeout(Duration), + // Handshake errors (from peer_conn) already have readable Display. + #[error("handshake failed: {0}")] + HandshakeFailed(Error), + #[error("handshake timeout after {}", humantime::format_duration(*.0))] + HandshakeTimeout(Duration), + #[error("connect failed: peer manager is gone")] + PeerManagerGone, +} + struct ConnectorManagerData { connectors: ConnectorMap, reconnecting: DashSet, @@ -59,6 +85,40 @@ pub struct ManualConnectorManager { } impl ManualConnectorManager { + fn reconnect_timeout(dead_url: &url::Url) -> Duration { + use crate::tunnel::IpScheme; + + let use_long_timeout = matches_scheme!( + dead_url, + TunnelScheme::Http + | TunnelScheme::Https + | TunnelScheme::Txt + | TunnelScheme::Srv + | TunnelScheme::Ip(IpScheme::Ws) + | TunnelScheme::Ip(IpScheme::Wss) + ); + + Duration::from_secs(if use_long_timeout { 20 } else { 2 }) + } + + /// Returns the remaining time in the budget, or `None` if exhausted. + fn remaining_budget(started_at: Instant, total_timeout: Duration) -> Option { + total_timeout.checked_sub(started_at.elapsed()) + } + + fn emit_connect_error( + data: &ConnectorManagerData, + dead_url: &url::Url, + ip_version: IpVersion, + error: &ConnectError, + ) { + data.global_ctx.issue_event(GlobalCtxEvent::ConnectError( + dead_url.to_string(), + format!("{:?}", ip_version), + error.to_string(), + )); + } + pub fn new(global_ctx: ArcGlobalCtx, peer_manager: Arc) -> Self { let connectors = Arc::new(DashSet::new()); let tasks = JoinSet::new(); @@ -242,25 +302,58 @@ impl ManualConnectorManager { async fn conn_reconnect_with_ip_version( data: Arc, - dead_url: String, + dead_url: url::Url, ip_version: IpVersion, - ) -> Result { - let connector = - create_connector_by_url(&dead_url, &data.global_ctx.clone(), ip_version).await?; + started_at: Instant, + total_timeout: Duration, + ) -> Result { + // Stage 1: Resolve — create connector (involves DNS internally) + let remaining = Self::remaining_budget(started_at, total_timeout) + .ok_or(ConnectError::ResolveTimeout(Duration::ZERO))?; + let connector = match timeout( + remaining, + create_connector_by_url(dead_url.as_str(), &data.global_ctx, ip_version), + ) + .await + { + Ok(Ok(c)) => c, + Ok(Err(e)) => return Err(ConnectError::ResolveFailed(e)), + Err(_) => return Err(ConnectError::ResolveTimeout(remaining)), + }; data.global_ctx .issue_event(GlobalCtxEvent::Connecting(connector.remote_url())); tracing::info!("reconnect try connect... conn: {:?}", connector); let Some(pm) = data.peer_manager.upgrade() else { - return Err(Error::AnyhowError(anyhow::anyhow!( - "peer manager is gone, cannot reconnect" - ))); + return Err(ConnectError::PeerManagerGone); + }; + + // Stage 2: Connect — transport-layer connect + let remaining = Self::remaining_budget(started_at, total_timeout) + .ok_or(ConnectError::ConnectTimeout(Duration::ZERO))?; + let tunnel = match timeout(remaining, pm.connect_tunnel(connector)).await { + Ok(Ok(t)) => t, + Ok(Err(e)) => return Err(ConnectError::ConnectFailed(e)), + Err(_) => return Err(ConnectError::ConnectTimeout(remaining)), + }; + + // Stage 3: Handshake — noise handshake + peer registration + let remaining = Self::remaining_budget(started_at, total_timeout) + .ok_or(ConnectError::HandshakeTimeout(Duration::ZERO))?; + let (peer_id, conn_id) = match timeout( + remaining, + pm.add_client_tunnel_with_peer_id_hint(tunnel, true, None), + ) + .await + { + Ok(Ok(r)) => r, + Ok(Err(e)) => return Err(ConnectError::HandshakeFailed(e)), + Err(_) => return Err(ConnectError::HandshakeTimeout(remaining)), }; - let (peer_id, conn_id) = pm.try_direct_connect(connector).await?; tracing::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url); Ok(ReconnResult { - dead_url, + dead_url: dead_url.to_string(), peer_id, conn_id, }) @@ -269,26 +362,44 @@ impl ManualConnectorManager { async fn conn_reconnect( data: Arc, dead_url: url::Url, - ) -> Result { + ) -> Result { tracing::info!("reconnect: {}", dead_url); + let total_timeout = Self::reconnect_timeout(&dead_url); + let started_at = Instant::now(); + let mut ip_versions = vec![]; - if dead_url.scheme() == "ring" || dead_url.scheme() == "txt" || dead_url.scheme() == "srv" { + if matches_scheme!( + dead_url, + TunnelScheme::Ring | TunnelScheme::Txt | TunnelScheme::Srv + ) { ip_versions.push(IpVersion::Both); } else { - let converted_dead_url = crate::common::idn::convert_idn_to_ascii(dead_url.clone())?; - let addrs = match socket_addrs(&converted_dead_url, || Some(1000)).await { - Ok(addrs) => addrs, - Err(e) => { - data.global_ctx.issue_event(GlobalCtxEvent::ConnectError( - dead_url.to_string(), - format!("{:?}", IpVersion::Both), - format!("{:?}", e), - )); - return Err(Error::AnyhowError(anyhow::anyhow!( - "get ip from url failed: {:?}", - e - ))); + let converted_dead_url = + match crate::common::idn::convert_idn_to_ascii(dead_url.clone()) { + Ok(url) => url, + Err(error) => { + let error = ConnectError::ResolveFailed(error.into()); + Self::emit_connect_error(&data, &dead_url, IpVersion::Both, &error); + return Err(error); + } + }; + let addrs = match timeout( + total_timeout, + socket_addrs(&converted_dead_url, || Some(1000)), + ) + .await + { + Ok(Ok(addrs)) => addrs, + Ok(Err(error)) => { + let error = ConnectError::ResolveFailed(error); + Self::emit_connect_error(&data, &dead_url, IpVersion::Both, &error); + return Err(error); + } + Err(_) => { + let error = ConnectError::ResolveTimeout(total_timeout); + Self::emit_connect_error(&data, &dead_url, IpVersion::Both, &error); + return Err(error); } }; tracing::info!(?addrs, ?dead_url, "get ip from url done"); @@ -309,50 +420,30 @@ impl ManualConnectorManager { } } - let mut reconn_ret = Err(Error::AnyhowError(anyhow::anyhow!( - "cannot get ip from url" + let mut reconn_ret = Err(ConnectError::ResolveFailed(Error::AnyhowError( + anyhow::anyhow!("cannot get ip from url"), ))); for ip_version in ip_versions { - let use_long_timeout = dead_url.scheme() == "http" - || dead_url.scheme() == "https" - || dead_url.scheme() == "ws" - || dead_url.scheme() == "wss" - || dead_url.scheme() == "txt" - || dead_url.scheme() == "srv"; - let ret = timeout( - // allow http/websocket connector to wait longer - std::time::Duration::from_secs(if use_long_timeout { 20 } else { 2 }), - Self::conn_reconnect_with_ip_version( - data.clone(), - dead_url.to_string(), - ip_version, - ), + let ret = Self::conn_reconnect_with_ip_version( + data.clone(), + dead_url.clone(), + ip_version, + started_at, + total_timeout, ) .await; tracing::info!("reconnect: {} done, ret: {:?}", dead_url, ret); match ret { - Ok(Ok(_)) => { - // 外层和内层都成功:解包并跳出 - reconn_ret = ret.unwrap(); + Ok(result) => { + reconn_ret = Ok(result); break; } - Ok(Err(e)) => { - // 外层成功,内层失败 - reconn_ret = Err(e); - } - Err(e) => { - // 外层失败 - reconn_ret = Err(e.into()); + Err(error) => { + Self::emit_connect_error(&data, &dead_url, ip_version, &error); + reconn_ret = Err(error); } } - - // 发送事件(只有在未 break 时才执行) - data.global_ctx.issue_event(GlobalCtxEvent::ConnectError( - dead_url.to_string(), - format!("{:?}", ip_version), - format!("{:?}", reconn_ret), - )); } reconn_ret diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 57b9ede8..7ea9e2c6 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -624,20 +624,27 @@ impl PeerManager { #[tracing::instrument] pub async fn try_direct_connect_with_peer_id_hint( &self, - mut connector: C, + connector: C, peer_id_hint: Option, ) -> Result<(PeerId, PeerConnId), Error> where C: TunnelConnector + Debug, { - let ns = self.global_ctx.net_ns.clone(); - let t = ns - .run_async(|| async move { connector.connect().await }) - .await?; + let t = self.connect_tunnel(connector).await?; self.add_client_tunnel_with_peer_id_hint(t, true, peer_id_hint) .await } + pub(crate) async fn connect_tunnel(&self, mut connector: C) -> Result, Error> + where + C: TunnelConnector + Debug, + { + let ns = self.global_ctx.net_ns.clone(); + Ok(ns + .run_async(|| async move { connector.connect().await }) + .await?) + } + // avoid loop back to virtual network fn check_remote_addr_not_from_virtual_network( &self,