πŸ¦€ miniFIX Tutorial | Part 07

September 21, 2025 by CryptoPatrick Rust Tutorials

Building High-Performance SOFH TCP Servers with miniFIX and Tokio

Introduction

Simple Open Framing Header (SOFH) is a lightweight framing protocol often used to wrap FIX messages for reliable transport over TCP connections. When building high-throughput financial systems, combining SOFH framing with async Rust provides excellent performance characteristics. This post demonstrates how to build a SOFH-enabled TCP server using miniFIX and Tokio.

What You’ll Learn

  • Understanding SOFH (Simple Open Framing Header) protocol
  • Building async TCP servers with Tokio
  • Using codec patterns for message framing
  • Handling streaming FIX message parsing
  • Creating production-ready financial networking code

Understanding SOFH

SOFH provides a simple length-prefixed framing mechanism that solves common TCP streaming problems:

  • Message Boundaries: Clearly delineates where each message begins and ends
  • Binary Safe: Handles any payload content including binary data
  • Minimal Overhead: Only 6 bytes per message
  • Simple Implementation: Easy to implement and debug

SOFH Frame Structure

+------------------+------------------+------------------+
|    Start of      |    Encoding     |     Message      |
|   Message (2)    |    Type (2)     |    Length (2)    |
+------------------+------------------+------------------+
|                                                       |
|                Message Payload                        |
|                 (Variable Length)                     |
+-------------------------------------------------------+
  • Start of Message: Always 0x0000 (2 bytes)
  • Encoding Type: Message format identifier (2 bytes)
  • Message Length: Payload length in bytes (2 bytes)
  • Payload: The actual message content

Setting Up the Async TCP Server

Let’s create a basic SOFH TCP server that can handle multiple concurrent connections:

use futures_util::stream::StreamExt;
use std::io;
use tokio::net::TcpListener;
use tokio_util::codec::Decoder;

#[tokio::main]
async fn main() -> io::Result<()> {
    println!("πŸš€ Starting SOFH TCP Server on 127.0.0.1:8080");
    
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    
    loop {
        match listener.accept().await {
            Ok((socket, addr)) => {
                println!("πŸ“‘ New connection from: {}", addr);
                
                // Spawn a task to handle each connection
                tokio::spawn(async move {
                    if let Err(e) = handle_connection(socket).await {
                        eprintln!("❌ Connection error: {}", e);
                    }
                });
            }
            Err(e) => {
                eprintln!("⚠️ Failed to accept connection: {}", e);
            }
        }
    }
}

async fn handle_connection(socket: tokio::net::TcpStream) -> io::Result<()> {
    let mut decoder = minisofh::TokioCodec::default().framed(socket);
    
    while let Some(frame_result) = decoder.next().await {
        match frame_result {
            Ok(frame) => {
                process_sofh_frame(frame).await;
            }
            Err(e) => {
                eprintln!("πŸ”§ Frame decode error: {}", e);
                break;
            }
        }
    }
    
    println!("πŸ“΄ Connection closed");
    Ok(())
}

Processing SOFH Frames

Once we receive a SOFH frame, we need to extract and process the payload:

async fn process_sofh_frame(frame: minisofh::Frame) {
    let payload = frame.payload();
    let encoding_type = frame.encoding_type();
    
    println!("πŸ“¨ Received SOFH frame:");
    println!("   Encoding Type: {}", encoding_type);
    println!("   Payload Length: {} bytes", payload.len());
    
    // Convert payload to UTF-8 for display (assuming text content)
    let payload_str = String::from_utf8_lossy(payload);
    println!("   Content: '{}'", payload_str);
    
    // Process based on encoding type
    match encoding_type {
        0x4649 => {
            // "FI" - FIX message
            process_fix_message(payload).await;
        }
        0x4A53 => {
            // "JS" - JSON message  
            process_json_message(payload).await;
        }
        _ => {
            println!("⚠️ Unknown encoding type: {:#06x}", encoding_type);
        }
    }
}

async fn process_fix_message(payload: &[u8]) {
    println!("πŸ”§ Processing FIX message: {}", String::from_utf8_lossy(payload));
    
    // Here you would typically use miniFIX to parse the FIX message
    // let fix_dictionary = Dictionary::fix44();
    // let mut decoder = Decoder::new(fix_dictionary);
    // match decoder.decode(payload) {
    //     Ok(msg) => { /* process FIX message */ }
    //     Err(e) => eprintln!("FIX decode error: {}", e),
    // }
}

async fn process_json_message(payload: &[u8]) {
    println!("πŸ“‹ Processing JSON message: {}", String::from_utf8_lossy(payload));
    
    // Process JSON payload
    // match serde_json::from_slice::<serde_json::Value>(payload) {
    //     Ok(json) => { /* process JSON */ }
    //     Err(e) => eprintln!("JSON parse error: {}", e),
    // }
}

Integrating FIX Message Processing

Let’s integrate miniFIX for proper FIX message handling within SOFH frames:

use minifix::prelude::*;
use minifix::tagvalue::Decoder as FixDecoder;

struct SofhFixServer {
    fix_decoder: FixDecoder,
}

impl SofhFixServer {
    fn new() -> Self {
        let fix_dictionary = Dictionary::fix44();
        let fix_decoder = FixDecoder::new(fix_dictionary);
        
        Self { fix_decoder }
    }
    
    async fn handle_fix_frame(&mut self, payload: &[u8]) -> io::Result<()> {
        match self.fix_decoder.decode(payload) {
            Ok(msg) => {
                self.process_fix_message(msg).await?;
            }
            Err(e) => {
                eprintln!("❌ FIX decode error: {:?}", e);
            }
        }
        
        Ok(())
    }
    
    async fn process_fix_message(&self, msg: impl GetField<u32>) -> io::Result<()> {
        // Extract common fields
        let msg_type = msg.get(fix44::MSG_TYPE)
            .map(|bytes| String::from_utf8_lossy(bytes))
            .unwrap_or_else(|_| "UNKNOWN".into());
            
        let sender_comp_id = msg.get(fix44::SENDER_COMP_ID)
            .map(|bytes| String::from_utf8_lossy(bytes))
            .unwrap_or_else(|_| "UNKNOWN".into());
            
        println!("πŸ“ˆ FIX Message Details:");
        println!("   Type: {}", msg_type);
        println!("   Sender: {}", sender_comp_id);
        
        // Route based on message type
        match msg_type.as_ref() {
            "D" => self.handle_new_order_single(msg).await?,
            "F" => self.handle_order_cancel_request(msg).await?,
            "8" => self.handle_execution_report(msg).await?,
            _ => {
                println!("⚠️ Unhandled message type: {}", msg_type);
            }
        }
        
        Ok(())
    }
    
    async fn handle_new_order_single(&self, msg: impl GetField<u32>) -> io::Result<()> {
        println!("πŸ“ Processing New Order Single");
        
        if let Ok(symbol_bytes) = msg.get(fix44::SYMBOL) {
            let symbol = String::from_utf8_lossy(symbol_bytes);
            println!("   Symbol: {}", symbol);
        }
        
        if let Ok(side_bytes) = msg.get(fix44::SIDE) {
            let side = match side_bytes {
                b"1" => "BUY",
                b"2" => "SELL", 
                _ => "UNKNOWN"
            };
            println!("   Side: {}", side);
        }
        
        if let Ok(qty) = msg.get::<i32>(fix44::ORDER_QTY) {
            println!("   Quantity: {}", qty);
        }
        
        // TODO: Implement order processing logic
        println!("βœ… Order processed successfully");
        
        Ok(())
    }
    
    async fn handle_order_cancel_request(&self, _msg: impl GetField<u32>) -> io::Result<()> {
        println!("❌ Processing Order Cancel Request");
        // TODO: Implement cancel logic
        Ok(())
    }
    
    async fn handle_execution_report(&self, _msg: impl GetField<u32>) -> io::Result<()> {
        println!("πŸ“Š Processing Execution Report");  
        // TODO: Implement execution report processing
        Ok(())
    }
}

Enhanced Connection Handler

Let’s create a more sophisticated connection handler that maintains state per connection:

use std::sync::Arc;
use tokio::sync::Mutex;

async fn handle_connection_enhanced(socket: tokio::net::TcpStream) -> io::Result<()> {
    let peer_addr = socket.peer_addr()?;
    println!("πŸ”— Handling connection from: {}", peer_addr);
    
    let server = Arc::new(Mutex::new(SofhFixServer::new()));
    let mut decoder = minisofh::TokioCodec::default().framed(socket);
    let mut message_count = 0u64;
    
    while let Some(frame_result) = decoder.next().await {
        match frame_result {
            Ok(frame) => {
                message_count += 1;
                
                println!("πŸ“¨ Message #{} from {}", message_count, peer_addr);
                
                let encoding_type = frame.encoding_type();
                let payload = frame.payload();
                
                match encoding_type {
                    0x4649 => { // "FI" - FIX message
                        let mut server_guard = server.lock().await;
                        if let Err(e) = server_guard.handle_fix_frame(payload).await {
                            eprintln!("🚨 FIX processing error: {}", e);
                        }
                    }
                    _ => {
                        println!("⚠️ Unsupported encoding type: {:#06x}", encoding_type);
                    }
                }
            }
            Err(e) => {
                eprintln!("πŸ’₯ Frame decode error from {}: {}", peer_addr, e);
                break;
            }
        }
    }
    
    println!("πŸ‘‹ Connection from {} closed after {} messages", peer_addr, message_count);
    Ok(())
}

Production-Ready Server Features

Connection Pool Management

use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};

struct ConnectionManager {
    connections: Arc<Mutex<HashMap<String, ConnectionInfo>>>,
    next_connection_id: AtomicU64,
}

struct ConnectionInfo {
    id: u64,
    peer_addr: std::net::SocketAddr,
    connected_at: std::time::Instant,
    message_count: Arc<AtomicU64>,
}

impl ConnectionManager {
    fn new() -> Self {
        Self {
            connections: Arc::new(Mutex::new(HashMap::new())),
            next_connection_id: AtomicU64::new(1),
        }
    }
    
    async fn add_connection(&self, peer_addr: std::net::SocketAddr) -> u64 {
        let connection_id = self.next_connection_id.fetch_add(1, Ordering::SeqCst);
        let key = format!("{}:{}", peer_addr.ip(), peer_addr.port());
        
        let info = ConnectionInfo {
            id: connection_id,
            peer_addr,
            connected_at: std::time::Instant::now(),
            message_count: Arc::new(AtomicU64::new(0)),
        };
        
        let mut connections = self.connections.lock().await;
        connections.insert(key, info);
        
        connection_id
    }
    
    async fn remove_connection(&self, peer_addr: std::net::SocketAddr) {
        let key = format!("{}:{}", peer_addr.ip(), peer_addr.port());
        let mut connections = self.connections.lock().await;
        connections.remove(&key);
    }
    
    async fn get_stats(&self) -> (usize, u64) {
        let connections = self.connections.lock().await;
        let connection_count = connections.len();
        let total_messages: u64 = connections
            .values()
            .map(|info| info.message_count.load(Ordering::SeqCst))
            .sum();
            
        (connection_count, total_messages)
    }
}

Configuration Management

use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
pub struct ServerConfig {
    pub bind_address: String,
    pub bind_port: u16,
    pub max_connections: usize,
    pub message_buffer_size: usize,
    pub fix_version: String,
}

impl Default for ServerConfig {
    fn default() -> Self {
        Self {
            bind_address: "127.0.0.1".to_string(),
            bind_port: 8080,
            max_connections: 1000,
            message_buffer_size: 8192,
            fix_version: "FIX.4.4".to_string(),
        }
    }
}

impl ServerConfig {
    pub fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
        let contents = std::fs::read_to_string(path)?;
        let config: Self = toml::from_str(&contents)?;
        Ok(config)
    }
}

Complete Production Server

use futures_util::stream::StreamExt;
use std::io;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::Mutex;

pub struct SofhFixServer {
    config: ServerConfig,
    connection_manager: ConnectionManager,
    fix_decoder: Arc<Mutex<FixDecoder>>,
}

impl SofhFixServer {
    pub fn new(config: ServerConfig) -> Self {
        let fix_dictionary = Dictionary::fix44();
        let fix_decoder = Arc::new(Mutex::new(FixDecoder::new(fix_dictionary)));
        
        Self {
            config,
            connection_manager: ConnectionManager::new(),
            fix_decoder,
        }
    }
    
    pub async fn start(&self) -> io::Result<()> {
        let bind_addr = format!("{}:{}", self.config.bind_address, self.config.bind_port);
        let listener = TcpListener::bind(&bind_addr).await?;
        
        println!("πŸš€ SOFH FIX Server listening on {}", bind_addr);
        
        // Spawn stats reporting task
        let connection_manager = self.connection_manager.clone();
        tokio::spawn(async move {
            let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
            loop {
                interval.tick().await;
                let (connections, messages) = connection_manager.get_stats().await;
                println!("πŸ“Š Stats: {} connections, {} total messages", connections, messages);
            }
        });
        
        // Accept connections
        loop {
            match listener.accept().await {
                Ok((socket, peer_addr)) => {
                    println!("πŸ”— New connection from: {}", peer_addr);
                    
                    let connection_id = self.connection_manager.add_connection(peer_addr).await;
                    let connection_manager = self.connection_manager.clone();
                    let fix_decoder = self.fix_decoder.clone();
                    
                    tokio::spawn(async move {
                        if let Err(e) = Self::handle_connection(
                            socket,
                            connection_id,
                            connection_manager,
                            fix_decoder,
                        ).await {
                            eprintln!("❌ Connection {} error: {}", connection_id, e);
                        }
                    });
                }
                Err(e) => {
                    eprintln!("⚠️ Accept error: {}", e);
                }
            }
        }
    }
    
    async fn handle_connection(
        socket: tokio::net::TcpStream,
        connection_id: u64,
        connection_manager: Arc<ConnectionManager>,
        fix_decoder: Arc<Mutex<FixDecoder>>,
    ) -> io::Result<()> {
        let peer_addr = socket.peer_addr()?;
        let mut decoder = minisofh::TokioCodec::default().framed(socket);
        
        while let Some(frame_result) = decoder.next().await {
            match frame_result {
                Ok(frame) => {
                    if frame.encoding_type() == 0x4649 { // FIX message
                        let mut fix_decoder_guard = fix_decoder.lock().await;
                        match fix_decoder_guard.decode(frame.payload()) {
                            Ok(msg) => {
                                // Process FIX message
                                println!("βœ… Processed FIX message from connection {}", connection_id);
                            }
                            Err(e) => {
                                eprintln!("❌ FIX decode error: {:?}", e);
                            }
                        }
                    }
                }
                Err(e) => {
                    eprintln!("πŸ”§ Frame error: {}", e);
                    break;
                }
            }
        }
        
        connection_manager.remove_connection(peer_addr).await;
        println!("πŸ‘‹ Connection {} closed", connection_id);
        
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = ServerConfig::load_from_file("server_config.toml")
        .unwrap_or_else(|_| ServerConfig::default());
    
    let server = SofhFixServer::new(config);
    server.start().await?;
    
    Ok(())
}

Benefits of SOFH with miniFIX

  1. Message Boundaries: SOFH solves TCP streaming issues elegantly
  2. Performance: Tokio’s async I/O handles thousands of concurrent connections
  3. Type Safety: miniFIX provides compile-time FIX message validation
  4. Framing Flexibility: Support multiple message formats in one connection
  5. Production Ready: Built-in connection management and monitoring

Testing the Server

Create a simple client to test your server:

use tokio::net::TcpStream;
use tokio::io::AsyncWriteExt;

async fn test_client() -> Result<(), Box<dyn std::error::Error>> {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
    
    // Create a SOFH-wrapped FIX message
    let fix_message = b"8=FIX.4.4|9=49|35=D|49=CLIENT|56=SERVER|34=1|52=20241201-10:30:15|11=ORDER1|55=AAPL|54=1|38=100|40=2|44=150.25|10=123|";
    
    // SOFH header: Start(0x0000) + Encoding(0x4649="FI") + Length
    let sofh_header = [
        0x00, 0x00,  // Start of message
        0x46, 0x49,  // Encoding type "FI" (FIX)
        ((fix_message.len() >> 8) & 0xFF) as u8,  // Length high byte
        (fix_message.len() & 0xFF) as u8,         // Length low byte
    ];
    
    // Send SOFH frame
    stream.write_all(&sofh_header).await?;
    stream.write_all(fix_message).await?;
    
    println!("πŸ“€ Sent SOFH-wrapped FIX message");
    
    Ok(())
}

This architecture provides a solid foundation for building high-performance FIX gateways, order management systems, and market data servers using modern Rust async patterns.


Next: Part 08 - Testing Strategies

'I write to understand as much as to be understood.' β€”Elie Wiesel
(c) 2024 CryptoPatrick