user@kolibrie:~/docs$
kolibrie@docs : ~/docs $ cat stream-processing-(rsp-ql).md

Stream Processing (RSP-QL)

Kolibrie implements RSP-QL (RDF Stream Processing Query Language), extending SPARQL to continuous queries over timestamped RDF data. Instead of querying a static dataset, you define a sliding window and Kolibrie evaluates your query against triples that arrived within that window — continuously as new data arrives.

Table of Contents

  1. Core Concepts
  2. RULE-Based RSP Syntax
  3. RSP-QL Query Syntax
  4. Python Streaming API
  5. HTTP Server: Persistent RSP Sessions
  6. Combining Streams with Static Data

Core Concepts

Sliding Windows

A sliding window defines how much historical data is visible at query time:

  • Width — how many time units of data the window holds
  • Slide — how often the window advances

Example: SLIDING 10 SLIDE 2 means the window holds the last 10 time units and re-evaluates every 2 units.

Stream Operators

OperatorBehavior
RSTREAMEmit all triples currently in the window at each evaluation
ISTREAMEmit only triples that are new since the last window evaluation
DSTREAMEmit triples that have left the window since the last evaluation

Choose RSTREAM when you need the full current state, ISTREAM for change detection, and DSTREAM for expiry events.


RULE-Based RSP Syntax

Kolibrie supports defining RSP-QL queries as RULE bodies. The rule fires whenever the window closes and asserts CONSTRUCT triples when the WHERE clause matches.

PREFIX ex: <http://example.org#>
PREFIX sensor: <http://sensor.org/>

RULE :TemperatureAlert :-
RSTREAM
FROM NAMED WINDOW <http://example.org/window1>
    ON <http://example.org/temperatureStream>
    [SLIDING 10 SLIDE 2 REPORT ON_WINDOW_CLOSE TICK TIME_DRIVEN]
CONSTRUCT { ?room ex:hasAlert "high_temperature" . }
WHERE {
    ?reading sensor:room ?room ;
             sensor:temperature ?temp .
    FILTER (?temp > 90)
}

Process the rule string in Rust:

use kolibrie::rsp::process_rule_definition;
use kolibrie::SparqlDatabase;

let mut db = SparqlDatabase::new();
let rule_str = r#"
    PREFIX ex: <http://example.org#>
    RULE :TemperatureAlert :-
    RSTREAM
    FROM NAMED WINDOW <http://example.org/window1>
        ON <http://example.org/temperatureStream>
        [SLIDING 10 SLIDE 2 REPORT ON_WINDOW_CLOSE TICK TIME_DRIVEN]
    CONSTRUCT { ?room ex:hasAlert "high_temperature" . }
    WHERE { ?reading sensor:room ?room ; sensor:temperature ?temp . FILTER(?temp > 90) }
"#;
process_rule_definition(rule_str, &mut db);

RSP-QL Query Syntax

For more complex scenarios, Kolibrie supports full RSP-QL RETRIEVE/REGISTER syntax with named windows:

RETRIEVE SOME ACTIVE STREAM ?s FROM <http://my.org/catalog>
WITH {
  ?s a :Stream .
  ?s :hasDescriptor ?descriptor .
}
REGISTER RSTREAM <http://output/stream> AS
SELECT *
FROM NAMED WINDOW :wind ON ?s [RANGE PT10M STEP PT1M]
WHERE {
  WINDOW :wind {
    ?obs a ssn:Observation .
    ?obs ssn:hasSimpleResult ?value .
  }
}

Parse and execute in Rust:

use kolibrie::rsp::parse_combined_query;
use kolibrie::SparqlDatabase;

let mut db = SparqlDatabase::new();
let query_str = r#"
    REGISTER RSTREAM <http://output/stream> AS
    SELECT *
    FROM NAMED WINDOW :w ON :s [RANGE PT10M STEP PT1M]
    WHERE { WINDOW :w { ?s ?p ?o } }
"#;
let results = parse_combined_query(query_str, &mut db);

Multiple Windows

You can join data across multiple streams in a single query:

REGISTER RSTREAM <http://output/combined> AS
SELECT ?room ?temp ?humidity
FROM NAMED WINDOW :tempWin  ON :temperatureStream [RANGE PT5M STEP PT1M]
FROM NAMED WINDOW :humidWin ON :humidityStream    [RANGE PT5M STEP PT1M]
WHERE {
  WINDOW :tempWin  { ?room sensor:temperature ?temp . }
  WINDOW :humidWin { ?room sensor:humidity ?humidity . }
}

Python Streaming API

The Python bindings expose a fluent streaming API on top of QueryBuilder.

RSTREAM — Full Window Contents

from py_kolibrie import PySparqlDatabase, PyStreamOperator

db = PySparqlDatabase()

stream = (db.query()
            .window(size=10, step=2)
            .with_predicate("http://example.org/knows")
            .with_stream_operator(PyStreamOperator.RSTREAM)
            .as_stream())

# Add timestamped events
stream.add_stream_triple("http://example.org/Alice",
                         "http://example.org/knows",
                         "http://example.org/Bob",
                         timestamp=1)
stream.add_stream_triple("http://example.org/Bob",
                         "http://example.org/knows",
                         "http://example.org/Carol",
                         timestamp=3)

# Retrieve all results accumulated so far
batches = stream.get_stream_results()
for batch in batches:
    for subject, predicate, obj in batch:
        print(f"{subject} -> {predicate} -> {obj}")

stream.stop_stream()

ISTREAM — Incremental Changes

ISTREAM emits only triples that are new since the last call to get_stream_results():

from py_kolibrie import PySparqlDatabase, PyStreamOperator

db = PySparqlDatabase()

istream = (db.query()
             .window(size=10, step=2)
             .with_subject_starting("http://example.org/Alice")
             .with_stream_operator(PyStreamOperator.ISTREAM)
             .as_stream())

istream.add_stream_triple("Alice", "knows", "Bob", timestamp=1)
results1 = istream.get_stream_results()   # contains Alice->knows->Bob

istream.add_stream_triple("Alice", "knows", "Charlie", timestamp=5)
results2 = istream.get_stream_results()   # contains only Alice->knows->Charlie

istream.stop_stream()

Available Operators

PyStreamOperator.RSTREAM   # all triples in current window
PyStreamOperator.ISTREAM   # new triples since last window
PyStreamOperator.DSTREAM   # triples that left the window

Retrieving All Accumulated Results

all_batches = stream.get_all_stream_results()

HTTP Server: Persistent RSP Sessions

When running cargo run --bin kolibrie-http-server (or a Docker deployment), Kolibrie exposes a stateful RSP session API.

Session Workflow

  1. Register — POST your RSP-QL query, receive a session_id
  2. Push — POST timestamped N-Triples to a named stream in that session
  3. Subscribe — open a Server-Sent Events (SSE) connection to receive results continuously

Endpoint Reference

MethodPathDescription
POST/rsp/registerRegister an RSP-QL session
POST/rsp/pushPush N-Triples to a session stream
GET/rsp/stream/{session_id}Subscribe to SSE result stream
POST/rsp/queryStateless single-shot RSP query

Registering a Session

curl -X POST http://localhost:8080/rsp/register \
  -H "Content-Type: application/json" \
  -d '{
    "query": "REGISTER RSTREAM <http://out/stream> AS SELECT * FROM NAMED WINDOW :w ON :s [RANGE PT10M STEP PT1M] WHERE { WINDOW :w { ?s ?p ?o } }",
    "static_rdf": ""
  }'

Response:

{
  "session_id": "session_1",
  "streams": [":s"]
}

Pushing Events

curl -X POST http://localhost:8080/rsp/push \
  -H "Content-Type: application/json" \
  -d '{
    "session_id": "session_1",
    "stream": ":s",
    "timestamp": 1000,
    "ntriples": "<http://ex.org/sensor1> <http://ex.org/temperature> \"72.5\" ."
  }'

Subscribing to Results

curl -N http://localhost:8080/rsp/stream/session_1

Results arrive as SSE events each time the window closes.


Combining Streams with Static Data

The static_rdf field in a registration request loads background knowledge that is joined with every window evaluation. This lets you enrich streaming sensor readings with ontology or reference data:

curl -X POST http://localhost:8080/rsp/register \
  -H "Content-Type: application/json" \
  -d '{
    "query": "...",
    "static_rdf": "<rdf:RDF xmlns:rdf=\"http://www.w3.org/1999/02/22-rdf-syntax-ns#\">...</rdf:RDF>"
  }'

The static_rdf content is parsed once at session creation and remains available across all window evaluations.


→ Go to Knowledge Graph & Reasoning