Concurrency with Async
One of async programming's greatest strengths is efficiently handling multiple operations at once. In this section, we'll explore how to run async operations concurrently, communicate between tasks, and coordinate complex workflows.
Sequential vs. Concurrent Execution
First, let's understand the difference between sequential and concurrent async code:
import tokio
import tokio.time.{ sleep, Duration }
async fn fetchData(name: &str, delayMs: UInt64): String {
println!("Starting fetch for \(name)...")
await sleep(Duration.fromMillis(delayMs))
println!("Finished fetch for \(name)")
"data_\(name)".toString()
}
#[tokio.main]
async fn main() {
// Sequential: total time ~3 seconds
let a = await fetchData("A", 1000)
let b = await fetchData("B", 1000)
let c = await fetchData("C", 1000)
println!("Sequential results: \(a), \(b), \(c)")
}
Each await waits for completion before starting the next fetch. With three
1-second fetches, this takes about 3 seconds total.
Running Futures Concurrently with join!
To run futures concurrently, use tokio.join!:
import tokio
#[tokio.main]
async fn main() {
// Concurrent: total time ~1 second (they run together!)
let (a, b, c) = await tokio.join!(
fetchData("A", 1000),
fetchData("B", 1000),
fetchData("C", 1000)
)
println!("Concurrent results: \(a), \(b), \(c)")
}
All three fetches start immediately and run concurrently. Since they each take 1 second and run in parallel, the total time is about 1 second.
Note the prefix await before tokio.join! - the macro produces a future
that we then await.
Spawning Independent Tasks
Sometimes you want a task to run independently in the background. Use
tokio.spawn:
import tokio
#[tokio.main]
async fn main() {
// Spawn a background task
let handle = tokio.spawn(async {
await sleep(Duration.fromSecs(2))
println!("Background task complete!")
42
})
// Main task continues immediately
println!("Main task doing work...")
await sleep(Duration.fromSecs(1))
println!("Main task still working...")
// Wait for the spawned task to complete
let result = await handle
println!("Background task returned: \(result.unwrap())")
}
Output:
Main task doing work...
Main task still working...
Background task complete!
Background task returned: 42
The spawned task runs concurrently with the main task. tokio.spawn returns
a JoinHandle that you can await to get the result.
Important: Move Semantics with Spawn
Spawned tasks may outlive the current function, so they must own their data:
async fn processUser(user: User) {
// ERROR: task may outlive this function, but borrows `user`
tokio.spawn(async {
println!("Processing \(user.name)")
})
// CORRECT: move ownership into the task
tokio.spawn(async move {
println!("Processing \(user.name)")
})
}
Message Passing Between Tasks
Tasks often need to communicate. Tokio provides async channels for this:
import tokio
import tokio.sync.mpsc
#[tokio.main]
async fn main() {
// Create a channel with buffer size 32
let (tx, mut rx) = mpsc.channel(32)
// Spawn a producer task
let producer = tokio.spawn(async move {
for i in 0..5 {
await tx.send(format!("Message \(i)"))
await sleep(Duration.fromMillis(100))
}
// tx is dropped here, closing the channel
})
// Spawn a consumer task
let consumer = tokio.spawn(async move {
while let Some(msg) = await rx.recv() {
println!("Received: \(msg)")
}
println!("Channel closed")
})
// Wait for both tasks
await tokio.join!(producer, consumer)
}
Output:
Received: Message 0
Received: Message 1
Received: Message 2
Received: Message 3
Received: Message 4
Channel closed
Key points:
mpsc.channel(n)creates a multi-producer, single-consumer channeltx.send()is async and may wait if the buffer is fullrx.recv()is async and returnsnullwhen the channel closes- Dropping all senders closes the channel
Multiple Producers
Clone the sender to allow multiple tasks to send:
import tokio
import tokio.sync.mpsc
#[tokio.main]
async fn main() {
let (tx, mut rx) = mpsc.channel(32)
// Spawn multiple producer tasks
for i in 0..3 {
let tx = tx.clone() // Clone for each task
tokio.spawn(async move {
for j in 0..3 {
await tx.send(format!("Producer \(i), message \(j)"))
await sleep(Duration.fromMillis(50))
}
})
}
// Drop the original sender so channel closes when tasks finish
drop(tx)
// Receive all messages
while let Some(msg) = await rx.recv() {
println!("\(msg)")
}
}
Racing Futures with select!
Sometimes you want the result of whichever future completes first:
import tokio
async fn fetchFastest(): String {
await tokio.select! {
result = fetchFromServerA() -> result,
result = fetchFromServerB() -> result,
}
}
#[tokio.main]
async fn main() {
let fastest = await fetchFastest()
println!("Got response: \(fastest)")
}
async fn fetchFromServerA(): String {
await sleep(Duration.fromMillis(100))
"Response from A".toString()
}
async fn fetchFromServerB(): String {
await sleep(Duration.fromMillis(200))
"Response from B".toString()
}
The select! macro races the futures and returns when the first one completes.
The other futures are cancelled.
Select with Timeouts
A common pattern is racing an operation against a timeout:
import tokio
import tokio.time.timeout
async fn fetchWithTimeout(url: &str): Result<Response, Error> {
match await timeout(Duration.fromSecs(5), fetch(url)) {
Ok(response) -> Ok(response),
Err(elapsed) -> Err(Error.Timeout(elapsed)),
}
}
// Or using select! directly:
async fn fetchWithTimeoutSelect(url: &str): Result<Response, Error> {
await tokio.select! {
response = fetch(url) -> Ok(response),
_ = sleep(Duration.fromSecs(5)) -> Err(Error.Timeout),
}
}
Fair Scheduling with join!
The join! macro provides fair scheduling between futures:
import tokio
#[tokio.main]
async fn main() {
await tokio.join!(
countTo("A", 5),
countTo("B", 5),
)
}
async fn countTo(name: &str, n: Int) {
for i in 1..=n {
println!("\(name): \(i)")
await tokio.task.yieldNow() // Let other tasks run
}
}
Output (interleaved):
A: 1
B: 1
A: 2
B: 2
A: 3
B: 3
...
The yieldNow() function explicitly yields control to the runtime, allowing
other tasks to make progress.
Handling Multiple Channels
Use select! to handle messages from multiple sources:
import tokio
import tokio.sync.mpsc
#[tokio.main]
async fn main() {
let (tx1, mut rx1) = mpsc.channel(10)
let (tx2, mut rx2) = mpsc.channel(10)
// Spawn producers
tokio.spawn(async move {
for i in 0..3 {
await tx1.send(format!("From channel 1: \(i)"))
await sleep(Duration.fromMillis(100))
}
})
tokio.spawn(async move {
for i in 0..3 {
await tx2.send(format!("From channel 2: \(i)"))
await sleep(Duration.fromMillis(150))
}
})
// Handle messages from both channels
loop {
await tokio.select! {
msg = rx1.recv() -> {
match msg {
Some(m) -> println!("RX1: \(m)"),
null -> break,
}
},
msg = rx2.recv() -> {
match msg {
Some(m) -> println!("RX2: \(m)"),
null -> break,
}
},
}
}
}
Shared State Between Tasks
For shared mutable state, use tokio.sync.Mutex:
import tokio
import tokio.sync.Mutex
import std.sync.Arc
#[tokio.main]
async fn main() {
let counter = Arc.new(Mutex.new(0))
var handles = vec![]
for _ in 0..10 {
let counter = Arc.clone(&counter)
let handle = tokio.spawn(async move {
for _ in 0..100 {
var lock = await counter.lock()
*lock += 1
}
})
handles.push(handle)
}
for handle in handles {
await handle.unwrap()
}
println!("Counter: \(await counter.lock())") // Prints: Counter: 1000
}
Note: tokio.sync.Mutex is designed for async code. It allows the task to
yield while waiting for the lock, unlike std.sync.Mutex which blocks the
thread.
Task Cancellation
When you drop a future before it completes, it's cancelled:
import tokio
#[tokio.main]
async fn main() {
let handle = tokio.spawn(async {
println!("Task starting...")
await sleep(Duration.fromSecs(10))
println!("Task complete!") // Never printed if cancelled
})
// Wait a bit then cancel
await sleep(Duration.fromSecs(1))
handle.abort() // Cancel the task
// Check if it was cancelled
match await handle {
Ok(_) -> println!("Task completed"),
Err(e) if e.isCancelled() -> println!("Task was cancelled"),
Err(e) -> println!("Task failed: \(e)"),
}
}
Cancellation happens at await points - if a task is in the middle of
non-async code when cancelled, it will continue until the next await.
Summary
This section covered key concurrency patterns:
join!runs multiple futures concurrently and waits for alltokio.spawncreates independent background tasks- Channels (
mpsc) enable message passing between tasks select!races futures and returns the first to completeyieldNow()explicitly yields control to other tasks- Async
Mutexprovides safe shared mutable state - Task cancellation happens when futures are dropped
Remember: all await expressions use prefix syntax in Oxide. The examples
above consistently show await tokio.join!(...), await tx.send(...), and
similar patterns.
In the next section, we'll explore more advanced future patterns including custom timeouts and composing futures in sophisticated ways.