mirror of https://github.com/rapiz1/rathole.git
docs: add comments and update README.md
This commit is contained in:
parent
b6c353e7bf
commit
347b958e84
14
README.md
14
README.md
|
@ -3,7 +3,7 @@
|
||||||
|
|
||||||
A fast and stable reverse proxy for NAT traversal, written in Rust
|
A fast and stable reverse proxy for NAT traversal, written in Rust
|
||||||
|
|
||||||
rathole, like frp, can help to expose the service on the device behind the NAT to the Internet, via a server with a public IP.
|
rathole, like [frp](https://github.com/fatedier/frp), can help to expose the service on the device behind the NAT to the Internet, via a server with a public IP.
|
||||||
|
|
||||||
## Quickstart
|
## Quickstart
|
||||||
|
|
||||||
|
@ -62,7 +62,7 @@ remote_addr = "example.com:2333" # Necessary. The address of the server
|
||||||
default_token = "default_token_if_not_specify" # Optional. The default token of services, if they don't define their own ones
|
default_token = "default_token_if_not_specify" # Optional. The default token of services, if they don't define their own ones
|
||||||
|
|
||||||
[client.transport]
|
[client.transport]
|
||||||
type = "tcp" # Necessary if multiple transport blocks present. Possibile values: ["tcp", "tls"]. Default: "tcp"
|
type = "tcp" # Optional. Possibile values: ["tcp", "tls"]. Default: "tcp"
|
||||||
[client.transport.tls] # Necessary if `type` is "tls"
|
[client.transport.tls] # Necessary if `type` is "tls"
|
||||||
trusted_root = "ca.pem" # Necessary. The certificate of CA that signed the server's certificate
|
trusted_root = "ca.pem" # Necessary. The certificate of CA that signed the server's certificate
|
||||||
hostname = "example.com" # Optional. The hostname that the client uses to validate the certificate. If not set, fallback to `client.remote_addr`
|
hostname = "example.com" # Optional. The hostname that the client uses to validate the certificate. If not set, fallback to `client.remote_addr`
|
||||||
|
@ -80,7 +80,7 @@ default_token = "default_token_if_not_specify" # Optional
|
||||||
|
|
||||||
[server.transport]
|
[server.transport]
|
||||||
type = "tcp" # Same as `[client.transport]`
|
type = "tcp" # Same as `[client.transport]`
|
||||||
[server.transport.tls]
|
[server.transport.tls] # Necessary if `type` is "tls"
|
||||||
pkcs12 = "identify.pfx" # Necessary. pkcs12 file of server's certificate and private key
|
pkcs12 = "identify.pfx" # Necessary. pkcs12 file of server's certificate and private key
|
||||||
pkcs12_password = "password" # Necessary. Password of the pkcs12 file
|
pkcs12_password = "password" # Necessary. Password of the pkcs12 file
|
||||||
|
|
||||||
|
@ -92,9 +92,9 @@ bind_addr = "0.0.0.0:8081" # Necessary. The address of the service is exposed at
|
||||||
bind_addr = "0.0.0.1:8082"
|
bind_addr = "0.0.0.1:8082"
|
||||||
```
|
```
|
||||||
|
|
||||||
# Benchmark
|
## Benchmark
|
||||||
|
|
||||||
rathole has similiar latency to frp, but can handle more connections. Also it can provide much better bandwidth than frp.
|
rathole has similiar latency to [frp](https://github.com/fatedier/frp), but can handle more connections. Also it can provide much better bandwidth than frp.
|
||||||
|
|
||||||
See also [Benchmark](./doc/benchmark.md).
|
See also [Benchmark](./doc/benchmark.md).
|
||||||
|
|
||||||
|
@ -102,10 +102,10 @@ See also [Benchmark](./doc/benchmark.md).
|
||||||
|
|
||||||

|

|
||||||
|
|
||||||
# Development
|
## Development Status
|
||||||
|
|
||||||
`rathole` is in active development. A load of features is on the way:
|
`rathole` is in active development. A load of features is on the way:
|
||||||
|
- [x] TLS support
|
||||||
- [ ] UDP support
|
- [ ] UDP support
|
||||||
- [ ] Hot reloading
|
- [ ] Hot reloading
|
||||||
- [ ] HTTP APIs for configuration
|
- [ ] HTTP APIs for configuration
|
||||||
|
|
|
@ -17,6 +17,7 @@ use tokio::sync::oneshot;
|
||||||
use tokio::time::{self, Duration};
|
use tokio::time::{self, Duration};
|
||||||
use tracing::{debug, error, info, instrument, Instrument, Span};
|
use tracing::{debug, error, info, instrument, Instrument, Span};
|
||||||
|
|
||||||
|
// The entrypoint of running a client
|
||||||
pub async fn run_client(config: &Config) -> Result<()> {
|
pub async fn run_client(config: &Config) -> Result<()> {
|
||||||
let config = match &config.client {
|
let config = match &config.client {
|
||||||
Some(v) => v,
|
Some(v) => v,
|
||||||
|
@ -40,6 +41,7 @@ pub async fn run_client(config: &Config) -> Result<()> {
|
||||||
type ServiceDigest = protocol::Digest;
|
type ServiceDigest = protocol::Digest;
|
||||||
type Nonce = protocol::Digest;
|
type Nonce = protocol::Digest;
|
||||||
|
|
||||||
|
// Holds the state of a client
|
||||||
struct Client<'a, T: Transport> {
|
struct Client<'a, T: Transport> {
|
||||||
config: &'a ClientConfig,
|
config: &'a ClientConfig,
|
||||||
service_handles: HashMap<String, ControlChannelHandle>,
|
service_handles: HashMap<String, ControlChannelHandle>,
|
||||||
|
@ -47,6 +49,7 @@ struct Client<'a, T: Transport> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, T: 'static + Transport> Client<'a, T> {
|
impl<'a, T: 'static + Transport> Client<'a, T> {
|
||||||
|
// Create a Client from `[client]` config block
|
||||||
async fn from(config: &'a ClientConfig) -> Result<Client<'a, T>> {
|
async fn from(config: &'a ClientConfig) -> Result<Client<'a, T>> {
|
||||||
Ok(Client {
|
Ok(Client {
|
||||||
config,
|
config,
|
||||||
|
@ -55,8 +58,10 @@ impl<'a, T: 'static + Transport> Client<'a, T> {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The entrypoint of Client
|
||||||
async fn run(&mut self) -> Result<()> {
|
async fn run(&mut self) -> Result<()> {
|
||||||
for (name, config) in &self.config.services {
|
for (name, config) in &self.config.services {
|
||||||
|
// Create a control channel for each service defined
|
||||||
let handle = ControlChannelHandle::new(
|
let handle = ControlChannelHandle::new(
|
||||||
(*config).clone(),
|
(*config).clone(),
|
||||||
self.config.remote_addr.clone(),
|
self.config.remote_addr.clone(),
|
||||||
|
@ -65,6 +70,8 @@ impl<'a, T: 'static + Transport> Client<'a, T> {
|
||||||
self.service_handles.insert(name.clone(), handle);
|
self.service_handles.insert(name.clone(), handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Maybe wait for a config change signal for hot reloading
|
||||||
|
// Wait for the shutdown signal
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
val = tokio::signal::ctrl_c() => {
|
val = tokio::signal::ctrl_c() => {
|
||||||
|
@ -130,14 +137,17 @@ async fn run_data_channel<T: Transport>(args: Arc<RunDataChannelArgs<T>>) -> Res
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Control channel, using T as the transport layer
|
||||||
struct ControlChannel<T: Transport> {
|
struct ControlChannel<T: Transport> {
|
||||||
digest: ServiceDigest,
|
digest: ServiceDigest, // SHA256 of the service name
|
||||||
service: ClientServiceConfig,
|
service: ClientServiceConfig, // `[client.services.foo]` config block
|
||||||
shutdown_rx: oneshot::Receiver<u8>,
|
shutdown_rx: oneshot::Receiver<u8>, // Receives the shutdown signal
|
||||||
remote_addr: String,
|
remote_addr: String, // `client.remote_addr`
|
||||||
transport: Arc<T>,
|
transport: Arc<T>, // Wrapper around the transport layer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle of a control channel
|
||||||
|
// Dropping it will also drop the actual control channel
|
||||||
struct ControlChannelHandle {
|
struct ControlChannelHandle {
|
||||||
shutdown_tx: oneshot::Sender<u8>,
|
shutdown_tx: oneshot::Sender<u8>,
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,9 @@ struct RawItem<K1, K2, V>(*mut (K1, K2, V));
|
||||||
unsafe impl<K1, K2, V> Send for RawItem<K1, K2, V> {}
|
unsafe impl<K1, K2, V> Send for RawItem<K1, K2, V> {}
|
||||||
unsafe impl<K1, K2, V> Sync for RawItem<K1, K2, V> {}
|
unsafe impl<K1, K2, V> Sync for RawItem<K1, K2, V> {}
|
||||||
|
|
||||||
|
/// MultiMap is a hash map that can index an item by two keys
|
||||||
|
/// For example, after an item with key (a, b) is insert, `map.get1(a)` and
|
||||||
|
/// `map.get2(b)` both returns the item. Likewise the `remove1` and `remove2`.
|
||||||
pub struct MultiMap<K1, K2, V> {
|
pub struct MultiMap<K1, K2, V> {
|
||||||
map1: HashMap<Key<K1>, RawItem<K1, K2, V>>,
|
map1: HashMap<Key<K1>, RawItem<K1, K2, V>>,
|
||||||
map2: HashMap<Key<K2>, RawItem<K1, K2, V>>,
|
map2: HashMap<Key<K2>, RawItem<K1, K2, V>>,
|
||||||
|
|
100
src/server.rs
100
src/server.rs
|
@ -19,12 +19,13 @@ use tokio::sync::{mpsc, oneshot, RwLock};
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
use tracing::{debug, error, info, info_span, warn, Instrument};
|
use tracing::{debug, error, info, info_span, warn, Instrument};
|
||||||
|
|
||||||
type ServiceDigest = protocol::Digest;
|
type ServiceDigest = protocol::Digest; // SHA256 of a service name
|
||||||
type Nonce = protocol::Digest;
|
type Nonce = protocol::Digest; // Also called `session_key`
|
||||||
|
|
||||||
const POOL_SIZE: usize = 64;
|
const POOL_SIZE: usize = 64; // The number of cached connections
|
||||||
const CHAN_SIZE: usize = 2048;
|
const CHAN_SIZE: usize = 2048; // The capacity of various chans
|
||||||
|
|
||||||
|
// The entrypoint of running a server
|
||||||
pub async fn run_server(config: &Config) -> Result<()> {
|
pub async fn run_server(config: &Config) -> Result<()> {
|
||||||
let config = match &config.server {
|
let config = match &config.server {
|
||||||
Some(config) => config,
|
Some(config) => config,
|
||||||
|
@ -32,6 +33,8 @@ pub async fn run_server(config: &Config) -> Result<()> {
|
||||||
return Err(anyhow!("Try to run as a server, but the configuration is missing. Please add the `[server]` block"))
|
return Err(anyhow!("Try to run as a server, but the configuration is missing. Please add the `[server]` block"))
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
//TODO: Maybe use a Box<dyn trait> here to reduce duplicated code
|
||||||
match config.transport.transport_type {
|
match config.transport.transport_type {
|
||||||
TransportType::Tcp => {
|
TransportType::Tcp => {
|
||||||
let mut server = Server::<TcpTransport>::from(config).await?;
|
let mut server = Server::<TcpTransport>::from(config).await?;
|
||||||
|
@ -42,17 +45,30 @@ pub async fn run_server(config: &Config) -> Result<()> {
|
||||||
server.run().await?;
|
server.run().await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A hash map of ControlChannelHandles, indexed by ServiceDigest or Nonce
|
||||||
|
// See also MultiMap
|
||||||
type ControlChannelMap<T> = MultiMap<ServiceDigest, Nonce, ControlChannelHandle<T>>;
|
type ControlChannelMap<T> = MultiMap<ServiceDigest, Nonce, ControlChannelHandle<T>>;
|
||||||
|
|
||||||
|
// Server holds all states of running a server
|
||||||
struct Server<'a, T: Transport> {
|
struct Server<'a, T: Transport> {
|
||||||
|
// `[server]` config
|
||||||
config: &'a ServerConfig,
|
config: &'a ServerConfig,
|
||||||
|
|
||||||
|
// TODO: Maybe the rwlock is unnecessary.
|
||||||
|
// Keep it until the hot reloading feature is implemented
|
||||||
|
// `[server.services]` config, indexed by ServiceDigest
|
||||||
services: Arc<RwLock<HashMap<ServiceDigest, ServerServiceConfig>>>,
|
services: Arc<RwLock<HashMap<ServiceDigest, ServerServiceConfig>>>,
|
||||||
|
// Collection of contorl channels
|
||||||
control_channels: Arc<RwLock<ControlChannelMap<T>>>,
|
control_channels: Arc<RwLock<ControlChannelMap<T>>>,
|
||||||
|
// Wrapper around the transport layer
|
||||||
transport: Arc<T>,
|
transport: Arc<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Generate a hash map of services which is indexed by ServiceDigest
|
||||||
fn generate_service_hashmap(
|
fn generate_service_hashmap(
|
||||||
server_config: &ServerConfig,
|
server_config: &ServerConfig,
|
||||||
) -> HashMap<ServiceDigest, ServerServiceConfig> {
|
) -> HashMap<ServiceDigest, ServerServiceConfig> {
|
||||||
|
@ -64,6 +80,7 @@ fn generate_service_hashmap(
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, T: 'static + Transport> Server<'a, T> {
|
impl<'a, T: 'static + Transport> Server<'a, T> {
|
||||||
|
// Create a server from `[server]`
|
||||||
pub async fn from(config: &'a ServerConfig) -> Result<Server<'a, T>> {
|
pub async fn from(config: &'a ServerConfig) -> Result<Server<'a, T>> {
|
||||||
Ok(Server {
|
Ok(Server {
|
||||||
config,
|
config,
|
||||||
|
@ -73,7 +90,9 @@ impl<'a, T: 'static + Transport> Server<'a, T> {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The entry point of Server
|
||||||
pub async fn run(&mut self) -> Result<()> {
|
pub async fn run(&mut self) -> Result<()> {
|
||||||
|
// Listen at `server.bind_addr`
|
||||||
let l = self
|
let l = self
|
||||||
.transport
|
.transport
|
||||||
.bind(&self.config.bind_addr)
|
.bind(&self.config.bind_addr)
|
||||||
|
@ -88,14 +107,17 @@ impl<'a, T: 'static + Transport> Server<'a, T> {
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
// Listen for incoming control or data channels
|
// Wait for connections and shutdown signals
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
// Wait for incoming control and data channels
|
||||||
ret = self.transport.accept(&l) => {
|
ret = self.transport.accept(&l) => {
|
||||||
match ret {
|
match ret {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
// Detects whether it's an IO error
|
||||||
if let Some(err) = err.downcast_ref::<io::Error>() {
|
if let Some(err) = err.downcast_ref::<io::Error>() {
|
||||||
// Possibly a EMFILE. So sleep for a while and retry
|
// If it is an IO error, then it's possibly an
|
||||||
|
// EMFILE. So sleep for a while and retry
|
||||||
if let Some(d) = backoff.next_backoff() {
|
if let Some(d) = backoff.next_backoff() {
|
||||||
error!("Failed to accept: {}. Retry in {:?}...", err, d);
|
error!("Failed to accept: {}. Retry in {:?}...", err, d);
|
||||||
time::sleep(d).await;
|
time::sleep(d).await;
|
||||||
|
@ -105,6 +127,8 @@ impl<'a, T: 'static + Transport> Server<'a, T> {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// If it's not an IO error, then it comes from
|
||||||
|
// the transport layer, so just ignore it
|
||||||
}
|
}
|
||||||
Ok((conn, addr)) => {
|
Ok((conn, addr)) => {
|
||||||
backoff.reset();
|
backoff.reset();
|
||||||
|
@ -120,6 +144,7 @@ impl<'a, T: 'static + Transport> Server<'a, T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
// Wait for the shutdown signal
|
||||||
_ = tokio::signal::ctrl_c() => {
|
_ = tokio::signal::ctrl_c() => {
|
||||||
info!("Shuting down gracefully...");
|
info!("Shuting down gracefully...");
|
||||||
break;
|
break;
|
||||||
|
@ -131,6 +156,7 @@ impl<'a, T: 'static + Transport> Server<'a, T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle connections to `server.bind_addr`
|
||||||
async fn handle_connection<T: 'static + Transport>(
|
async fn handle_connection<T: 'static + Transport>(
|
||||||
mut conn: T::Stream,
|
mut conn: T::Stream,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
|
@ -203,8 +229,16 @@ async fn do_control_channel_handshake<T: 'static + Transport>(
|
||||||
);
|
);
|
||||||
bail!("Service {} failed the authentication", service_name);
|
bail!("Service {} failed the authentication", service_name);
|
||||||
} else {
|
} else {
|
||||||
|
// TODO: Here could use some refactor:
|
||||||
|
// 1. Clone the config and drop `services_guard` earlier
|
||||||
|
// 2. Use the result of `insert` to warn. Then no need to call `remove1`
|
||||||
|
|
||||||
let mut h = control_channels.write().await;
|
let mut h = control_channels.write().await;
|
||||||
|
|
||||||
|
// If there's already a control channel for the service, then drop the old one.
|
||||||
|
// Because a control channel doesn't report back when it's dead,
|
||||||
|
// the handle in the map could be stall, dropping the old handle enables
|
||||||
|
// the client to reconnect.
|
||||||
if let Some(_) = h.remove1(&service_digest) {
|
if let Some(_) = h.remove1(&service_digest) {
|
||||||
warn!(
|
warn!(
|
||||||
"Dropping previous control channel for digest {}",
|
"Dropping previous control channel for digest {}",
|
||||||
|
@ -213,6 +247,8 @@ async fn do_control_channel_handshake<T: 'static + Transport>(
|
||||||
}
|
}
|
||||||
|
|
||||||
let service_config = service_config.clone();
|
let service_config = service_config.clone();
|
||||||
|
|
||||||
|
// Drop the rwlock as soon as possible when we're done with it
|
||||||
drop(services_guard);
|
drop(services_guard);
|
||||||
|
|
||||||
// Send ack
|
// Send ack
|
||||||
|
@ -222,7 +258,7 @@ async fn do_control_channel_handshake<T: 'static + Transport>(
|
||||||
info!(service = %service_config.name, "Control channel established");
|
info!(service = %service_config.name, "Control channel established");
|
||||||
let handle = ControlChannelHandle::new(conn, service_config);
|
let handle = ControlChannelHandle::new(conn, service_config);
|
||||||
|
|
||||||
// Drop the old handle
|
// Insert the new handle
|
||||||
let _ = h.insert(service_digest, session_key, handle);
|
let _ = h.insert(service_digest, session_key, handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -242,38 +278,55 @@ async fn do_data_channel_handshake<T: Transport>(
|
||||||
c_ch.conn_pool.data_ch_tx.send(conn).await?;
|
c_ch.conn_pool.data_ch_tx.send(conn).await?;
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
|
// TODO: Maybe print IP here
|
||||||
warn!("Data channel has incorrect nonce");
|
warn!("Data channel has incorrect nonce");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Control channel, using T as the transport layer
|
||||||
struct ControlChannel<T: Transport> {
|
struct ControlChannel<T: Transport> {
|
||||||
conn: T::Stream,
|
conn: T::Stream, // The connection of control channel
|
||||||
service: ServerServiceConfig,
|
service: ServerServiceConfig, // A copy of the corresponding service config
|
||||||
shutdown_rx: oneshot::Receiver<bool>,
|
shutdown_rx: oneshot::Receiver<bool>, // Receives the shutdown signal
|
||||||
visitor_tx: mpsc::Sender<TcpStream>,
|
visitor_tx: mpsc::Sender<TcpStream>, // Receives visitor connections
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The handle of a control channel, along with the handle of a connection pool
|
||||||
|
// Dropping it will drop the actual control channel, because `visitor_tx`
|
||||||
|
// and `shutdown_tx` are closed
|
||||||
struct ControlChannelHandle<T: Transport> {
|
struct ControlChannelHandle<T: Transport> {
|
||||||
|
// Shutdown the control channel.
|
||||||
|
// Not used for now, but can be used for hot reloading
|
||||||
_shutdown_tx: oneshot::Sender<bool>,
|
_shutdown_tx: oneshot::Sender<bool>,
|
||||||
conn_pool: ConnectionPoolHandle<T>,
|
conn_pool: ConnectionPoolHandle<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: 'static + Transport> ControlChannelHandle<T> {
|
impl<T: 'static + Transport> ControlChannelHandle<T> {
|
||||||
|
// Create a control channel handle, where the control channel handling task
|
||||||
|
// and the connection pool task are created.
|
||||||
fn new(conn: T::Stream, service: ServerServiceConfig) -> ControlChannelHandle<T> {
|
fn new(conn: T::Stream, service: ServerServiceConfig) -> ControlChannelHandle<T> {
|
||||||
let (_shutdown_tx, shutdown_rx) = oneshot::channel::<bool>();
|
// Save the name string for logging
|
||||||
let name = service.name.clone();
|
let name = service.name.clone();
|
||||||
|
|
||||||
|
// Create a shutdown channel. The sender is not used for now, but for future use
|
||||||
|
let (_shutdown_tx, shutdown_rx) = oneshot::channel::<bool>();
|
||||||
|
|
||||||
|
// Create and run the connection pool, where the visitors and data channels meet
|
||||||
let conn_pool = ConnectionPoolHandle::new();
|
let conn_pool = ConnectionPoolHandle::new();
|
||||||
let actor: ControlChannel<T> = ControlChannel {
|
|
||||||
|
// Create the control channel
|
||||||
|
let ch: ControlChannel<T> = ControlChannel {
|
||||||
conn,
|
conn,
|
||||||
shutdown_rx,
|
shutdown_rx,
|
||||||
service,
|
service,
|
||||||
visitor_tx: conn_pool.visitor_tx.clone(),
|
visitor_tx: conn_pool.visitor_tx.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Run the control channel
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(err) = actor.run().await {
|
if let Err(err) = ch.run().await {
|
||||||
error!(%name, "{}", err);
|
error!(%name, "{}", err);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -286,8 +339,10 @@ impl<T: 'static + Transport> ControlChannelHandle<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Transport> ControlChannel<T> {
|
impl<T: Transport> ControlChannel<T> {
|
||||||
|
// Run a control channel
|
||||||
#[tracing::instrument(skip(self), fields(service = %self.service.name))]
|
#[tracing::instrument(skip(self), fields(service = %self.service.name))]
|
||||||
async fn run(mut self) -> Result<()> {
|
async fn run(mut self) -> Result<()> {
|
||||||
|
// Where the service is exposed
|
||||||
let l = match TcpListener::bind(&self.service.bind_addr).await {
|
let l = match TcpListener::bind(&self.service.bind_addr).await {
|
||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
@ -303,7 +358,11 @@ impl<T: Transport> ControlChannel<T> {
|
||||||
|
|
||||||
info!("Listening at {}", &self.service.bind_addr);
|
info!("Listening at {}", &self.service.bind_addr);
|
||||||
|
|
||||||
|
// Each `u8` in the chan indicates a data channel creation request
|
||||||
let (data_req_tx, mut data_req_rx) = mpsc::unbounded_channel::<u8>();
|
let (data_req_tx, mut data_req_rx) = mpsc::unbounded_channel::<u8>();
|
||||||
|
|
||||||
|
// The control channel is moved into the task, and sends CreateDataChannel
|
||||||
|
// comamnds to the client when needed
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let cmd = bincode::serialize(&ControlChannelCmd::CreateDataChannel).unwrap();
|
let cmd = bincode::serialize(&ControlChannelCmd::CreateDataChannel).unwrap();
|
||||||
while data_req_rx.recv().await.is_some() {
|
while data_req_rx.recv().await.is_some() {
|
||||||
|
@ -313,32 +372,43 @@ impl<T: Transport> ControlChannel<T> {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Cache some data channels for later use
|
||||||
for _i in 0..POOL_SIZE {
|
for _i in 0..POOL_SIZE {
|
||||||
if let Err(e) = data_req_tx.send(0) {
|
if let Err(e) = data_req_tx.send(0) {
|
||||||
error!("Failed to request data channel {}", e);
|
error!("Failed to request data channel {}", e);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Retry at least every 1s
|
||||||
let mut backoff = ExponentialBackoff {
|
let mut backoff = ExponentialBackoff {
|
||||||
max_interval: Duration::from_secs(1),
|
max_interval: Duration::from_secs(1),
|
||||||
max_elapsed_time: None,
|
max_elapsed_time: None,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Wait for visitors and the shutdown signal
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
// Wait for visitors
|
||||||
val = l.accept() => {
|
val = l.accept() => {
|
||||||
match val {
|
match val {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
// `l` is a TCP listener so this must be a IO error
|
||||||
|
// Possibly a EMFILE. So sleep for a while
|
||||||
error!("{}. Sleep for a while", e);
|
error!("{}. Sleep for a while", e);
|
||||||
if let Some(d) = backoff.next_backoff() {
|
if let Some(d) = backoff.next_backoff() {
|
||||||
time::sleep(d).await;
|
time::sleep(d).await;
|
||||||
} else {
|
} else {
|
||||||
|
// This branch will never be reached for current backoff policy
|
||||||
error!("Too many retries. Aborting...");
|
error!("Too many retries. Aborting...");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Ok((incoming, addr)) => {
|
Ok((incoming, addr)) => {
|
||||||
|
// For every visitor, request to create a data channel
|
||||||
if let Err(e) = data_req_tx.send(0) {
|
if let Err(e) = data_req_tx.send(0) {
|
||||||
|
// An error indicates the control channel is broken
|
||||||
|
// So break the loop
|
||||||
error!("{}", e);
|
error!("{}", e);
|
||||||
break;
|
break;
|
||||||
};
|
};
|
||||||
|
@ -347,10 +417,12 @@ impl<T: Transport> ControlChannel<T> {
|
||||||
|
|
||||||
debug!("New visitor from {}", addr);
|
debug!("New visitor from {}", addr);
|
||||||
|
|
||||||
|
// Send the visitor to the connection pool
|
||||||
let _ = self.visitor_tx.send(incoming).await;
|
let _ = self.visitor_tx.send(incoming).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
// Wait for the shutdown signal
|
||||||
_ = &mut self.shutdown_rx => {
|
_ = &mut self.shutdown_rx => {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ use std::net::SocketAddr;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
use tokio::net::ToSocketAddrs;
|
use tokio::net::ToSocketAddrs;
|
||||||
|
|
||||||
|
// Specify a transport layer, like TCP, TLS
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait Transport: Debug + Send + Sync {
|
pub trait Transport: Debug + Send + Sync {
|
||||||
type Acceptor: Send + Sync;
|
type Acceptor: Send + Sync;
|
||||||
|
|
Loading…
Reference in New Issue