Traits for Async: Under the Hood
Understanding the traits that power async programming helps you write better
async code and debug issues when they arise. In this section, we'll explore
Future, Pin, and Stream - the building blocks of Rust and Oxide's async
system.
The Future Trait
Every async operation in Oxide is backed by the Future trait:
trait Future {
type Output
mutating fn poll(cx: &mut Context): Poll<Self.Output>
}
Let's break this down:
Output: The type of value the future produces when completepoll: Called by the runtime to check if the future is readyPoll: An enum with two variants:Poll.Ready(value): The future completed with a valuePoll.Pending: The future isn't ready yet
How Await Works
When you write await someFuture, the compiler transforms it into code that
repeatedly calls poll() until the future returns Poll.Ready:
// You write:
let result = await someFuture
// Conceptually becomes something like:
loop {
match someFuture.poll(cx) {
Poll.Ready(value) -> break value,
Poll.Pending -> {
// Yield to runtime, which will poll again later
suspend()
},
}
}
The runtime handles the actual polling loop and decides when to poll each future based on readiness notifications.
State Machines
When you write an async fn, the compiler generates a state machine:
async fn fetchAndProcess(url: &str): Data {
let response = await fetch(url) // State 1
let body = await response.text() // State 2
processData(&body) // State 3
}
The compiler creates something like:
enum FetchAndProcessState {
Start { url: String },
WaitingForFetch { fetchFuture: FetchFuture },
WaitingForBody { bodyFuture: BodyFuture },
Done,
}
Each await point becomes a state transition. The poll method checks which
state we're in, polls the appropriate inner future, and transitions when ready.
The Pin Type
You might have noticed Pin<&mut Self> in the poll signature. Pin is
crucial for async safety but often confusing at first.
The Problem: Self-Referential Futures
State machines can contain references to their own fields:
async fn example() {
let data = vec![1, 2, 3]
let reference = &data[0] // reference points to data
await someOperation()
println!("\(reference)") // use the reference after await
}
The state machine stores both data and reference. But what if the state
machine is moved in memory? The reference would point to the old location!
Pin's Solution
Pin guarantees that pinned data won't move in memory:
import std.pin.Pin
// Once pinned, the value cannot be moved
var boxed = Box.pin(MyFuture.new())
// We can poll it safely
await boxed.asMut().poll(cx)
The Unpin Marker Trait
Most types are Unpin, meaning they're safe to move even when pinned:
// These are all Unpin - can be moved freely
let x: Int = 42
let s: String = "hello".toString()
let v: Vec<Int> = vec![1, 2, 3]
Only self-referential types (like compiler-generated futures) are !Unpin.
In practice, you rarely need to think about Pin unless:
- You're implementing
Futuremanually - You're working with
!Unpintypes in collections - You're writing low-level async utilities
Working with Pin in Practice
When you need to pin a future:
import std.pin.pin
async fn example() {
// Use the pin! macro for stack pinning
let future = someFuture()
pin!(future)
// Now we can poll it
await future
}
// Or use Box.pin for heap allocation
async fn heapPinned() {
let future = Box.pin(someFuture())
await future
}
The Stream Trait
Stream is to Future what Iterator is to single values:
trait Stream {
type Item
mutating fn pollNext(cx: &mut Context): Poll<Self.Item?>
}
The return type Poll<Self.Item?> combines:
Poll.Ready(Some(item)): An item is availablePoll.Ready(null): The stream has endedPoll.Pending: No item ready yet, try again later
Stream vs Future
| Aspect | Future | Stream |
|---|---|---|
| Produces | One value | Many values |
| Poll returns | Poll<T> | Poll<T?> |
| Method | poll() | pollNext() |
| Analog | Single async result | Async iterator |
The StreamExt Trait
Like Iterator has many useful methods, Stream has StreamExt:
trait StreamExt: Stream {
mutating async fn next(): Self.Item?
consuming fn map<F, T>(f: F): Map<Self, F>
consuming fn filter<F>(f: F): Filter<Self, F>
consuming async fn collect<C>(): C
// ... many more
}
StreamExt provides the convenient next() method that hides the polling
details:
// Instead of manual polling:
loop {
match stream.pollNext(cx) {
Poll.Ready(Some(item)) -> process(item),
Poll.Ready(null) -> break,
Poll.Pending -> suspend(),
}
}
// You can write:
while let Some(item) = await stream.next() {
process(item)
}
Implementing Future Manually
Sometimes you need to implement Future yourself. Here's a simple timer:
import std.future.Future
import std.pin.Pin
import std.task.{ Context, Poll }
import std.time.{ Duration, Instant }
struct Timer {
deadline: Instant,
}
extension Timer {
static fn new(duration: Duration): Timer {
Timer {
deadline: Instant.now() + duration,
}
}
}
extension Timer: Future {
type Output = ()
mutating fn poll(cx: &mut Context): Poll<()> {
if Instant.now() >= self.deadline {
Poll.Ready(())
} else {
// Schedule wakeup (simplified - real impl uses a timer wheel)
cx.waker().wakeByRef()
Poll.Pending
}
}
}
// Usage
async fn useTimer() {
await Timer.new(Duration.fromSecs(1))
println!("Timer fired!")
}
Wakers
The Context contains a Waker that tells the runtime when to poll again:
mutating fn poll(cx: &mut Context): Poll<Output> {
if self.isReady() {
Poll.Ready(self.result())
} else {
// Store the waker to call later when ready
self.waker = Some(cx.waker().clone())
Poll.Pending
}
}
// Later, when the operation completes:
fn onComplete() {
if let Some(waker) = &self.waker {
waker.wake() // Tell runtime to poll again
}
}
Async Runtimes
Runtimes tie everything together. They:
- Execute the top-level future (from
blockOnor#[tokio.main]) - Poll futures when they're ready to make progress
- Manage wakers and notifications
- Schedule tasks across threads (for multi-threaded runtimes)
Runtime Comparison
Different runtimes make different trade-offs:
| Runtime | Threads | Use Case |
|---|---|---|
| Tokio (current_thread) | Single | Simple applications |
| Tokio (multi_thread) | Multiple | High-performance servers |
| async-std | Multiple | std-like API |
| smol | Configurable | Minimal footprint |
Choosing a Runtime
For most applications, Tokio is the standard choice:
// Single-threaded (simpler, no Send requirements)
#[tokio.main(flavor = "current_thread")]
async fn main() {
// ...
}
// Multi-threaded (default, better for CPU-bound tasks)
#[tokio.main]
async fn main() {
// ...
}
Send and Sync with Async
For multi-threaded runtimes, futures must be Send:
// This works with single-threaded runtime
async fn nonSendExample() {
let cell = Rc.new(RefCell.new(0)) // Rc is !Send
await doWork()
*cell.borrowMut() += 1
}
// For multi-threaded, use Send types
async fn sendExample() {
let counter = Arc.new(AtomicUInt32.new(0)) // Arc is Send
await doWork()
counter.fetchAdd(1, Ordering.SeqCst)
}
If you hold a !Send value across an await point, the entire future becomes
!Send:
async fn problematic() {
let rc = Rc.new(42)
await someOperation() // rc is held across await
println!("\(rc)") // Future is !Send
}
Solutions:
- Use
Sendtypes (Arcinstead ofRc) - Don't hold
!Sendvalues acrossawait - Use a single-threaded runtime
Debugging Async Code
Common Issues
1. Future not awaited:
async fn forgetfulExample() {
fetch(url) // WARNING: future not awaited!
// The fetch never happens
}
2. Blocking in async context:
async fn badExample() {
std.thread.sleep(Duration.fromSecs(1)) // Blocks the runtime!
// Use tokio.time.sleep instead
}
3. Deadlock with sync Mutex:
async fn deadlockRisk() {
let lock = syncMutex.lock() // Holds lock across await
await someOperation() // Other tasks can't acquire lock
drop(lock)
}
// Use tokio.sync.Mutex instead
Debugging Tools
tokio-console: Real-time async debugging- Tracing: Add instrumentation to async code
- Careful logging: Log at await points to track progress
Summary
The async system is built on three key traits:
-
Future: Represents a single async computationpoll()checks for completion- Returns
Poll.Ready(value)orPoll.Pending
-
Pin: Ensures self-referential futures don't move- Most types are
Unpinand can be moved freely - Compiler-generated futures may be
!Unpin
- Most types are
-
Stream: Represents an async sequencepollNext()yields items over timeStreamExtprovides convenient methods
Understanding these traits helps you:
- Debug mysterious async behavior
- Write custom futures when needed
- Choose appropriate types for concurrent code
- Work effectively with async runtimes
Remember: Oxide uses prefix await throughout. Whether you're calling
await future, await stream.next(), or await customFuture.poll(...), the
await keyword always comes before the expression.
This concludes our tour of async programming in Oxide. You now have the knowledge to write efficient, concurrent applications using futures, streams, and async/await!