Skip to main content

Overview

Publisher and Subscriber provide direct, low-level APIs for messaging in Cerulion Core. Unlike TopicManager, these APIs require manual session management, transport control, and serialization handling.
For a quick start guide with examples, see the Quickstart page.

Quick Reference

MethodDescriptionCategory
Publisher::create()Create a new publisherInitialization
Publisher::create_with_session()Create publisher with shared session/nodeInitialization
publisher.send()Send raw bytes to topicPublishing
publisher.send_message()Send typed message (convenience)Publishing
publisher.enable_network()Enable network publishingNetwork Control
publisher.disable_network()Disable network publishingNetwork Control
publisher.is_network_enabled()Check if network is enabledNetwork Control
publisher.start_network_thread()Start network thread manuallyNetwork Control
publisher.stop_network_thread()Stop network thread manuallyNetwork Control
Subscriber::create()Create a new subscriberInitialization
Subscriber::create_with_session()Create subscriber with shared session/nodeInitialization
subscriber.receive()Receive message (non-blocking)Subscribing
subscriber.enable_local_subscriber()Enable local transport after creationSubscribing
subscriber.id()Get subscriber IDUtilities
subscriber.take_listener()Get iceoryx2 listenerUtilities

Publisher API

Signature

pub fn create(topic: &str, enable_network: bool) -> Result<Self, Box<dyn std::error::Error>>;

Description

Creates a new publisher for a topic. The publisher always uses local (iceoryx2) transport. Network (Zenoh) transport can be enabled or disabled via the enable_network parameter.

Parameters

ParameterTypeDescription
topic&strTopic name to publish to
enable_networkboolIf true, enables network publishing initially. If false, network can be enabled later with enable_network()

Returns

  • Ok(Publisher) - Successfully created publisher
  • Err(e) - Error during publisher creation

Example

use cerulion_core::prelude::*;

// Create publisher with network enabled
let publisher = Publisher::create("sensors/temperature", true)?;

// Create publisher with network disabled initially
let publisher = Publisher::create("sensors/temperature", false)?;
The publisher handle must remain alive for subscribers to receive messages. If the publisher is dropped, local subscribers will stop receiving messages.

Signature

pub fn create_with_session(
    topic: &str,
    enable_network: bool,
    zenoh_session: Option<Arc<zenoh::Session>>,
    variable_size: bool,
    shared_node: Option<Arc<Node<ipc::Service>>>,
) -> Result<Self, Box<dyn std::error::Error>>;

Description

Creates a publisher with an optional shared Zenoh session and iceoryx2 node. This allows you to reuse sessions and nodes across multiple publishers to reduce resource overhead.

Parameters

ParameterTypeDescription
topic&strTopic name to publish to
enable_networkboolIf true, enables network publishing initially
zenoh_sessionOption<Arc<zenoh::Session>>Optional shared Zenoh session. If None, creates a new session
variable_sizeboolIf true, uses variable-size allocation for large messages
shared_nodeOption<Arc<Node<ipc::Service>>>Optional shared iceoryx2 node. If None, creates a new node

Returns

  • Ok(Publisher) - Successfully created publisher
  • Err(e) - Error during publisher creation

Example

use cerulion_core::prelude::*;
use std::sync::Arc;

// Create shared session and node
let config = zenoh::Config::default();
let session = Arc::new(zenoh::open(config).wait()?);
let node = Arc::new(NodeBuilder::new().create::<ipc::Service>()?);

// Create multiple publishers sharing the same session and node
let pub1 = Publisher::create_with_session("topic1", true, Some(session.clone()), false, Some(node.clone()))?;
let pub2 = Publisher::create_with_session("topic2", true, Some(session.clone()), false, Some(node.clone()))?;

Signature

pub fn send(&self, data: &[u8]) -> Result<(), Box<dyn std::error::Error>>;

Description

Sends raw bytes to the topic. The bytes are automatically wrapped in a relocatable message format (unless already in relocatable format from ROS2 hooks).

Parameters

ParameterTypeDescription
data&[u8]Raw bytes to send

Returns

  • Ok(()) - Message sent successfully
  • Err(e) - Error during send

Behavior

When you call send(), here’s what happens:
  1. Relocatable wrapping: Raw bytes are wrapped in a relocatable message format (unless already in relocatable format)
  2. Local path: Message is written directly to shared memory (synchronous, fast, < 1 μs)
  3. Network path (if enabled): Message is queued to background thread (non-blocking)
  4. Background thread: Converts relocatable to protobuf and sends over Zenoh (asynchronous)
The network path uses latest-message semantics. If the network is slow and multiple messages arrive, only the most recent message is sent. This ensures network subscribers always get the latest data.

Example

use cerulion_core::prelude::*;

let publisher = Publisher::create("sensors/temperature", true)?;
let bytes = vec![0x01, 0x02, 0x03];
publisher.send(&bytes)?;

Signature

pub fn send_message<T>(&self, message: &T) -> Result<(), Box<dyn std::error::Error>>
where
    T: SerializableMessage;

Description

Convenience method that automatically serializes a typed message before sending. The message type must implement SerializableMessage.

Parameters

ParameterTypeDescription
message&TReference to a message that implements SerializableMessage

Returns

  • Ok(()) - Message sent successfully
  • Err(e) - Error during serialization or send

Example

use cerulion_core::prelude::*;

#[derive(Copy, Clone)]
struct SensorData {
    temperature: f32,
    timestamp: u64,
}

impl SerializableMessage for SensorData {
    fn to_bytes(&self) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
        // Serialization logic
        Ok(vec![])
    }
}

let publisher = Publisher::create("sensors/temperature", true)?;
let data = SensorData { temperature: 23.5, timestamp: 1234567890 };
publisher.send_message(&data)?;

Signature

pub fn enable_network(&self);

Description

Enables network publishing. This sets an atomic flag that the background network thread checks. Network publishing will begin as soon as the thread processes the next message.

Example

use cerulion_core::prelude::*;

let publisher = Publisher::create("sensors/temperature", false)?;
// Later, enable network when remote subscriber discovered
publisher.enable_network();

Signature

pub fn disable_network(&self);

Description

Disables network publishing. The background thread will stop sending messages over the network, but local publishing continues unaffected.

Example

use cerulion_core::prelude::*;

let publisher = Publisher::create("sensors/temperature", true)?;
// Disable network to save resources
publisher.disable_network();

Signature

pub fn is_network_enabled(&self) -> bool;

Description

Checks if network publishing is currently enabled.

Returns

  • true - Network publishing is enabled
  • false - Network publishing is disabled

Example

use cerulion_core::prelude::*;

let publisher = Publisher::create("sensors/temperature", true)?;
if publisher.is_network_enabled() {
    println!("Network publishing is active");
}

Signature

pub fn start_network_thread(&self);

Description

Manually starts the network publishing thread. This is typically called automatically when the publisher is created with enable_network: true, but can be called manually if needed.
This is an advanced method. Most users should use enable_network() instead, which handles thread management automatically.

Signature

pub fn stop_network_thread(&self);

Description

Manually stops the network publishing thread. This is typically called automatically when the publisher is dropped, but can be called manually if needed.
This is an advanced method. Most users should use disable_network() instead, which handles thread management automatically.

Subscriber API

Signature

pub fn create(topic: &str, network_only: Option<bool>) -> Result<Self, Box<dyn std::error::Error>>;

Description

Creates a new subscriber for a topic. The subscriber automatically chooses the best transport based on the network_only parameter.

Parameters

ParameterTypeDescription
topic&strTopic name to subscribe to
network_onlyOption<bool>Transport selection: None (auto-detect), Some(true) (network only), Some(false) (local only)

Returns

  • Ok(Subscriber) - Successfully created subscriber
  • Err(e) - Error during subscriber creation

Auto-Detection Logic

When network_only is None:
  1. Check if local publisher exists (iceoryx2)
  2. If yes, use local transport (fast, zero-copy)
  3. If no, use network transport (Zenoh)

Example

use cerulion_core::prelude::*;

// Auto-detect: tries local first, falls back to network
let subscriber = Subscriber::create("sensors/temperature", None)?;

// Force network transport only
let subscriber = Subscriber::create("sensors/temperature", Some(true))?;

// Force local transport only
let subscriber = Subscriber::create("sensors/temperature", Some(false))?;

Signature

pub fn create_with_session(
    topic: &str,
    network_only: Option<bool>,
    zenoh_session: Option<Arc<zenoh::Session>>,
    shared_node: Option<Arc<Node<ipc::Service>>>,
) -> Result<Self, Box<dyn std::error::Error>>;

Description

Creates a subscriber with an optional shared Zenoh session and iceoryx2 node. This allows you to reuse sessions and nodes across multiple subscribers to reduce resource overhead.

Parameters

ParameterTypeDescription
topic&strTopic name to subscribe to
network_onlyOption<bool>Transport selection: None (auto-detect), Some(true) (network only), Some(false) (local only)
zenoh_sessionOption<Arc<zenoh::Session>>Optional shared Zenoh session. If None, creates a new session
shared_nodeOption<Arc<Node<ipc::Service>>>Optional shared iceoryx2 node. If None, creates a new node

Returns

  • Ok(Subscriber) - Successfully created subscriber
  • Err(e) - Error during subscriber creation

Example

use cerulion_core::prelude::*;
use std::sync::Arc;

// Create shared session and node
let config = zenoh::Config::default();
let session = Arc::new(zenoh::open(config).wait()?);
let node = Arc::new(NodeBuilder::new().create::<ipc::Service>()?);

// Create multiple subscribers sharing the same session and node
let sub1 = Subscriber::create_with_session("topic1", None, Some(session.clone()), Some(node.clone()))?;
let sub2 = Subscriber::create_with_session("topic2", None, Some(session.clone()), Some(node.clone()))?;

Signature

pub fn receive(&self) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error>>;

Description

Receives a message from the topic. This method is non-blocking and returns raw bytes that you must deserialize yourself.

Returns

  • Ok(Some(bytes)) - Message received (raw bytes)
  • Ok(None) - No message available
  • Err(e) - Error during receive
The receive() method never blocks. It returns immediately with Some(Vec<u8>) if a message is available, or None if no message is ready.

Behavior

The method checks transport in this order:
  1. Local transport (if available): Checks iceoryx2 shared memory first (fastest)
  2. Network transport (if available): Checks Zenoh network messages

Example

use cerulion_core::prelude::*;

let subscriber = Subscriber::create("sensors/temperature", None)?;

match subscriber.receive() {
    Ok(Some(bytes)) => {
        // Deserialize the raw bytes
        if let Ok(data) = SensorData::from_bytes(&bytes) {
            println!("Received: {:?}", data);
        }
    }
    Ok(None) => {
        // No message available (non-blocking)
    }
    Err(e) => {
        eprintln!("Error receiving: {}", e);
    }
}

Signature

pub fn enable_local_subscriber(
    &mut self,
    shared_node: Option<Arc<Node<ipc::Service>>>,
) -> Result<(), Box<dyn std::error::Error>>;

Description

Enables local subscriber transport after creation. This is useful if you created a network-only subscriber but later want to also receive from local publishers.

Parameters

ParameterTypeDescription
shared_nodeOption<Arc<Node<ipc::Service>>>Optional shared iceoryx2 node. If None, creates a new node

Returns

  • Ok(()) - Local subscriber enabled successfully
  • Err(e) - Error during local subscriber creation (e.g., publisher doesn’t exist yet)

Example

use cerulion_core::prelude::*;

// Create network-only subscriber
let mut subscriber = Subscriber::create("sensors/temperature", Some(true))?;

// Later, enable local subscriber when local publisher becomes available
subscriber.enable_local_subscriber(None)?;
// Now subscriber can receive from both local and network publishers

Signature

pub fn id(&self) -> u64;

Description

Returns the subscriber ID. This ID is automatically generated and used for discovery messages when the subscriber is managed by TopicManager.

Returns

  • u64 - Subscriber ID

Example

use cerulion_core::prelude::*;

let subscriber = Subscriber::create("sensors/temperature", None)?;
let subscriber_id = subscriber.id();
println!("Subscriber ID: {}", subscriber_id);

Signature

pub fn take_listener(&mut self) -> Option<IceoryxListener<ipc::Service>>;

Description

Takes the local listener from the subscriber. This consumes the listener from the subscriber, so it can only be called once. The listener can be used for event-based notifications from the local publisher.

Returns

  • Some(listener) - Local listener if available
  • None - No local listener available (network-only subscriber)
This consumes the listener from the subscriber. After calling this, the subscriber will no longer have access to the listener.

Example

use cerulion_core::prelude::*;

let mut subscriber = Subscriber::create("sensors/temperature", None)?;

// Get the local listener (if available)
if let Some(listener) = subscriber.take_listener() {
    // Use listener for event-based notifications
    // ...
}

Next Steps