fix: flush DataChannelCmd::StartForward* commands (#316)

Without flushing this may sit in a kernel buffer and we won't
know if the channel is still alive. This is particularly problematic
for the TCP connection pool.
This commit is contained in:
Ryan Dearing 2024-02-13 20:30:52 -07:00 committed by GitHub
parent 915bf4d21d
commit 63221028c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 17 additions and 10 deletions

View File

@ -1,8 +1,9 @@
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Context, Result};
use async_http_proxy::{http_connect_tokio, http_connect_tokio_with_basic_auth}; use async_http_proxy::{http_connect_tokio, http_connect_tokio_with_basic_auth};
use backoff::{backoff::Backoff, Notify}; use backoff::{backoff::Backoff, Notify};
use socket2::{SockRef, TcpKeepalive}; use socket2::{SockRef, TcpKeepalive};
use std::{future::Future, net::SocketAddr, time::Duration}; use std::{future::Future, net::SocketAddr, time::Duration};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::{ use tokio::{
net::{lookup_host, TcpStream, ToSocketAddrs, UdpSocket}, net::{lookup_host, TcpStream, ToSocketAddrs, UdpSocket},
sync::broadcast, sync::broadcast,
@ -144,3 +145,14 @@ where
} }
} }
} }
pub async fn write_and_flush<T>(conn: &mut T, data: &[u8]) -> Result<()>
where
T: AsyncWrite + Unpin,
{
conn.write_all(data)
.await
.with_context(|| "Failed to write data")?;
conn.flush().await.with_context(|| "Failed to flush data")?;
Ok(())
}

View File

@ -1,7 +1,7 @@
use crate::config::{Config, ServerConfig, ServerServiceConfig, ServiceType, TransportType}; use crate::config::{Config, ServerConfig, ServerServiceConfig, ServiceType, TransportType};
use crate::config_watcher::{ConfigChange, ServerServiceChange}; use crate::config_watcher::{ConfigChange, ServerServiceChange};
use crate::constants::{listen_backoff, UDP_BUFFER_SIZE}; use crate::constants::{listen_backoff, UDP_BUFFER_SIZE};
use crate::helper::retry_notify_with_deadline; use crate::helper::{retry_notify_with_deadline, write_and_flush};
use crate::multi_map::MultiMap; use crate::multi_map::MultiMap;
use crate::protocol::Hello::{ControlChannelHello, DataChannelHello}; use crate::protocol::Hello::{ControlChannelHello, DataChannelHello};
use crate::protocol::{ use crate::protocol::{
@ -498,14 +498,9 @@ struct ControlChannel<T: Transport> {
impl<T: Transport> ControlChannel<T> { impl<T: Transport> ControlChannel<T> {
async fn write_and_flush(&mut self, data: &[u8]) -> Result<()> { async fn write_and_flush(&mut self, data: &[u8]) -> Result<()> {
self.conn write_and_flush(&mut self.conn, data)
.write_all(data)
.await .await
.with_context(|| "Failed to write control cmds")?; .with_context(|| "Failed to write control cmds")?;
self.conn
.flush()
.await
.with_context(|| "Failed to flush control cmds")?;
Ok(()) Ok(())
} }
// Run a control channel // Run a control channel
@ -640,7 +635,7 @@ async fn run_tcp_connection_pool<T: Transport>(
'pool: while let Some(mut visitor) = visitor_rx.recv().await { 'pool: while let Some(mut visitor) = visitor_rx.recv().await {
loop { loop {
if let Some(mut ch) = data_ch_rx.recv().await { if let Some(mut ch) = data_ch_rx.recv().await {
if ch.write_all(&cmd).await.is_ok() { if write_and_flush(&mut ch, &cmd).await.is_ok() {
tokio::spawn(async move { tokio::spawn(async move {
let _ = copy_bidirectional(&mut ch, &mut visitor).await; let _ = copy_bidirectional(&mut ch, &mut visitor).await;
}); });
@ -690,7 +685,7 @@ async fn run_udp_connection_pool<T: Transport>(
.recv() .recv()
.await .await
.ok_or_else(|| anyhow!("No available data channels"))?; .ok_or_else(|| anyhow!("No available data channels"))?;
conn.write_all(&cmd).await?; write_and_flush(&mut conn, &cmd).await?;
let mut buf = [0u8; UDP_BUFFER_SIZE]; let mut buf = [0u8; UDP_BUFFER_SIZE];
loop { loop {