diff --git a/examples/hyper-server.rs b/examples/hyper-server.rs index 3152778..e5af019 100644 --- a/examples/hyper-server.rs +++ b/examples/hyper-server.rs @@ -149,8 +149,8 @@ impl AsyncWrite for SmolStream { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match &mut *self { - Self::Plain(s) => Pin::new(s).poll_close(cx), - Self::Tls(s) => Pin::new(s).poll_close(cx), + Self::Plain(s) => Pin::new(s).poll_flush(cx), + Self::Tls(s) => Pin::new(s).poll_flush(cx), } } } diff --git a/examples/web-crawler.rs b/examples/web-crawler.rs index 2396a16..dbc8282 100644 --- a/examples/web-crawler.rs +++ b/examples/web-crawler.rs @@ -14,11 +14,30 @@ use scraper::{Html, Selector}; const ROOT: &str = "https://www.rust-lang.org"; +/// A guard that sends an empty string when dropped so the main loop never deadlocks. +struct FetchGuard { + sender: Sender, + sent: bool, +} + +impl Drop for FetchGuard { + fn drop(&mut self) { + if !self.sent { + let _ = self.sender.try_send(String::new()); + } + } +} + /// Fetches the HTML contents of a web page. async fn fetch(url: String, sender: Sender) { + let mut guard = FetchGuard { + sender: sender.clone(), + sent: false, + }; let body = surf::get(&url).recv_string().await; let body = body.unwrap_or_default(); - sender.send(body).await.ok(); + let _ = sender.send(body).await; + guard.sent = true; } /// Extracts links from an HTML body. diff --git a/examples/websocket-server.rs b/examples/websocket-server.rs index beea595..3fc8485 100644 --- a/examples/websocket-server.rs +++ b/examples/websocket-server.rs @@ -16,7 +16,7 @@ use std::net::{TcpListener, TcpStream}; use std::pin::Pin; use std::task::{Context, Poll}; -use anyhow::{Context as _, Result}; +use anyhow::Result; use async_native_tls::{Identity, TlsAcceptor, TlsStream}; use async_tungstenite::{tungstenite, WebSocketStream}; use futures::sink::{Sink, SinkExt}; @@ -25,8 +25,10 @@ use tungstenite::Message; /// Echoes messages from the client back to it. async fn echo(mut stream: WsStream) -> Result<()> { - let msg = stream.next().await.context("expected a message")??; - stream.send(Message::text(msg.to_string())).await?; + while let Some(msg) = stream.next().await { + let msg = msg?; + stream.send(Message::text(msg.to_string())).await?; + } Ok(()) } diff --git a/src/spawn.rs b/src/spawn.rs index daed4d4..365a571 100644 --- a/src/spawn.rs +++ b/src/spawn.rs @@ -41,6 +41,7 @@ pub fn spawn(future: impl Future + Send + 'static .ok() .and_then(|s| s.parse().ok()) .unwrap_or(1) + .max(1) }; for n in 1..=num_threads {