Stream Processing (RSP-QL)
Kolibrie implements RSP-QL (RDF Stream Processing Query Language), extending SPARQL to continuous queries over timestamped RDF data. Instead of querying a static dataset, you define a sliding window and Kolibrie evaluates your query against triples that arrived within that window — continuously as new data arrives.
Table of Contents
- Core Concepts
- RULE-Based RSP Syntax
- RSP-QL Query Syntax
- Python Streaming API
- HTTP Server: Persistent RSP Sessions
- Combining Streams with Static Data
Core Concepts
Sliding Windows
A sliding window defines how much historical data is visible at query time:
- Width — how many time units of data the window holds
- Slide — how often the window advances
Example: SLIDING 10 SLIDE 2 means the window holds the last 10 time units and re-evaluates every 2 units.
Stream Operators
| Operator | Behavior |
|---|---|
RSTREAM | Emit all triples currently in the window at each evaluation |
ISTREAM | Emit only triples that are new since the last window evaluation |
DSTREAM | Emit triples that have left the window since the last evaluation |
Choose RSTREAM when you need the full current state, ISTREAM for change detection, and DSTREAM for expiry events.
RULE-Based RSP Syntax
Kolibrie supports defining RSP-QL queries as RULE bodies. The rule fires whenever the window closes and asserts CONSTRUCT triples when the WHERE clause matches.
PREFIX ex: <http://example.org#>
PREFIX sensor: <http://sensor.org/>
RULE :TemperatureAlert :-
RSTREAM
FROM NAMED WINDOW <http://example.org/window1>
ON <http://example.org/temperatureStream>
[SLIDING 10 SLIDE 2 REPORT ON_WINDOW_CLOSE TICK TIME_DRIVEN]
CONSTRUCT { ?room ex:hasAlert "high_temperature" . }
WHERE {
?reading sensor:room ?room ;
sensor:temperature ?temp .
FILTER (?temp > 90)
}
Process the rule string in Rust:
use kolibrie::rsp::process_rule_definition;
use kolibrie::SparqlDatabase;
let mut db = SparqlDatabase::new();
let rule_str = r#"
PREFIX ex: <http://example.org#>
RULE :TemperatureAlert :-
RSTREAM
FROM NAMED WINDOW <http://example.org/window1>
ON <http://example.org/temperatureStream>
[SLIDING 10 SLIDE 2 REPORT ON_WINDOW_CLOSE TICK TIME_DRIVEN]
CONSTRUCT { ?room ex:hasAlert "high_temperature" . }
WHERE { ?reading sensor:room ?room ; sensor:temperature ?temp . FILTER(?temp > 90) }
"#;
process_rule_definition(rule_str, &mut db);
RSP-QL Query Syntax
For more complex scenarios, Kolibrie supports full RSP-QL RETRIEVE/REGISTER syntax with named windows:
RETRIEVE SOME ACTIVE STREAM ?s FROM <http://my.org/catalog>
WITH {
?s a :Stream .
?s :hasDescriptor ?descriptor .
}
REGISTER RSTREAM <http://output/stream> AS
SELECT *
FROM NAMED WINDOW :wind ON ?s [RANGE PT10M STEP PT1M]
WHERE {
WINDOW :wind {
?obs a ssn:Observation .
?obs ssn:hasSimpleResult ?value .
}
}
Parse and execute in Rust:
use kolibrie::rsp::parse_combined_query;
use kolibrie::SparqlDatabase;
let mut db = SparqlDatabase::new();
let query_str = r#"
REGISTER RSTREAM <http://output/stream> AS
SELECT *
FROM NAMED WINDOW :w ON :s [RANGE PT10M STEP PT1M]
WHERE { WINDOW :w { ?s ?p ?o } }
"#;
let results = parse_combined_query(query_str, &mut db);
Multiple Windows
You can join data across multiple streams in a single query:
REGISTER RSTREAM <http://output/combined> AS
SELECT ?room ?temp ?humidity
FROM NAMED WINDOW :tempWin ON :temperatureStream [RANGE PT5M STEP PT1M]
FROM NAMED WINDOW :humidWin ON :humidityStream [RANGE PT5M STEP PT1M]
WHERE {
WINDOW :tempWin { ?room sensor:temperature ?temp . }
WINDOW :humidWin { ?room sensor:humidity ?humidity . }
}
Python Streaming API
The Python bindings expose a fluent streaming API on top of QueryBuilder.
RSTREAM — Full Window Contents
from py_kolibrie import PySparqlDatabase, PyStreamOperator
db = PySparqlDatabase()
stream = (db.query()
.window(size=10, step=2)
.with_predicate("http://example.org/knows")
.with_stream_operator(PyStreamOperator.RSTREAM)
.as_stream())
# Add timestamped events
stream.add_stream_triple("http://example.org/Alice",
"http://example.org/knows",
"http://example.org/Bob",
timestamp=1)
stream.add_stream_triple("http://example.org/Bob",
"http://example.org/knows",
"http://example.org/Carol",
timestamp=3)
# Retrieve all results accumulated so far
batches = stream.get_stream_results()
for batch in batches:
for subject, predicate, obj in batch:
print(f"{subject} -> {predicate} -> {obj}")
stream.stop_stream()
ISTREAM — Incremental Changes
ISTREAM emits only triples that are new since the last call to get_stream_results():
from py_kolibrie import PySparqlDatabase, PyStreamOperator
db = PySparqlDatabase()
istream = (db.query()
.window(size=10, step=2)
.with_subject_starting("http://example.org/Alice")
.with_stream_operator(PyStreamOperator.ISTREAM)
.as_stream())
istream.add_stream_triple("Alice", "knows", "Bob", timestamp=1)
results1 = istream.get_stream_results() # contains Alice->knows->Bob
istream.add_stream_triple("Alice", "knows", "Charlie", timestamp=5)
results2 = istream.get_stream_results() # contains only Alice->knows->Charlie
istream.stop_stream()
Available Operators
PyStreamOperator.RSTREAM # all triples in current window
PyStreamOperator.ISTREAM # new triples since last window
PyStreamOperator.DSTREAM # triples that left the window
Retrieving All Accumulated Results
all_batches = stream.get_all_stream_results()
HTTP Server: Persistent RSP Sessions
When running cargo run --bin kolibrie-http-server (or a Docker deployment), Kolibrie exposes a stateful RSP session API.
Session Workflow
- Register — POST your RSP-QL query, receive a
session_id - Push — POST timestamped N-Triples to a named stream in that session
- Subscribe — open a Server-Sent Events (SSE) connection to receive results continuously
Endpoint Reference
| Method | Path | Description |
|---|---|---|
POST | /rsp/register | Register an RSP-QL session |
POST | /rsp/push | Push N-Triples to a session stream |
GET | /rsp/stream/{session_id} | Subscribe to SSE result stream |
POST | /rsp/query | Stateless single-shot RSP query |
Registering a Session
curl -X POST http://localhost:8080/rsp/register \
-H "Content-Type: application/json" \
-d '{
"query": "REGISTER RSTREAM <http://out/stream> AS SELECT * FROM NAMED WINDOW :w ON :s [RANGE PT10M STEP PT1M] WHERE { WINDOW :w { ?s ?p ?o } }",
"static_rdf": ""
}'
Response:
{
"session_id": "session_1",
"streams": [":s"]
}
Pushing Events
curl -X POST http://localhost:8080/rsp/push \
-H "Content-Type: application/json" \
-d '{
"session_id": "session_1",
"stream": ":s",
"timestamp": 1000,
"ntriples": "<http://ex.org/sensor1> <http://ex.org/temperature> \"72.5\" ."
}'
Subscribing to Results
curl -N http://localhost:8080/rsp/stream/session_1
Results arrive as SSE events each time the window closes.
Combining Streams with Static Data
The static_rdf field in a registration request loads background knowledge that is joined with every window evaluation. This lets you enrich streaming sensor readings with ontology or reference data:
curl -X POST http://localhost:8080/rsp/register \
-H "Content-Type: application/json" \
-d '{
"query": "...",
"static_rdf": "<rdf:RDF xmlns:rdf=\"http://www.w3.org/1999/02/22-rdf-syntax-ns#\">...</rdf:RDF>"
}'
The static_rdf content is parsed once at session creation and remains available across all window evaluations.