Working With More Futures
Now that we understand the basics of async and concurrency, let's explore more advanced patterns. We'll learn about yielding control, building custom async abstractions, and handling dynamic collections of futures.
Yielding Control to the Runtime
Async code is cooperatively scheduled: the runtime can only switch between
tasks at await points. If your code does a lot of work without awaiting, it
can starve other tasks:
import tokio
import tokio.time.{ sleep, Duration }
async fn greedyTask() {
// This blocks other tasks for the entire loop!
for i in 0..1_000_000 {
heavyComputation(i) // No await points
}
println!("Greedy task done")
}
async fn politeTask() {
println!("Polite task trying to run...")
}
#[tokio.main]
async fn main() {
await tokio.join!(
greedyTask(),
politeTask(),
)
}
In this example, politeTask can't run until greedyTask finishes because
there are no await points where the runtime can switch tasks.
Solution 1: Add Await Points
Break up long-running work with await:
async fn friendlyTask() {
for i in 0..1_000_000 {
heavyComputation(i)
// Periodically yield control
if i % 10_000 == 0 {
await sleep(Duration.fromMillis(0))
}
}
println!("Friendly task done")
}
Solution 2: Use yieldNow
A more explicit way to yield control:
import tokio.task.yieldNow
async fn yieldingTask() {
for i in 0..1_000_000 {
heavyComputation(i)
if i % 10_000 == 0 {
await yieldNow() // Explicitly give other tasks a chance
}
}
println!("Yielding task done")
}
yieldNow() is more efficient than sleep(0) and clearly expresses intent.
Solution 3: Spawn Blocking Work
For truly CPU-intensive work, use spawnBlocking to run it on a dedicated
thread pool:
import tokio.task.spawnBlocking
async fn heavyLifting() {
// Run on a blocking thread, not the async runtime
let result = await spawnBlocking {
// This can take as long as it needs
expensiveComputation()
}.unwrap()
println!("Result: \(result)")
}
This keeps the async runtime responsive for I/O tasks while heavy computation runs elsewhere.
Building Custom Async Abstractions
One of async's strengths is composability. Let's build a timeout function
that races any future against a timer:
import tokio
import tokio.time.{ sleep, Duration }
async fn timeout<T, F>(
future: F,
duration: Duration
): Result<T, TimeoutError>
where
F: Future<Output = T>,
{
await tokio.select! {
result = future -> Ok(result),
_ = sleep(duration) -> Err(TimeoutError.new(duration)),
}
}
#[derive(Debug)]
struct TimeoutError {
duration: Duration,
}
extension TimeoutError {
static fn new(duration: Duration): TimeoutError {
TimeoutError { duration }
}
}
Now we can use it with any async operation:
#[tokio.main]
async fn main() {
match await timeout(fetchData(), Duration.fromSecs(5)) {
Ok(data) -> println!("Got data: \(data)"),
Err(e) -> println!("Timed out after \(e.duration:?)"),
}
}
Building a Retry Function
Here's a more complex example - a function that retries failed operations:
async fn retry<T, E, F, Fut>(
operation: F,
maxAttempts: UInt32,
delayBetween: Duration,
): Result<T, E>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, E>>,
{
var lastError: E? = null
for attempt in 1..=maxAttempts {
match await operation() {
Ok(value) -> return Ok(value),
Err(e) -> {
println!("Attempt \(attempt) failed: \(e:?)")
lastError = Some(e)
if attempt < maxAttempts {
await sleep(delayBetween)
}
}
}
}
Err(lastError.unwrap())
}
// Usage
#[tokio.main]
async fn main() {
let result = await retry(
{ -> fetchUnreliableData() },
3, // max attempts
Duration.fromSecs(1), // delay between attempts
)
match result {
Ok(data) -> println!("Success: \(data)"),
Err(e) -> println!("All attempts failed: \(e)"),
}
}
Combining Patterns
You can combine timeout and retry:
async fn fetchWithRetryAndTimeout(): Result<Data, Error> {
await retry(
{ -> timeout(fetchData(), Duration.fromSecs(5)) },
3,
Duration.fromSecs(1),
)
}
Working with Dynamic Collections of Futures
Sometimes you don't know at compile time how many futures you'll have. The
FuturesUnordered collection handles this:
import futures.stream.{ FuturesUnordered, StreamExt }
async fn fetchAll(urls: Vec<String>): Vec<Response> {
var futures = FuturesUnordered.new()
// Add all fetch operations
for url in urls {
futures.push(fetchUrl(url))
}
// Collect results as they complete
var results = vec![]
while let Some(result) = await futures.next() {
results.push(result)
}
results
}
Results come back in completion order, not submission order. This is efficient when you don't care about order but want results as fast as possible.
Preserving Order
If you need results in the original order, use indices:
async fn fetchAllOrdered(urls: Vec<String>): Vec<Response> {
var futures = FuturesUnordered.new()
// Track indices with each future
for (i, url) in urls.intoIter().enumerate() {
futures.push(async move {
let response = await fetchUrl(&url)
(i, response)
})
}
// Collect and sort by index
var results: Vec<(UIntSize, Response)> = vec![]
while let Some(result) = await futures.next() {
results.push(result)
}
results.sortBy { it.0 }
results.iter().map { it.1 }.collect()
}
Limiting Concurrency
Too many concurrent operations can overwhelm resources. Use a semaphore to limit concurrency:
import tokio.sync.Semaphore
import std.sync.Arc
async fn fetchWithLimit(urls: Vec<String>, maxConcurrent: UIntSize): Vec<Response> {
let semaphore = Arc.new(Semaphore.new(maxConcurrent))
var handles = vec![]
for url in urls {
let semaphore = Arc.clone(&semaphore)
let handle = tokio.spawn(async move {
// Acquire permit (waits if at limit)
let _permit = await semaphore.acquire().unwrap()
// Permit is dropped when this scope ends, releasing it
await fetchUrl(&url)
})
handles.push(handle)
}
var results = vec![]
for handle in handles {
results.push(await handle.unwrap())
}
results
}
#[tokio.main]
async fn main() {
let urls = getUrlList() // Might be thousands of URLs
// Only 10 concurrent fetches at a time
let results = await fetchWithLimit(urls, 10)
}
Buffered Streams
For processing streams with limited concurrency, use bufferUnordered:
import futures.stream.{ self, StreamExt }
async fn processAll(items: Vec<Item>): Vec<Result> {
let results = stream.iter(items)
.map { item -> processItem(item) } // Create futures
.bufferUnordered(10) // Run up to 10 concurrently
.collect() // Collect results
await results
}
Graceful Shutdown
Real applications need to shut down cleanly. Here's a pattern using a shutdown signal:
import tokio
import tokio.sync.broadcast
async fn worker(id: Int, mut shutdown: broadcast.Receiver<()>) {
loop {
await tokio.select! {
_ = shutdown.recv() -> {
println!("Worker \(id) shutting down")
return
},
_ = doWork() -> {
println!("Worker \(id) completed work")
},
}
}
}
#[tokio.main]
async fn main() {
let (shutdownTx, _) = broadcast.channel(1)
var handles = vec![]
for i in 0..3 {
let rx = shutdownTx.subscribe()
handles.push(tokio.spawn(worker(i, rx)))
}
// Let workers run for a while
await sleep(Duration.fromSecs(5))
// Signal shutdown
println!("Sending shutdown signal...")
shutdownTx.send(()).unwrap()
// Wait for all workers to finish
for handle in handles {
await handle.unwrap()
}
println!("All workers stopped")
}
Error Handling in Concurrent Operations
When running multiple operations, you need to decide how to handle errors:
Fail Fast (Stop on First Error)
async fn fetchAllFailFast(urls: Vec<String>): Result<Vec<Response>, Error> {
var futures = FuturesUnordered.new()
for url in urls {
futures.push(fetchUrl(url))
}
var results = vec![]
while let Some(result) = await futures.next() {
// Return immediately on error
results.push(result?)
}
Ok(results)
}
Collect All Results (Errors and Successes)
async fn fetchAllResults(urls: Vec<String>): Vec<Result<Response, Error>> {
var futures = FuturesUnordered.new()
for url in urls {
futures.push(fetchUrl(url))
}
var results = vec![]
while let Some(result) = await futures.next() {
results.push(result)
}
results
}
// Later, separate successes from failures
fn processResults(results: Vec<Result<Response, Error>>) {
let (successes, failures): (Vec<Result<Response, Error>>, Vec<Result<Response, Error>>) =
results.intoIter().partition { it.isOk() }
println!("\(successes.len()) succeeded, \(failures.len()) failed")
}
Summary
This section covered advanced async patterns:
- Yielding control with
yieldNow()prevents task starvation spawnBlockinghandles CPU-intensive work without blocking the runtime- Custom abstractions like
timeoutandretrycompose naturally FuturesUnorderedhandles dynamic collections of futures- Semaphores limit concurrent operations
- Buffered streams process items with bounded concurrency
- Graceful shutdown uses channels to coordinate stopping
- Error handling strategies for concurrent operations
Remember: async code in Oxide uses prefix await syntax. All the examples
above demonstrate this: await timeout(...), await semaphore.acquire(),
await futures.next(), and so on.
Next, we'll explore streams - async sequences of values that arrive over time.