Skip to main content

Overview

TopicManager provides a centralized API for managing publishers and subscribers with automatic discovery, shared session management, and type safety. This reference documents all available methods organized by functionality.
For a quick start guide with examples, see the Quickstart page.

Quick Reference

MethodDescriptionCategory
create()Create a new TopicManager instanceInitialization
register_publisher()Register a publisher for a topicPublishing
send_message()Send a typed message to a topicPublishing
send()Send raw bytes to a topicPublishing
register_subscriber()Register a subscriber for a topicSubscribing
pull()Pull the latest message (non-blocking)Subscribing
check_local_availability()Check if local IPC is availableAdvanced
get_listener()Get iceoryx2 listener for a topicAdvanced
subscribe_to_topic()Subscribe with callback functionAdvanced
request_to_broadcast()Request topic broadcast from networkAdvanced
session()Get shared Zenoh sessionAdvanced

Initialization

Signature

pub fn create() -> Result<Self, Box<dyn std::error::Error>>;

Description

Creates a new TopicManager instance with a shared Zenoh session and iceoryx2 node. This is the entry point for all TopicManager operations.The manager automatically:
  • Initializes a shared Zenoh session for network operations
  • Creates a shared iceoryx2 node for local IPC
  • Starts the discovery thread for automatic subscriber detection
  • Initializes the schema registry if schema directories exist

Returns

  • Ok(TopicManager) - Successfully created manager
  • Err(e) - Error during initialization (e.g., Zenoh connection failed)

Example

use cerulion_core::prelude::*;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let manager = TopicManager::create()?;
    // Use manager for pub/sub operations
    Ok(())
}

Publishing Messages

Signature

pub fn register_publisher(
    &self,
    topic: &str,
    enable_network_initial: bool,
    is_variable_size: bool,
) -> Result<Arc<Publisher>, Box<dyn std::error::Error>>;

Description

Registers a publisher for a topic. Returns a shared publisher handle that can be cloned for use across different parts of your application.

Parameters

ParameterTypeDescription
topic&strTopic name to publish to
enable_network_initialboolIf true, enables network publishing initially. If false, network is enabled automatically when remote subscribers are discovered
is_variable_sizeboolIf true, uses variable-size allocation for large messages. If false, uses fixed-size allocation (faster but limited)

Returns

  • Ok(Arc<Publisher>) - Publisher handle for shared ownership
  • Err(e) - Error during publisher creation

Example

// Create publisher with network disabled initially
let publisher = manager.register_publisher("sensors/temperature", false, false)?;

// For large messages, use variable-size allocation
let large_publisher = manager.register_publisher("large_data", false, true)?;

Notes

Start with enable_network_initial: false to save resources. Network will be automatically enabled when remote subscribers are discovered via the discovery protocol.
The is_variable_size parameter affects memory allocation:
  • false (fixed-size): Faster allocation, but limited message size. Use for small, frequent messages.
  • true (variable-size): Slower allocation, but supports larger messages. Use when messages may exceed the fixed buffer size.

Signature

pub fn send_message<T>(
    &self,
    topic: &str,
    message: &T,
) -> Result<(), Box<dyn std::error::Error>>
where
    T: SerializableMessage;

Description

Sends a typed message to a topic. The message is automatically serialized using its to_bytes() implementation. This method provides compile-time type checking and automatic schema tracking.The method will:
  • Get or create a publisher for the topic
  • Serialize the message to bytes
  • Extract and track the schema name from the type
  • Send the message via the publisher

Parameters

ParameterTypeDescription
topic&strTopic name to send to
message&TReference to a message that implements SerializableMessage

Returns

  • Ok(()) - Message sent successfully
  • Err(e) - Error during send (e.g., serialization failed, publisher error)

Example

struct SensorData {
    temperature: f32,
    timestamp: u64,
}

let data = SensorData {
    temperature: 23.5,
    timestamp: 1234567890,
};

manager.send_message("sensors/temperature", &data)?;

See Also

  • send() - For sending raw bytes or pre-serialized data

Signature

pub fn send(
    &self,
    topic: &str,
    data: Vec<u8>,
) -> Result<(), Box<dyn std::error::Error>>;

Description

Sends raw bytes to a topic. Gets or creates a publisher for the topic and sends the data as-is. Use this when you already have serialized bytes or need to send data that doesn’t implement SerializableMessage.The schema name is automatically inferred from the message format if the data is already wrapped in a relocatable message.

Parameters

ParameterTypeDescription
topic&strTopic name to send to
dataVec<u8>Raw bytes to send

Returns

  • Ok(()) - Message sent successfully
  • Err(e) - Error during send

Example

// Send pre-serialized data
let raw_bytes = vec![0x01, 0x02, 0x03, 0x04];
manager.send("raw_topic", raw_bytes)?;

// Or send data from another serialization format
let protobuf_bytes = serialize_to_protobuf(&my_data)?;
manager.send("protobuf_topic", protobuf_bytes)?;

When to Use

Prefer send_message() for typed messages as it provides compile-time type checking and automatic schema tracking. Use send() only when:
  • You have raw bytes from another source
  • You need to send data that doesn’t implement SerializableMessage
  • You’re working with pre-serialized data

Receiving Messages

Signature

pub fn register_subscriber(
    &self,
    topic: &str,
    network_mode: Option<bool>,
) -> Result<Arc<Mutex<Subscriber>>, Box<dyn std::error::Error>>;

Description

Registers a subscriber for a topic. Returns a thread-safe shared subscriber handle that can be cloned and used across threads.The network_mode parameter controls transport selection:
  • None - Auto-detect: tries local first if available, falls back to network
  • Some(true) - Force network: always uses network transport
  • Some(false) - Force local: only uses local IPC (fails if no local publisher exists)

Parameters

ParameterTypeDescription
topic&strTopic name to subscribe to
network_modeOption<bool>Transport mode: None = auto-detect, Some(true) = force network, Some(false) = force local

Returns

  • Ok(Arc<Mutex<Subscriber>>) - Thread-safe shared subscriber handle
  • Err(e) - Error during subscriber creation

Example

// Auto-detect transport (recommended)
let subscriber = manager.register_subscriber("sensors/temperature", None)?;

// Force network mode for remote subscribers
let remote_sub = manager.register_subscriber("remote_topic", Some(true))?;

// Force local mode (only if local publisher exists)
let local_sub = manager.register_subscriber("local_topic", Some(false))?;

Thread Safety

The returned Arc<Mutex<Subscriber>> can be safely shared across threads:
let subscriber = manager.register_subscriber("topic", None)?;
let sub1 = Arc::clone(&subscriber);
thread::spawn(move || {
    let sub = sub1.lock().unwrap();
    // Use subscriber
});
Always lock the Mutex before accessing the subscriber.

Important

The subscriber handle must be kept alive for the subscription to remain active. If the handle is dropped, the subscription is automatically closed.

Signature

pub fn pull(
    &self,
    topic: &str,
) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error>>;

Description

Pulls the latest message from a topic’s subscriber. This method is always non-blocking - it returns immediately whether a message is available or not.Gets or creates a subscriber for the topic if it doesn’t exist, then returns the latest available message.

Parameters

ParameterTypeDescription
topic&strTopic name to pull from

Returns

  • Ok(Some(bytes)) - Message available, returns the message bytes
  • Ok(None) - No message currently available
  • Err(e) - Error occurred (e.g., subscriber error)

Example

// Non-blocking poll
match manager.pull("sensors/temperature") {
    Ok(Some(bytes)) => {
        // Deserialize the message
        match TemperatureReading::from_bytes(&bytes) {
            Ok(reading) => println!("Received: {}°C", reading.temperature),
            Err(e) => eprintln!("Deserialization error: {}", e),
        }
    }
    Ok(None) => {
        // No message available, continue polling
    }
    Err(e) => eprintln!("Error: {}", e),
}

// For blocking behavior, use a loop
loop {
    match manager.pull("topic") {
        Ok(Some(bytes)) => {
            // Process message
            break;
        }
        Ok(None) => {
            thread::sleep(Duration::from_millis(10));
        }
        Err(e) => {
            eprintln!("Error: {}", e);
            break;
        }
    }
}

Behavior

pull() never blocks. It always returns immediately with:
  • Ok(Some(bytes)) if a message is available
  • Ok(None) if no message is available
  • Err(e) if an error occurred
For blocking behavior, use a loop with std::thread::sleep or access the subscriber’s blocking receive methods directly.

Advanced Operations

Signature

pub fn check_local_availability(&self, topic: &str) -> bool;

Description

Checks whether an iceoryx2 service is available for the given topic, which indicates that a local publisher exists. This is useful for determining if local IPC transport is available before creating a subscriber.

Parameters

ParameterTypeDescription
topic&strTopic name to check

Returns

  • true - Local IPC service exists for the topic
  • false - No local service available

Example

if manager.check_local_availability("sensors/temperature") {
    // Local publisher exists, can use local transport
    let subscriber = manager.register_subscriber("sensors/temperature", Some(false))?;
} else {
    // No local publisher, will use network
    let subscriber = manager.register_subscriber("sensors/temperature", None)?;
}

Signature

pub fn get_listener(
    &self,
    topic: &str,
) -> Result<Option<IceoryxListener<ipc::Service>>, Box<dyn std::error::Error>>;

Description

Gets or creates an iceoryx2 listener for a topic. This will register a subscriber for the topic if it doesn’t exist. Returns None if the subscriber is network-only (no local listener available).This method provides low-level access to iceoryx2’s event-driven API for advanced use cases.

Parameters

ParameterTypeDescription
topic&strTopic name to get listener for

Returns

  • Ok(Some(listener)) - Local listener available
  • Ok(None) - Subscriber is network-only or local IPC not available
  • Err(e) - Error during listener creation

Example

match manager.get_listener("sensors/temperature") {
    Ok(Some(listener)) => {
        // Use iceoryx2 listener for event-driven operations
        // This is for advanced use cases
    }
    Ok(None) => {
        // No local listener available (network-only subscriber)
    }
    Err(e) => eprintln!("Error: {}", e),
}

When to Use

Most users should use register_subscriber() and pull() instead. Only use get_listener() if you need direct access to iceoryx2’s event-driven API for advanced use cases.

Signature

pub fn subscribe_to_topic<F>(
    &self,
    callback: F,
    io_dict: IODict,
    trigger_topic: &str,
) -> Result<SubscriptionHandle, Box<dyn std::error::Error>>
where
    F: Fn(Vec<Vec<u8>>) -> Result<Vec<Vec<u8>>, Box<dyn std::error::Error>>
        + Send + Sync + 'static;

Description

Subscribes to topics with a callback function. When the trigger topic receives data, the callback is invoked with data collected from all input topics. The callback’s return value is published to the output topics.This is useful for creating data processing pipelines where one topic triggers processing of multiple input topics.

Parameters

ParameterTypeDescription
callbackFFunction that accepts flatbuffer bytes from input topics and returns bytes for output topics
io_dictIODictDictionary containing input and output topic name vectors
trigger_topic&strTopic name that triggers the callback when it receives data

Returns

  • Ok(SubscriptionHandle) - Handle to manage the subscription lifecycle
  • Err(e) - Error during subscription creation

Example

use cerulion_core::prelude::*;

let io_dict = IODict {
    input: vec!["sensor1".to_string(), "sensor2".to_string()],
    output: vec!["processed".to_string()],
};

let callback = |inputs: Vec<Vec<u8>>| -> Result<Vec<Vec<u8>>, Box<dyn std::error::Error>> {
    // Process inputs and return output
    let processed = process_data(&inputs)?;
    Ok(vec![processed])
};

let handle = manager.subscribe_to_topic(callback, io_dict, "trigger")?;
// Keep handle alive for subscription to remain active

Important

The SubscriptionHandle must be kept alive for the subscription to remain active. Dropping the handle will unsubscribe and clean up resources.

Signature

pub fn request_to_broadcast(&self) -> Result<(), Box<dyn std::error::Error>>;

Description

Requests a broadcast of all registered topics from other TopicManagers on the network. Subscribes to the broadcast channel and prints all received topic information.This is useful for discovering what topics are available on the network from other processes.

Returns

  • Ok(()) - Broadcast request sent and response received
  • Err(e) - Error during broadcast (e.g., timeout waiting for response)

Example

// Request topic broadcast from network
match manager.request_to_broadcast() {
    Ok(()) => {
        // Topic information printed to console
    }
    Err(e) => eprintln!("Failed to get broadcast: {}", e),
}

Use Case

This method is useful for debugging and discovery. It helps you see what topics are available on the network from other TopicManager instances.

Signature

pub fn session(&self) -> &Arc<zenoh::Session>;

Description

Returns a reference to the shared Zenoh session used by all network operations. This provides direct access to Zenoh functionality if you need to perform operations not covered by TopicManager’s API.

Returns

  • &Arc<zenoh::Session> - Reference to the shared Zenoh session

Example

// Get direct access to Zenoh session
let zenoh_session = manager.session();

// Use Zenoh API directly for advanced operations
// (e.g., custom queries, storage operations, etc.)

When to Use

Most users should use TopicManager’s methods instead. Only use session() if you need direct access to Zenoh functionality for advanced use cases not covered by TopicManager.

Next Steps