diff --git a/Cargo.lock b/Cargo.lock index 9c78ca9..066bf76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9ecd88a8c8378ca913a680cd98f0f13ac67383d35993f86c90a70e3f137816b" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "ansi_term" version = "0.12.1" @@ -16,6 +31,9 @@ name = "anyhow" version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27" +dependencies = [ + "backtrace", +] [[package]] name = "atty" @@ -34,6 +52,21 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backtrace" +version = "0.3.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e121dee8023ce33ab248d9ce1493df03c3b38a659b240096fcbd7048ff9c31f" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -47,26 +80,26 @@ dependencies = [ "anyhow", "clap", "dashmap", - "rmp-serde", "serde", + "serde_json", "tokio", "tracing", "tracing-subscriber", "uuid", ] -[[package]] -name = "byteorder" -version = "1.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" - [[package]] name = "bytes" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +[[package]] +name = "cc" +version = "1.0.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" + [[package]] name = "cfg-if" version = "1.0.0" @@ -125,6 +158,12 @@ dependencies = [ "wasi 0.10.2+wasi-snapshot-preview1", ] +[[package]] +name = "gimli" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" + [[package]] name = "hashbrown" version = "0.11.2" @@ -156,6 +195,12 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "itoa" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" + [[package]] name = "lazy_static" version = "1.4.0" @@ -193,6 +238,16 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "miniz_oxide" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b" +dependencies = [ + "adler", + "autocfg", +] + [[package]] name = "mio" version = "0.8.2" @@ -225,15 +280,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "num-traits" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" -dependencies = [ - "autocfg", -] - [[package]] name = "num_cpus" version = "1.13.1" @@ -244,6 +290,15 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67ac1d3f9a1d3616fd9a60c8d74296f22406a238b6a72f5cc1e6f314df4ffbf9" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.10.0" @@ -340,25 +395,16 @@ dependencies = [ ] [[package]] -name = "rmp" -version = "0.8.10" +name = "rustc-demangle" +version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f55e5fa1446c4d5dd1f5daeed2a4fe193071771a2636274d0d7a3b082aa7ad6" -dependencies = [ - "byteorder", - "num-traits", -] +checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" [[package]] -name = "rmp-serde" -version = "1.0.0" +name = "ryu" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3eedffbfcc6a428f230c04baf8f59bd73c1781361e4286111fe900849aaddaf" -dependencies = [ - "byteorder", - "rmp", - "serde", -] +checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" [[package]] name = "scopeguard" @@ -386,6 +432,17 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_json" +version = "1.0.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "sharded-slab" version = "0.1.4" diff --git a/Cargo.toml b/Cargo.toml index 4fc90e2..2b20b59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,11 +16,11 @@ name = "bore" path = "src/main.rs" [dependencies] -anyhow = "1.0.56" +anyhow = { version = "1.0.56", features = ["backtrace"] } clap = { version = "3.1.8", features = ["derive"] } dashmap = "5.2.0" -rmp-serde = "1.0.0" serde = { version = "1.0.136", features = ["derive"] } +serde_json = "1.0.79" tokio = { version = "1.17.0", features = ["full"] } tracing = "0.1.32" tracing-subscriber = "0.3.10" diff --git a/src/client.rs b/src/client.rs index 7fa7770..79e4082 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1 +1,97 @@ //! Client implementation for the `bore` service. + +use std::sync::Arc; + +use anyhow::{bail, Context, Result}; +use tokio::{io::BufReader, net::TcpStream}; +use tracing::{error, info, info_span, warn, Instrument}; +use uuid::Uuid; + +use crate::shared::{proxy, recv_json, send_json, ClientMessage, ServerMessage, CONTROL_PORT}; + +/// State structure for the client. +pub struct Client { + /// Control connection to the server. + conn: Option>, + + /// Destination address of the server. + to: String, + + /// Local port that is forwarded. + local_port: u16, + + /// Port that is publicly available on the remote. + remote_port: u16, +} + +impl Client { + /// Create a new client. + pub async fn new(local_port: u16, to: &str, port: u16) -> Result { + let stream = TcpStream::connect((to, CONTROL_PORT)).await?; + let mut stream = BufReader::new(stream); + + send_json(&mut stream, ClientMessage::Hello(port)).await?; + let remote_port = match recv_json(&mut stream, &mut Vec::new()).await? { + Some(ServerMessage::Hello(remote_port)) => remote_port, + Some(ServerMessage::Error(message)) => bail!("server error: {message}"), + Some(_) => bail!("unexpected initial non-hello message"), + None => bail!("unexpected EOF"), + }; + info!(remote_port, "connected to server"); + info!("listening at {to}:{remote_port}"); + + Ok(Client { + conn: Some(stream), + to: to.to_string(), + local_port, + remote_port, + }) + } + + /// Returns the port publicly available on the remote. + pub fn remote_port(&self) -> u16 { + self.remote_port + } + + /// Start the client, listening for new connections. + pub async fn listen(mut self) -> Result<()> { + let mut conn = self.conn.take().unwrap(); + let this = Arc::new(self); + let mut buf = Vec::new(); + loop { + let msg = recv_json(&mut conn, &mut buf).await?; + match msg { + Some(ServerMessage::Hello(_)) => warn!("unexpected hello"), + Some(ServerMessage::Heartbeat) => (), + Some(ServerMessage::Connection(id)) => { + let this = Arc::clone(&this); + tokio::spawn( + async move { + info!("new connection"); + match this.handle_connection(id).await { + Ok(_) => info!("connection exited"), + Err(err) => warn!(%err, "connection exited with error"), + } + } + .instrument(info_span!("proxy", %id)), + ); + } + Some(ServerMessage::Error(err)) => error!(%err, "server error"), + None => return Ok(()), + } + } + } + + async fn handle_connection(&self, id: Uuid) -> Result<()> { + let local_conn = TcpStream::connect(("localhost", self.local_port)) + .await + .context("failed TCP connection to local port")?; + let mut remote_conn = TcpStream::connect((&self.to[..], CONTROL_PORT)) + .await + .context("failed TCP connection to remote port")?; + + send_json(&mut remote_conn, ClientMessage::Accept(id)).await?; + proxy(local_conn, remote_conn).await?; + Ok(()) + } +} diff --git a/src/main.rs b/src/main.rs index 6a06953..cdcafad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use bore_cli::server::Server; +use bore_cli::{client::Client, server::Server}; use clap::{Parser, Subcommand}; #[derive(Parser, Debug)] @@ -45,8 +45,8 @@ async fn main() -> Result<()> { to, port, } => { - let _ = (local_port, to, port); - todo!() + let client = Client::new(local_port, &to, port).await?; + client.listen().await?; } Command::Server { min_port } => { Server::new(min_port).listen().await?; diff --git a/src/server.rs b/src/server.rs index bf1d76f..f8b523e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,22 +4,20 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use anyhow::{Context, Result}; +use anyhow::Result; use dashmap::DashMap; -use serde::de::DeserializeOwned; -use serde::Serialize; -use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader}; +use tokio::io::BufReader; use tokio::net::{TcpListener, TcpStream}; use tokio::time::{sleep, timeout}; use tracing::{info, info_span, warn, Instrument}; use uuid::Uuid; -use crate::shared::{proxy, ClientMessage, ServerMessage, CONTROL_PORT}; +use crate::shared::{proxy, recv_json, send_json, ClientMessage, ServerMessage, CONTROL_PORT}; /// State structure for the server. pub struct Server { /// The minimum TCP port that can be forwarded. - pub min_port: u16, + min_port: u16, /// Concurrent map of IDs to incoming connections. conns: Arc>, @@ -27,7 +25,7 @@ pub struct Server { impl Server { /// Create a new server with a specified minimum port number. - pub fn new(min_port: u16) -> Server { + pub fn new(min_port: u16) -> Self { Server { min_port, conns: Arc::new(DashMap::new()), @@ -62,18 +60,28 @@ impl Server { let mut stream = BufReader::new(stream); let mut buf = Vec::new(); - let msg = next_mp(&mut stream, &mut buf).await?; + let msg = recv_json(&mut stream, &mut buf).await?; match msg { Some(ClientMessage::Hello(port)) => { - if port < self.min_port { + if port != 0 && port < self.min_port { warn!(?port, "client port number too low"); return Ok(()); } info!(?port, "new client"); - let listener = TcpListener::bind(("::", port)).await?; + let listener = match TcpListener::bind(("::", port)).await { + Ok(listener) => listener, + Err(_) => { + warn!(?port, "could not bind to local port"); + send_json(&mut stream, "port already in use").await?; + return Ok(()); + } + }; + let port = listener.local_addr()?.port(); + send_json(&mut stream, ServerMessage::Hello(port)).await?; + loop { - if send_mp(&mut stream, ServerMessage::Heartbeat) + if send_json(&mut stream, ServerMessage::Heartbeat) .await .is_err() { @@ -92,18 +100,18 @@ impl Server { // Remove stale entries to avoid memory leaks. sleep(Duration::from_secs(10)).await; if conns.remove(&id).is_some() { - warn!(?id, "removed stale connection"); + warn!(%id, "removed stale connection"); } }); - send_mp(&mut stream, ServerMessage::Connection(id)).await?; + send_json(&mut stream, ServerMessage::Connection(id)).await?; } } } Some(ClientMessage::Accept(id)) => { - info!(?id, "forwarding connection"); + info!(%id, "forwarding connection"); match self.conns.remove(&id) { Some((_, stream2)) => proxy(stream, stream2).await?, - None => warn!(?id, "missing connection ID"), + None => warn!(%id, "missing connection"), } Ok(()) } @@ -120,27 +128,3 @@ impl Default for Server { Server::new(1024) } } - -/// Read the next null-delimited MessagePack instruction from a stream. -async fn next_mp( - reader: &mut (impl AsyncBufRead + Unpin), - buf: &mut Vec, -) -> Result> { - buf.clear(); - reader.read_until(0, buf).await?; - if buf.is_empty() { - return Ok(None); - } - if buf.last() == Some(&0) { - buf.pop(); - } - Ok(rmp_serde::from_slice(buf).context("failed to parse MessagePack")?) -} - -/// Send a null-terminated MessagePack instruction on a stream. -async fn send_mp(writer: &mut (impl AsyncWrite + Unpin), msg: T) -> Result<()> { - let msg = rmp_serde::to_vec(&msg)?; - writer.write_all(&msg).await?; - writer.write_all(&[0]).await?; - Ok(()) -} diff --git a/src/shared.rs b/src/shared.rs index af33470..bf18806 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -1,7 +1,9 @@ //! Shared data structures, utilities, and protocol definitions. +use anyhow::{Context, Result}; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use tokio::io::{self, AsyncRead, AsyncWrite}; +use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt}; use uuid::Uuid; /// TCP port used for control connections with the server. @@ -20,11 +22,17 @@ pub enum ClientMessage { /// A message from the server on the control connection. #[derive(Serialize, Deserialize)] pub enum ServerMessage { + /// Response to a client's initial message, with actual public port. + Hello(u16), + /// No-op used to test if the client is still reachable. Heartbeat, /// Asks the client to accept a forwarded TCP connection. Connection(Uuid), + + /// Indicates a server error that terminates the connection. + Error(String), } /// Copy data mutually between two read/write streams. @@ -41,3 +49,27 @@ where )?; Ok(()) } + +/// Read the next null-delimited JSON instruction from a stream. +pub async fn recv_json( + reader: &mut (impl AsyncBufRead + Unpin), + buf: &mut Vec, +) -> Result> { + buf.clear(); + reader.read_until(0, buf).await?; + if buf.is_empty() { + return Ok(None); + } + if buf.last() == Some(&0) { + buf.pop(); + } + Ok(serde_json::from_slice(buf).context("failed to parse JSON")?) +} + +/// Send a null-terminated JSON instruction on a stream. +pub async fn send_json(writer: &mut (impl AsyncWrite + Unpin), msg: T) -> Result<()> { + let msg = serde_json::to_vec(&msg)?; + writer.write_all(&msg).await?; + writer.write_all(&[0]).await?; + Ok(()) +}