I published the async-fn-stream
crate which is a lightweight alternative to async-stream
.
Rust provides support for async iteration via the Stream
trait.
It allows to decouple producer of data from the consumer,
just like an Iterator
allows the same for blocking code.
This is immensely useful if the producer is expected to produce a large (or infinite) sequence.
Both Stream
and Iterator
are quite easy to use:
// Iterator
for x in produce_iter() {
println!("{x}");
}
fn produce_iter() -> impl Iterator<Item = i32> { ... }
// Stream
let stream = produce_stream().await;
pin_mut!(stream);
while let Some(x) = stream.next().await {
println!("{x}");
}
async fn produce_stream() -> impl Stream<Item = i32> { ... }
We can see that the usage of Stream
has somewhat more boilerplate code but it's still quite OK. Most likely Rust will in the future gain language support for using Stream
s in for
loops.
async-stream
crateCurrently, it's not too ergonomic to hand-code implementations of either Stream
or Iterator
.
The Iterator::next()
method might sometimes be cumbersome to implement, but usually, it's doable without too much effort.
But Stream::poll_next()
is much harder to implement.
Often this necessitates storing (and swapping) futures inside of Stream
implementation struct which is quite hard to do without unsafe code.
To tackle this issue, Tokio team has written a great crate async-stream
which I recommend checking out.
This macro lets us create an async stream with ease:
use std::path::Path;
use async_stream::stream;
use futures::{pin_mut, Stream, StreamExt};
use tokio::{
fs::File,
io::{AsyncBufReadExt, BufReader},
};
async fn read_lines(path: &Path) -> std::io::Result<impl Stream<Item = std::io::Result<String>>> {
let file = File::open(path).await?;
let mut file = BufReader::new(file);
let result = stream! {
let mut buf = String::new();
loop {
buf.clear();
match file.read_line(&mut buf).await {
Ok(0) => break,
Ok(_) => yield Ok(buf.clone()),
Err(error) => yield Err(error),
}
};
};
Ok(result)
}
And it even runs and provides correct output!
async-stream
in production codeSo, is the problem solved? Maybe so, but for me, unfortunately, this solution leaves a lot to be desired.
And I must say upfront that this is not a fault of async-stream
- those are general problems associated with macros in Rust.
async-stream
just makes it more apparent because stream! { ... }
often encompasses large chunks of code.
Let's look closely at this code.
The first that I notice is that rustfmt
does not even try to reformat the code inside stream!
macro.
For example, if I mess up the formatting of this code like so:
let result = stream! {
let mut buf = String::new();
loop
{
buf.clear(); match file.read_line(&mut buf).await {
Ok(0) => break,
Ok(_ ) => yield Ok(buf.clone()), Err(error) => yield Err(error),
}
};
};
then rustfmt
will leave this code as-is.
And if you use rustfmt --check
in the CI pipeline to ensure that formatting stays consistent, it also won't notice any inconsistencies in formatting.
The second and much more important downside is that some features of the IDE do not work as well as they should. Here's what happens if I try to use autocomplete in Visual Studio Code with rust-analyzer:
async-stream
Having found this solution unsatisfactory for my needs, I went ahead and published my take on it. Meet the async-fn-stream
crate.
It's the same idea as async-stream
but implemented without resorting to macros.
Here's what the code looks like and the summary of differences:
async_stream::stream! { ... }
is replaced with async_fn_stream::fn_stream(|emitter| async move { ... })
(there's also a try_fn_stream
which is similar to try_stream!
);yield X
is replaced with emitter.emit(X).await
.use std::path::Path;
use async_fn_stream::fn_stream;
use futures::{pin_mut, Stream, StreamExt};
use tokio::{
fs::File,
io::{AsyncBufReadExt, BufReader},
};
async fn read_lines(path: &Path) -> std::io::Result<impl Stream<Item = std::io::Result<String>>> {
let file = File::open(path).await?;
let mut file = BufReader::new(file);
let result = fn_stream(|emitter| async move {
let mut buf = String::new();
loop {
buf.clear();
match file.read_line(&mut buf).await {
Ok(0) => break,
Ok(_) => emitter.emit(Ok(buf.clone())).await,
Err(error) => emitter.emit(Err(error)).await,
}
}
});
Ok(result)
}
And now, code completion works fine:
If you've also tried using async-stream
and met similar issues, I recommend checking my crate out.