Skip to main content

Topic Manager

TopicManager provides centralized management for publishers and subscribers with automatic discovery, shared session management, and type safety validation.

What is TopicManager?

TopicManager is a centralized manager that coordinates publishers and subscribers across your application. It provides:
  • Shared Zenoh Session: Single session for all network operations (reduces overhead)
  • Automatic Discovery: Subscribers announce themselves, publishers auto-enable network
  • Type Safety: Validates that topics use consistent message types
  • Keep-Alive Tracking: Automatically manages subscriber connections
TopicManager is especially useful when you have multiple publishers and subscribers. It reduces resource overhead and simplifies connection management.

Why Use TopicManager?

Shared Session

Single Zenoh session for all pub/sub pairs reduces memory and connection overhead

Automatic Discovery

Publishers automatically enable network when remote subscribers connect

Type Safety

Validates that all publishers/subscribers for a topic use the same message type

Connection Management

Automatic keep-alive tracking and cleanup of disconnected subscribers

Basic Usage

Creating a TopicManager

use cerulion_core::prelude::*;

let manager = TopicManager::create()?;
TopicManager creates a shared Zenoh session that will be used by all publishers and subscribers registered with it.

Registering a Publisher

#[derive(Copy, Clone, Debug)]
#[repr(C)]
struct SensorData {
    temperature: f32,
    timestamp: u64,
}

// Register publisher (network disabled initially)
let publisher = manager.register_publisher::<SensorData>(
    "sensors/temperature",
    false  // Start with local-only
)?;
Starting with network disabled is recommended. The network will automatically enable when a remote subscriber is discovered.

Registering a Subscriber

// Register subscriber (auto-detects local vs network)
let subscriber = manager.register_subscriber::<SensorData>(
    "sensors/temperature",
    None  // Auto-detect: checks local availability first
)?;
Auto-detection logic:
  1. Check if local publisher exists (iceoryx2)
  2. If yes, use local transport (fast, zero-copy)
  3. If no, use network transport (Zenoh)

Complete Example

use cerulion_core::prelude::*;

#[derive(Copy, Clone, Debug)]
#[repr(C)]
struct SensorData {
    temperature: f32,
    timestamp: u64,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create manager
    let manager = TopicManager::create()?;
    
    // Register publisher (network disabled initially)
    let publisher = manager.register_publisher::<SensorData>(
        "sensors/temperature",
        false  // Start with local-only
    )?;
    
    // Register subscriber (auto-detects local transport)
    let subscriber = manager.register_subscriber::<SensorData>(
        "sensors/temperature",
        None  // Auto-detect
    )?;
    
    // Publish data
    let data = SensorData {
        temperature: 23.5,
        timestamp: 1234567890,
    };
    publisher.send(data)?;
    
    // Receive data
    if let Ok(Some(received)) = subscriber.receive() {
        println!("Received: temp={}°C", received.temperature);
    }
    
    Ok(())
}

Discovery Protocol

How Discovery Works

  1. Subscriber Connects: When a network subscriber is created, it sends a DiscoveryMessage on the _discovery channel
  2. Publisher Responds: TopicManager’s discovery thread receives the message and matches it to registered publishers
  3. Network Enable: Publisher’s network transport is automatically enabled
  4. Keep-Alive: Subscriber sends keep-alive messages every 20 seconds
  5. Cleanup: If keep-alive expires (60s timeout), network is automatically disabled

Discovery Message

The discovery protocol uses a simple message format:
#[repr(C)]
pub struct DiscoveryMessage {
    pub topic_hash: u64,              // Fast topic matching
    pub keep_alive_timeout_secs: u32,     // Default 60s
    pub subscriber_id: u64,                // Unique identifier
}
Topic hashing enables fast topic matching without string comparisons. The hash is computed from the topic name.

Force Network Mode

To force network transport (useful for remote subscribers):
// Force network mode to simulate remote subscriber
let remote_subscriber = manager.register_subscriber::<SensorData>(
    "sensors/temperature",
    Some(true)  // Force network
)?;

// Publisher's network will auto-enable via discovery protocol
// Wait briefly for discovery to propagate
std::thread::sleep(std::time::Duration::from_secs(1));

// Network should now be enabled
// (In real code, check publisher.is_network_enabled())
There’s a brief delay (typically < 1 second) between subscriber creation and network enable. This is normal and allows the discovery protocol to propagate.

Type Safety

TopicManager ensures type safety by validating that all publishers and subscribers for a topic use the same message type:
#[derive(Copy, Clone, Debug)]
#[repr(C)]
struct SensorData {
    temperature: f32,
}

#[derive(Copy, Clone, Debug)]
#[repr(C)]
struct DifferentType {
    value: u64,
}

// First registration establishes the type
let pub1 = manager.register_publisher::<SensorData>("topic", false)?;

// This will fail - type mismatch!
match manager.register_publisher::<DifferentType>("topic", false) {
    Ok(_) => println!("Success"),
    Err(e) => println!("Error: {}", e),  // "Type mismatch for topic 'topic'"
}
Type safety prevents common errors like sending the wrong message type or mismatched field layouts. This is especially important for cross-language communication.

Shared Session Benefits

Single Session vs Multiple Sessions

Single Shared Session (via TopicManager):
  • One Zenoh session for all pub/sub pairs
  • Lower memory footprint
  • Faster startup time
  • Centralized configuration
Multiple Independent Sessions:
  • Each publisher/subscriber creates its own session
  • Higher memory usage (each session has overhead)
  • Slower initialization (each session connects independently)

Performance Comparison

MetricSingle SessionMultiple Sessions
Session Creation~100ms~300ms+ (for 3 topics)
Memory UsageLower per topicHigher per topic
Message LatencySimilarSimilar
The performance difference is most noticeable when you have many topics. For a single topic, the difference is minimal.

Keep-Alive Management

How Keep-Alive Works

  1. Initial Discovery: Subscriber sends discovery message on creation
  2. Periodic Updates: Subscriber sends keep-alive every 20 seconds
  3. Timeout: If no keep-alive received for 60 seconds, subscriber is considered disconnected
  4. Cleanup: TopicManager’s cleanup thread runs every 10 seconds to remove expired subscribers
  5. Network Disable: When no active subscribers remain, publisher’s network is automatically disabled

Keep-Alive Configuration

Keep-alive timeout is configurable per subscriber:
// DiscoveryMessage includes keep_alive_timeout_secs
// Default is 60 seconds
// This is set automatically by the subscriber
The 60-second timeout with 20-second keep-alive intervals provides a good balance between responsiveness and network efficiency. Subscribers that disconnect are detected within 60 seconds.

API Reference

TopicManager

impl TopicManager {
    /// Create a new TopicManager with shared Zenoh session
    pub fn create() -> Result<Self, Box<dyn std::error::Error>>;
    
    /// Register a publisher with the manager
    pub fn register_publisher<T>(
        &self,
        topic: &str,
        enable_network_initial: bool,
    ) -> Result<Arc<Publisher<T>>, Box<dyn std::error::Error>>
    where
        T: SerializableMessage + Copy + Send + Sync + 'static;
    
    /// Register a subscriber with the manager
    /// network_mode: None = auto-detect, Some(true) = force network, Some(false) = force local
    pub fn register_subscriber<T>(
        &self,
        topic: &str,
        network_mode: Option<bool>,
    ) -> Result<Subscriber<T>, Box<dyn std::error::Error>>
    where
        T: SerializableMessage + Copy + Send + Sync + 'static;
    
    /// Check if a local IPC service exists for the topic
    pub fn check_local_availability(&self, topic: &str) -> bool;
    
    /// Get the shared Zenoh session
    pub fn session(&self) -> &Arc<zenoh::Session>;
}

Limitations

1. iceoryx2 Publisher Thread Safety

Publishers cannot be moved across threads due to iceoryx2’s internal single-threaded reference counting. Publishers must be used on the thread where they’re created.
If you need multi-threaded publishing, create separate publishers per thread or use synchronization primitives to coordinate access.

2. Keep-Alive Delay

Cleanup runs every 10 seconds, so there’s up to 10s delay before network is disabled after the last subscriber disconnects.
This delay is intentional to avoid rapid enable/disable cycles when subscribers reconnect quickly. The 10-second cleanup interval provides a good balance.

3. Type Erasure

TopicManager uses TypeId for type safety but can’t store typed references directly. Each publisher/subscriber is managed via atomic flags.

Best Practices

1. Start with Network Disabled

// ✅ Good: Start local-only, enable network on demand
let publisher = manager.register_publisher::<T>("topic", false)?;

// ⚠️ Only if you know remote subscribers exist
let publisher = manager.register_publisher::<T>("topic", true)?;

2. Use Auto-Detection

// ✅ Good: Let TopicManager choose best transport
let subscriber = manager.register_subscriber::<T>("topic", None)?;

// ⚠️ Only if you need specific behavior
let subscriber = manager.register_subscriber::<T>("topic", Some(true))?;

3. Keep Manager Alive

// ✅ Good: Manager stays alive for all pub/sub
let manager = TopicManager::create()?;
let pub1 = manager.register_publisher::<T1>("topic1", false)?;
let pub2 = manager.register_publisher::<T2>("topic2", false)?;

// ❌ Bad: Manager dropped too early
{
    let manager = TopicManager::create()?;
    let pub = manager.register_publisher::<T>("topic", false)?;
} // Manager dropped here
// Publisher may not work correctly

Next Steps