πŸ¦€ miniFIX Tutorial | Part 02

September 16, 2025 by CryptoPatrick Rust Tutorials

Streaming FIX Message Decoding with miniFIX

Introduction

In real-world trading applications, FIX messages often arrive as continuous streams over network connections. Unlike processing individual complete messages, streaming scenarios require handling partial messages, buffering incomplete data, and parsing messages as they arrive. miniFIX provides excellent support for streaming FIX message decoding.

What You’ll Learn

  • How to set up a streaming FIX decoder
  • Managing message buffers and partial reads
  • Parsing messages from continuous streams
  • Best practices for production streaming scenarios

The Challenge of Streaming

When FIX messages arrive over a network connection, they may come in chunks smaller than a complete message. You might receive:

  1. Part of a message
  2. Multiple messages in one chunk
  3. A complete message plus part of the next one

miniFIX’s streaming decoder handles these scenarios gracefully.

Setting Up a Streaming Decoder

Let’s create a streaming decoder that can handle multiple messages from a continuous stream:

use minifix::prelude::*;
use minifix::tagvalue::Decoder;
use std::io::{Cursor, Read};

fn create_streaming_decoder(fix_dictionary: Dictionary) -> Decoder {
    let mut decoder = Decoder::new(fix_dictionary);
    decoder.config_mut().separator = b'|';
    decoder
}

Sample Stream Data

For this example, we’ll simulate a stream containing multiple FIX messages:

const FIX_MESSAGES: &[&[u8]] = &[
    b"8=FIX.4.2|9=97|35=6|49=BKR|56=IM|34=14|52=20100204-09:18:42|23=115685|28=N|55=SPMI.MI|54=2|44=2200.75|27=S|25=H|10=248|",
    b"8=FIX.4.2|9=40|35=D|49=AFUNDMGR|56=ABROKER|15=USD|59=0|10=091|",
];

fn create_fix_stream() -> Vec<u8> {
    FIX_MESSAGES.iter().copied().flatten().copied().collect()
}

The Streaming Parse Loop

The core of streaming FIX message parsing involves a careful loop that reads data and attempts to parse complete messages:

fn main() {
    let fix_dictionary = Dictionary::fix42();
    let mut fix_decoder = create_streaming_decoder(fix_dictionary).streaming(vec![]);
    let mut stream = Cursor::new(create_fix_stream());
    
    for message_num in 0..FIX_MESSAGES.len() {
        loop {
            // Fill the decoder's buffer with available data
            stream.read_exact(fix_decoder.fillable()).unwrap();
            
            // Try to parse a complete message
            match fix_decoder.try_parse().unwrap() {
                Some(_) => {
                    // Successfully parsed a complete message
                    let msg = fix_decoder.message();
                    process_message(&msg, message_num);
                    
                    // Clear decoder to prepare for next message
                    fix_decoder.clear();
                    break;
                }
                None => {
                    // Still parsing - need more data
                    println!("Partial message received, waiting for more data...");
                }
            }
        }
    }
}

fn process_message(msg: &impl GetField<u32>, message_num: usize) {
    println!("Message {}: {}", message_num, 
        String::from_utf8_lossy(msg.get(fix42::BEGIN_STRING).unwrap()));
        
    // Process the message based on its type
    match msg.get(fix42::MSG_TYPE) {
        Ok(b"6") => println!("  Indication of Interest"),
        Ok(b"D") => println!("  New Order Single"),
        _ => println!("  Other message type"),
    }
}

Key Concepts

1. Buffer Management

The streaming decoder maintains an internal buffer that you can fill with incoming data:

// Get the fillable portion of the buffer
let fillable_buffer = fix_decoder.fillable();

// Read data into the buffer (must use read_exact)
stream.read_exact(fillable_buffer).unwrap();

2. Partial Parsing

The try_parse() method attempts to parse a complete message from the current buffer:

  • Some(_): A complete message was successfully parsed
  • None: More data is needed to complete the current message

3. Message Clearing

After processing a message, call clear() to prepare the decoder for the next message:

fix_decoder.clear();

Production Considerations

Error Handling

In production, you’ll want robust error handling:

match fix_decoder.try_parse() {
    Ok(Some(_)) => {
        // Process message
        let msg = fix_decoder.message();
        process_message(&msg);
        fix_decoder.clear();
    },
    Ok(None) => {
        // Need more data
        continue;
    },
    Err(e) => {
        // Handle parsing error
        eprintln!("FIX parsing error: {:?}", e);
        // Decide whether to reset decoder or handle gracefully
    }
}

Network Integration

With real network streams, you might use this pattern:

use tokio::net::TcpStream;
use tokio::io::AsyncReadExt;

async fn process_fix_stream(mut stream: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
    let fix_dictionary = Dictionary::fix42();
    let mut fix_decoder = Decoder::new(fix_dictionary).streaming(vec![]);
    
    loop {
        // Read data from network
        let bytes_read = stream.read(fix_decoder.fillable()).await?;
        if bytes_read == 0 {
            break; // Connection closed
        }
        
        // Try to parse messages
        while let Some(_) = fix_decoder.try_parse()? {
            let msg = fix_decoder.message();
            process_message(&msg);
            fix_decoder.clear();
        }
    }
    
    Ok(())
}

Complete Streaming Example

use minifix::prelude::*;
use minifix::tagvalue::Decoder;
use std::io::{Cursor, Read};

const FIX_MESSAGES: &[&[u8]] = &[
    b"8=FIX.4.2|9=97|35=6|49=BKR|56=IM|34=14|52=20100204-09:18:42|23=115685|28=N|55=SPMI.MI|54=2|44=2200.75|27=S|25=H|10=248|",
    b"8=FIX.4.2|9=40|35=D|49=AFUNDMGR|56=ABROKER|15=USD|59=0|10=091|",
];

fn create_fix_stream() -> Vec<u8> {
    FIX_MESSAGES.iter().copied().flatten().copied().collect()
}

fn main() {
    let fix_dictionary = Dictionary::fix42();
    let mut fix_decoder = Decoder::new(fix_dictionary).streaming(vec![]);
    fix_decoder.config_mut().separator = b'|';
    
    let mut stream = Cursor::new(create_fix_stream());
    let mut message_count = 0;
    
    for _expected_message in 0..FIX_MESSAGES.len() {
        loop {
            // Read data into decoder buffer
            if stream.read_exact(fix_decoder.fillable()).is_err() {
                break;
            }
            
            match fix_decoder.try_parse() {
                Ok(Some(_)) => {
                    message_count += 1;
                    let msg = fix_decoder.message();
                    
                    println!("πŸ“¨ Message #{}: {}", 
                        message_count,
                        String::from_utf8_lossy(msg.get(fix42::BEGIN_STRING).unwrap())
                    );
                    
                    println!("   Sender: {}", 
                        String::from_utf8_lossy(msg.get(fix42::SENDER_COMP_ID).unwrap())
                    );
                    
                    fix_decoder.clear();
                    break;
                }
                Ok(None) => {
                    println!("⏳ Waiting for more data...");
                }
                Err(e) => {
                    eprintln!("❌ Parse error: {:?}", e);
                    break;
                }
            }
        }
    }
    
    println!("βœ… Processed {} messages", message_count);
}

Benefits of Streaming Decoding

  1. Memory Efficiency: Only allocates buffers as needed
  2. Real-time Processing: Messages processed as soon as they’re complete
  3. Robust Handling: Gracefully handles partial messages and network chunking
  4. Performance: Minimal copying and efficient buffer management

Best Practices

  1. Always use read_exact(): This ensures the buffer is properly filled
  2. Handle partial reads: Network connections may not deliver complete messages at once
  3. Clear after processing: Always call clear() after processing each message
  4. Error recovery: Implement strategies for handling malformed messages
  5. Buffer sizing: Consider the typical message sizes in your application

Next Steps

Streaming FIX decoding opens up possibilities for:

  • Real-time market data processing
  • Order management systems
  • FIX gateway implementations
  • High-frequency trading applications

Combined with Rust’s async capabilities and miniFIX’s performance, you can build robust, high-performance FIX processing systems.


Next: Part 03 - Encoding FIX Messages

'I write to understand as much as to be understood.' β€”Elie Wiesel
(c) 2024 CryptoPatrick