Traits for Async: Under the Hood

Understanding the traits that power async programming helps you write better async code and debug issues when they arise. In this section, we'll explore Future, Pin, and Stream - the building blocks of Rust and Oxide's async system.

The Future Trait

Every async operation in Oxide is backed by the Future trait:

trait Future {
    type Output

    mutating fn poll(cx: &mut Context): Poll<Self.Output>
}

Let's break this down:

  • Output: The type of value the future produces when complete
  • poll: Called by the runtime to check if the future is ready
  • Poll: An enum with two variants:
    • Poll.Ready(value): The future completed with a value
    • Poll.Pending: The future isn't ready yet

How Await Works

When you write await someFuture, the compiler transforms it into code that repeatedly calls poll() until the future returns Poll.Ready:

// You write:
let result = await someFuture

// Conceptually becomes something like:
loop {
    match someFuture.poll(cx) {
        Poll.Ready(value) -> break value,
        Poll.Pending -> {
            // Yield to runtime, which will poll again later
            suspend()
        },
    }
}

The runtime handles the actual polling loop and decides when to poll each future based on readiness notifications.

State Machines

When you write an async fn, the compiler generates a state machine:

async fn fetchAndProcess(url: &str): Data {
    let response = await fetch(url)      // State 1
    let body = await response.text()     // State 2
    processData(&body)                   // State 3
}

The compiler creates something like:

enum FetchAndProcessState {
    Start { url: String },
    WaitingForFetch { fetchFuture: FetchFuture },
    WaitingForBody { bodyFuture: BodyFuture },
    Done,
}

Each await point becomes a state transition. The poll method checks which state we're in, polls the appropriate inner future, and transitions when ready.

The Pin Type

You might have noticed Pin<&mut Self> in the poll signature. Pin is crucial for async safety but often confusing at first.

The Problem: Self-Referential Futures

State machines can contain references to their own fields:

async fn example() {
    let data = vec![1, 2, 3]
    let reference = &data[0]  // reference points to data
    await someOperation()
    println!("\(reference)")  // use the reference after await
}

The state machine stores both data and reference. But what if the state machine is moved in memory? The reference would point to the old location!

Pin's Solution

Pin guarantees that pinned data won't move in memory:

import std.pin.Pin

// Once pinned, the value cannot be moved
var boxed = Box.pin(MyFuture.new())

// We can poll it safely
await boxed.asMut().poll(cx)

The Unpin Marker Trait

Most types are Unpin, meaning they're safe to move even when pinned:

// These are all Unpin - can be moved freely
let x: Int = 42
let s: String = "hello".toString()
let v: Vec<Int> = vec![1, 2, 3]

Only self-referential types (like compiler-generated futures) are !Unpin. In practice, you rarely need to think about Pin unless:

  1. You're implementing Future manually
  2. You're working with !Unpin types in collections
  3. You're writing low-level async utilities

Working with Pin in Practice

When you need to pin a future:

import std.pin.pin

async fn example() {
    // Use the pin! macro for stack pinning
    let future = someFuture()
    pin!(future)

    // Now we can poll it
    await future
}

// Or use Box.pin for heap allocation
async fn heapPinned() {
    let future = Box.pin(someFuture())
    await future
}

The Stream Trait

Stream is to Future what Iterator is to single values:

trait Stream {
    type Item

    mutating fn pollNext(cx: &mut Context): Poll<Self.Item?>
}

The return type Poll<Self.Item?> combines:

  • Poll.Ready(Some(item)): An item is available
  • Poll.Ready(null): The stream has ended
  • Poll.Pending: No item ready yet, try again later

Stream vs Future

AspectFutureStream
ProducesOne valueMany values
Poll returnsPoll<T>Poll<T?>
Methodpoll()pollNext()
AnalogSingle async resultAsync iterator

The StreamExt Trait

Like Iterator has many useful methods, Stream has StreamExt:

trait StreamExt: Stream {
    mutating async fn next(): Self.Item?
    consuming fn map<F, T>(f: F): Map<Self, F>
    consuming fn filter<F>(f: F): Filter<Self, F>
    consuming async fn collect<C>(): C
    // ... many more
}

StreamExt provides the convenient next() method that hides the polling details:

// Instead of manual polling:
loop {
    match stream.pollNext(cx) {
        Poll.Ready(Some(item)) -> process(item),
        Poll.Ready(null) -> break,
        Poll.Pending -> suspend(),
    }
}

// You can write:
while let Some(item) = await stream.next() {
    process(item)
}

Implementing Future Manually

Sometimes you need to implement Future yourself. Here's a simple timer:

import std.future.Future
import std.pin.Pin
import std.task.{ Context, Poll }
import std.time.{ Duration, Instant }

struct Timer {
    deadline: Instant,
}

extension Timer {
    static fn new(duration: Duration): Timer {
        Timer {
            deadline: Instant.now() + duration,
        }
    }
}

extension Timer: Future {
    type Output = ()

    mutating fn poll(cx: &mut Context): Poll<()> {
        if Instant.now() >= self.deadline {
            Poll.Ready(())
        } else {
            // Schedule wakeup (simplified - real impl uses a timer wheel)
            cx.waker().wakeByRef()
            Poll.Pending
        }
    }
}

// Usage
async fn useTimer() {
    await Timer.new(Duration.fromSecs(1))
    println!("Timer fired!")
}

Wakers

The Context contains a Waker that tells the runtime when to poll again:

mutating fn poll(cx: &mut Context): Poll<Output> {
    if self.isReady() {
        Poll.Ready(self.result())
    } else {
        // Store the waker to call later when ready
        self.waker = Some(cx.waker().clone())
        Poll.Pending
    }
}

// Later, when the operation completes:
fn onComplete() {
    if let Some(waker) = &self.waker {
        waker.wake()  // Tell runtime to poll again
    }
}

Async Runtimes

Runtimes tie everything together. They:

  1. Execute the top-level future (from blockOn or #[tokio.main])
  2. Poll futures when they're ready to make progress
  3. Manage wakers and notifications
  4. Schedule tasks across threads (for multi-threaded runtimes)

Runtime Comparison

Different runtimes make different trade-offs:

RuntimeThreadsUse Case
Tokio (current_thread)SingleSimple applications
Tokio (multi_thread)MultipleHigh-performance servers
async-stdMultiplestd-like API
smolConfigurableMinimal footprint

Choosing a Runtime

For most applications, Tokio is the standard choice:

// Single-threaded (simpler, no Send requirements)
#[tokio.main(flavor = "current_thread")]
async fn main() {
    // ...
}

// Multi-threaded (default, better for CPU-bound tasks)
#[tokio.main]
async fn main() {
    // ...
}

Send and Sync with Async

For multi-threaded runtimes, futures must be Send:

// This works with single-threaded runtime
async fn nonSendExample() {
    let cell = Rc.new(RefCell.new(0))  // Rc is !Send
    await doWork()
    *cell.borrowMut() += 1
}

// For multi-threaded, use Send types
async fn sendExample() {
    let counter = Arc.new(AtomicUInt32.new(0))  // Arc is Send
    await doWork()
    counter.fetchAdd(1, Ordering.SeqCst)
}

If you hold a !Send value across an await point, the entire future becomes !Send:

async fn problematic() {
    let rc = Rc.new(42)
    await someOperation()  // rc is held across await
    println!("\(rc)")      // Future is !Send
}

Solutions:

  1. Use Send types (Arc instead of Rc)
  2. Don't hold !Send values across await
  3. Use a single-threaded runtime

Debugging Async Code

Common Issues

1. Future not awaited:

async fn forgetfulExample() {
    fetch(url)  // WARNING: future not awaited!
    // The fetch never happens
}

2. Blocking in async context:

async fn badExample() {
    std.thread.sleep(Duration.fromSecs(1))  // Blocks the runtime!
    // Use tokio.time.sleep instead
}

3. Deadlock with sync Mutex:

async fn deadlockRisk() {
    let lock = syncMutex.lock()  // Holds lock across await
    await someOperation()        // Other tasks can't acquire lock
    drop(lock)
}
// Use tokio.sync.Mutex instead

Debugging Tools

  • tokio-console: Real-time async debugging
  • Tracing: Add instrumentation to async code
  • Careful logging: Log at await points to track progress

Summary

The async system is built on three key traits:

  • Future: Represents a single async computation

    • poll() checks for completion
    • Returns Poll.Ready(value) or Poll.Pending
  • Pin: Ensures self-referential futures don't move

    • Most types are Unpin and can be moved freely
    • Compiler-generated futures may be !Unpin
  • Stream: Represents an async sequence

    • pollNext() yields items over time
    • StreamExt provides convenient methods

Understanding these traits helps you:

  • Debug mysterious async behavior
  • Write custom futures when needed
  • Choose appropriate types for concurrent code
  • Work effectively with async runtimes

Remember: Oxide uses prefix await throughout. Whether you're calling await future, await stream.next(), or await customFuture.poll(...), the await keyword always comes before the expression.

This concludes our tour of async programming in Oxide. You now have the knowledge to write efficient, concurrent applications using futures, streams, and async/await!