diff --git a/easytier/src/connector/udp_hole_punch/sym_to_cone.rs b/easytier/src/connector/udp_hole_punch/sym_to_cone.rs index fda62d85..ddc11401 100644 --- a/easytier/src/connector/udp_hole_punch/sym_to_cone.rs +++ b/easytier/src/connector/udp_hole_punch/sym_to_cone.rs @@ -616,7 +616,7 @@ pub mod tests { .await .is_ok() }, - Duration::from_secs(10), + Duration::from_secs(30), ) .await; println!("{:?}", p_a.list_routes().await); diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index 5506cdf0..4dde5102 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -34,7 +34,7 @@ use crate::gateway::kcp_proxy::{KcpProxyDst, KcpProxyDstRpcService, KcpProxySrc} use crate::gateway::quic_proxy::{QuicProxy, QuicProxyDstRpcService}; use crate::gateway::tcp_proxy::{NatDstTcpConnector, TcpProxy, TcpProxyRpcService}; use crate::gateway::udp_proxy::UdpProxy; -use crate::peer_center::instance::PeerCenterInstance; +use crate::peer_center::instance::{PeerCenterInstance, PeerCenterInstanceService}; use crate::peers::peer_conn::PeerConnId; use crate::peers::peer_manager::{PeerManager, RouteAlgoType}; #[cfg(feature = "tun")] @@ -54,6 +54,7 @@ use crate::proto::api::instance::{ }; use crate::proto::api::manage::NetworkConfig; use crate::proto::common::{PortForwardConfigPb, TunnelInfo}; +use crate::proto::peer_rpc::PeerCenterRpc; use crate::proto::rpc_impl::standalone::RpcServerHook; use crate::proto::rpc_types; use crate::proto::rpc_types::controller::BaseController; @@ -1311,6 +1312,7 @@ impl Instance { port_forward_manage_rpc_service: F, stats_rpc_service: G, config_rpc_service: H, + peer_center_rpc_service: Arc, } #[async_trait::async_trait] @@ -1372,6 +1374,12 @@ impl Instance { fn get_config_service(&self) -> &dyn ConfigRpc { &self.config_rpc_service } + + fn get_peer_center_service( + &self, + ) -> Arc + Send + Sync> { + self.peer_center_rpc_service.clone() + } } ApiRpcServiceImpl { @@ -1432,6 +1440,7 @@ impl Instance { port_forward_manage_rpc_service: self.get_port_forward_manager_rpc_service(), stats_rpc_service: self.get_stats_rpc_service(), config_rpc_service: self.get_config_service(), + peer_center_rpc_service: Arc::new(self.peer_center.get_rpc_service()), } } diff --git a/easytier/src/rpc_service/api.rs b/easytier/src/rpc_service/api.rs index 6a169231..37051e4d 100644 --- a/easytier/src/rpc_service/api.rs +++ b/easytier/src/rpc_service/api.rs @@ -17,6 +17,7 @@ use crate::{ logger::LoggerRpcServer, manage::WebClientServiceServer, }, + peer_rpc::PeerCenterRpcServer, rpc_impl::{service_registry::ServiceRegistry, standalone::StandAloneServer}, rpc_types::error::Error, }, @@ -24,8 +25,9 @@ use crate::{ acl_manage::AclManageRpcService, config::ConfigRpcService, connector_manage::ConnectorManageRpcService, instance_manage::InstanceManageRpcService, logger::LoggerRpcService, mapped_listener_manage::MappedListenerManageRpcService, - peer_manage::PeerManageRpcService, port_forward_manage::PortForwardManageRpcService, - proxy::TcpProxyRpcService, stats::StatsRpcService, vpn_portal::VpnPortalRpcService, + peer_center::PeerCenterManageRpcService, peer_manage::PeerManageRpcService, + port_forward_manage::PortForwardManageRpcService, proxy::TcpProxyRpcService, + stats::StatsRpcService, vpn_portal::VpnPortalRpcService, }, tunnel::{tcp::TcpTunnelListener, TunnelListener}, web_client::DefaultHooks, @@ -149,6 +151,11 @@ fn register_api_rpc_service( )), "", ); + + registry.register( + PeerCenterRpcServer::new(PeerCenterManageRpcService::new(instance_manager.clone())), + "", + ); } fn parse_rpc_portal(rpc_portal: Option) -> anyhow::Result { diff --git a/easytier/src/rpc_service/mod.rs b/easytier/src/rpc_service/mod.rs index e67c2fbe..cddec6af 100644 --- a/easytier/src/rpc_service/mod.rs +++ b/easytier/src/rpc_service/mod.rs @@ -3,6 +3,7 @@ mod api; mod config; mod connector_manage; mod mapped_listener_manage; +mod peer_center; mod peer_manage; mod port_forward_manage; mod proxy; @@ -67,6 +68,14 @@ pub trait InstanceRpcService: Sync + Send { ) -> &dyn crate::proto::api::config::ConfigRpc< Controller = crate::proto::rpc_types::controller::BaseController, >; + fn get_peer_center_service( + &self, + ) -> std::sync::Arc< + dyn crate::proto::peer_rpc::PeerCenterRpc< + Controller = crate::proto::rpc_types::controller::BaseController, + > + Send + + Sync, + >; } fn get_instance_service( diff --git a/easytier/src/rpc_service/peer_center.rs b/easytier/src/rpc_service/peer_center.rs new file mode 100644 index 00000000..66be659d --- /dev/null +++ b/easytier/src/rpc_service/peer_center.rs @@ -0,0 +1,108 @@ +use std::sync::Arc; + +use crate::{ + instance_manager::NetworkInstanceManager, + proto::{ + peer_rpc::{ + GetGlobalPeerMapRequest, GetGlobalPeerMapResponse, PeerCenterRpc, ReportPeersRequest, + ReportPeersResponse, + }, + rpc_types::controller::BaseController, + }, +}; + +#[derive(Clone)] +pub struct PeerCenterManageRpcService { + instance_manager: Arc, +} + +impl PeerCenterManageRpcService { + pub fn new(instance_manager: Arc) -> Self { + Self { instance_manager } + } +} + +#[async_trait::async_trait] +impl PeerCenterRpc for PeerCenterManageRpcService { + type Controller = BaseController; + + async fn get_global_peer_map( + &self, + ctrl: BaseController, + req: GetGlobalPeerMapRequest, + ) -> crate::proto::rpc_types::error::Result { + let instance_service = + super::get_instance_service(&self.instance_manager, &None).map_err(|e| { + let msg = e.to_string(); + if msg.contains("please specify the instance ID") { + anyhow::anyhow!( + "PeerCenter management RPC cannot select an instance automatically \ + when multiple instances are running; please use an API that allows \ + specifying an instance identifier." + ) + } else { + e + } + })?; + + instance_service + .get_peer_center_service() + .get_global_peer_map(ctrl, req) + .await + } + + async fn report_peers( + &self, + _: BaseController, + _: ReportPeersRequest, + ) -> crate::proto::rpc_types::error::Result { + Err(anyhow::anyhow!("not implemented for management API").into()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + instance_manager::NetworkInstanceManager, + proto::{ + peer_rpc::{GetGlobalPeerMapRequest, ReportPeersRequest}, + rpc_types::controller::BaseController, + }, + }; + + fn make_service() -> PeerCenterManageRpcService { + PeerCenterManageRpcService::new(Arc::new(NetworkInstanceManager::new())) + } + + #[tokio::test] + async fn get_global_peer_map_errors_when_no_instance() { + let svc = make_service(); + let result = svc + .get_global_peer_map( + BaseController::default(), + GetGlobalPeerMapRequest::default(), + ) + .await; + assert!(result.is_err()); + let msg = result.unwrap_err().to_string(); + assert!( + msg.contains("No instance matches the selector"), + "unexpected error: {msg}" + ); + } + + #[tokio::test] + async fn report_peers_always_returns_error() { + let svc = make_service(); + let result = svc + .report_peers(BaseController::default(), ReportPeersRequest::default()) + .await; + assert!(result.is_err()); + let msg = result.unwrap_err().to_string(); + assert!( + msg.contains("not implemented for management API"), + "unexpected error: {msg}" + ); + } +}