From fe1c8ad0e94b1810e4577b3fdc901211def060cf Mon Sep 17 00:00:00 2001 From: Eric Zhang Date: Wed, 6 Apr 2022 01:08:35 -0400 Subject: [PATCH] Implement shared protocol and server --- Cargo.lock | 157 +++++++++++++++++++++++++++++++++++++++++++++----- Cargo.toml | 6 +- src/lib.rs | 10 +--- src/main.rs | 24 ++++++-- src/server.rs | 145 ++++++++++++++++++++++++++++++++++++++++++++++ src/shared.rs | 43 ++++++++++++++ 6 files changed, 355 insertions(+), 30 deletions(-) create mode 100644 src/shared.rs diff --git a/Cargo.lock b/Cargo.lock index 852873f..9c78ca9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "ansi_term" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" +dependencies = [ + "winapi", +] + +[[package]] +name = "anyhow" +version = "1.0.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27" + [[package]] name = "atty" version = "0.2.14" @@ -29,13 +44,23 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" name = "bore-cli" version = "0.1.0" dependencies = [ + "anyhow", "clap", + "dashmap", + "rmp-serde", "serde", - "serde_json", "tokio", + "tracing", + "tracing-subscriber", "uuid", ] +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + [[package]] name = "bytes" version = "1.1.0" @@ -78,6 +103,17 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c8858831f7781322e539ea39e72449c46b059638250c14344fec8d0aa6e539c" +dependencies = [ + "cfg-if", + "num_cpus", + "parking_lot", +] + [[package]] name = "getrandom" version = "0.2.6" @@ -120,12 +156,6 @@ dependencies = [ "hashbrown", ] -[[package]] -name = "itoa" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" - [[package]] name = "lazy_static" version = "1.4.0" @@ -195,6 +225,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-traits" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.13.1" @@ -301,10 +340,25 @@ dependencies = [ ] [[package]] -name = "ryu" -version = "1.0.9" +name = "rmp" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" +checksum = "4f55e5fa1446c4d5dd1f5daeed2a4fe193071771a2636274d0d7a3b082aa7ad6" +dependencies = [ + "byteorder", + "num-traits", +] + +[[package]] +name = "rmp-serde" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3eedffbfcc6a428f230c04baf8f59bd73c1781361e4286111fe900849aaddaf" +dependencies = [ + "byteorder", + "rmp", + "serde", +] [[package]] name = "scopeguard" @@ -333,14 +387,12 @@ dependencies = [ ] [[package]] -name = "serde_json" -version = "1.0.79" +name = "sharded-slab" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95" +checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" dependencies = [ - "itoa", - "ryu", - "serde", + "lazy_static", ] [[package]] @@ -400,6 +452,15 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" +[[package]] +name = "thread_local" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180" +dependencies = [ + "once_cell", +] + [[package]] name = "tokio" version = "1.17.0" @@ -431,6 +492,64 @@ dependencies = [ "syn", ] +[[package]] +name = "tracing" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e65ce065b4b5c53e73bb28912318cb8c9e9ad3921f1d669eb0e68b4c8143a2b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90442985ee2f57c9e1b548ee72ae842f4a9a20e3f417cc38dbc5dc684d9bb4ee" +dependencies = [ + "lazy_static", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6923477a48e41c1951f1999ef8bb5a3023eb723ceadafe78ffb65dc366761e3" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9df98b037d039d03400d9dd06b0f8ce05486b5f25e9a2d7d36196e142ebbc52" +dependencies = [ + "ansi_term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", +] + [[package]] name = "unicode-xid" version = "0.2.2" @@ -447,6 +566,12 @@ dependencies = [ "serde", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "version_check" version = "0.9.4" diff --git a/Cargo.toml b/Cargo.toml index 8a61535..4fc90e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,8 +16,12 @@ name = "bore" path = "src/main.rs" [dependencies] +anyhow = "1.0.56" clap = { version = "3.1.8", features = ["derive"] } +dashmap = "5.2.0" +rmp-serde = "1.0.0" serde = { version = "1.0.136", features = ["derive"] } -serde_json = "1.0.79" tokio = { version = "1.17.0", features = ["full"] } +tracing = "0.1.32" +tracing-subscriber = "0.3.10" uuid = { version = "0.8.2", features = ["serde", "v4"] } diff --git a/src/lib.rs b/src/lib.rs index 2cc80d5..4c595e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,12 +17,4 @@ pub mod client; pub mod server; - -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - let result = 2 + 2; - assert_eq!(result, 4); - } -} +pub mod shared; diff --git a/src/main.rs b/src/main.rs index 97927f7..6a06953 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,5 @@ +use anyhow::Result; +use bore_cli::server::Server; use clap::{Parser, Subcommand}; #[derive(Parser, Debug)] @@ -32,10 +34,24 @@ enum Command { }, } -fn main() { +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt::init(); + let args = Args::parse(); + match args.command { + Command::Local { + local_port, + to, + port, + } => { + let _ = (local_port, to, port); + todo!() + } + Command::Server { min_port } => { + Server::new(min_port).listen().await?; + } + } - println!("{:?}", args); - - println!("bore cli running"); + Ok(()) } diff --git a/src/server.rs b/src/server.rs index 30eb1b9..bf1d76f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1 +1,146 @@ //! Server implementation for the `bore` service. + +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::{Context, Result}; +use dashmap::DashMap; +use serde::de::DeserializeOwned; +use serde::Serialize; +use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::time::{sleep, timeout}; +use tracing::{info, info_span, warn, Instrument}; +use uuid::Uuid; + +use crate::shared::{proxy, ClientMessage, ServerMessage, CONTROL_PORT}; + +/// State structure for the server. +pub struct Server { + /// The minimum TCP port that can be forwarded. + pub min_port: u16, + + /// Concurrent map of IDs to incoming connections. + conns: Arc>, +} + +impl Server { + /// Create a new server with a specified minimum port number. + pub fn new(min_port: u16) -> Server { + Server { + min_port, + conns: Arc::new(DashMap::new()), + } + } + + /// Start the server, listening for new connections. + pub async fn listen(self) -> Result<()> { + let this = Arc::new(self); + let addr = SocketAddr::from(([0, 0, 0, 0], CONTROL_PORT)); + let listener = TcpListener::bind(&addr).await?; + info!(?addr, "server listening"); + + loop { + let (stream, addr) = listener.accept().await?; + let this = Arc::clone(&this); + tokio::spawn( + async move { + info!("incoming connection"); + if let Err(err) = this.handle_connection(stream).await { + warn!(%err, "connection exited with error"); + } else { + info!("connection exited"); + } + } + .instrument(info_span!("control", ?addr)), + ); + } + } + + async fn handle_connection(&self, stream: TcpStream) -> Result<()> { + let mut stream = BufReader::new(stream); + + let mut buf = Vec::new(); + let msg = next_mp(&mut stream, &mut buf).await?; + + match msg { + Some(ClientMessage::Hello(port)) => { + if port < self.min_port { + warn!(?port, "client port number too low"); + return Ok(()); + } + info!(?port, "new client"); + let listener = TcpListener::bind(("::", port)).await?; + loop { + if send_mp(&mut stream, ServerMessage::Heartbeat) + .await + .is_err() + { + // Assume that the TCP connection has been dropped. + return Ok(()); + } + const TIMEOUT: Duration = Duration::from_millis(500); + if let Ok(result) = timeout(TIMEOUT, listener.accept()).await { + let (stream2, addr) = result?; + info!(?addr, ?port, "new connection"); + + let id = Uuid::new_v4(); + let conns = Arc::clone(&self.conns); + conns.insert(id, stream2); + tokio::spawn(async move { + // Remove stale entries to avoid memory leaks. + sleep(Duration::from_secs(10)).await; + if conns.remove(&id).is_some() { + warn!(?id, "removed stale connection"); + } + }); + send_mp(&mut stream, ServerMessage::Connection(id)).await?; + } + } + } + Some(ClientMessage::Accept(id)) => { + info!(?id, "forwarding connection"); + match self.conns.remove(&id) { + Some((_, stream2)) => proxy(stream, stream2).await?, + None => warn!(?id, "missing connection ID"), + } + Ok(()) + } + None => { + warn!("unexpected EOF"); + Ok(()) + } + } + } +} + +impl Default for Server { + fn default() -> Self { + Server::new(1024) + } +} + +/// Read the next null-delimited MessagePack instruction from a stream. +async fn next_mp( + reader: &mut (impl AsyncBufRead + Unpin), + buf: &mut Vec, +) -> Result> { + buf.clear(); + reader.read_until(0, buf).await?; + if buf.is_empty() { + return Ok(None); + } + if buf.last() == Some(&0) { + buf.pop(); + } + Ok(rmp_serde::from_slice(buf).context("failed to parse MessagePack")?) +} + +/// Send a null-terminated MessagePack instruction on a stream. +async fn send_mp(writer: &mut (impl AsyncWrite + Unpin), msg: T) -> Result<()> { + let msg = rmp_serde::to_vec(&msg)?; + writer.write_all(&msg).await?; + writer.write_all(&[0]).await?; + Ok(()) +} diff --git a/src/shared.rs b/src/shared.rs new file mode 100644 index 0000000..af33470 --- /dev/null +++ b/src/shared.rs @@ -0,0 +1,43 @@ +//! Shared data structures, utilities, and protocol definitions. + +use serde::{Deserialize, Serialize}; +use tokio::io::{self, AsyncRead, AsyncWrite}; +use uuid::Uuid; + +/// TCP port used for control connections with the server. +pub const CONTROL_PORT: u16 = 7835; + +/// A message from the client on the control connection. +#[derive(Serialize, Deserialize)] +pub enum ClientMessage { + /// Initial client message specifying a port to forward. + Hello(u16), + + /// Accepts an incoming TCP connection, using this stream as a proxy. + Accept(Uuid), +} + +/// A message from the server on the control connection. +#[derive(Serialize, Deserialize)] +pub enum ServerMessage { + /// No-op used to test if the client is still reachable. + Heartbeat, + + /// Asks the client to accept a forwarded TCP connection. + Connection(Uuid), +} + +/// Copy data mutually between two read/write streams. +pub async fn proxy(stream1: S1, stream2: S2) -> io::Result<()> +where + S1: AsyncRead + AsyncWrite + Unpin, + S2: AsyncRead + AsyncWrite + Unpin, +{ + let (mut s1_read, mut s1_write) = io::split(stream1); + let (mut s2_read, mut s2_write) = io::split(stream2); + tokio::try_join!( + io::copy(&mut s1_read, &mut s2_write), + io::copy(&mut s2_read, &mut s1_write), + )?; + Ok(()) +}