Building High-Performance SOFH TCP Servers with miniFIX and Tokio
Introduction
Simple Open Framing Header (SOFH) is a lightweight framing protocol often used to wrap FIX messages for reliable transport over TCP connections. When building high-throughput financial systems, combining SOFH framing with async Rust provides excellent performance characteristics. This post demonstrates how to build a SOFH-enabled TCP server using miniFIX and Tokio.
What You’ll Learn
- Understanding SOFH (Simple Open Framing Header) protocol
- Building async TCP servers with Tokio
- Using codec patterns for message framing
- Handling streaming FIX message parsing
- Creating production-ready financial networking code
Understanding SOFH
SOFH provides a simple length-prefixed framing mechanism that solves common TCP streaming problems:
- Message Boundaries: Clearly delineates where each message begins and ends
- Binary Safe: Handles any payload content including binary data
- Minimal Overhead: Only 6 bytes per message
- Simple Implementation: Easy to implement and debug
SOFH Frame Structure
+------------------+------------------+------------------+
| Start of | Encoding | Message |
| Message (2) | Type (2) | Length (2) |
+------------------+------------------+------------------+
| |
| Message Payload |
| (Variable Length) |
+-------------------------------------------------------+
- Start of Message: Always
0x0000(2 bytes) - Encoding Type: Message format identifier (2 bytes)
- Message Length: Payload length in bytes (2 bytes)
- Payload: The actual message content
Setting Up the Async TCP Server
Let’s create a basic SOFH TCP server that can handle multiple concurrent connections:
use futures_util::stream::StreamExt;
use std::io;
use tokio::net::TcpListener;
use tokio_util::codec::Decoder;
#[tokio::main]
async fn main() -> io::Result<()> {
println!("π Starting SOFH TCP Server on 127.0.0.1:8080");
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
match listener.accept().await {
Ok((socket, addr)) => {
println!("π‘ New connection from: {}", addr);
// Spawn a task to handle each connection
tokio::spawn(async move {
if let Err(e) = handle_connection(socket).await {
eprintln!("β Connection error: {}", e);
}
});
}
Err(e) => {
eprintln!("β οΈ Failed to accept connection: {}", e);
}
}
}
}
async fn handle_connection(socket: tokio::net::TcpStream) -> io::Result<()> {
let mut decoder = minisofh::TokioCodec::default().framed(socket);
while let Some(frame_result) = decoder.next().await {
match frame_result {
Ok(frame) => {
process_sofh_frame(frame).await;
}
Err(e) => {
eprintln!("π§ Frame decode error: {}", e);
break;
}
}
}
println!("π΄ Connection closed");
Ok(())
}
Processing SOFH Frames
Once we receive a SOFH frame, we need to extract and process the payload:
async fn process_sofh_frame(frame: minisofh::Frame) {
let payload = frame.payload();
let encoding_type = frame.encoding_type();
println!("π¨ Received SOFH frame:");
println!(" Encoding Type: {}", encoding_type);
println!(" Payload Length: {} bytes", payload.len());
// Convert payload to UTF-8 for display (assuming text content)
let payload_str = String::from_utf8_lossy(payload);
println!(" Content: '{}'", payload_str);
// Process based on encoding type
match encoding_type {
0x4649 => {
// "FI" - FIX message
process_fix_message(payload).await;
}
0x4A53 => {
// "JS" - JSON message
process_json_message(payload).await;
}
_ => {
println!("β οΈ Unknown encoding type: {:#06x}", encoding_type);
}
}
}
async fn process_fix_message(payload: &[u8]) {
println!("π§ Processing FIX message: {}", String::from_utf8_lossy(payload));
// Here you would typically use miniFIX to parse the FIX message
// let fix_dictionary = Dictionary::fix44();
// let mut decoder = Decoder::new(fix_dictionary);
// match decoder.decode(payload) {
// Ok(msg) => { /* process FIX message */ }
// Err(e) => eprintln!("FIX decode error: {}", e),
// }
}
async fn process_json_message(payload: &[u8]) {
println!("π Processing JSON message: {}", String::from_utf8_lossy(payload));
// Process JSON payload
// match serde_json::from_slice::<serde_json::Value>(payload) {
// Ok(json) => { /* process JSON */ }
// Err(e) => eprintln!("JSON parse error: {}", e),
// }
}
Integrating FIX Message Processing
Let’s integrate miniFIX for proper FIX message handling within SOFH frames:
use minifix::prelude::*;
use minifix::tagvalue::Decoder as FixDecoder;
struct SofhFixServer {
fix_decoder: FixDecoder,
}
impl SofhFixServer {
fn new() -> Self {
let fix_dictionary = Dictionary::fix44();
let fix_decoder = FixDecoder::new(fix_dictionary);
Self { fix_decoder }
}
async fn handle_fix_frame(&mut self, payload: &[u8]) -> io::Result<()> {
match self.fix_decoder.decode(payload) {
Ok(msg) => {
self.process_fix_message(msg).await?;
}
Err(e) => {
eprintln!("β FIX decode error: {:?}", e);
}
}
Ok(())
}
async fn process_fix_message(&self, msg: impl GetField<u32>) -> io::Result<()> {
// Extract common fields
let msg_type = msg.get(fix44::MSG_TYPE)
.map(|bytes| String::from_utf8_lossy(bytes))
.unwrap_or_else(|_| "UNKNOWN".into());
let sender_comp_id = msg.get(fix44::SENDER_COMP_ID)
.map(|bytes| String::from_utf8_lossy(bytes))
.unwrap_or_else(|_| "UNKNOWN".into());
println!("π FIX Message Details:");
println!(" Type: {}", msg_type);
println!(" Sender: {}", sender_comp_id);
// Route based on message type
match msg_type.as_ref() {
"D" => self.handle_new_order_single(msg).await?,
"F" => self.handle_order_cancel_request(msg).await?,
"8" => self.handle_execution_report(msg).await?,
_ => {
println!("β οΈ Unhandled message type: {}", msg_type);
}
}
Ok(())
}
async fn handle_new_order_single(&self, msg: impl GetField<u32>) -> io::Result<()> {
println!("π Processing New Order Single");
if let Ok(symbol_bytes) = msg.get(fix44::SYMBOL) {
let symbol = String::from_utf8_lossy(symbol_bytes);
println!(" Symbol: {}", symbol);
}
if let Ok(side_bytes) = msg.get(fix44::SIDE) {
let side = match side_bytes {
b"1" => "BUY",
b"2" => "SELL",
_ => "UNKNOWN"
};
println!(" Side: {}", side);
}
if let Ok(qty) = msg.get::<i32>(fix44::ORDER_QTY) {
println!(" Quantity: {}", qty);
}
// TODO: Implement order processing logic
println!("β
Order processed successfully");
Ok(())
}
async fn handle_order_cancel_request(&self, _msg: impl GetField<u32>) -> io::Result<()> {
println!("β Processing Order Cancel Request");
// TODO: Implement cancel logic
Ok(())
}
async fn handle_execution_report(&self, _msg: impl GetField<u32>) -> io::Result<()> {
println!("π Processing Execution Report");
// TODO: Implement execution report processing
Ok(())
}
}
Enhanced Connection Handler
Let’s create a more sophisticated connection handler that maintains state per connection:
use std::sync::Arc;
use tokio::sync::Mutex;
async fn handle_connection_enhanced(socket: tokio::net::TcpStream) -> io::Result<()> {
let peer_addr = socket.peer_addr()?;
println!("π Handling connection from: {}", peer_addr);
let server = Arc::new(Mutex::new(SofhFixServer::new()));
let mut decoder = minisofh::TokioCodec::default().framed(socket);
let mut message_count = 0u64;
while let Some(frame_result) = decoder.next().await {
match frame_result {
Ok(frame) => {
message_count += 1;
println!("π¨ Message #{} from {}", message_count, peer_addr);
let encoding_type = frame.encoding_type();
let payload = frame.payload();
match encoding_type {
0x4649 => { // "FI" - FIX message
let mut server_guard = server.lock().await;
if let Err(e) = server_guard.handle_fix_frame(payload).await {
eprintln!("π¨ FIX processing error: {}", e);
}
}
_ => {
println!("β οΈ Unsupported encoding type: {:#06x}", encoding_type);
}
}
}
Err(e) => {
eprintln!("π₯ Frame decode error from {}: {}", peer_addr, e);
break;
}
}
}
println!("π Connection from {} closed after {} messages", peer_addr, message_count);
Ok(())
}
Production-Ready Server Features
Connection Pool Management
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
struct ConnectionManager {
connections: Arc<Mutex<HashMap<String, ConnectionInfo>>>,
next_connection_id: AtomicU64,
}
struct ConnectionInfo {
id: u64,
peer_addr: std::net::SocketAddr,
connected_at: std::time::Instant,
message_count: Arc<AtomicU64>,
}
impl ConnectionManager {
fn new() -> Self {
Self {
connections: Arc::new(Mutex::new(HashMap::new())),
next_connection_id: AtomicU64::new(1),
}
}
async fn add_connection(&self, peer_addr: std::net::SocketAddr) -> u64 {
let connection_id = self.next_connection_id.fetch_add(1, Ordering::SeqCst);
let key = format!("{}:{}", peer_addr.ip(), peer_addr.port());
let info = ConnectionInfo {
id: connection_id,
peer_addr,
connected_at: std::time::Instant::now(),
message_count: Arc::new(AtomicU64::new(0)),
};
let mut connections = self.connections.lock().await;
connections.insert(key, info);
connection_id
}
async fn remove_connection(&self, peer_addr: std::net::SocketAddr) {
let key = format!("{}:{}", peer_addr.ip(), peer_addr.port());
let mut connections = self.connections.lock().await;
connections.remove(&key);
}
async fn get_stats(&self) -> (usize, u64) {
let connections = self.connections.lock().await;
let connection_count = connections.len();
let total_messages: u64 = connections
.values()
.map(|info| info.message_count.load(Ordering::SeqCst))
.sum();
(connection_count, total_messages)
}
}
Configuration Management
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
pub struct ServerConfig {
pub bind_address: String,
pub bind_port: u16,
pub max_connections: usize,
pub message_buffer_size: usize,
pub fix_version: String,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
bind_address: "127.0.0.1".to_string(),
bind_port: 8080,
max_connections: 1000,
message_buffer_size: 8192,
fix_version: "FIX.4.4".to_string(),
}
}
}
impl ServerConfig {
pub fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
let contents = std::fs::read_to_string(path)?;
let config: Self = toml::from_str(&contents)?;
Ok(config)
}
}
Complete Production Server
use futures_util::stream::StreamExt;
use std::io;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
pub struct SofhFixServer {
config: ServerConfig,
connection_manager: ConnectionManager,
fix_decoder: Arc<Mutex<FixDecoder>>,
}
impl SofhFixServer {
pub fn new(config: ServerConfig) -> Self {
let fix_dictionary = Dictionary::fix44();
let fix_decoder = Arc::new(Mutex::new(FixDecoder::new(fix_dictionary)));
Self {
config,
connection_manager: ConnectionManager::new(),
fix_decoder,
}
}
pub async fn start(&self) -> io::Result<()> {
let bind_addr = format!("{}:{}", self.config.bind_address, self.config.bind_port);
let listener = TcpListener::bind(&bind_addr).await?;
println!("π SOFH FIX Server listening on {}", bind_addr);
// Spawn stats reporting task
let connection_manager = self.connection_manager.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
interval.tick().await;
let (connections, messages) = connection_manager.get_stats().await;
println!("π Stats: {} connections, {} total messages", connections, messages);
}
});
// Accept connections
loop {
match listener.accept().await {
Ok((socket, peer_addr)) => {
println!("π New connection from: {}", peer_addr);
let connection_id = self.connection_manager.add_connection(peer_addr).await;
let connection_manager = self.connection_manager.clone();
let fix_decoder = self.fix_decoder.clone();
tokio::spawn(async move {
if let Err(e) = Self::handle_connection(
socket,
connection_id,
connection_manager,
fix_decoder,
).await {
eprintln!("β Connection {} error: {}", connection_id, e);
}
});
}
Err(e) => {
eprintln!("β οΈ Accept error: {}", e);
}
}
}
}
async fn handle_connection(
socket: tokio::net::TcpStream,
connection_id: u64,
connection_manager: Arc<ConnectionManager>,
fix_decoder: Arc<Mutex<FixDecoder>>,
) -> io::Result<()> {
let peer_addr = socket.peer_addr()?;
let mut decoder = minisofh::TokioCodec::default().framed(socket);
while let Some(frame_result) = decoder.next().await {
match frame_result {
Ok(frame) => {
if frame.encoding_type() == 0x4649 { // FIX message
let mut fix_decoder_guard = fix_decoder.lock().await;
match fix_decoder_guard.decode(frame.payload()) {
Ok(msg) => {
// Process FIX message
println!("β
Processed FIX message from connection {}", connection_id);
}
Err(e) => {
eprintln!("β FIX decode error: {:?}", e);
}
}
}
}
Err(e) => {
eprintln!("π§ Frame error: {}", e);
break;
}
}
}
connection_manager.remove_connection(peer_addr).await;
println!("π Connection {} closed", connection_id);
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = ServerConfig::load_from_file("server_config.toml")
.unwrap_or_else(|_| ServerConfig::default());
let server = SofhFixServer::new(config);
server.start().await?;
Ok(())
}
Benefits of SOFH with miniFIX
- Message Boundaries: SOFH solves TCP streaming issues elegantly
- Performance: Tokio’s async I/O handles thousands of concurrent connections
- Type Safety: miniFIX provides compile-time FIX message validation
- Framing Flexibility: Support multiple message formats in one connection
- Production Ready: Built-in connection management and monitoring
Testing the Server
Create a simple client to test your server:
use tokio::net::TcpStream;
use tokio::io::AsyncWriteExt;
async fn test_client() -> Result<(), Box<dyn std::error::Error>> {
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
// Create a SOFH-wrapped FIX message
let fix_message = b"8=FIX.4.4|9=49|35=D|49=CLIENT|56=SERVER|34=1|52=20241201-10:30:15|11=ORDER1|55=AAPL|54=1|38=100|40=2|44=150.25|10=123|";
// SOFH header: Start(0x0000) + Encoding(0x4649="FI") + Length
let sofh_header = [
0x00, 0x00, // Start of message
0x46, 0x49, // Encoding type "FI" (FIX)
((fix_message.len() >> 8) & 0xFF) as u8, // Length high byte
(fix_message.len() & 0xFF) as u8, // Length low byte
];
// Send SOFH frame
stream.write_all(&sofh_header).await?;
stream.write_all(fix_message).await?;
println!("π€ Sent SOFH-wrapped FIX message");
Ok(())
}
This architecture provides a solid foundation for building high-performance FIX gateways, order management systems, and market data servers using modern Rust async patterns.