fix: use unbounded channels in `config_watcher` (#127)

This commit is contained in:
Yujia Qiao 2022-02-07 18:12:13 +08:00 committed by GitHub
parent 4c08779ff6
commit 90aa0a4e44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 6 additions and 8 deletions

View File

@ -93,19 +93,17 @@ impl InstanceConfig for ClientConfig {
} }
pub struct ConfigWatcherHandle { pub struct ConfigWatcherHandle {
pub event_rx: mpsc::Receiver<ConfigChange>, pub event_rx: mpsc::UnboundedReceiver<ConfigChange>,
} }
impl ConfigWatcherHandle { impl ConfigWatcherHandle {
pub async fn new(path: &Path, shutdown_rx: broadcast::Receiver<bool>) -> Result<Self> { pub async fn new(path: &Path, shutdown_rx: broadcast::Receiver<bool>) -> Result<Self> {
let (event_tx, event_rx) = mpsc::channel(16); let (event_tx, event_rx) = mpsc::unbounded_channel();
let origin_cfg = Config::from_file(path).await?; let origin_cfg = Config::from_file(path).await?;
// Initial start // Initial start
event_tx event_tx
.send(ConfigChange::General(Box::new(origin_cfg.clone()))) .send(ConfigChange::General(Box::new(origin_cfg.clone())))
.await
.unwrap(); .unwrap();
tokio::spawn(config_watcher( tokio::spawn(config_watcher(
@ -124,7 +122,7 @@ impl ConfigWatcherHandle {
async fn config_watcher( async fn config_watcher(
_path: PathBuf, _path: PathBuf,
mut shutdown_rx: broadcast::Receiver<bool>, mut shutdown_rx: broadcast::Receiver<bool>,
_event_tx: mpsc::Sender<ConfigChange>, _event_tx: mpsc::UnboundedSender<ConfigChange>,
_old: Config, _old: Config,
) -> Result<()> { ) -> Result<()> {
// Do nothing except waiting for ctrl-c // Do nothing except waiting for ctrl-c
@ -137,7 +135,7 @@ async fn config_watcher(
async fn config_watcher( async fn config_watcher(
path: PathBuf, path: PathBuf,
mut shutdown_rx: broadcast::Receiver<bool>, mut shutdown_rx: broadcast::Receiver<bool>,
event_tx: mpsc::Sender<ConfigChange>, event_tx: mpsc::UnboundedSender<ConfigChange>,
mut old: Config, mut old: Config,
) -> Result<()> { ) -> Result<()> {
let (fevent_tx, mut fevent_rx) = mpsc::channel(16); let (fevent_tx, mut fevent_rx) = mpsc::channel(16);
@ -146,7 +144,7 @@ async fn config_watcher(
notify::recommended_watcher(move |res: Result<notify::Event, _>| match res { notify::recommended_watcher(move |res: Result<notify::Event, _>| match res {
Ok(e) => { Ok(e) => {
if let EventKind::Modify(ModifyKind::Data(_)) = e.kind { if let EventKind::Modify(ModifyKind::Data(_)) = e.kind {
let _ = fevent_tx.blocking_send(true); let _ = fevent_tx.send(true);
} }
} }
Err(e) => error!("watch error: {:#}", e), Err(e) => error!("watch error: {:#}", e),
@ -171,7 +169,7 @@ async fn config_watcher(
}; };
for event in calculate_events(&old, &new) { for event in calculate_events(&old, &new) {
event_tx.send(event).await?; event_tx.send(event)?;
} }
old = new; old = new;