fix: log error when failed to listen on udp port

This commit is contained in:
Yujia Qiao 2022-01-07 15:57:26 +08:00 committed by Yujia Qiao
parent f1f4044f7b
commit a071b0786b
2 changed files with 70 additions and 33 deletions

View File

@ -17,7 +17,7 @@ use tokio::io::{self, copy_bidirectional, AsyncWriteExt};
use tokio::net::{TcpStream, UdpSocket};
use tokio::sync::{broadcast, mpsc, oneshot, RwLock};
use tokio::time::{self, Duration};
use tracing::{debug, error, info, instrument, warn, Instrument, Span};
use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
#[cfg(feature = "noise")]
use crate::transport::NoiseTransport;
@ -236,8 +236,13 @@ async fn run_data_channel_for_udp<T: Transport>(conn: T::Stream, local_addr: &st
// Keep sending items from the outbound channel to the server
tokio::spawn(async move {
while let Some(t) = outbound_rx.recv().await {
debug!("outbound {:?}", t);
if t.write(&mut wr).await.is_err() {
trace!("outbound {:?}", t);
if let Err(e) = t
.write(&mut wr)
.await
.with_context(|| "Failed to forward UDP traffic to the server")
{
debug!("{:?}", e);
break;
}
}
@ -245,7 +250,10 @@ async fn run_data_channel_for_udp<T: Transport>(conn: T::Stream, local_addr: &st
loop {
// Read a packet from the server
let packet = UdpTraffic::read(&mut rd).await?;
let hdr_len = rd.read_u16().await?;
let packet = UdpTraffic::read(&mut rd, hdr_len)
.await
.with_context(|| "Failed to read UDPTraffic from the server")?;
let m = port_map.read().await;
if m.get(&packet.from).is_none() {
@ -290,6 +298,7 @@ async fn run_data_channel_for_udp<T: Transport>(conn: T::Stream, local_addr: &st
}
// Run a UdpSocket for the visitor `from`
#[instrument(skip_all, fields(from))]
async fn run_udp_forwarder(
s: UdpSocket,
mut inbound_rx: mpsc::Receiver<Bytes>,
@ -297,6 +306,7 @@ async fn run_udp_forwarder(
from: SocketAddr,
port_map: UdpPortMap,
) -> Result<()> {
debug!("Forwarder created");
let mut buf = BytesMut::new();
buf.resize(UDP_BUFFER_SIZE, 0);
@ -336,6 +346,7 @@ async fn run_udp_forwarder(
let mut port_map = port_map.write().await;
port_map.remove(&from);
debug!("Forwarder dropped");
Ok(())
}

View File

@ -162,12 +162,12 @@ impl<'a, T: 'static + Transport> Server<'a, T> {
}
Ok((conn, addr)) => {
backoff.reset();
debug!("Incomming connection from {}", addr);
debug!("Incoming connection from {}", addr);
let services = self.services.clone();
let control_channels = self.control_channels.clone();
tokio::spawn(async move {
if let Err(err) = handle_connection(conn, addr, services, control_channels).await.with_context(||"Failed to handle a connection to `server.bind_addr`") {
if let Err(err) = handle_connection(conn, services, control_channels).await {
error!("{:?}", err);
}
}.instrument(info_span!("handle_connection", %addr)));
@ -215,7 +215,6 @@ impl<'a, T: 'static + Transport> Server<'a, T> {
// Handle connections to `server.bind_addr`
async fn handle_connection<T: 'static + Transport>(
mut conn: T::Stream,
addr: SocketAddr,
services: Arc<RwLock<HashMap<ServiceDigest, ServerServiceConfig>>>,
control_channels: Arc<RwLock<ControlChannelMap<T>>>,
) -> Result<()> {
@ -223,8 +222,7 @@ async fn handle_connection<T: 'static + Transport>(
let hello = read_hello(&mut conn).await?;
match hello {
ControlChannelHello(_, service_digest) => {
do_control_channel_handshake(conn, addr, services, control_channels, service_digest)
.await?;
do_control_channel_handshake(conn, services, control_channels, service_digest).await?;
}
DataChannelHello(_, nonce) => {
do_data_channel_handshake(conn, control_channels, nonce).await?;
@ -235,12 +233,11 @@ async fn handle_connection<T: 'static + Transport>(
async fn do_control_channel_handshake<T: 'static + Transport>(
mut conn: T::Stream,
addr: SocketAddr,
services: Arc<RwLock<HashMap<ServiceDigest, ServerServiceConfig>>>,
control_channels: Arc<RwLock<ControlChannelMap<T>>>,
service_digest: ServiceDigest,
) -> Result<()> {
info!("New control channel incomming from {}", addr);
info!("New control channel incoming");
// Generate a nonce
let mut nonce = vec![0u8; HASH_WIDTH_IN_BYTES];
@ -321,6 +318,8 @@ async fn do_data_channel_handshake<T: 'static + Transport>(
control_channels: Arc<RwLock<ControlChannelMap<T>>>,
nonce: Nonce,
) -> Result<()> {
info!("New control channel incoming");
// Validate
let control_channels_guard = control_channels.read().await;
match control_channels_guard.get2(&nonce) {
@ -358,27 +357,6 @@ where
// Store data channel creation requests
let (data_ch_req_tx, data_ch_req_rx) = mpsc::unbounded_channel();
match service.service_type {
ServiceType::Tcp => tokio::spawn(
run_tcp_connection_pool::<T>(
service.bind_addr.clone(),
data_ch_rx,
data_ch_req_tx.clone(),
shutdown_tx.subscribe(),
)
.instrument(Span::current()),
),
ServiceType::Udp => tokio::spawn(
run_udp_connection_pool::<T>(
service.bind_addr.clone(),
data_ch_rx,
data_ch_req_tx.clone(),
shutdown_tx.subscribe(),
)
.instrument(Span::current()),
),
};
// Cache some data channels for later use
let pool_size = match service.service_type {
ServiceType::Tcp => TCP_POOL_SIZE,
@ -391,6 +369,43 @@ where
};
}
let shutdown_rx_clone = shutdown_tx.subscribe();
let bind_addr = service.bind_addr.clone();
match service.service_type {
ServiceType::Tcp => tokio::spawn(
async move {
if let Err(e) = run_tcp_connection_pool::<T>(
bind_addr,
data_ch_rx,
data_ch_req_tx,
shutdown_rx_clone,
)
.await
.with_context(|| "Failed to run TCP connection pool")
{
error!("{:?}", e);
}
}
.instrument(Span::current()),
),
ServiceType::Udp => tokio::spawn(
async move {
if let Err(e) = run_udp_connection_pool::<T>(
bind_addr,
data_ch_rx,
data_ch_req_tx,
shutdown_rx_clone,
)
.await
.with_context(|| "Failed to run TCP connection pool")
{
error!("{:?}", e);
}
}
.instrument(Span::current()),
),
};
// Create the control channel
let ch = ControlChannel::<T> {
conn,
@ -568,7 +583,16 @@ async fn run_udp_connection_pool<T: Transport>(
// TODO: Load balance
let l: UdpSocket = backoff::future::retry(listen_backoff(), || async {
Ok(UdpSocket::bind(&bind_addr).await?)
Ok(match UdpSocket::bind(&bind_addr)
.await
.with_context(|| "Failed to listen for the service")
{
Err(e) => {
error!("{:?}", e);
Err(e)
}
v => v,
}?)
})
.await
.with_context(|| "Failed to listen for the service")?;
@ -605,5 +629,7 @@ async fn run_udp_connection_pool<T: Transport>(
}
}
debug!("UDP pool dropped");
Ok(())
}