diff --git a/src/client.rs b/src/client.rs index 40231d0..dfe0740 100644 --- a/src/client.rs +++ b/src/client.rs @@ -186,6 +186,7 @@ async fn do_data_channel_handshake( let v: &[u8; HASH_WIDTH_IN_BYTES] = args.session_key[..].try_into().unwrap(); let hello = Hello::DataChannelHello(CURRENT_PROTO_VERSION, v.to_owned()); conn.write_all(&bincode::serialize(&hello).unwrap()).await?; + conn.flush().await?; Ok(conn) } @@ -387,6 +388,7 @@ impl ControlChannel { Hello::ControlChannelHello(CURRENT_PROTO_VERSION, self.digest[..].try_into().unwrap()); conn.write_all(&bincode::serialize(&hello_send).unwrap()) .await?; + conn.flush().await?; // Read hello debug!("Reading hello"); @@ -408,6 +410,7 @@ impl ControlChannel { let session_key = protocol::digest(&concat); let auth = Auth(session_key); conn.write_all(&bincode::serialize(&auth).unwrap()).await?; + conn.flush().await?; // Read ack debug!("Reading ack"); diff --git a/src/server.rs b/src/server.rs index 26e0b96..61bc083 100644 --- a/src/server.rs +++ b/src/server.rs @@ -265,6 +265,7 @@ async fn do_control_channel_handshake( ); conn.write_all(&bincode::serialize(&hello_send).unwrap()) .await?; + conn.flush().await?; // Lookup the service let service_config = match services.read().await.get(&service_digest) { @@ -314,6 +315,7 @@ async fn do_control_channel_handshake( // Send ack conn.write_all(&bincode::serialize(&Ack::Ok).unwrap()) .await?; + conn.flush().await?; info!(service = %service_config.name, "Control channel established"); let handle = ControlChannelHandle::new(conn, service_config); @@ -467,7 +469,11 @@ impl ControlChannel { val = self.data_ch_req_rx.recv() => { match val { Some(_) => { - if let Err(e) = self.conn.write_all(&cmd).await.with_context(||"Failed to write data cmds") { + if let Err(e) = self.conn.write_all(&cmd).await.with_context(||"Failed to write control cmds") { + error!("{:?}", e); + break; + } + if let Err(e) = self.conn.flush().await.with_context(|| "Failed to flush control cmds") { error!("{:?}", e); break; }