Converting to a Multithreaded Web Server
The single-threaded server works, but it can only handle one request at a time. Let's add concurrency using a thread pool so multiple clients can be served simultaneously.
The Challenge
We could spawn a new thread for each incoming connection:
for stream in listener.incoming() {
let stream = stream.expect("Failed to accept connection")
thread.spawn {
handleConnection(stream)
}
}
While this works, it's inefficient. Creating a new thread for each connection consumes resources and the operating system can only create so many threads before performance degrades.
Thread Pool Design
A better approach is a thread pool: a fixed number of worker threads that wait for jobs.
The workflow:
- Main thread accepts connections
- Main thread puts the work (handling a connection) in a job queue
- Worker threads take jobs from the queue and process them
- When a job is done, the worker waits for the next job
Creating the Library Structure
Create src/lib.ox:
import std.sync.{mpsc, Arc, Mutex}
import std.thread
public struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc.Sender<Message>,
}
struct Worker {
id: UIntSize,
thread: thread.JoinHandle<()>?,
}
enum Message {
NewJob(Job),
Terminate,
}
type Job = Box<dyn Fn() + Send + 'static>
extension ThreadPool {
public static fn new(size: UIntSize): ThreadPool {
assert!(size > 0, "Thread pool size must be greater than 0")
let (sender, receiver) = mpsc.channel()
let receiver = Arc.new(Mutex.new(receiver))
var workers = Vec.new()
for id in 0..size {
workers.push(Worker.new(id, Arc.clone(&receiver)))
}
ThreadPool { workers, sender }
}
public fn execute(f: Job) {
let message = Message.NewJob(f)
self.sender.send(message).expect("Failed to send job")
}
}
extension Worker {
fn new(id: UIntSize, receiver: Arc<Mutex<mpsc.Receiver<Message>>>): Worker {
let thread = thread.spawn(move {
loop {
let message = receiver.lock().expect("Mutex poisoned").recv().expect("Failed to receive message")
match message {
Message.NewJob(job) -> {
println!("Worker \(id) got a job; executing.")
job()
}
Message.Terminate -> {
println!("Worker \(id) was told to terminate.")
break
}
}
}
})
Worker { id, thread: Some(thread) }
}
}
extension ThreadPool: Drop {
mutating fn drop() {
println!("Sending terminate message to all workers.")
for _ in &self.workers {
self.sender.send(Message.Terminate).expect("Failed to send terminate message")
}
println!("Shutting down all workers.")
for worker in &mut self.workers {
println!("Shutting down worker \(worker.id)")
if let Some(thread) = worker.thread.take() {
thread.join().expect("Failed to join worker thread")
}
}
}
}
Understanding the Design
The Job Type
type Job = Box<dyn Fn() + Send + 'static>
This defines a type alias for:
Box<...>- A boxed value on the heapdyn Fn()- A dynamic trait object for a function that takes no arguments and returns nothingSend- The function can be sent between threads'static- The function has no borrowed data with a limited lifetime
The Message Enum
enum Message {
NewJob(Job),
Terminate,
}
The job queue sends Message enums:
NewJob(job)- A new job to executeTerminate- Signal to shutdown
Thread Pool Creation
public static fn new(size: UIntSize): ThreadPool {
let (sender, receiver) = mpsc.channel()
let receiver = Arc.new(Mutex.new(receiver))
var workers = Vec.new()
for id in 0..size {
workers.push(Worker.new(id, Arc.clone(&receiver)))
}
ThreadPool { workers, sender }
}
We:
- Create a multi-producer, single-consumer channel
- Wrap the receiver in
Arc<Mutex<...>>so multiple threads can share it - Spawn
sizeworker threads, each with a clone of the receiver - Return the thread pool with the sender
Worker Thread Loop
let thread = thread.spawn(move {
loop {
let message = receiver.lock().expect("Mutex poisoned").recv().expect("Failed to receive message")
match message {
Message.NewJob(job) -> {
job()
}
Message.Terminate -> {
break
}
}
}
})
Each worker:
- Continuously loops waiting for messages
- Locks the mutex to access the receiver
- Blocks until a message arrives
- Either executes the job or terminates
Sending Jobs
public fn execute(f: Job) {
let message = Message.NewJob(f)
self.sender.send(message).expect("Failed to send job")
}
The main thread sends jobs through the channel. Thanks to mpsc (multi-producer), multiple threads could send jobs (though in our case only the main thread does).
Graceful Shutdown
extension ThreadPool: Drop {
mutating fn drop() {
for _ in &self.workers {
self.sender.send(Message.Terminate).expect("Failed to send terminate message")
}
for worker in &mut self.workers {
if let Some(thread) = worker.thread.take() {
thread.join().expect("Failed to join worker thread")
}
}
}
}
When the thread pool is dropped (goes out of scope):
- Send a
Terminatemessage to each worker - Wait for each worker thread to finish with
join()
This ensures clean shutdown.
Using the Thread Pool in main.ox
Update src/main.ox:
import webserver.ThreadPool
import std.io.{BufRead, BufReader, Write}
import std.net.TcpListener
import std.fs.readToString
fn main() {
let listener = TcpListener.bind("127.0.0.1:7878").expect("Failed to bind to port 7878")
let pool = ThreadPool.new(4)
println!("Server listening on http://127.0.0.1:7878")
for stream in listener.incoming() {
let stream = stream.expect("Failed to accept connection")
pool.execute {
handleConnection(stream)
}
}
}
fn handleConnection(stream: &mut TcpStream) {
let bufReader = BufReader.new(stream)
let requestLine = bufReader.lines().next().expect("Should have first line")
.expect("Should read first line")
let (status, filename) = if requestLine == "GET / HTTP/1.1" {
("200 OK", "hello.html")
} else {
("404 NOT FOUND", "404.html")
}
let contents = readToString(filename).unwrapOrElse { _ -> "Error reading file".toString() }
let length = contents.len()
let response = "HTTP/1.1 \(status)\r\nContent-Length: \(length)\r\n\r\n\(contents)"
stream.writeAll(response.asBytes()).expect("Failed to write response")
}
Key Changes
-
Create a thread pool with 4 workers:
let pool = ThreadPool.new(4) -
Execute jobs in the pool instead of spawning threads:
pool.execute { handleConnection(stream) }
The closure captures the stream and will be executed by one of the worker threads.
Testing the Multithreaded Server
Compile and run:
cargo run
Open multiple browser tabs to http://127.0.0.1:7878. The server can now handle them concurrently!
You can also test with concurrent curl requests:
curl http://127.0.0.1:7878 &
curl http://127.0.0.1:7878 &
curl http://127.0.0.1:7878 &
curl http://127.0.0.1:7878 &
wait
All four requests should complete without waiting for the previous one to finish.
How the Thread Pool Handles Concurrency
When you make multiple requests:
- The main thread accepts each connection
- Instead of spawning a new thread, it sends the connection to the thread pool
- An available worker thread takes the job from the queue
- The worker processes the request while the main thread can accept more connections
- With 4 workers, up to 4 requests are handled simultaneously
- Additional requests queue up and are processed when workers become free
This is much more efficient than creating a new thread per request!
Performance Improvement
With the thread pool approach:
- Fixed resources - Always 4 threads running, not thousands
- Better latency - No thread creation overhead per request
- Predictable performance - System resources are bounded
- Concurrent handling - Multiple requests processed simultaneously
Summary
We've implemented a thread pool that:
- Maintains a fixed number of worker threads
- Uses channels for job distribution
- Provides graceful shutdown
- Enables concurrent request handling
Next, we'll add a mechanism to gracefully stop the server.