Skip to main content

Examples

Complete, runnable examples demonstrating common patterns and use cases for Cerulion Graph Editor. Each example includes step-by-step instructions, code samples, and expected output.

Prerequisites

Graph Editor Installed

Cerulion Graph Editor installed and running. See Installation.

Basic Knowledge

Understanding of nodes, topics, and schemas. See Quickstart if needed.

Language Familiarity

Basic knowledge of Rust, Python, or C++ (examples show multiple languages).

Time Available

Each example takes 20-30 minutes to complete.

Temperature Pipeline

A complete end-to-end example building a temperature monitoring pipeline from scratch.

Problem Statement

Build a system that:
  1. Reads temperature data from a sensor
  2. Converts temperature from Celsius to Fahrenheit
  3. Logs the converted temperature
  4. Runs continuously, processing new readings as they arrive

Solution

Create a simple pipeline: Publisher → Processor → Subscriber

Step-by-Step

1

Create schema

Create a TemperatureReading schema:
schemas:
  TemperatureReading:
    fields:
      float32 temperature:
      uint64 timestamp:
2

Create publisher node

Create a Temperature Publisher node with one output port of type TemperatureReading.Write code:
import time
from dataclasses import dataclass

@dataclass
class TemperatureReading:
    temperature: float
    timestamp: int

def main(temperature_out):
    while True:
        reading = TemperatureReading(
            temperature=22.5,
            timestamp=int(time.time() * 1000)
        )
        temperature_out.send(reading)
        time.sleep(1)
3

Create processor node

Create a Celsius to Fahrenheit node with input and output ports of type TemperatureReading.Write conversion code:
def main(temperature_in, temperature_out):
    reading = temperature_in.receive()
    fahrenheit = (reading.temperature * 9.0 / 5.0) + 32.0
    output = TemperatureReading(temperature=fahrenheit, timestamp=reading.timestamp)
    temperature_out.send(output)
4

Create subscriber node

Create a Temperature Logger node with one input port of type TemperatureReading.Write logging code:
def main(temperature_in):
    reading = temperature_in.receive()
    print(f"Temperature: {reading.temperature:.2f}°F (timestamp: {reading.timestamp})")
5

Connect and run

  1. Connect Temperature PublisherCelsius to FahrenheitTemperature Logger
  2. Generate and run
You should see: Temperature: 72.50°F (timestamp: ...)

Expected Output

Temperature: 72.50°F (timestamp: 1234567890)
Temperature: 72.50°F (timestamp: 1234567891)
...

Key Takeaways

  • Schemas define data structure for type safety
  • Nodes process data with single responsibility
  • Topics automatically created when connecting nodes
  • Framework handles all communication details

Fan-Out Pattern

Distribute data from one publisher to multiple subscribers.

Problem Statement

You have one temperature sensor and need to send data to:
  1. A logger
  2. An alert system
  3. A statistics calculator
All three systems need the same data simultaneously.

Solution

Connect one output port to multiple input ports:
Temperature Publisher

    [Topic]
    ↙  ↓  ↘
Logger  Alert  Statistics

Step-by-Step

1

Create schema and publisher

Create TemperatureReading schema and Temperature Publisher node (same as temperature pipeline example).
2

Create three subscriber nodes

Create three nodes:
  • Temperature Logger - Logs readings
  • Temperature Alert - Alerts on high temperature
  • Temperature Statistics - Calculates averages
Each has one input port of type TemperatureReading.
3

Write subscriber code

# Logger
def main(temperature_in):
    reading = temperature_in.receive()
    print(f"[LOG] Temperature: {reading.temperature}°C")

# Alert
def main(temperature_in):
    reading = temperature_in.receive()
    if reading.temperature > 30.0:
        print(f"[ALERT] High temperature: {reading.temperature}°C")

# Statistics
readings = []
def main(temperature_in):
    reading = temperature_in.receive()
    readings.append(reading.temperature)
    if len(readings) >= 10:
        avg = sum(readings) / len(readings)
        print(f"[STATS] Average: {avg:.2f}°C")
        readings.clear()
4

Connect in fan-out pattern

Connect Temperature Publisher output to all three subscriber inputs.
You should see three connections from one output port.

Understanding Fan-Out

  • One-to-many - One publisher, many subscribers
  • Independent processing - Subscribers don’t affect each other
  • Same data - All subscribers receive identical data
  • Automatic distribution - Framework handles distribution

Use Cases

  • Broadcasting data to multiple consumers
  • Parallel processing of the same data
  • Multiple systems monitoring the same stream
  • Redundancy and backup systems

Fan-In Join Pattern

Combine data from multiple publishers into a single subscriber using a join node.

Problem Statement

You have multiple temperature sensors and need to:
  1. Collect readings from Sensor A, B, and C
  2. Combine them into a single average temperature
  3. Process the combined result

Solution

Use multiple publishers connected to a join node:
Sensor A → Topic A ↘
Sensor B → Topic B → Join Node → Processor
Sensor C → Topic C ↗

Step-by-Step

1

Create schema with sensor ID

schemas:
  TemperatureReading:
    fields:
      float32 temperature:
      uint64 timestamp:
      string sensor_id:
2

Create three sensor publishers

Create Sensor A, Sensor B, Sensor C nodes, each with one output port.Write code for each:
# Sensor A
def main(temperature_out):
    while True:
        reading = TemperatureReading(
            temperature=20.0,
            timestamp=int(time.time() * 1000),
            sensor_id="sensor_a"
        )
        temperature_out.send(reading)
        time.sleep(1)
3

Create join node

Create a Temperature Join node with:
  • Three input ports: sensor_a_in, sensor_b_in, sensor_c_in
  • One output port: combined_out
Write join logic:
def main(sensor_a_in, sensor_b_in, sensor_c_in, combined_out):
    reading_a = sensor_a_in.receive()
    reading_b = sensor_b_in.receive()
    reading_c = sensor_c_in.receive()
    
    avg_temp = (reading_a.temperature + reading_b.temperature + reading_c.temperature) / 3.0
    
    combined = TemperatureReading(
        temperature=avg_temp,
        timestamp=int(time.time() * 1000),
        sensor_id="combined"
    )
    combined_out.send(combined)
4

Connect in fan-in pattern

Connect all sensors to the join node, then join to processor.
You should see three connections converging into the join node.

Understanding Fan-In Join

  • Many-to-one - Many publishers, one subscriber (via join)
  • Synchronization - Join node coordinates multiple inputs
  • Combination logic - Join node merges data appropriately
  • Order independence - Handles data arriving in any order
Always use a join node for fan-in. Don’t connect multiple outputs directly to one input, as this can cause race conditions.

Use Cases

  • Data aggregation from multiple sources
  • Sensor fusion (merging multiple sensor readings)
  • Load balancing (collecting results from parallel workers)
  • Synchronizing multiple data streams

Multi-Language Pipeline

Build pipelines using Rust, Python, and C++ nodes in the same graph.

Problem Statement

Build a system that:
  1. Reads data in Python (easy data manipulation)
  2. Processes in Rust (high performance)
  3. Logs in C++ (system integration)
  4. All nodes work together seamlessly

Solution

Build a pipeline where each language handles what it’s best at:
Python (Parsing) → Rust (Processing) → C++ (Logging)

Step-by-Step

1

Create schemas

schemas:
  RawData:
    fields:
      string data:
      uint64 timestamp:

  ProcessedData:
    fields:
      float32 result:
      uint64 timestamp:
      string source:
2

Create Python reader node

Create a Data Reader node (Python) with one output port of type RawData.Write Python code:
import time
import json
from dataclasses import dataclass

@dataclass
class RawData:
    data: str
    timestamp: int

def main(raw_data_out):
    while True:
        data_dict = {"value": 42, "status": "ok"}
        data_json = json.dumps(data_dict)
        raw_data = RawData(data=data_json, timestamp=int(time.time() * 1000))
        raw_data_out.send(raw_data)
        time.sleep(1)
3

Create Rust processor node

Create a Data Processor node (Rust) with input RawData and output ProcessedData.Write Rust code:
use serde_json::Value;

#[repr(C)]
struct RawData {
    data: *const u8,
    data_len: usize,
    timestamp: u64,
}

fn main(
    raw_data_in: &mut InputPort<RawData>,
    processed_data_out: &mut OutputPort<ProcessedData>
) {
    let raw = raw_data_in.receive().unwrap();
    let data_str = unsafe {
        std::str::from_utf8(std::slice::from_raw_parts(raw.data, raw.data_len)).unwrap()
    };
    let json: Value = serde_json::from_str(data_str).unwrap();
    let value = json["value"].as_f64().unwrap() as f32;
    let result = value * 2.0 + 10.0;
    
    let processed = ProcessedData {
        result,
        timestamp: raw.timestamp,
        source: b"rust_processor\0".as_ptr(),
        source_len: 14,
    };
    processed_data_out.send(processed).unwrap();
}
4

Create C++ logger node

Create a System Logger node (C++) with one input port of type ProcessedData.Write C++ code:
#include <iostream>
#include <fstream>
#include <chrono>
#include <iomanip>

struct ProcessedData {
    float result;
    uint64_t timestamp;
    const char* source;
    size_t source_len;
};

void main(InputPort<ProcessedData>& processed_data_in) {
    ProcessedData data = processed_data_in.receive();
    
    std::ofstream log_file("/var/log/cerulion.log", std::ios::app);
    auto time = std::chrono::system_clock::from_time_t(data.timestamp / 1000);
    auto time_t = std::chrono::system_clock::to_time_t(time);
    
    log_file << std::put_time(std::localtime(&time_t), "%Y-%m-%d %H:%M:%S")
             << " [CERULION] Result: " << data.result << std::endl;
    log_file.close();
    
    std::cout << "Logged: " << data.result << std::endl;
}
5

Connect nodes

Connect: Data Reader (Python) → Data Processor (Rust) → System Logger (C++)
The framework automatically handles serialization and type conversion between languages. No glue code needed.

Understanding Multi-Language Support

  • Schemas are language-agnostic - Defined in YAML, work for all languages
  • Automatic code generation - Framework generates types for each language
  • Binary compatibility - Data structures are compatible across languages
  • Framework handles communication - Serialization and transport are automatic

Language Selection Guide

  • Python - Data parsing, string manipulation, rapid prototyping
  • Rust - High performance, memory safety, concurrent processing
  • C++ - System integration, low-level control, existing C++ libraries

Key Benefits

Use Best Language

Use each language for what it’s best at. No need to compromise.

Reuse Existing Code

Integrate existing libraries from any language without rewriting.

Team Flexibility

Team members can work in their preferred language.

Performance

Use Rust/C++ for performance-critical parts, Python for rapid development.

Next Steps

Now that you’ve seen these examples, explore more: