Working With More Futures

Now that we understand the basics of async and concurrency, let's explore more advanced patterns. We'll learn about yielding control, building custom async abstractions, and handling dynamic collections of futures.

Yielding Control to the Runtime

Async code is cooperatively scheduled: the runtime can only switch between tasks at await points. If your code does a lot of work without awaiting, it can starve other tasks:

import tokio
import tokio.time.{ sleep, Duration }

async fn greedyTask() {
    // This blocks other tasks for the entire loop!
    for i in 0..1_000_000 {
        heavyComputation(i)  // No await points
    }
    println!("Greedy task done")
}

async fn politeTask() {
    println!("Polite task trying to run...")
}

#[tokio.main]
async fn main() {
    await tokio.join!(
        greedyTask(),
        politeTask(),
    )
}

In this example, politeTask can't run until greedyTask finishes because there are no await points where the runtime can switch tasks.

Solution 1: Add Await Points

Break up long-running work with await:

async fn friendlyTask() {
    for i in 0..1_000_000 {
        heavyComputation(i)

        // Periodically yield control
        if i % 10_000 == 0 {
            await sleep(Duration.fromMillis(0))
        }
    }
    println!("Friendly task done")
}

Solution 2: Use yieldNow

A more explicit way to yield control:

import tokio.task.yieldNow

async fn yieldingTask() {
    for i in 0..1_000_000 {
        heavyComputation(i)

        if i % 10_000 == 0 {
            await yieldNow()  // Explicitly give other tasks a chance
        }
    }
    println!("Yielding task done")
}

yieldNow() is more efficient than sleep(0) and clearly expresses intent.

Solution 3: Spawn Blocking Work

For truly CPU-intensive work, use spawnBlocking to run it on a dedicated thread pool:

import tokio.task.spawnBlocking

async fn heavyLifting() {
    // Run on a blocking thread, not the async runtime
    let result = await spawnBlocking {
        // This can take as long as it needs
        expensiveComputation()
    }.unwrap()

    println!("Result: \(result)")
}

This keeps the async runtime responsive for I/O tasks while heavy computation runs elsewhere.

Building Custom Async Abstractions

One of async's strengths is composability. Let's build a timeout function that races any future against a timer:

import tokio
import tokio.time.{ sleep, Duration }

async fn timeout<T, F>(
    future: F,
    duration: Duration
): Result<T, TimeoutError>
where
    F: Future<Output = T>,
{
    await tokio.select! {
        result = future -> Ok(result),
        _ = sleep(duration) -> Err(TimeoutError.new(duration)),
    }
}

#[derive(Debug)]
struct TimeoutError {
    duration: Duration,
}

extension TimeoutError {
    static fn new(duration: Duration): TimeoutError {
        TimeoutError { duration }
    }
}

Now we can use it with any async operation:

#[tokio.main]
async fn main() {
    match await timeout(fetchData(), Duration.fromSecs(5)) {
        Ok(data) -> println!("Got data: \(data)"),
        Err(e) -> println!("Timed out after \(e.duration:?)"),
    }
}

Building a Retry Function

Here's a more complex example - a function that retries failed operations:

async fn retry<T, E, F, Fut>(
    operation: F,
    maxAttempts: UInt32,
    delayBetween: Duration,
): Result<T, E>
where
    F: Fn() -> Fut,
    Fut: Future<Output = Result<T, E>>,
{
    var lastError: E? = null

    for attempt in 1..=maxAttempts {
        match await operation() {
            Ok(value) -> return Ok(value),
            Err(e) -> {
                println!("Attempt \(attempt) failed: \(e:?)")
                lastError = Some(e)

                if attempt < maxAttempts {
                    await sleep(delayBetween)
                }
            }
        }
    }

    Err(lastError.unwrap())
}

// Usage
#[tokio.main]
async fn main() {
    let result = await retry(
        { -> fetchUnreliableData() },
        3,  // max attempts
        Duration.fromSecs(1),  // delay between attempts
    )

    match result {
        Ok(data) -> println!("Success: \(data)"),
        Err(e) -> println!("All attempts failed: \(e)"),
    }
}

Combining Patterns

You can combine timeout and retry:

async fn fetchWithRetryAndTimeout(): Result<Data, Error> {
    await retry(
        { -> timeout(fetchData(), Duration.fromSecs(5)) },
        3,
        Duration.fromSecs(1),
    )
}

Working with Dynamic Collections of Futures

Sometimes you don't know at compile time how many futures you'll have. The FuturesUnordered collection handles this:

import futures.stream.{ FuturesUnordered, StreamExt }

async fn fetchAll(urls: Vec<String>): Vec<Response> {
    var futures = FuturesUnordered.new()

    // Add all fetch operations
    for url in urls {
        futures.push(fetchUrl(url))
    }

    // Collect results as they complete
    var results = vec![]
    while let Some(result) = await futures.next() {
        results.push(result)
    }

    results
}

Results come back in completion order, not submission order. This is efficient when you don't care about order but want results as fast as possible.

Preserving Order

If you need results in the original order, use indices:

async fn fetchAllOrdered(urls: Vec<String>): Vec<Response> {
    var futures = FuturesUnordered.new()

    // Track indices with each future
    for (i, url) in urls.intoIter().enumerate() {
        futures.push(async move {
            let response = await fetchUrl(&url)
            (i, response)
        })
    }

    // Collect and sort by index
    var results: Vec<(UIntSize, Response)> = vec![]
    while let Some(result) = await futures.next() {
        results.push(result)
    }

    results.sortBy { it.0 }
    results.iter().map { it.1 }.collect()
}

Limiting Concurrency

Too many concurrent operations can overwhelm resources. Use a semaphore to limit concurrency:

import tokio.sync.Semaphore
import std.sync.Arc

async fn fetchWithLimit(urls: Vec<String>, maxConcurrent: UIntSize): Vec<Response> {
    let semaphore = Arc.new(Semaphore.new(maxConcurrent))
    var handles = vec![]

    for url in urls {
        let semaphore = Arc.clone(&semaphore)
        let handle = tokio.spawn(async move {
            // Acquire permit (waits if at limit)
            let _permit = await semaphore.acquire().unwrap()

            // Permit is dropped when this scope ends, releasing it
            await fetchUrl(&url)
        })
        handles.push(handle)
    }

    var results = vec![]
    for handle in handles {
        results.push(await handle.unwrap())
    }
    results
}

#[tokio.main]
async fn main() {
    let urls = getUrlList()  // Might be thousands of URLs
    // Only 10 concurrent fetches at a time
    let results = await fetchWithLimit(urls, 10)
}

Buffered Streams

For processing streams with limited concurrency, use bufferUnordered:

import futures.stream.{ self, StreamExt }

async fn processAll(items: Vec<Item>): Vec<Result> {
    let results = stream.iter(items)
        .map { item -> processItem(item) }  // Create futures
        .bufferUnordered(10)               // Run up to 10 concurrently
        .collect()                          // Collect results

    await results
}

Graceful Shutdown

Real applications need to shut down cleanly. Here's a pattern using a shutdown signal:

import tokio
import tokio.sync.broadcast

async fn worker(id: Int, mut shutdown: broadcast.Receiver<()>) {
    loop {
        await tokio.select! {
            _ = shutdown.recv() -> {
                println!("Worker \(id) shutting down")
                return
            },
            _ = doWork() -> {
                println!("Worker \(id) completed work")
            },
        }
    }
}

#[tokio.main]
async fn main() {
    let (shutdownTx, _) = broadcast.channel(1)

    var handles = vec![]
    for i in 0..3 {
        let rx = shutdownTx.subscribe()
        handles.push(tokio.spawn(worker(i, rx)))
    }

    // Let workers run for a while
    await sleep(Duration.fromSecs(5))

    // Signal shutdown
    println!("Sending shutdown signal...")
    shutdownTx.send(()).unwrap()

    // Wait for all workers to finish
    for handle in handles {
        await handle.unwrap()
    }
    println!("All workers stopped")
}

Error Handling in Concurrent Operations

When running multiple operations, you need to decide how to handle errors:

Fail Fast (Stop on First Error)

async fn fetchAllFailFast(urls: Vec<String>): Result<Vec<Response>, Error> {
    var futures = FuturesUnordered.new()
    for url in urls {
        futures.push(fetchUrl(url))
    }

    var results = vec![]
    while let Some(result) = await futures.next() {
        // Return immediately on error
        results.push(result?)
    }
    Ok(results)
}

Collect All Results (Errors and Successes)

async fn fetchAllResults(urls: Vec<String>): Vec<Result<Response, Error>> {
    var futures = FuturesUnordered.new()
    for url in urls {
        futures.push(fetchUrl(url))
    }

    var results = vec![]
    while let Some(result) = await futures.next() {
        results.push(result)
    }
    results
}

// Later, separate successes from failures
fn processResults(results: Vec<Result<Response, Error>>) {
    let (successes, failures): (Vec<Result<Response, Error>>, Vec<Result<Response, Error>>) =
        results.intoIter().partition { it.isOk() }

    println!("\(successes.len()) succeeded, \(failures.len()) failed")
}

Summary

This section covered advanced async patterns:

  • Yielding control with yieldNow() prevents task starvation
  • spawnBlocking handles CPU-intensive work without blocking the runtime
  • Custom abstractions like timeout and retry compose naturally
  • FuturesUnordered handles dynamic collections of futures
  • Semaphores limit concurrent operations
  • Buffered streams process items with bounded concurrency
  • Graceful shutdown uses channels to coordinate stopping
  • Error handling strategies for concurrent operations

Remember: async code in Oxide uses prefix await syntax. All the examples above demonstrate this: await timeout(...), await semaphore.acquire(), await futures.next(), and so on.

Next, we'll explore streams - async sequences of values that arrive over time.