Streams: Async Sequences

While a Future represents a single value that will be available later, a Stream represents a sequence of values that arrive over time. Streams are the async equivalent of iterators.

Understanding Streams

Think about these real-world scenarios:

  • Messages arriving in a chat application
  • Log entries being written to a file
  • Sensor readings coming from IoT devices
  • Chunks of data downloading from the network

Each produces multiple values over time, not just one. That's what streams model.

// Iterator: produces values synchronously
for item in collection.iter() {
    process(item)
}

// Stream: produces values asynchronously
while let Some(item) = await stream.next() {
    await process(item)
}

Creating Streams

From Iterators

The simplest way to create a stream is from an existing iterator:

import futures.stream.{ self, StreamExt }

async fn processNumbers() {
    let numbers = vec![1, 2, 3, 4, 5]
    var stream = stream.iter(numbers)

    while let Some(n) = await stream.next() {
        println!("Got: \(n)")
    }
}

From Channels

Channels naturally produce streams:

import tokio.sync.mpsc

async fn receiveMessages() {
    let (tx, mut rx) = mpsc.channel(100)

    // Producer sends messages
    tokio.spawn(async move {
        for i in 0..5 {
            await tx.send(format!("Message \(i)"))
            await sleep(Duration.fromMillis(100))
        }
    })

    // Receiver processes the stream
    while let Some(msg) = await rx.recv() {
        println!("Received: \(msg)")
    }
}

Using stream! Macro

The async-stream crate provides a convenient macro:

import asyncStream.stream

fn countingStream(max: Int): impl Stream<Item = Int> {
    stream! {
        for i in 0..max {
            await sleep(Duration.fromMillis(100))
            yield i
        }
    }
}

async fn useCountingStream() {
    var stream = countingStream(5)

    while let Some(n) = await stream.next() {
        println!("Count: \(n)")
    }
}

Stream Combinators

Like iterators, streams have powerful combinators for transformation and filtering. You need to import StreamExt to access these methods.

Map

Transform each element:

import futures.stream.StreamExt

async fn doubleStream() {
    let numbers = stream.iter(vec![1, 2, 3, 4, 5])

    var doubled = numbers.map { it * 2 }

    while let Some(n) = await doubled.next() {
        println!("\(n)")  // 2, 4, 6, 8, 10
    }
}

Filter

Keep only matching elements:

async fn filterStream() {
    let numbers = stream.iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

    var evens = numbers.filter { it % 2 == 0 }

    while let Some(n) = await evens.next() {
        println!("\(n)")  // 2, 4, 6, 8, 10
    }
}

Filter Map

Combine filter and map - returns Some to include, null to skip:

async fn filterMapStream() {
    let items = stream.iter(vec!["1", "two", "3", "four", "5"])

    var parsed = items.filterMap { s ->
        s.parse<Int>().ok()  // Only keeps successfully parsed numbers
    }

    while let Some(n) = await parsed.next() {
        println!("\(n)")  // 1, 3, 5
    }
}

Take and Skip

Limit the stream:

async fn limitStream() {
    let numbers = stream.iter(0..100)

    // Skip first 10, then take 5
    var limited = numbers.skip(10).take(5)

    while let Some(n) = await limited.next() {
        println!("\(n)")  // 10, 11, 12, 13, 14
    }
}

Collect

Gather all stream elements into a collection:

async fn collectStream() {
    let numbers = stream.iter(vec![1, 2, 3, 4, 5])
    let doubled = numbers.map { it * 2 }

    // Collect all results into a Vec
    let results: Vec<Int> = await doubled.collect()
    println!("Results: \(results:?)")  // [2, 4, 6, 8, 10]
}

Fold

Reduce a stream to a single value:

async fn sumStream() {
    let numbers = stream.iter(vec![1, 2, 3, 4, 5])

    let sum = await numbers.fold(0, { acc, n -> acc + n })
    println!("Sum: \(sum)")  // 15
}

Async Operations in Streams

Then

Apply an async function to each element:

async fn asyncTransform() {
    let urls = stream.iter(vec![
        "https://example.com/1",
        "https://example.com/2",
        "https://example.com/3",
    ])

    // fetch is async, so we use then
    var responses = urls.then { url -> fetch(url) }

    while let Some(response) = await responses.next() {
        println!("Got response: \(response.status())")
    }
}

Buffered Processing

Process multiple items concurrently with bounded parallelism:

async fn bufferedFetch() {
    let urls = stream.iter(getUrls())

    // Create futures (doesn't execute yet)
    let fetches = urls.map { url -> fetch(url) }

    // Execute up to 5 concurrently
    var results = fetches.bufferUnordered(5)

    while let Some(result) = await results.next() {
        println!("Completed: \(result:?)")
    }
}

Results arrive in completion order, not original order. Use buffered(n) if you need original order.

For-Await Loops

Oxide provides a convenient syntax for iterating over streams:

import tokio_stream.StreamExt

async fn forAwaitExample() {
    var stream = countingStream(5)

    // for-await iterates over a stream
    for n in await stream.next() {
        println!("Got: \(n)")
    }
}

Note: This uses the pattern for item in await stream.next() rather than a special for await syntax. Each iteration awaits the next item.

For a cleaner loop, you can use while let:

async fn whileLetStream() {
    var stream = countingStream(5)

    while let Some(n) = await stream.next() {
        println!("Got: \(n)")
    }
}

Combining Streams

Merge

Combine multiple streams into one, interleaving items:

import futures.stream.{ select, StreamExt }

async fn mergeStreams() {
    let streamA = stream.iter(vec![1, 3, 5])
    let streamB = stream.iter(vec![2, 4, 6])

    // Items from both streams interleave
    var merged = select(streamA, streamB)

    while let Some(n) = await merged.next() {
        println!("\(n)")  // Order depends on timing
    }
}

Chain

Concatenate streams (one after another):

async fn chainStreams() {
    let first = stream.iter(vec![1, 2, 3])
    let second = stream.iter(vec![4, 5, 6])

    var chained = first.chain(second)

    while let Some(n) = await chained.next() {
        println!("\(n)")  // 1, 2, 3, 4, 5, 6 (in order)
    }
}

Zip

Pair items from two streams:

async fn zipStreams() {
    let names = stream.iter(vec!["Alice", "Bob", "Carol"])
    let ages = stream.iter(vec![30, 25, 35])

    var zipped = names.zip(ages)

    while let Some((name, age)) = await zipped.next() {
        println!("\(name) is \(age) years old")
    }
}

Real-World Example: Log Tail

Here's a practical example - watching a log file for new lines:

import tokio.fs.File
import tokio.io.{ AsyncBufReadExt, BufReader }

async fn tailLog(path: &str) {
    let file = await File.open(path).unwrap()
    var reader = BufReader.new(file)
    var line = String.new()

    println!("Watching \(path) for new lines...")

    loop {
        line.clear()
        match await reader.readLine(&mut line) {
            Ok(0) -> {
                // End of file, wait and try again
                await sleep(Duration.fromMillis(100))
            },
            Ok(_) -> {
                print!("\(line)")  // Line already has newline
            },
            Err(e) -> {
                println!("Error reading: \(e)")
                break
            },
        }
    }
}

Real-World Example: Event Throttling

Throttle UI events to prevent overwhelming handlers:

import tokio.time.{ interval, Duration }
import futures.stream.StreamExt

struct ThrottledStream<S> {
    inner: S,
    interval: Interval,
    pending: S.Item?,
}

async fn throttle<S: Stream>(stream: S, period: Duration): impl Stream<Item = S.Item> {
    var interval = interval(period)
    var pending: S.Item? = null
    var stream = Box.pin(stream)

    stream! {
        loop {
            await tokio.select! {
                item = stream.next() -> {
                    match item {
                        Some(i) -> pending = i,
                        null -> {
                            if let p = pending.take() {
                                yield p
                            }
                            return
                        },
                    }
                },
                _ = interval.tick() -> {
                    if let p = pending.take() {
                        yield p
                    }
                },
            }
        }
    }
}

// Usage: only emit at most one event per 100ms
async fn handleEvents() {
    let events = getEventStream()
    var throttled = throttle(events, Duration.fromMillis(100))

    while let Some(event) = await throttled.next() {
        await handleEvent(event)
    }
}

Stream vs Iterator Comparison

OperationIteratorStream
Next itemiter.next()await stream.next()
Transformiter.map(f)stream.map(f)
Filteriter.filter(p)stream.filter(p)
Collectiter.collect()await stream.collect()
Folditer.fold(init, f)await stream.fold(init, f)
For loopfor x in iterwhile let Some(x) = await stream.next()

The patterns are nearly identical - the key difference is adding await where async operations occur.

Summary

Streams are the async version of iterators:

  • Create streams from iterators, channels, or the stream! macro
  • Transform streams with familiar combinators: map, filter, take, etc.
  • Process async operations with then and bufferUnordered
  • Combine streams with merge, chain, and zip
  • Iterate using while let Some(item) = await stream.next()

Remember: stream operations that produce values use prefix await: await stream.next(), await stream.collect(), await stream.fold(...).

In the final section, we'll look at the underlying traits that make async programming possible: Future, Pin, and Stream.