From a071b0786b9f46b21721b9040b02c05f7a80b007 Mon Sep 17 00:00:00 2001 From: Yujia Qiao Date: Fri, 7 Jan 2022 15:57:26 +0800 Subject: [PATCH] fix: log error when failed to listen on udp port --- src/client.rs | 19 +++++++++--- src/server.rs | 84 +++++++++++++++++++++++++++++++++------------------ 2 files changed, 70 insertions(+), 33 deletions(-) diff --git a/src/client.rs b/src/client.rs index 89d0064..6ee34d5 100644 --- a/src/client.rs +++ b/src/client.rs @@ -17,7 +17,7 @@ use tokio::io::{self, copy_bidirectional, AsyncWriteExt}; use tokio::net::{TcpStream, UdpSocket}; use tokio::sync::{broadcast, mpsc, oneshot, RwLock}; use tokio::time::{self, Duration}; -use tracing::{debug, error, info, instrument, warn, Instrument, Span}; +use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span}; #[cfg(feature = "noise")] use crate::transport::NoiseTransport; @@ -236,8 +236,13 @@ async fn run_data_channel_for_udp(conn: T::Stream, local_addr: &st // Keep sending items from the outbound channel to the server tokio::spawn(async move { while let Some(t) = outbound_rx.recv().await { - debug!("outbound {:?}", t); - if t.write(&mut wr).await.is_err() { + trace!("outbound {:?}", t); + if let Err(e) = t + .write(&mut wr) + .await + .with_context(|| "Failed to forward UDP traffic to the server") + { + debug!("{:?}", e); break; } } @@ -245,7 +250,10 @@ async fn run_data_channel_for_udp(conn: T::Stream, local_addr: &st loop { // Read a packet from the server - let packet = UdpTraffic::read(&mut rd).await?; + let hdr_len = rd.read_u16().await?; + let packet = UdpTraffic::read(&mut rd, hdr_len) + .await + .with_context(|| "Failed to read UDPTraffic from the server")?; let m = port_map.read().await; if m.get(&packet.from).is_none() { @@ -290,6 +298,7 @@ async fn run_data_channel_for_udp(conn: T::Stream, local_addr: &st } // Run a UdpSocket for the visitor `from` +#[instrument(skip_all, fields(from))] async fn run_udp_forwarder( s: UdpSocket, mut inbound_rx: mpsc::Receiver, @@ -297,6 +306,7 @@ async fn run_udp_forwarder( from: SocketAddr, port_map: UdpPortMap, ) -> Result<()> { + debug!("Forwarder created"); let mut buf = BytesMut::new(); buf.resize(UDP_BUFFER_SIZE, 0); @@ -336,6 +346,7 @@ async fn run_udp_forwarder( let mut port_map = port_map.write().await; port_map.remove(&from); + debug!("Forwarder dropped"); Ok(()) } diff --git a/src/server.rs b/src/server.rs index 0e2bbcc..ef2c85d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -162,12 +162,12 @@ impl<'a, T: 'static + Transport> Server<'a, T> { } Ok((conn, addr)) => { backoff.reset(); - debug!("Incomming connection from {}", addr); + debug!("Incoming connection from {}", addr); let services = self.services.clone(); let control_channels = self.control_channels.clone(); tokio::spawn(async move { - if let Err(err) = handle_connection(conn, addr, services, control_channels).await.with_context(||"Failed to handle a connection to `server.bind_addr`") { + if let Err(err) = handle_connection(conn, services, control_channels).await { error!("{:?}", err); } }.instrument(info_span!("handle_connection", %addr))); @@ -215,7 +215,6 @@ impl<'a, T: 'static + Transport> Server<'a, T> { // Handle connections to `server.bind_addr` async fn handle_connection( mut conn: T::Stream, - addr: SocketAddr, services: Arc>>, control_channels: Arc>>, ) -> Result<()> { @@ -223,8 +222,7 @@ async fn handle_connection( let hello = read_hello(&mut conn).await?; match hello { ControlChannelHello(_, service_digest) => { - do_control_channel_handshake(conn, addr, services, control_channels, service_digest) - .await?; + do_control_channel_handshake(conn, services, control_channels, service_digest).await?; } DataChannelHello(_, nonce) => { do_data_channel_handshake(conn, control_channels, nonce).await?; @@ -235,12 +233,11 @@ async fn handle_connection( async fn do_control_channel_handshake( mut conn: T::Stream, - addr: SocketAddr, services: Arc>>, control_channels: Arc>>, service_digest: ServiceDigest, ) -> Result<()> { - info!("New control channel incomming from {}", addr); + info!("New control channel incoming"); // Generate a nonce let mut nonce = vec![0u8; HASH_WIDTH_IN_BYTES]; @@ -321,6 +318,8 @@ async fn do_data_channel_handshake( control_channels: Arc>>, nonce: Nonce, ) -> Result<()> { + info!("New control channel incoming"); + // Validate let control_channels_guard = control_channels.read().await; match control_channels_guard.get2(&nonce) { @@ -358,27 +357,6 @@ where // Store data channel creation requests let (data_ch_req_tx, data_ch_req_rx) = mpsc::unbounded_channel(); - match service.service_type { - ServiceType::Tcp => tokio::spawn( - run_tcp_connection_pool::( - service.bind_addr.clone(), - data_ch_rx, - data_ch_req_tx.clone(), - shutdown_tx.subscribe(), - ) - .instrument(Span::current()), - ), - ServiceType::Udp => tokio::spawn( - run_udp_connection_pool::( - service.bind_addr.clone(), - data_ch_rx, - data_ch_req_tx.clone(), - shutdown_tx.subscribe(), - ) - .instrument(Span::current()), - ), - }; - // Cache some data channels for later use let pool_size = match service.service_type { ServiceType::Tcp => TCP_POOL_SIZE, @@ -391,6 +369,43 @@ where }; } + let shutdown_rx_clone = shutdown_tx.subscribe(); + let bind_addr = service.bind_addr.clone(); + match service.service_type { + ServiceType::Tcp => tokio::spawn( + async move { + if let Err(e) = run_tcp_connection_pool::( + bind_addr, + data_ch_rx, + data_ch_req_tx, + shutdown_rx_clone, + ) + .await + .with_context(|| "Failed to run TCP connection pool") + { + error!("{:?}", e); + } + } + .instrument(Span::current()), + ), + ServiceType::Udp => tokio::spawn( + async move { + if let Err(e) = run_udp_connection_pool::( + bind_addr, + data_ch_rx, + data_ch_req_tx, + shutdown_rx_clone, + ) + .await + .with_context(|| "Failed to run TCP connection pool") + { + error!("{:?}", e); + } + } + .instrument(Span::current()), + ), + }; + // Create the control channel let ch = ControlChannel:: { conn, @@ -568,7 +583,16 @@ async fn run_udp_connection_pool( // TODO: Load balance let l: UdpSocket = backoff::future::retry(listen_backoff(), || async { - Ok(UdpSocket::bind(&bind_addr).await?) + Ok(match UdpSocket::bind(&bind_addr) + .await + .with_context(|| "Failed to listen for the service") + { + Err(e) => { + error!("{:?}", e); + Err(e) + } + v => v, + }?) }) .await .with_context(|| "Failed to listen for the service")?; @@ -605,5 +629,7 @@ async fn run_udp_connection_pool( } } + debug!("UDP pool dropped"); + Ok(()) }