fix(connector): classify manual reconnect errors by stage

The manual reconnect flow wrapped DNS resolution, transport connect,
and handshake in a single tokio::time::timeout, so any stage timeout
surfaced as a generic Timeout(Elapsed(())). This made it impossible
to diagnose which stage actually failed.

Split the timeout budget across three explicit stages (resolve,
connect, handshake) inside conn_reconnect_with_ip_version, each
returning a typed ConnectError variant that names the stage and
whether it was a timeout or an inner failure.

- Add ConnectError enum (thiserror) with six failure variants and
  PeerManagerGone; use humantime for human-readable durations.
- Extract PeerManager::connect_tunnel() as pub(crate) so the manual
  connector can run connect and handshake as separate steps.
- Replace raw scheme string comparisons with matches_scheme! macro
  and TunnelScheme/IpScheme.
- Share a single Instant + total Duration budget across all stages
  so the original 2 s / 20 s reconnect window is preserved.
This commit is contained in:
fanyang89
2026-04-05 23:20:04 +08:00
parent cf6dcbc054
commit 51849fdf6a
4 changed files with 163 additions and 63 deletions
Generated
+1
View File
@@ -2207,6 +2207,7 @@ dependencies = [
"http",
"http_req",
"humansize",
"humantime",
"humantime-serde",
"idna 1.0.3",
"indoc",
+1
View File
@@ -243,6 +243,7 @@ hickory-server = { version = "0.25.2", features = [
], optional = true }
derive_builder = "0.20.2"
humantime = "2"
humantime-serde = "1.1.1"
multimap = "0.10.1"
version-compare = "0.2.0"
+149 -58
View File
@@ -1,6 +1,7 @@
use std::{
collections::BTreeSet,
sync::{Arc, Weak},
time::{Duration, Instant},
};
use dashmap::DashSet;
@@ -16,7 +17,7 @@ use crate::{
},
rpc_types::{self, controller::BaseController},
},
tunnel::{IpVersion, TunnelConnector},
tunnel::{matches_scheme, IpVersion, TunnelConnector, TunnelScheme},
utils::weak_upgrade,
};
@@ -41,6 +42,31 @@ struct ReconnResult {
conn_id: PeerConnId,
}
/// Errors that can occur during each stage of a manual reconnect attempt.
///
/// Each variant identifies the stage (resolve / connect / handshake) and
/// whether the failure was a timeout or an inner error, so log consumers
/// can immediately tell where the reconnect got stuck.
#[derive(Debug, thiserror::Error)]
enum ConnectError {
// Resolve/Connect inner errors use Debug ({0:?}) for richer context.
#[error("resolve failed: {0:?}")]
ResolveFailed(Error),
#[error("resolve timeout after {}", humantime::format_duration(*.0))]
ResolveTimeout(Duration),
#[error("connect failed: {0:?}")]
ConnectFailed(Error),
#[error("connect timeout after {}", humantime::format_duration(*.0))]
ConnectTimeout(Duration),
// Handshake errors (from peer_conn) already have readable Display.
#[error("handshake failed: {0}")]
HandshakeFailed(Error),
#[error("handshake timeout after {}", humantime::format_duration(*.0))]
HandshakeTimeout(Duration),
#[error("connect failed: peer manager is gone")]
PeerManagerGone,
}
struct ConnectorManagerData {
connectors: ConnectorMap,
reconnecting: DashSet<url::Url>,
@@ -59,6 +85,40 @@ pub struct ManualConnectorManager {
}
impl ManualConnectorManager {
fn reconnect_timeout(dead_url: &url::Url) -> Duration {
use crate::tunnel::IpScheme;
let use_long_timeout = matches_scheme!(
dead_url,
TunnelScheme::Http
| TunnelScheme::Https
| TunnelScheme::Txt
| TunnelScheme::Srv
| TunnelScheme::Ip(IpScheme::Ws)
| TunnelScheme::Ip(IpScheme::Wss)
);
Duration::from_secs(if use_long_timeout { 20 } else { 2 })
}
/// Returns the remaining time in the budget, or `None` if exhausted.
fn remaining_budget(started_at: Instant, total_timeout: Duration) -> Option<Duration> {
total_timeout.checked_sub(started_at.elapsed())
}
fn emit_connect_error(
data: &ConnectorManagerData,
dead_url: &url::Url,
ip_version: IpVersion,
error: &ConnectError,
) {
data.global_ctx.issue_event(GlobalCtxEvent::ConnectError(
dead_url.to_string(),
format!("{:?}", ip_version),
error.to_string(),
));
}
pub fn new(global_ctx: ArcGlobalCtx, peer_manager: Arc<PeerManager>) -> Self {
let connectors = Arc::new(DashSet::new());
let tasks = JoinSet::new();
@@ -242,25 +302,58 @@ impl ManualConnectorManager {
async fn conn_reconnect_with_ip_version(
data: Arc<ConnectorManagerData>,
dead_url: String,
dead_url: url::Url,
ip_version: IpVersion,
) -> Result<ReconnResult, Error> {
let connector =
create_connector_by_url(&dead_url, &data.global_ctx.clone(), ip_version).await?;
started_at: Instant,
total_timeout: Duration,
) -> Result<ReconnResult, ConnectError> {
// Stage 1: Resolve — create connector (involves DNS internally)
let remaining = Self::remaining_budget(started_at, total_timeout)
.ok_or(ConnectError::ResolveTimeout(Duration::ZERO))?;
let connector = match timeout(
remaining,
create_connector_by_url(dead_url.as_str(), &data.global_ctx, ip_version),
)
.await
{
Ok(Ok(c)) => c,
Ok(Err(e)) => return Err(ConnectError::ResolveFailed(e)),
Err(_) => return Err(ConnectError::ResolveTimeout(remaining)),
};
data.global_ctx
.issue_event(GlobalCtxEvent::Connecting(connector.remote_url()));
tracing::info!("reconnect try connect... conn: {:?}", connector);
let Some(pm) = data.peer_manager.upgrade() else {
return Err(Error::AnyhowError(anyhow::anyhow!(
"peer manager is gone, cannot reconnect"
)));
return Err(ConnectError::PeerManagerGone);
};
// Stage 2: Connect — transport-layer connect
let remaining = Self::remaining_budget(started_at, total_timeout)
.ok_or(ConnectError::ConnectTimeout(Duration::ZERO))?;
let tunnel = match timeout(remaining, pm.connect_tunnel(connector)).await {
Ok(Ok(t)) => t,
Ok(Err(e)) => return Err(ConnectError::ConnectFailed(e)),
Err(_) => return Err(ConnectError::ConnectTimeout(remaining)),
};
// Stage 3: Handshake — noise handshake + peer registration
let remaining = Self::remaining_budget(started_at, total_timeout)
.ok_or(ConnectError::HandshakeTimeout(Duration::ZERO))?;
let (peer_id, conn_id) = match timeout(
remaining,
pm.add_client_tunnel_with_peer_id_hint(tunnel, true, None),
)
.await
{
Ok(Ok(r)) => r,
Ok(Err(e)) => return Err(ConnectError::HandshakeFailed(e)),
Err(_) => return Err(ConnectError::HandshakeTimeout(remaining)),
};
let (peer_id, conn_id) = pm.try_direct_connect(connector).await?;
tracing::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url);
Ok(ReconnResult {
dead_url,
dead_url: dead_url.to_string(),
peer_id,
conn_id,
})
@@ -269,26 +362,44 @@ impl ManualConnectorManager {
async fn conn_reconnect(
data: Arc<ConnectorManagerData>,
dead_url: url::Url,
) -> Result<ReconnResult, Error> {
) -> Result<ReconnResult, ConnectError> {
tracing::info!("reconnect: {}", dead_url);
let total_timeout = Self::reconnect_timeout(&dead_url);
let started_at = Instant::now();
let mut ip_versions = vec![];
if dead_url.scheme() == "ring" || dead_url.scheme() == "txt" || dead_url.scheme() == "srv" {
if matches_scheme!(
dead_url,
TunnelScheme::Ring | TunnelScheme::Txt | TunnelScheme::Srv
) {
ip_versions.push(IpVersion::Both);
} else {
let converted_dead_url = crate::common::idn::convert_idn_to_ascii(dead_url.clone())?;
let addrs = match socket_addrs(&converted_dead_url, || Some(1000)).await {
Ok(addrs) => addrs,
Err(e) => {
data.global_ctx.issue_event(GlobalCtxEvent::ConnectError(
dead_url.to_string(),
format!("{:?}", IpVersion::Both),
format!("{:?}", e),
));
return Err(Error::AnyhowError(anyhow::anyhow!(
"get ip from url failed: {:?}",
e
)));
let converted_dead_url =
match crate::common::idn::convert_idn_to_ascii(dead_url.clone()) {
Ok(url) => url,
Err(error) => {
let error = ConnectError::ResolveFailed(error.into());
Self::emit_connect_error(&data, &dead_url, IpVersion::Both, &error);
return Err(error);
}
};
let addrs = match timeout(
total_timeout,
socket_addrs(&converted_dead_url, || Some(1000)),
)
.await
{
Ok(Ok(addrs)) => addrs,
Ok(Err(error)) => {
let error = ConnectError::ResolveFailed(error);
Self::emit_connect_error(&data, &dead_url, IpVersion::Both, &error);
return Err(error);
}
Err(_) => {
let error = ConnectError::ResolveTimeout(total_timeout);
Self::emit_connect_error(&data, &dead_url, IpVersion::Both, &error);
return Err(error);
}
};
tracing::info!(?addrs, ?dead_url, "get ip from url done");
@@ -309,50 +420,30 @@ impl ManualConnectorManager {
}
}
let mut reconn_ret = Err(Error::AnyhowError(anyhow::anyhow!(
"cannot get ip from url"
let mut reconn_ret = Err(ConnectError::ResolveFailed(Error::AnyhowError(
anyhow::anyhow!("cannot get ip from url"),
)));
for ip_version in ip_versions {
let use_long_timeout = dead_url.scheme() == "http"
|| dead_url.scheme() == "https"
|| dead_url.scheme() == "ws"
|| dead_url.scheme() == "wss"
|| dead_url.scheme() == "txt"
|| dead_url.scheme() == "srv";
let ret = timeout(
// allow http/websocket connector to wait longer
std::time::Duration::from_secs(if use_long_timeout { 20 } else { 2 }),
Self::conn_reconnect_with_ip_version(
data.clone(),
dead_url.to_string(),
ip_version,
),
let ret = Self::conn_reconnect_with_ip_version(
data.clone(),
dead_url.clone(),
ip_version,
started_at,
total_timeout,
)
.await;
tracing::info!("reconnect: {} done, ret: {:?}", dead_url, ret);
match ret {
Ok(Ok(_)) => {
// 外层和内层都成功:解包并跳出
reconn_ret = ret.unwrap();
Ok(result) => {
reconn_ret = Ok(result);
break;
}
Ok(Err(e)) => {
// 外层成功,内层失败
reconn_ret = Err(e);
}
Err(e) => {
// 外层失败
reconn_ret = Err(e.into());
Err(error) => {
Self::emit_connect_error(&data, &dead_url, ip_version, &error);
reconn_ret = Err(error);
}
}
// 发送事件(只有在未 break 时才执行)
data.global_ctx.issue_event(GlobalCtxEvent::ConnectError(
dead_url.to_string(),
format!("{:?}", ip_version),
format!("{:?}", reconn_ret),
));
}
reconn_ret
+12 -5
View File
@@ -624,20 +624,27 @@ impl PeerManager {
#[tracing::instrument]
pub async fn try_direct_connect_with_peer_id_hint<C>(
&self,
mut connector: C,
connector: C,
peer_id_hint: Option<PeerId>,
) -> Result<(PeerId, PeerConnId), Error>
where
C: TunnelConnector + Debug,
{
let ns = self.global_ctx.net_ns.clone();
let t = ns
.run_async(|| async move { connector.connect().await })
.await?;
let t = self.connect_tunnel(connector).await?;
self.add_client_tunnel_with_peer_id_hint(t, true, peer_id_hint)
.await
}
pub(crate) async fn connect_tunnel<C>(&self, mut connector: C) -> Result<Box<dyn Tunnel>, Error>
where
C: TunnelConnector + Debug,
{
let ns = self.global_ctx.net_ns.clone();
Ok(ns
.run_async(|| async move { connector.connect().await })
.await?)
}
// avoid loop back to virtual network
fn check_remote_addr_not_from_virtual_network(
&self,