2271 words
11 minutes
Async Rust with Tokio - Building High-Performance Network Applications

Async Rust with Tokio: Building High-Performance Network Applications#

Asynchronous programming is essential for building scalable network applications, and Tokio is Rust’s most powerful async runtime. In this comprehensive guide, we’ll explore how to leverage Tokio to build high-performance servers, handle thousands of concurrent connections, and write efficient async code.

Table of Contents#

  1. Understanding Async/Await in Rust
  2. Getting Started with Tokio
  3. Building a TCP Echo Server
  4. Concurrent Task Management
  5. Async HTTP Operations
  6. Working with Channels and Streams
  7. Real-World Examples
  8. Performance Tips and Best Practices

Understanding Async/Await in Rust#

Before diving into Tokio, let’s understand how async/await works in Rust.

The Basics#

// An async function returns a Future
async fn fetch_data() -> String {
// Simulate async operation
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
"Data fetched!".to_string()
}
// Async functions must be awaited
async fn process() {
let data = fetch_data().await;
println!("{}", data);
}

Key Concepts#

  1. Futures: Async functions return Future types that represent values that will be available later
  2. Polling: The runtime polls futures to check if they’re ready
  3. Zero-cost abstractions: Async code compiles to efficient state machines

Getting Started with Tokio#

First, add Tokio to your Cargo.toml:

[dependencies]
tokio = { version = "1.43", features = ["full"] }

Basic Tokio Application#

use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
println!("Starting async operations...");
// Run multiple async operations concurrently
let task1 = async {
sleep(Duration::from_secs(2)).await;
println!("Task 1 completed");
};
let task2 = async {
sleep(Duration::from_secs(1)).await;
println!("Task 2 completed");
};
// Join executes both futures concurrently
tokio::join!(task1, task2);
println!("All tasks completed!");
}

Building a TCP Echo Server#

Let’s build a production-ready TCP echo server that can handle thousands of concurrent connections.

Simple Echo Server#

use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::error::Error;
async fn handle_connection(mut socket: TcpStream) -> Result<(), Box<dyn Error>> {
let mut buffer = [0; 1024];
loop {
// Read data from the socket
let n = socket.read(&mut buffer).await?;
// If we read 0 bytes, the connection is closed
if n == 0 {
return Ok(());
}
// Echo the data back
socket.write_all(&buffer[0..n]).await?;
socket.flush().await?;
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Echo server listening on 127.0.0.1:8080");
loop {
let (socket, addr) = listener.accept().await?;
println!("New connection from: {}", addr);
// Spawn a new task for each connection
tokio::spawn(async move {
if let Err(e) = handle_connection(socket).await {
eprintln!("Error handling connection: {}", e);
}
});
}
}

Advanced Echo Server with Graceful Shutdown#

use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::broadcast;
use tokio::select;
use std::error::Error;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
struct Server {
listener: TcpListener,
shutdown: broadcast::Sender<()>,
connection_count: Arc<AtomicUsize>,
}
impl Server {
async fn new(addr: &str) -> Result<Self, Box<dyn Error>> {
let listener = TcpListener::bind(addr).await?;
let (shutdown, _) = broadcast::channel(1);
Ok(Server {
listener,
shutdown,
connection_count: Arc::new(AtomicUsize::new(0)),
})
}
async fn run(self) -> Result<(), Box<dyn Error>> {
println!("Server listening on {}", self.listener.local_addr()?);
loop {
select! {
result = self.listener.accept() => {
let (socket, addr) = result?;
self.handle_new_connection(socket, addr);
}
_ = tokio::signal::ctrl_c() => {
println!("\nShutting down server...");
self.shutdown.send(()).ok();
break;
}
}
}
// Wait for all connections to close
while self.connection_count.load(Ordering::SeqCst) > 0 {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
println!("Server shutdown complete");
Ok(())
}
fn handle_new_connection(&self, socket: TcpStream, addr: std::net::SocketAddr) {
let mut shutdown_rx = self.shutdown.subscribe();
let count = self.connection_count.clone();
count.fetch_add(1, Ordering::SeqCst);
println!("New connection from: {} (total: {})", addr, count.load(Ordering::SeqCst));
tokio::spawn(async move {
let result = select! {
res = handle_connection(socket) => res,
_ = shutdown_rx.recv() => {
println!("Connection {} shutting down", addr);
Ok(())
}
};
if let Err(e) = result {
eprintln!("Error handling connection {}: {}", addr, e);
}
count.fetch_sub(1, Ordering::SeqCst);
println!("Connection {} closed (remaining: {})", addr, count.load(Ordering::SeqCst));
});
}
}
async fn handle_connection(mut socket: TcpStream) -> Result<(), Box<dyn Error>> {
let mut buffer = vec![0; 4096];
loop {
let n = socket.read(&mut buffer).await?;
if n == 0 {
return Ok(());
}
socket.write_all(&buffer[0..n]).await?;
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let server = Server::new("127.0.0.1:8080").await?;
server.run().await
}

Concurrent Task Management#

Tokio provides several ways to manage concurrent tasks effectively.

Spawning Tasks#

use tokio::task;
use std::time::Duration;
#[tokio::main]
async fn main() {
// Spawn multiple concurrent tasks
let handles: Vec<_> = (0..5)
.map(|i| {
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
println!("Task {} completed", i);
i * 2
})
})
.collect();
// Wait for all tasks and collect results
let mut results = Vec::new();
for handle in handles {
let result = handle.await.unwrap();
results.push(result);
}
println!("Results: {:?}", results);
}

Task Groups with JoinSet#

use tokio::task::JoinSet;
use std::time::Duration;
async fn process_item(id: u32) -> Result<String, String> {
tokio::time::sleep(Duration::from_millis(100)).await;
if id % 3 == 0 {
Err(format!("Item {} failed", id))
} else {
Ok(format!("Item {} processed", id))
}
}
#[tokio::main]
async fn main() {
let mut set = JoinSet::new();
// Spawn multiple tasks
for i in 0..10 {
set.spawn(process_item(i));
}
// Process results as they complete
while let Some(result) = set.join_next().await {
match result {
Ok(Ok(msg)) => println!("Success: {}", msg),
Ok(Err(e)) => println!("Task error: {}", e),
Err(e) => println!("Join error: {}", e),
}
}
}

Async HTTP Operations#

HTTP Client with Reqwest#

// Add to Cargo.toml: reqwest = { version = "0.12", features = ["json"] }
use reqwest;
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Serialize, Deserialize)]
struct Post {
id: u32,
title: String,
body: String,
userId: u32,
}
async fn fetch_posts() -> Result<Vec<Post>, reqwest::Error> {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?;
let posts = client
.get("https://jsonplaceholder.typicode.com/posts")
.send()
.await?
.json::<Vec<Post>>()
.await?;
Ok(posts)
}
async fn create_post(post: &Post) -> Result<Post, reqwest::Error> {
let client = reqwest::Client::new();
let response = client
.post("https://jsonplaceholder.typicode.com/posts")
.json(post)
.send()
.await?
.json::<Post>()
.await?;
Ok(response)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Fetch posts concurrently
let fetch_task = fetch_posts();
// Create a new post
let new_post = Post {
id: 0,
title: "My Async Post".to_string(),
body: "Created with Tokio and Reqwest".to_string(),
userId: 1,
};
let create_task = create_post(&new_post);
// Wait for both operations
let (posts_result, created_result) = tokio::join!(fetch_task, create_task);
match posts_result {
Ok(posts) => println!("Fetched {} posts", posts.len()),
Err(e) => eprintln!("Failed to fetch posts: {}", e),
}
match created_result {
Ok(post) => println!("Created post with ID: {}", post.id),
Err(e) => eprintln!("Failed to create post: {}", e),
}
Ok(())
}

Parallel HTTP Requests with Rate Limiting#

use tokio::sync::Semaphore;
use std::sync::Arc;
use std::time::Instant;
async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
let response = reqwest::get(url).await?;
Ok(response.text().await?)
}
async fn fetch_urls_with_limit(urls: Vec<String>, max_concurrent: usize) {
let semaphore = Arc::new(Semaphore::new(max_concurrent));
let start = Instant::now();
let tasks: Vec<_> = urls
.into_iter()
.map(|url| {
let permit = semaphore.clone();
tokio::spawn(async move {
let _permit = permit.acquire().await.unwrap();
println!("Fetching: {}", url);
match fetch_url(&url).await {
Ok(content) => {
println!("Fetched {} bytes from {}", content.len(), url);
Ok(content.len())
}
Err(e) => {
eprintln!("Error fetching {}: {}", url, e);
Err(e)
}
}
})
})
.collect();
let mut total_bytes = 0;
let mut errors = 0;
for task in tasks {
match task.await.unwrap() {
Ok(bytes) => total_bytes += bytes,
Err(_) => errors += 1,
}
}
let elapsed = start.elapsed();
println!(
"\nFetched {} bytes with {} errors in {:?}",
total_bytes, errors, elapsed
);
}
#[tokio::main]
async fn main() {
let urls = vec![
"https://www.rust-lang.org".to_string(),
"https://tokio.rs".to_string(),
"https://github.com".to_string(),
"https://docs.rs".to_string(),
"https://crates.io".to_string(),
];
// Limit to 3 concurrent requests
fetch_urls_with_limit(urls, 3).await;
}

Working with Channels and Streams#

Multi-Producer Single-Consumer Channel#

use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
async fn producer(id: u32, tx: mpsc::Sender<String>) {
for i in 0..5 {
let msg = format!("Producer {} - Message {}", id, i);
if tx.send(msg.clone()).await.is_err() {
println!("Receiver dropped, stopping producer {}", id);
return;
}
println!("Sent: {}", msg);
sleep(Duration::from_millis(100 * id as u64)).await;
}
}
async fn consumer(mut rx: mpsc::Receiver<String>) {
while let Some(msg) = rx.recv().await {
println!("Received: {}", msg);
sleep(Duration::from_millis(50)).await;
}
println!("All producers done");
}
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(10);
// Spawn multiple producers
for i in 1..=3 {
let tx_clone = tx.clone();
tokio::spawn(producer(i, tx_clone));
}
// Drop the original sender so the channel closes when all producers are done
drop(tx);
// Start the consumer
consumer(rx).await;
}

Broadcast Channel for Pub/Sub#

use tokio::sync::broadcast;
use tokio::time::{sleep, Duration};
#[derive(Debug, Clone)]
enum Event {
UserJoined(String),
MessageSent { user: String, message: String },
UserLeft(String),
}
async fn event_publisher(tx: broadcast::Sender<Event>) {
let events = vec![
Event::UserJoined("Alice".to_string()),
Event::MessageSent {
user: "Alice".to_string(),
message: "Hello everyone!".to_string(),
},
Event::UserJoined("Bob".to_string()),
Event::MessageSent {
user: "Bob".to_string(),
message: "Hi Alice!".to_string(),
},
Event::UserLeft("Alice".to_string()),
];
for event in events {
println!("Publishing: {:?}", event);
if tx.send(event).is_err() {
break;
}
sleep(Duration::from_secs(1)).await;
}
}
async fn event_subscriber(name: &str, mut rx: broadcast::Receiver<Event>) {
while let Ok(event) = rx.recv().await {
println!("{} received: {:?}", name, event);
}
println!("{} finished", name);
}
#[tokio::main]
async fn main() {
let (tx, _) = broadcast::channel(16);
// Create multiple subscribers
let rx1 = tx.subscribe();
let rx2 = tx.subscribe();
// Spawn subscribers
let sub1 = tokio::spawn(event_subscriber("Subscriber 1", rx1));
let sub2 = tokio::spawn(event_subscriber("Subscriber 2", rx2));
// Spawn publisher
let pub_task = tokio::spawn(event_publisher(tx));
// Wait for all tasks
let _ = tokio::join!(pub_task, sub1, sub2);
}

Real-World Examples#

Example 1: Web Scraper with Concurrent Downloads#

use reqwest;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::sync::Semaphore;
use std::sync::Arc;
use std::path::Path;
struct Downloader {
client: reqwest::Client,
semaphore: Arc<Semaphore>,
}
impl Downloader {
fn new(max_concurrent: usize) -> Self {
Self {
client: reqwest::Client::new(),
semaphore: Arc::new(Semaphore::new(max_concurrent)),
}
}
async fn download_file(&self, url: &str, path: &str) -> Result<(), Box<dyn std::error::Error>> {
let _permit = self.semaphore.acquire().await?;
println!("Downloading: {}", url);
let response = self.client.get(url).send().await?;
let bytes = response.bytes().await?;
let mut file = File::create(path).await?;
file.write_all(&bytes).await?;
println!("Saved: {} ({} bytes)", path, bytes.len());
Ok(())
}
async fn download_batch(&self, downloads: Vec<(&str, &str)>) {
let tasks: Vec<_> = downloads
.into_iter()
.map(|(url, path)| {
let url = url.to_string();
let path = path.to_string();
let downloader = self.clone();
tokio::spawn(async move {
if let Err(e) = downloader.download_file(&url, &path).await {
eprintln!("Failed to download {}: {}", url, e);
}
})
})
.collect();
for task in tasks {
let _ = task.await;
}
}
}
impl Clone for Downloader {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
semaphore: self.semaphore.clone(),
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let downloader = Downloader::new(3); // Max 3 concurrent downloads
let downloads = vec![
("https://www.rust-lang.org/logos/rust-logo-512x512.png", "rust-logo.png"),
("https://tokio.rs/img/tokio-horizontal.svg", "tokio-logo.svg"),
// Add more URLs as needed
];
downloader.download_batch(downloads).await;
Ok(())
}

Example 2: Chat Server with WebSockets#

use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, RwLock};
use std::collections::HashMap;
use std::sync::Arc;
use std::net::SocketAddr;
#[derive(Debug, Clone)]
struct Message {
user: String,
content: String,
}
struct ChatServer {
users: Arc<RwLock<HashMap<SocketAddr, String>>>,
broadcast: broadcast::Sender<Message>,
}
impl ChatServer {
fn new() -> Self {
let (tx, _) = broadcast::channel(100);
Self {
users: Arc::new(RwLock::new(HashMap::new())),
broadcast: tx,
}
}
async fn handle_client(&self, socket: TcpStream, addr: SocketAddr) {
let mut rx = self.broadcast.subscribe();
let users = self.users.clone();
let tx = self.broadcast.clone();
// Register user
{
let mut users_lock = users.write().await;
users_lock.insert(addr, format!("User_{}", addr.port()));
}
// Announce user joined
let join_msg = Message {
user: "System".to_string(),
content: format!("User {} joined", addr),
};
let _ = tx.send(join_msg);
// Handle messages (simplified - in real app, use WebSocket protocol)
tokio::spawn(async move {
loop {
tokio::select! {
msg = rx.recv() => {
match msg {
Ok(msg) => {
// Send message to client
println!("Broadcasting to {}: {:?}", addr, msg);
}
Err(_) => break,
}
}
}
}
// User disconnected
let mut users_lock = users.write().await;
users_lock.remove(&addr);
let leave_msg = Message {
user: "System".to_string(),
content: format!("User {} left", addr),
};
let _ = tx.send(leave_msg);
});
}
async fn run(&self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(addr).await?;
println!("Chat server listening on {}", addr);
loop {
let (socket, addr) = listener.accept().await?;
self.handle_client(socket, addr).await;
}
}
}

Performance Tips and Best Practices#

1. Choose the Right Runtime Features#

# Minimal features for specific use cases
[dependencies]
tokio = { version = "1.43", features = ["rt-multi-thread", "net", "time"] }
# Full features for development
[dependencies]
tokio = { version = "1.43", features = ["full"] }

2. Avoid Blocking Operations#

// Bad - blocks the async runtime
async fn bad_example() {
std::thread::sleep(std::time::Duration::from_secs(1)); // BLOCKING!
std::fs::read_to_string("file.txt").unwrap(); // BLOCKING!
}
// Good - use async alternatives
async fn good_example() {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
tokio::fs::read_to_string("file.txt").await.unwrap();
}
// For CPU-intensive work, use spawn_blocking
async fn cpu_intensive_example() {
let result = tokio::task::spawn_blocking(|| {
// CPU-intensive computation
expensive_computation()
}).await.unwrap();
}

3. Buffer Sizes Matter#

// Configure channel buffer sizes based on workload
let (tx, rx) = mpsc::channel(100); // Buffered channel
// For unbounded channels (use with caution)
let (tx, rx) = mpsc::unbounded_channel();

4. Use Select for Complex Control Flow#

use tokio::select;
async fn complex_flow() {
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut shutdown = tokio::signal::ctrl_c();
loop {
select! {
_ = interval.tick() => {
println!("Tick");
}
_ = &mut shutdown => {
println!("Shutting down");
break;
}
}
}
}

5. Handle Backpressure#

async fn handle_backpressure(tx: mpsc::Sender<String>) {
for i in 0..1000 {
// Check if receiver is keeping up
if tx.capacity() == 0 {
println!("Channel full, applying backpressure");
tokio::time::sleep(Duration::from_millis(100)).await;
}
let _ = tx.send(format!("Message {}", i)).await;
}
}

6. Monitor Task Health#

use std::time::Instant;
async fn monitored_task(id: u32) {
let start = Instant::now();
// Task work here
tokio::time::sleep(Duration::from_secs(2)).await;
let duration = start.elapsed();
if duration > Duration::from_secs(1) {
eprintln!("Task {} took too long: {:?}", id, duration);
}
}

Common Pitfalls and Solutions#

Pitfall 1: Forgetting to Await#

// Wrong - Future is created but not executed
async fn wrong() {
fetch_data(); // Missing .await
}
// Correct
async fn correct() {
fetch_data().await;
}

Pitfall 2: Holding Locks Across Await Points#

use tokio::sync::Mutex;
// Bad - holds lock across await
async fn bad_lock_usage(data: Arc<Mutex<Vec<String>>>) {
let mut guard = data.lock().await;
tokio::time::sleep(Duration::from_secs(1)).await; // Lock held for 1 second!
guard.push("data".to_string());
}
// Good - minimize lock scope
async fn good_lock_usage(data: Arc<Mutex<Vec<String>>>) {
tokio::time::sleep(Duration::from_secs(1)).await;
let mut guard = data.lock().await;
guard.push("data".to_string());
// Lock automatically released here
}

Pitfall 3: Not Handling Task Panics#

// Handle panics gracefully
async fn safe_task_spawn() {
let handle = tokio::spawn(async {
// Task that might panic
panic!("Oh no!");
});
match handle.await {
Ok(_) => println!("Task completed"),
Err(e) if e.is_panic() => {
println!("Task panicked: {:?}", e);
}
Err(e) => println!("Task error: {:?}", e),
}
}

Conclusion#

Tokio provides a powerful foundation for building high-performance async applications in Rust. By understanding its core concepts and following best practices, you can build servers that handle thousands of concurrent connections efficiently.

Key takeaways:

  • Use tokio::spawn for concurrent task execution
  • Leverage channels for inter-task communication
  • Avoid blocking operations in async contexts
  • Use select! for complex control flow
  • Handle errors and panics gracefully
  • Monitor and profile your async applications

Next Steps#

Ready to dive deeper into async Rust? Check out:

Happy async coding!

Async Rust with Tokio - Building High-Performance Network Applications
https://mranv.pages.dev/posts/async-rust-tokio-practical-guide/
Author
Anubhav Gain
Published at
2025-01-10
License
CC BY-NC-SA 4.0