Skip to main content

Publisher & Subscriber

The Publisher and Subscriber are the core messaging APIs in Cerulion Core. This guide shows you how to create, configure, and use them effectively.

Creating a Publisher

Basic Publisher

Create a publisher for a topic:
use cerulion_core::prelude::*;

#[derive(Copy, Clone, Debug)]
#[repr(C)]
struct SensorData {
    temperature: f32,
    timestamp: u64,
}

let publisher = Publisher::<SensorData>::create("sensors/temperature")?;
The publisher automatically enables both local (iceoryx2) and network (Zenoh) transport. Local transport is always available, while network transport runs asynchronously on a background thread.

Publisher with Network Control

Control network transport explicitly:
// Create with network disabled initially
let publisher = Publisher::<SensorData>::create_with_session(
    "sensors/temperature",
    false,  // Start with network disabled
    None    // No shared session
)?;

// Enable network later (e.g., when remote subscriber discovered)
// This is typically handled by TopicManager

Sending Messages

Basic Send

Send a message to the topic:
let data = SensorData {
    temperature: 23.5,
    timestamp: 1234567890,
};

publisher.send(data)?;
The send() method returns immediately. For local subscribers, the message is available in < 1 μs. For network subscribers, serialization and network send happen asynchronously on a background thread.

Send Behavior

When you call send(), here’s what happens:
  1. Local path: Message is written directly to shared memory (synchronous, fast)
  2. Network path: Message is queued to background thread (non-blocking)
  3. Background thread: Serializes and sends over Zenoh (asynchronous)
The network path uses latest-message semantics. If the network is slow and multiple messages arrive, only the most recent message is sent. This ensures network subscribers always get the latest data.

Creating a Subscriber

Let the subscriber automatically choose the best transport:
let subscriber = Subscriber::<SensorData>::create(
    "sensors/temperature",
    None  // Auto-detect: tries local first, falls back to network
)?;
Auto-detection logic:
  1. Check if local publisher exists (iceoryx2)
  2. If yes, use local transport (fast, zero-copy)
  3. If no, use network transport (Zenoh)

Force Local Transport

Force local transport only:
let subscriber = Subscriber::<SensorData>::create(
    "sensors/temperature",
    Some(false)  // Force local transport
)?;
If you force local transport but no local publisher exists, receive() will always return None. Use this only when you’re certain a local publisher is running.

Force Network Transport

Force network transport (useful for remote subscribers):
let subscriber = Subscriber::<SensorData>::create(
    "sensors/temperature",
    Some(true)  // Force network transport
)?;
Network transport is useful for:
  • Cross-process communication
  • Remote machines
  • When you want to ensure network discovery works

Receiving Messages

Basic Receive

Receive a message from the topic:
match subscriber.receive() {
    Ok(Some(data)) => {
        println!("Received: temp={}°C", data.temperature);
    }
    Ok(None) => {
        // No message available (non-blocking)
    }
    Err(e) => {
        eprintln!("Error receiving: {}", e);
    }
}
The receive() method is non-blocking. It returns immediately with Some(data) if a message is available, or None if no message is ready.

Polling Pattern

For continuous message reception, use a polling loop:
loop {
    if let Ok(Some(data)) = subscriber.receive() {
        // Process message
        process_sensor_data(data);
    }
    
    // Small sleep to avoid busy-waiting
    std::thread::sleep(std::time::Duration::from_millis(1));
}
For high-frequency data, consider removing the sleep. The receive() call is very fast (< 1 μs for local transport), so busy-waiting may be acceptable for real-time applications.

Complete Example

Here’s a complete example with both publisher and subscriber:
use cerulion_core::prelude::*;

#[derive(Copy, Clone, Debug)]
#[repr(C)]
struct SensorData {
    temperature: f32,
    timestamp: u64,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create publisher
    let publisher = Publisher::<SensorData>::create("sensors/temperature")?;
    
    // Create subscriber (auto-detects local transport)
    let subscriber = Subscriber::<SensorData>::create("sensors/temperature", None)?;
    
    // Send multiple messages
    for i in 0..10 {
        let data = SensorData {
            temperature: 20.0 + (i as f32) * 0.5,
            timestamp: i as u64,
        };
        publisher.send(data)?;
        
        // Small delay to see messages arrive
        std::thread::sleep(std::time::Duration::from_millis(100));
    }
    
    // Receive messages
    let mut count = 0;
    loop {
        if let Ok(Some(data)) = subscriber.receive() {
            println!("Message {}: temp={}°C, time={}", 
                     count, data.temperature, data.timestamp);
            count += 1;
            
            if count >= 10 {
                break;
            }
        }
    }
    
    Ok(())
}

Transport Modes Comparison

ModeWhen to UseLatencySerialization
Auto-detectDefault choiceLocal: < 1 μs
Network: 1-10 ms
Local: None
Network: Yes
Force localSame-machine only< 1 μsNone
Force networkRemote or cross-process1-10 msYes

Using with TopicManager

For centralized management and automatic discovery, use TopicManager:
use cerulion_core::prelude::*;

let manager = TopicManager::create()?;

// Register publisher
let publisher = manager.register_publisher::<SensorData>(
    "sensors/temperature",
    false  // Start with network disabled
)?;

// Register subscriber (triggers discovery)
let subscriber = manager.register_subscriber::<SensorData>(
    "sensors/temperature",
    None  // Auto-detect
)?;

// Publisher's network will auto-enable when subscriber connects
See the Topic Manager guide for details.

Type Safety

Cerulion Core ensures type safety at multiple levels:

Compile-Time (Rust)

let publisher = Publisher::<SensorData>::create("topic")?;
let subscriber = Subscriber::<DifferentType>::create("topic", None)?;
// ❌ Compile error: type mismatch

Runtime (Python/C++)

Type validation occurs when messages are deserialized. Mismatched types result in deserialization errors:
# Type mismatch will cause deserialization error
try:
    data = WrongType.from_bytes(sample.payload)
except ValueError as e:
    print(f"Type mismatch: {e}")
Type safety prevents common errors like sending the wrong message type or mismatched field layouts. This is especially important for cross-language communication.

Error Handling

Common Errors

Topic already exists with different type:
// First publisher establishes type
let pub1 = Publisher::<SensorData>::create("topic")?;

// This will fail if DifferentType != SensorData
let pub2 = Publisher::<DifferentType>::create("topic")?;
// Error: Type mismatch for topic 'topic'
Network transport unavailable:
// If network transport fails to initialize
let publisher = Publisher::<SensorData>::create("topic")?;
// Network errors are logged but don't affect local publishing
Network failures are logged but don’t propagate to the caller. This ensures local communication continues even if the network is down. Check logs if network communication isn’t working.

Best Practices

1. Use Auto-Detection

Unless you have a specific reason, use auto-detection:
// ✅ Good: Auto-detects best transport
let subscriber = Subscriber::<T>::create("topic", None)?;

// ⚠️ Only if you need specific behavior
let subscriber = Subscriber::<T>::create("topic", Some(true))?;

2. Handle None Gracefully

Handle cases when no message is available:
// ✅ Good: Handle None case
if let Ok(Some(data)) = subscriber.receive() {
    process(data);
}

// ❌ Bad: Assumes message always available
let data = subscriber.receive()?.unwrap();

3. Use TopicManager for Multiple Topics

If you have multiple publishers/subscribers, use TopicManager for centralized management:
// ✅ Good: Centralized management
let manager = TopicManager::create()?;
let pub1 = manager.register_publisher::<T1>("topic1", false)?;
let pub2 = manager.register_publisher::<T2>("topic2", false)?;

4. Keep Publishers Alive

Publishers must remain in scope for subscribers to receive messages:
// ✅ Good: Publisher stays alive
let publisher = Publisher::<T>::create("topic")?;
// ... use publisher ...

// ❌ Bad: Publisher dropped too early
{
    let publisher = Publisher::<T>::create("topic")?;
} // Publisher dropped here
let subscriber = Subscriber::<T>::create("topic", None)?;
// Subscriber won't receive messages

Next Steps