• Stars
    star
    568
  • Rank 78,502 (Top 2 %)
  • Language
    Rust
  • License
    Other
  • Created over 5 years ago
  • Updated 9 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Asynchronous streams for Rust using async & await notation

Asynchronous streams for Rust

Asynchronous stream of elements.

Provides two macros, stream! and try_stream!, allowing the caller to define asynchronous streams of elements. These are implemented using async & await notation. This crate works without unstable features.

The stream! macro returns an anonymous type implementing the Stream trait. The Item associated type is the type of the values yielded from the stream. The try_stream! also returns an anonymous type implementing the Stream trait, but the Item associated type is Result<T, Error>. The try_stream! macro supports using ? notation as part of the implementation.

Usage

A basic stream yielding numbers. Values are yielded using the yield keyword. The stream block must return ().

use async_stream::stream;

use futures_util::pin_mut;
use futures_util::stream::StreamExt;

#[tokio::main]
async fn main() {
    let s = stream! {
        for i in 0..3 {
            yield i;
        }
    };

    pin_mut!(s); // needed for iteration

    while let Some(value) = s.next().await {
        println!("got {}", value);
    }
}

Streams may be returned by using impl Stream<Item = T>:

use async_stream::stream;

use futures_core::stream::Stream;
use futures_util::pin_mut;
use futures_util::stream::StreamExt;

fn zero_to_three() -> impl Stream<Item = u32> {
    stream! {
        for i in 0..3 {
            yield i;
        }
    }
}

#[tokio::main]
async fn main() {
    let s = zero_to_three();
    pin_mut!(s); // needed for iteration

    while let Some(value) = s.next().await {
        println!("got {}", value);
    }
}

Streams may be implemented in terms of other streams - async-stream provides for await syntax to assist with this:

use async_stream::stream;

use futures_core::stream::Stream;
use futures_util::pin_mut;
use futures_util::stream::StreamExt;

fn zero_to_three() -> impl Stream<Item = u32> {
    stream! {
        for i in 0..3 {
            yield i;
        }
    }
}

fn double<S: Stream<Item = u32>>(input: S)
    -> impl Stream<Item = u32>
{
    stream! {
        for await value in input {
            yield value * 2;
        }
    }
}

#[tokio::main]
async fn main() {
    let s = double(zero_to_three());
    pin_mut!(s); // needed for iteration

    while let Some(value) = s.next().await {
        println!("got {}", value);
    }
}

Rust try notation (?) can be used with the try_stream! macro. The Item of the returned stream is Result with Ok being the value yielded and Err the error type returned by ?.

use tokio::net::{TcpListener, TcpStream};

use async_stream::try_stream;
use futures_core::stream::Stream;

use std::io;
use std::net::SocketAddr;

fn bind_and_accept(addr: SocketAddr)
    -> impl Stream<Item = io::Result<TcpStream>>
{
    try_stream! {
        let mut listener = TcpListener::bind(addr).await?;

        loop {
            let (stream, addr) = listener.accept().await?;
            println!("received on {:?}", addr);
            yield stream;
        }
    }
}

Implementation

The stream! and try_stream! macros are implemented using proc macros. The macro searches the syntax tree for instances of yield $expr and transforms them into sender.send($expr).await.

The stream uses a lightweight sender to send values from the stream implementation to the caller. When entering the stream, an Option<T> is stored on the stack. A pointer to the cell is stored in a thread local and poll is called on the async block. When poll returns. sender.send(value) stores the value that cell and yields back to the caller.

Supported Rust Versions

The current minimum supported Rust version is 1.56.

License

This project is licensed under the MIT license.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in async-stream by you, shall be licensed as MIT, without any additional terms or conditions.

More Repositories

1

tokio

A runtime for writing reliable asynchronous applications with Rust. Provides I/O, networking, scheduling, timers, ...
Rust
23,031
star
2

axum

Ergonomic and modular web framework built with Tokio, Tower, and Hyper
Rust
17,992
star
3

mio

Metal I/O library for Rust.
Rust
6,301
star
4

tracing

Application level tracing for Rust.
Rust
5,365
star
5

mini-redis

Incomplete Redis client and server implementation using Tokio - for learning purposes only
Rust
3,927
star
6

prost

PROST! a Protocol Buffers implementation for the Rust Language
Rust
3,766
star
7

console

a debugger for async rust!
Rust
3,542
star
8

loom

Concurrency permutation testing tool for Rust.
Rust
1,847
star
9

bytes

Utilities for working with bytes
Rust
1,739
star
10

io-uring

The `io_uring` library for Rust
Rust
1,037
star
11

tokio-uring

An io_uring backed runtime for Rust
Rust
946
star
12

turmoil

Add hardship to your tests
Rust
772
star
13

tokio-proto

A network application framework for Rust
Rust
694
star
14

slab

Slab allocator for Rust
Rust
641
star
15

tokio-core

I/O primitives and event loop for async I/O in Rust
Rust
628
star
16

rdbc

Rust DataBase Connectivity (RDBC) :: Common Rust API for database drivers
Rust
558
star
17

tokio-minihttp

Protocol implementation experimentations
Rust
425
star
18

tokio-metrics

Utilities for collecting metrics from a Tokio application
Rust
254
star
19

tls

A collection of Tokio based TLS libraries.
Rust
243
star
20

tracing-opentelemetry

Rust
233
star
21

website

Website for the Tokio project
TypeScript
211
star
22

valuable

Rust
185
star
23

async-backtrace

Rust
162
star
24

tokio-io

Core I/O primitives for asynchronous I/O in Rust.
Rust
124
star
25

tokio-socks5

An example SOCKSv5 server implementation with tokio
Rust
100
star
26

tokio-tls

An implementation of TLS/SSL streams for Tokio
Rust
95
star
27

simulation

Framework for simulating distributed applications
Rust
92
star
28

tokio-timer

Timer facilities for Tokio
Rust
83
star
29

tokio-service

The core `Service` trait in Tokio and support
Rust
81
star
30

tokio-line

Line protocol for Tokio
Rust
64
star
31

tokio-redis

Redis client for Tokio
Rust
58
star
32

tokio-uds

Unix Domain Sockets for tokio
Rust
52
star
33

doc-push

Tokio doc blitz effort - A concerted effort to improve Tokio's documentation.
50
star
34

tokio-compat

Streamline updating a Tokio 0.1 application to Tokio 0.2.
Rust
48
star
35

book

45
star
36

tokio-openssl

OpenSSL bindings for Tokio
Rust
37
star
37

tokio-middleware

A collection of Tokio middleware
Rust
28
star
38

tokio-rfcs

22
star
39

async

Utilities building on top of Rust's async primitives.
Rust
22
star
40

console-gsoc

Google Summer of Code tokio-console prototype
Rust
12
star
41

service-fn

A service implemented by a closure
Rust
11
star
42

gsoc

Organize the Google Summer of Code projects.
6
star
43

cargo-tokio

A cargo subcommand to help building the Tokio project.
Rust
4
star
44

website-next

Next iteration of the Tokio website.
TypeScript
1
star