fix: flush when handshaking

This commit is contained in:
Yujia Qiao 2022-01-13 11:34:12 +08:00 committed by Yujia Qiao
parent 0b2cb15dae
commit 7a35e9e4f2
2 changed files with 10 additions and 1 deletions

View File

@ -186,6 +186,7 @@ async fn do_data_channel_handshake<T: Transport>(
let v: &[u8; HASH_WIDTH_IN_BYTES] = args.session_key[..].try_into().unwrap(); let v: &[u8; HASH_WIDTH_IN_BYTES] = args.session_key[..].try_into().unwrap();
let hello = Hello::DataChannelHello(CURRENT_PROTO_VERSION, v.to_owned()); let hello = Hello::DataChannelHello(CURRENT_PROTO_VERSION, v.to_owned());
conn.write_all(&bincode::serialize(&hello).unwrap()).await?; conn.write_all(&bincode::serialize(&hello).unwrap()).await?;
conn.flush().await?;
Ok(conn) Ok(conn)
} }
@ -387,6 +388,7 @@ impl<T: 'static + Transport> ControlChannel<T> {
Hello::ControlChannelHello(CURRENT_PROTO_VERSION, self.digest[..].try_into().unwrap()); Hello::ControlChannelHello(CURRENT_PROTO_VERSION, self.digest[..].try_into().unwrap());
conn.write_all(&bincode::serialize(&hello_send).unwrap()) conn.write_all(&bincode::serialize(&hello_send).unwrap())
.await?; .await?;
conn.flush().await?;
// Read hello // Read hello
debug!("Reading hello"); debug!("Reading hello");
@ -408,6 +410,7 @@ impl<T: 'static + Transport> ControlChannel<T> {
let session_key = protocol::digest(&concat); let session_key = protocol::digest(&concat);
let auth = Auth(session_key); let auth = Auth(session_key);
conn.write_all(&bincode::serialize(&auth).unwrap()).await?; conn.write_all(&bincode::serialize(&auth).unwrap()).await?;
conn.flush().await?;
// Read ack // Read ack
debug!("Reading ack"); debug!("Reading ack");

View File

@ -265,6 +265,7 @@ async fn do_control_channel_handshake<T: 'static + Transport>(
); );
conn.write_all(&bincode::serialize(&hello_send).unwrap()) conn.write_all(&bincode::serialize(&hello_send).unwrap())
.await?; .await?;
conn.flush().await?;
// Lookup the service // Lookup the service
let service_config = match services.read().await.get(&service_digest) { let service_config = match services.read().await.get(&service_digest) {
@ -314,6 +315,7 @@ async fn do_control_channel_handshake<T: 'static + Transport>(
// Send ack // Send ack
conn.write_all(&bincode::serialize(&Ack::Ok).unwrap()) conn.write_all(&bincode::serialize(&Ack::Ok).unwrap())
.await?; .await?;
conn.flush().await?;
info!(service = %service_config.name, "Control channel established"); info!(service = %service_config.name, "Control channel established");
let handle = ControlChannelHandle::new(conn, service_config); let handle = ControlChannelHandle::new(conn, service_config);
@ -467,7 +469,11 @@ impl<T: Transport> ControlChannel<T> {
val = self.data_ch_req_rx.recv() => { val = self.data_ch_req_rx.recv() => {
match val { match val {
Some(_) => { Some(_) => {
if let Err(e) = self.conn.write_all(&cmd).await.with_context(||"Failed to write data cmds") { if let Err(e) = self.conn.write_all(&cmd).await.with_context(||"Failed to write control cmds") {
error!("{:?}", e);
break;
}
if let Err(e) = self.conn.flush().await.with_context(|| "Failed to flush control cmds") {
error!("{:?}", e); error!("{:?}", e);
break; break;
} }