From ecd1ea6f8c2857aca28fe2624e3f9a7aef6f97ea Mon Sep 17 00:00:00 2001 From: KKRainbow <443152178@qq.com> Date: Tue, 10 Mar 2026 08:48:08 +0800 Subject: [PATCH] feat(web): implement secure core-web tunnel with Noise protocol (#1976) Implement end-to-end encryption for core-web connections using the Noise protocol framework with the following changes: Client-side (easytier/src/web_client/): - Add security.rs module with Noise handshake implementation - Add upgrade_client_tunnel() for client-side handshake - Add Noise frame encryption/decryption via TunnelFilter - Integrate GetFeature RPC for capability negotiation - Support secure_mode option to enforce encrypted connections - Handle graceful fallback for backward compatibility Server-side (easytier-web/): - Accept Noise handshake in client_manager - Expose encryption support via GetFeature RPC The implementation uses Noise_NN_25519_ChaChaPoly_SHA256 pattern for encryption without authentication. Provides backward compatibility with automatic fallback to plaintext connections. --- easytier-gui/src-tauri/src/lib.rs | 16 +- easytier-web/src/client_manager/mod.rs | 45 ++-- easytier-web/src/client_manager/session.rs | 10 + easytier/src/core.rs | 1 + easytier/src/proto/web.proto | 9 +- easytier/src/web_client/mod.rs | 73 ++++++- easytier/src/web_client/security.rs | 229 +++++++++++++++++++++ easytier/src/web_client/session.rs | 55 +++-- 8 files changed, 404 insertions(+), 34 deletions(-) create mode 100644 easytier/src/web_client/security.rs diff --git a/easytier-gui/src-tauri/src/lib.rs b/easytier-gui/src-tauri/src/lib.rs index 354bce90..b72a5025 100644 --- a/easytier-gui/src-tauri/src/lib.rs +++ b/easytier-gui/src-tauri/src/lib.rs @@ -472,11 +472,17 @@ async fn init_web_client(app: AppHandle, url: Option) -> Result<(), Stri let hooks = Arc::new(manager::GuiHooks { app: app.clone() }); - let web_client = - web_client::run_web_client(url.as_str(), None, None, instance_manager, Some(hooks)) - .await - .with_context(|| "Failed to initialize web client") - .map_err(|e| format!("{:#}", e))?; + let web_client = web_client::run_web_client( + url.as_str(), + None, + None, + false, + instance_manager, + Some(hooks), + ) + .await + .with_context(|| "Failed to initialize web client") + .map_err(|e| format!("{:#}", e))?; *web_client_guard = Some(web_client); Ok(()) } diff --git a/easytier-web/src/client_manager/mod.rs b/easytier-web/src/client_manager/mod.rs index a3e510bc..390daccb 100644 --- a/easytier-web/src/client_manager/mod.rs +++ b/easytier-web/src/client_manager/mod.rs @@ -13,6 +13,7 @@ use easytier::{ }, rpc_service::remote_client::{self, RemoteClientManager}, tunnel::TunnelListener, + web_client::security, }; use maxminddb::geoip2; use session::{Location, Session}; @@ -99,12 +100,20 @@ impl ClientManager { let feature_flags = self.feature_flags.clone(); self.tasks.spawn(async move { while let Ok(tunnel) = listener.accept().await { + let (tunnel, secure) = match security::accept_or_upgrade_server_tunnel(tunnel).await { + Ok(v) => v, + Err(error) => { + tracing::warn!(%error, "failed to accept secure tunnel, dropping connection"); + continue; + } + }; let info = tunnel.info().unwrap(); let client_url: url::Url = info.remote_addr.unwrap().into(); let location = Self::lookup_location(&client_url, geoip_db.clone()); tracing::info!( - "New session from {:?}, location: {:?}", + "New session from {:?}, secure: {}, location: {:?}", client_url, + secure, location ); let mut session = Session::new( @@ -326,26 +335,36 @@ mod tests { connector, "test", "test", + false, Arc::new(NetworkInstanceManager::new()), None, ); wait_for_condition( - || async { mgr.client_sessions.len() == 1 }, - Duration::from_secs(6), + || async { !mgr.client_sessions.is_empty() }, + Duration::from_secs(12), ) .await; - let mut a = mgr - .client_sessions - .iter() - .next() - .unwrap() - .data() - .read() - .await - .heartbeat_waiter(); - let req = a.recv().await.unwrap(); + let req = tokio::time::timeout(Duration::from_secs(12), async { + loop { + let session = mgr + .client_sessions + .iter() + .next() + .map(|item| item.value().clone()); + let Some(session) = session else { + tokio::time::sleep(Duration::from_millis(100)).await; + continue; + }; + let mut waiter = session.data().read().await.heartbeat_waiter(); + if let Ok(req) = waiter.recv().await { + break req; + } + } + }) + .await + .unwrap(); println!("{:?}", req); println!("{:?}", mgr); } diff --git a/easytier-web/src/client_manager/session.rs b/easytier-web/src/client_manager/session.rs index 4e7a3ecd..40b3980e 100644 --- a/easytier-web/src/client_manager/session.rs +++ b/easytier-web/src/client_manager/session.rs @@ -169,6 +169,16 @@ impl WebServerService for SessionRpcService { } ret } + + async fn get_feature( + &self, + _: BaseController, + _: easytier::proto::web::GetFeatureRequest, + ) -> rpc_types::error::Result { + Ok(easytier::proto::web::GetFeatureResponse { + support_encryption: true, + }) + } } pub struct Session { diff --git a/easytier/src/core.rs b/easytier/src/core.rs index 0a8c6f97..a3434138 100644 --- a/easytier/src/core.rs +++ b/easytier/src/core.rs @@ -1281,6 +1281,7 @@ async fn run_main(cli: Cli) -> anyhow::Result<()> { config_server_url_s, cli.machine_id.clone(), cli.network_options.hostname.clone(), + cli.network_options.secure_mode.unwrap_or(false), manager.clone(), None, ) diff --git a/easytier/src/proto/web.proto b/easytier/src/proto/web.proto index 68429bc9..0b283254 100644 --- a/easytier/src/proto/web.proto +++ b/easytier/src/proto/web.proto @@ -18,6 +18,13 @@ message HeartbeatRequest { message HeartbeatResponse {} +message GetFeatureRequest {} + +message GetFeatureResponse { + bool support_encryption = 1; +} + service WebServerService { rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse); -} \ No newline at end of file + rpc GetFeature(GetFeatureRequest) returns (GetFeatureResponse); +} diff --git a/easytier/src/web_client/mod.rs b/easytier/src/web_client/mod.rs index d3dfb28d..c2953079 100644 --- a/easytier/src/web_client/mod.rs +++ b/easytier/src/web_client/mod.rs @@ -36,6 +36,7 @@ pub struct DefaultHooks; impl WebClientHooks for DefaultHooks {} pub mod controller; +pub mod security; pub mod session; use std::sync::atomic::{AtomicBool, Ordering}; @@ -52,6 +53,7 @@ impl WebClient { connector: T, token: S, hostname: H, + secure_mode: bool, manager: Arc, hooks: Option>, ) -> Self { @@ -68,7 +70,13 @@ impl WebClient { let controller_clone = controller.clone(); let connected_clone = connected.clone(); let tasks = ScopedTask::from(tokio::spawn(async move { - Self::routine(controller_clone, connected_clone, Box::new(connector)).await; + Self::routine( + controller_clone, + connected_clone, + secure_mode, + Box::new(connector), + ) + .await; })); WebClient { @@ -82,6 +90,7 @@ impl WebClient { async fn routine( controller: Arc, connected: Arc, + secure_mode: bool, mut connector: Box, ) { loop { @@ -99,6 +108,65 @@ impl WebClient { log::info!("Successfully connected to {:?}", conn.info()); let mut session = session::Session::new(conn, controller.clone()); + let support_encryption = match tokio::time::timeout( + std::time::Duration::from_secs(3), + session.get_feature(), + ) + .await + { + Ok(Ok(feature)) => feature.support_encryption, + Ok(Err(error)) => { + log::warn!(%error, "GetFeature rpc failed, fallback to legacy tunnel"); + false + } + Err(_) => { + log::warn!("GetFeature rpc timeout, fallback to legacy tunnel"); + false + } + }; + + if support_encryption { + log::info!("Server supports encryption, reconnecting with secure tunnel"); + drop(session); + + let conn = match connector.connect().await { + Ok(conn) => conn, + Err(error) => { + connected.store(false, Ordering::Release); + let wait = 1; + log::warn!(%error, "Failed to reconnect secure tunnel, retrying in {} seconds...", wait); + tokio::time::sleep(std::time::Duration::from_secs(wait)).await; + continue; + } + }; + + let conn = match security::upgrade_client_tunnel(conn).await { + Ok(conn) => conn, + Err(error) => { + connected.store(false, Ordering::Release); + let wait = 1; + log::warn!(%error, "Noise handshake failed, retrying in {} seconds...", wait); + tokio::time::sleep(std::time::Duration::from_secs(wait)).await; + continue; + } + }; + + let mut session = session::Session::new(conn, controller.clone()); + session.start_heartbeat().await; + session.wait().await; + connected.store(false, Ordering::Release); + continue; + } + + if secure_mode { + connected.store(false, Ordering::Release); + let wait = 1; + log::warn!("secure-mode enabled but server does not support encryption, retrying in {} seconds...", wait); + tokio::time::sleep(std::time::Duration::from_secs(wait)).await; + continue; + } + + session.start_heartbeat().await; session.wait().await; connected.store(false, Ordering::Release); } @@ -113,6 +181,7 @@ pub async fn run_web_client( config_server_url_s: &str, machine_id: Option, hostname: Option, + secure_mode: bool, manager: Arc, hooks: Option>, ) -> Result { @@ -160,6 +229,7 @@ pub async fn run_web_client( create_connector_by_url(c_url.as_str(), &global_ctx, IpVersion::Both).await?, token.to_string(), hostname, + secure_mode, manager.clone(), hooks, )) @@ -178,6 +248,7 @@ mod tests { format!("ring://{}/test", uuid::Uuid::new_v4()).as_str(), None, None, + false, manager.clone(), None, ) diff --git a/easytier/src/web_client/security.rs b/easytier/src/web_client/security.rs new file mode 100644 index 00000000..27ba3869 --- /dev/null +++ b/easytier/src/web_client/security.rs @@ -0,0 +1,229 @@ +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use bytes::BytesMut; +use futures::{SinkExt, StreamExt}; +use snow::{params::NoiseParams, Builder, TransportState}; + +use crate::{ + proto::common::TunnelInfo, + tunnel::{ + filter::{TunnelFilter, TunnelWithFilter}, + packet_def::{PacketType, ZCPacket, ZCPacketType}, + SplitTunnel, StreamItem, Tunnel, TunnelError, ZCPacketSink, ZCPacketStream, + }, +}; + +const NOISE_MAGIC: &[u8] = b"ET_WEB_NOISE_V1:"; +const NOISE_PROLOGUE: &[u8] = b"easytier-webclient-noise-v1"; +const NOISE_PATTERN: &str = "Noise_NN_25519_ChaChaPoly_SHA256"; + +struct RawSplitTunnel { + info: Option, + split: Mutex>, +} + +impl RawSplitTunnel { + fn new( + info: Option, + stream: std::pin::Pin>, + sink: std::pin::Pin>, + ) -> Self { + Self { + info, + split: Mutex::new(Some((stream, sink))), + } + } +} + +impl Tunnel for RawSplitTunnel { + fn split(&self) -> SplitTunnel { + self.split + .lock() + .unwrap() + .take() + .expect("split can only be called once") + } + + fn info(&self) -> Option { + self.info.clone() + } +} + +struct NoiseTunnelFilter { + transport: Arc>, +} + +impl TunnelFilter for NoiseTunnelFilter { + type FilterOutput = (); + + fn before_send(&self, data: ZCPacket) -> Option { + let plain = data.tunnel_payload(); + let mut encrypted = vec![0u8; plain.len() + 64]; + let len = self + .transport + .lock() + .unwrap() + .write_message(plain, &mut encrypted) + .ok()?; + let mut packet = ZCPacket::new_with_payload(&encrypted[..len]); + packet.fill_peer_manager_hdr(0, 0, PacketType::Data as u8); + Some(packet) + } + + fn after_received(&self, data: StreamItem) -> Option { + let packet = match data { + Ok(v) => v, + Err(e) => return Some(Err(e)), + }; + let cipher = packet.payload(); + let mut plain = vec![0u8; cipher.len() + 64]; + let len = match self + .transport + .lock() + .unwrap() + .read_message(cipher, &mut plain) + { + Ok(v) => v, + Err(e) => { + return Some(Err(TunnelError::InvalidPacket(format!( + "noise decrypt failed: {e}" + )))); + } + }; + Some(Ok(ZCPacket::new_from_buf( + BytesMut::from(&plain[..len]), + ZCPacketType::DummyTunnel, + ))) + } + + fn filter_output(&self) {} +} + +fn pack_control_packet(payload: &[u8]) -> ZCPacket { + let mut packet = ZCPacket::new_with_payload(payload); + packet.fill_peer_manager_hdr(0, 0, PacketType::Data as u8); + packet +} + +fn encode_noise_payload(buf: &[u8]) -> Vec { + let mut payload = Vec::with_capacity(NOISE_MAGIC.len() + buf.len()); + payload.extend_from_slice(NOISE_MAGIC); + payload.extend_from_slice(buf); + payload +} + +fn decode_noise_payload(payload: &[u8]) -> Option<&[u8]> { + payload.strip_prefix(NOISE_MAGIC) +} + +fn wrap_secure_tunnel( + info: Option, + stream: std::pin::Pin>, + sink: std::pin::Pin>, + transport: TransportState, +) -> Box { + let raw = RawSplitTunnel::new(info, stream, sink); + Box::new(TunnelWithFilter::new( + raw, + NoiseTunnelFilter { + transport: Arc::new(Mutex::new(transport)), + }, + )) +} + +pub async fn upgrade_client_tunnel( + tunnel: Box, +) -> Result, TunnelError> { + let info = tunnel.info(); + let (mut stream, mut sink) = tunnel.split(); + + let params: NoiseParams = NOISE_PATTERN + .parse() + .map_err(|e| TunnelError::InternalError(format!("parse noise params failed: {e}")))?; + let mut state = Builder::new(params) + .prologue(NOISE_PROLOGUE) + .map_err(|e| TunnelError::InternalError(format!("set prologue failed: {e}")))? + .build_initiator() + .map_err(|e| TunnelError::InternalError(format!("build initiator failed: {e}")))?; + + let mut msg1 = vec![0u8; 1024]; + let msg1_len = state + .write_message(&[], &mut msg1) + .map_err(|e| TunnelError::InternalError(format!("write noise msg1 failed: {e}")))?; + sink.send(pack_control_packet(&encode_noise_payload( + &msg1[..msg1_len], + ))) + .await?; + + let msg2_packet = stream.next().await.ok_or(TunnelError::Shutdown)??; + let msg2_cipher = decode_noise_payload(msg2_packet.payload()) + .ok_or_else(|| TunnelError::InvalidPacket("invalid noise msg2 magic".to_string()))?; + let mut msg2 = vec![0u8; 1024]; + state + .read_message(msg2_cipher, &mut msg2) + .map_err(|e| TunnelError::InvalidPacket(format!("read noise msg2 failed: {e}")))?; + + let transport = state + .into_transport_mode() + .map_err(|e| TunnelError::InternalError(format!("switch transport mode failed: {e}")))?; + + Ok(wrap_secure_tunnel(info, stream, sink, transport)) +} + +pub async fn accept_or_upgrade_server_tunnel( + tunnel: Box, +) -> Result<(Box, bool), TunnelError> { + let info = tunnel.info(); + let (stream, sink) = tunnel.split(); + let mut stream = stream; + let mut sink = sink; + + let first_packet = match tokio::time::timeout(Duration::from_secs(1), stream.next()).await { + Ok(Some(Ok(packet))) => packet, + Ok(Some(Err(error))) => return Err(error), + Ok(None) => return Err(TunnelError::Shutdown), + Err(_) => { + return Ok(( + Box::new(RawSplitTunnel::new(info, stream, sink)) as Box, + false, + )); + } + }; + let Some(msg1_cipher) = decode_noise_payload(first_packet.payload()) else { + let stream = Box::pin(futures::stream::once(async move { Ok(first_packet) }).chain(stream)); + return Ok(( + Box::new(RawSplitTunnel::new(info, stream, sink)) as Box, + false, + )); + }; + + let params: NoiseParams = NOISE_PATTERN + .parse() + .map_err(|e| TunnelError::InternalError(format!("parse noise params failed: {e}")))?; + let mut state = Builder::new(params) + .prologue(NOISE_PROLOGUE) + .map_err(|e| TunnelError::InternalError(format!("set prologue failed: {e}")))? + .build_responder() + .map_err(|e| TunnelError::InternalError(format!("build responder failed: {e}")))?; + + let mut msg1 = vec![0u8; 1024]; + state + .read_message(msg1_cipher, &mut msg1) + .map_err(|e| TunnelError::InvalidPacket(format!("read noise msg1 failed: {e}")))?; + + let mut msg2 = vec![0u8; 1024]; + let msg2_len = state + .write_message(&[], &mut msg2) + .map_err(|e| TunnelError::InternalError(format!("write noise msg2 failed: {e}")))?; + sink.send(pack_control_packet(&encode_noise_payload( + &msg2[..msg2_len], + ))) + .await?; + + let transport = state + .into_transport_mode() + .map_err(|e| TunnelError::InternalError(format!("switch transport mode failed: {e}")))?; + + Ok((wrap_secure_tunnel(info, stream, sink, transport), true)) +} diff --git a/easytier/src/web_client/session.rs b/easytier/src/web_client/session.rs index fdec67e8..c4e0d251 100644 --- a/easytier/src/web_client/session.rs +++ b/easytier/src/web_client/session.rs @@ -12,7 +12,10 @@ use crate::{ api::manage::WebClientServiceServer, rpc_impl::bidirect::BidirectRpcManager, rpc_types::controller::BaseController, - web::{HeartbeatRequest, HeartbeatResponse, WebServerServiceClientFactory}, + web::{ + GetFeatureRequest, GetFeatureResponse, HeartbeatRequest, HeartbeatResponse, + WebServerServiceClientFactory, + }, }, tunnel::Tunnel, }; @@ -30,6 +33,7 @@ pub struct Session { controller: Arc, heartbeat_ctx: HeartbeatCtx, + heartbeat_started: std::sync::atomic::AtomicBool, tasks: Mutex>, } @@ -44,15 +48,18 @@ impl Session { "", ); - let mut tasks: JoinSet<()> = JoinSet::new(); - let heartbeat_ctx = - Self::heartbeat_routine(&rpc_mgr, Arc::downgrade(&controller), &mut tasks); + let (tx, _rx1) = broadcast::channel(2); + let heartbeat_ctx = HeartbeatCtx { + notifier: Arc::new(tx), + resp: Arc::new(Mutex::new(None)), + }; Session { rpc_mgr, controller, heartbeat_ctx, - tasks: Mutex::new(tasks), + heartbeat_started: std::sync::atomic::AtomicBool::new(false), + tasks: Mutex::new(JoinSet::new()), } } @@ -60,14 +67,8 @@ impl Session { rpc_mgr: &BidirectRpcManager, controller: Weak, tasks: &mut JoinSet<()>, - ) -> HeartbeatCtx { - let (tx, _rx1) = broadcast::channel(2); - - let ctx = HeartbeatCtx { - notifier: Arc::new(tx), - resp: Arc::new(Mutex::new(None)), - }; - + ctx: HeartbeatCtx, + ) { let mid = get_machine_id(); let inst_id = uuid::Uuid::new_v4(); let token = controller.upgrade().unwrap().token(); @@ -118,8 +119,22 @@ impl Session { } } }); + } - ctx + pub async fn start_heartbeat(&self) { + if self + .heartbeat_started + .swap(true, std::sync::atomic::Ordering::AcqRel) + { + return; + } + let mut tasks = self.tasks.lock().await; + Self::heartbeat_routine( + &self.rpc_mgr, + Arc::downgrade(&self.controller), + &mut tasks, + self.heartbeat_ctx.clone(), + ); } async fn wait_routines(&self) { @@ -135,6 +150,18 @@ impl Session { } } + pub async fn get_feature( + &self, + ) -> Result { + let client = self + .rpc_mgr + .rpc_client() + .scoped_client::>(1, 1, "".to_string()); + client + .get_feature(BaseController::default(), GetFeatureRequest {}) + .await + } + pub async fn wait_next_heartbeat(&self) -> Option { let mut rx = self.heartbeat_ctx.notifier.subscribe(); rx.recv().await.ok()