API Reference
Table of Contents
SparqlDatabase
SparqlDatabase is the core type in Kolibrie. It holds your RDF data and is the entry point for parsing, querying, and modification.
Initialization
Rust:
use kolibrie::SparqlDatabase;
let mut db = SparqlDatabase::new();
Python:
from py_kolibrie import PySparqlDatabase
db = PySparqlDatabase()
Parsing Methods
| Method | Input | Description |
|---|---|---|
parse_rdf(str) | RDF/XML string | Parse and ingest RDF/XML |
parse_rdf_from_file(path) | File path | Multi-threaded file-based parse |
parse_turtle(str) | Turtle string | Parse Turtle format |
parse_n3(str) | N3 string | Parse N3 format (supports rules) |
parse_ntriples_and_add(str) | N-Triples string | Parse N-Triples format |
All formats support RDF-star (quoted triples).
Rust Examples:
db.parse_rdf("<rdf:RDF ...>...</rdf:RDF>");
db.parse_rdf_from_file("dataset.rdf"); // parallelized
db.parse_turtle("@prefix ex: <http://ex.org/> . ex:Alice ex:knows ex:Bob .");
db.parse_n3("{ ?X ex:knows ?Y . } => { ?Y ex:knownBy ?X . } .");
db.parse_ntriples_and_add("<http://ex.org/s> <http://ex.org/p> <http://ex.org/o> .");
Data Manipulation
add_triple_parts(subject, predicate, object)
Add a single triple directly, without parsing RDF syntax:
db.add_triple_parts(
"http://example.org/Alice",
"http://example.org/knows",
"http://example.org/Bob",
);
delete_triple_parts(subject, predicate, object) -> bool
Remove a specific triple. Returns true if the triple existed and was removed:
let removed = db.delete_triple_parts(
"http://example.org/Alice",
"http://example.org/knows",
"http://example.org/Bob",
);
build_all_indexes()
Explicitly rebuild all indexes after bulk data changes:
db.build_all_indexes();
Query Execution
execute_query(sparql, &mut db) -> Vec<Vec<String>>
Run a SPARQL query string and get results as a 2D vector of decoded strings. Each inner Vec<String> corresponds to one result row, with values in the order of your SELECT variables.
use kolibrie::execute_query::execute_query;
let results = execute_query(
"PREFIX ex: <http://example.org/> SELECT ?s WHERE { ?s ex:knows ex:Bob }",
&mut db,
);
for row in results {
println!("{}", row[0]);
}
execute_query_rayon_parallel2_volcano(sparql, &mut db) -> Vec<Vec<String>>
Multi-threaded query execution with the Streamertail cost-based optimizer:
use kolibrie::execute_query::execute_query_rayon_parallel2_volcano;
let results = execute_query_rayon_parallel2_volcano(sparql_str, &mut db);
query(&self) -> QueryBuilder
Start a programmatic query using the fluent QueryBuilder API (see QueryBuilder):
let results = db.query()
.with_predicate("http://example.org/knows")
.distinct()
.get_decoded_triples();
Optimization Hooks
get_or_build_stats() -> Arc<DatabaseStats>
Pre-build the cost-based optimizer’s statistics. Call once after loading data, before running many queries:
db.get_or_build_stats();
invalidate_stats_cache()
Reset cached statistics. Call after bulk INSERT or DELETE operations:
db.invalidate_stats_cache();
User-Defined Functions
register_udf(name, fn)
Register a named function for use in SPARQL BIND() expressions:
db.register_udf("toUpperCase", |args: Vec<String>| -> String {
args.into_iter().map(|s| s.to_uppercase()).collect::<Vec<_>>().join("")
});
Then use in SPARQL:
SELECT ?upper WHERE {
?p ex:name ?name .
BIND(toUpperCase(?name) AS ?upper)
}
Export
generate_rdf_xml() -> String
Serialize the entire database to an RDF/XML string:
let rdf_xml = db.generate_rdf_xml();
std::fs::write("output.rdf", rdf_xml).unwrap();
HTTP Server REST API
Start the HTTP server:
cargo run --bin kolibrie-http-server
# Server available at http://localhost:8080
Or via Docker:
docker compose up --build
POST /query — Execute a SPARQL Query
Run a SPARQL query against RDF data provided in the request body.
Request body:
{
"sparql": "SELECT ?s ?p ?o WHERE { ?s ?p ?o }",
"rdf": "<optional RDF/XML data to load>",
"format": "rdfxml",
"rule": "optional SPARQL RULE definition string",
"n3logic": "optional N3 logic rules",
"queries": ["optional array of SPARQL strings for batch execution"]
}
The format field accepts: rdfxml, turtle, n3, ntriples.
Response:
{
"results": [
{
"query_index": 0,
"query": "SELECT ...",
"data": [["Alice", "knows", "Bob"]],
"execution_time_ms": 1.23
}
]
}
Example:
curl -X POST http://localhost:8080/query \
-H "Content-Type: application/json" \
-d '{
"rdf": "@prefix ex: <http://example.org/> . ex:Alice ex:knows ex:Bob .",
"format": "turtle",
"sparql": "PREFIX ex: <http://example.org/> SELECT ?s ?o WHERE { ?s ex:knows ?o }"
}'
POST /rsp/register — Register an RSP-QL Session
Create a persistent streaming session. Returns a session_id for subsequent push and subscribe calls.
Request body:
{
"query": "REGISTER RSTREAM <http://output/stream> AS SELECT * FROM NAMED WINDOW :w ON :s [RANGE PT10M STEP PT1M] WHERE { WINDOW :w { ?s ?p ?o } }",
"static_rdf": "<optional background RDF/XML joined with every window>",
"n3logic": "optional N3 rules",
"sparql_rules": ["optional SPARQL RULE strings"]
}
Response:
{
"session_id": "session_1",
"streams": [":s"]
}
POST /rsp/push — Push Events to a Session
Push timestamped N-Triples to a named stream within a session:
{
"session_id": "session_1",
"stream": ":s",
"timestamp": 1000,
"ntriples": "<http://ex.org/sensor1> <http://ex.org/temperature> \"72.5\" ."
}
Example:
curl -X POST http://localhost:8080/rsp/push \
-H "Content-Type: application/json" \
-d '{
"session_id": "session_1",
"stream": ":s",
"timestamp": 1000,
"ntriples": "<http://ex.org/r1> <http://ex.org/value> \"42\" ."
}'
GET /rsp/stream/{session_id} — Subscribe to SSE Results
Open a long-lived Server-Sent Events connection to receive results each time a window closes:
curl -N http://localhost:8080/rsp/stream/session_1
Results are delivered as SSE events in JSON format.
POST /rsp/query — Stateless RSP Query
Execute a single-shot RSP-QL query with inline events (no persistent session):
{
"query": "SELECT * FROM NAMED WINDOW :w ON :s [RANGE PT10M STEP PT1M] WHERE { WINDOW :w { ?s ?p ?o } }",
"events": [
{ "stream": ":s", "timestamp": 100, "ntriples": "<http://ex.org/r1> <http://ex.org/p> <http://ex.org/v> ." }
],
"static_rdf": ""
}
Python Streaming API
The Python streaming API is available via QueryBuilder.as_stream().
Creating a Stream
from py_kolibrie import PySparqlDatabase, PyStreamOperator
db = PySparqlDatabase()
stream = (db.query()
.window(size=10, step=2)
.with_predicate("http://example.org/knows")
.with_stream_operator(PyStreamOperator.RSTREAM)
.as_stream())
Stream Methods
| Method | Description |
|---|---|
add_stream_triple(s, p, o, timestamp) | Add a timestamped triple to the stream |
get_stream_results() | Get results since the last call (ISTREAM-style for incremental use) |
get_all_stream_results() | Get all accumulated result batches |
stop_stream() | Shut down the stream and release resources |
Stream Operators
| Operator | Behavior |
|---|---|
PyStreamOperator.RSTREAM | All triples in the current window |
PyStreamOperator.ISTREAM | Only triples new since the last window |
PyStreamOperator.DSTREAM | Triples that left the window |
Example
stream.add_stream_triple("http://example.org/Alice",
"http://example.org/knows",
"http://example.org/Bob",
timestamp=1)
stream.add_stream_triple("http://example.org/Bob",
"http://example.org/knows",
"http://example.org/Carol",
timestamp=3)
batches = stream.get_stream_results()
for batch in batches:
for subject, predicate, obj in batch:
print(f"{subject} -> {obj}")
stream.stop_stream()
Reasoner API
The Reasoner provides rule-based inference over ABox (instance-level) facts.
Rust
use datalog::reasoning::Reasoner;
| Method | Return | Description |
|---|---|---|
Reasoner::new() | Reasoner | Create an empty reasoner |
add_abox_triple(s, p, o) | — | Add a fact triple |
query_abox(s?, p?, o?) | Vec<Triple> | Query facts with optional filters |
add_rule(rule) | — | Add an inference rule |
add_constraint(rule) | — | Add an integrity constraint |
infer_new_facts() | Vec<Triple> | Forward chaining (naive) |
infer_new_facts_semi_naive() | Vec<Triple> | Forward chaining (semi-naive, efficient) |
infer_new_facts_semi_naive_parallel() | Vec<Triple> | Parallel forward chaining |
backward_chaining(pattern) | Vec<HashMap<String, Term>> | Goal-directed proof search |
infer_new_facts_semi_naive_with_repairs() | Vec<Triple> | Inference with inconsistency repair |
query_with_repairs(pattern) | Vec<HashMap<String, u32>> | IAR-semantics inconsistency-tolerant query |
Example:
use datalog::reasoning::Reasoner;
use shared::terms::Term;
use shared::rule::Rule;
let mut kg = Reasoner::new();
kg.add_abox_triple("Alice", "hasParent", "Bob");
kg.add_abox_triple("Bob", "hasParent", "Charlie");
let parent_id = kg.dictionary.write().unwrap().encode("hasParent");
let grandparent_id = kg.dictionary.write().unwrap().encode("hasGrandparent");
kg.add_rule(Rule {
premise: vec![
(Term::Variable("X".into()), Term::Constant(parent_id), Term::Variable("Y".into())),
(Term::Variable("Y".into()), Term::Constant(parent_id), Term::Variable("Z".into())),
],
negative_premise: vec![],
conclusion: vec![(
Term::Variable("X".into()),
Term::Constant(grandparent_id),
Term::Variable("Z".into()),
)],
filters: vec![],
});
let inferred = kg.infer_new_facts_semi_naive();
// ["Alice hasGrandparent Charlie"]
Python
import py_kolibrie
| Method | Description |
|---|---|
PyKnowledgeGraph() | Create a knowledge graph with Reasoner |
add_abox_triple(s, p, o) | Add a fact |
encode_term(str) -> int | Encode a term for use in PyTerm.Constant() |
query_abox() | Return all (subject, predicate, object) tuples |
add_rule(PyRule) | Add an inference rule |
add_constraint(PyRule) | Add an integrity constraint |
infer_new_facts() | Run forward chaining |
infer_new_facts_semi_naive_with_repairs() | Inference with inconsistency repair |
query_with_repairs(PyTriplePattern) | Inconsistency-tolerant query |
Example:
import py_kolibrie
graph = py_kolibrie.PyKnowledgeGraph()
graph.add_abox_triple("Alice", "hasParent", "Bob")
graph.add_abox_triple("Bob", "hasParent", "Charlie")
has_parent = graph.encode_term("hasParent")
has_grandparent = graph.encode_term("hasGrandparent")
rule = py_kolibrie.PyRule(
premise=[
py_kolibrie.PyTriplePattern(
py_kolibrie.PyTerm.Variable("X"),
py_kolibrie.PyTerm.Constant(has_parent),
py_kolibrie.PyTerm.Variable("Y")),
py_kolibrie.PyTriplePattern(
py_kolibrie.PyTerm.Variable("Y"),
py_kolibrie.PyTerm.Constant(has_parent),
py_kolibrie.PyTerm.Variable("Z")),
],
filters=[],
conclusion=[py_kolibrie.PyTriplePattern(
py_kolibrie.PyTerm.Variable("X"),
py_kolibrie.PyTerm.Constant(has_grandparent),
py_kolibrie.PyTerm.Variable("Z"),
)],
)
graph.add_rule(rule)
inferred = graph.infer_new_facts()
for subject, predicate, obj in inferred:
print(f"{subject} {predicate} {obj}")
QueryBuilder Overview
The QueryBuilder provides a fluent interface for constructing and executing queries against RDF triple stores. It supports filtering, joining, sorting, and various result formats.
Creating a QueryBuilder
Rust:
use kolibrie::SparqlDatabase;
use kolibrie::query_builder::QueryBuilder;
let db = SparqlDatabase::new();
let query = QueryBuilder::new(&db);
Python:
from py_kolibrie import PySparqlDatabase
db = PySparqlDatabase()
query = db.query()
Basic Filtering
Subject Filtering
with_subject(subject: &str)
Filter triples by exact subject value.
What it does: Returns only triples where the subject exactly matches the provided string.
Rust Example:
let results = QueryBuilder::new(&db)
.with_subject("http://example.org/Alice")
.get_decoded_triples();
Python Example:
results = (db.query()
.with_subject("http://example.org/Alice")
.get_decoded_triples())
with_subject_like(pattern: &str)
Filter triples by subject containing a substring.
What it does: Returns triples where the subject contains the specified pattern as a substring.
Rust Example:
let results = QueryBuilder::new(&db)
.with_subject_like("example.org")
.get_decoded_triples();
with_subject_starting(prefix: &str)
Filter triples by subject starting with a prefix.
What it does: Returns triples where the subject begins with the specified prefix.
Rust Example:
let results = QueryBuilder::new(&db)
.with_subject_starting("http://example.org/")
.get_decoded_triples();
with_subject_ending(suffix: &str)
Filter triples by subject ending with a suffix.
What it does: Returns triples where the subject ends with the specified suffix.
Rust Example:
let results = QueryBuilder::new(&db)
.with_subject_ending("Alice")
.get_decoded_triples();
Predicate Filtering
with_predicate(predicate: &str)
Filter triples by exact predicate value.
What it does: Returns only triples where the predicate exactly matches the provided string.
Rust Example:
let results = QueryBuilder::new(&db)
.with_predicate("http://example.org/knows")
.get_decoded_triples();
Python Example:
results = (db.query()
.with_predicate("http://example.org/knows")
.get_decoded_triples())
with_predicate_like(pattern: &str)
Filter triples by predicate containing a substring.
What it does: Returns triples where the predicate contains the specified pattern.
with_predicate_starting(prefix: &str)
Filter triples by predicate starting with a prefix.
with_predicate_ending(suffix: &str)
Filter triples by predicate ending with a suffix.
Object Filtering
with_object(object: &str)
Filter triples by exact object value.
What it does: Returns only triples where the object exactly matches the provided string.
Rust Example:
let results = QueryBuilder::new(&db)
.with_object("http://example.org/Bob")
.get_decoded_triples();
with_object_like(pattern: &str)
Filter triples by object containing a substring.
with_object_starting(prefix: &str)
Filter triples by object starting with a prefix.
with_object_ending(suffix: &str)
Filter triples by object ending with a suffix.
Advanced Filtering
filter<F>(predicate: F)
Apply a custom filter function to all triples.
What it does: Applies a user-defined function to filter triples based on custom logic.
Rust Example:
let results = QueryBuilder::new(&db)
.filter(|triple| {
db.dictionary.decode(triple.subject)
.map(|s| s.contains("Alice"))
.unwrap_or(false)
})
.get_decoded_triples();
Joining Databases
join(other: &SparqlDatabase)
Join with another SPARQL database.
What it does: Prepares to join the current query results with triples from another database.
join_on_subject()
Specify join condition on subject.
What it does: Joins triples where the subject values match between databases.
Rust Example:
let other_db = SparqlDatabase::new();
// ... populate other_db ...
let results = QueryBuilder::new(&db)
.join(&other_db)
.join_on_subject()
.get_decoded_triples();
join_on_predicate()
Specify join condition on predicate.
What it does: Joins triples where the predicate values match between databases.
join_on_object()
Specify join condition on object.
What it does: Joins triples where the object values match between databases.
join_with<F>(condition: F)
Specify a custom join condition.
What it does: Joins triples based on a user-defined condition function.
Rust Example:
let results = QueryBuilder::new(&db)
.join(&other_db)
.join_with(|left, right| {
left.subject == right.object
})
.get_decoded_triples();
Sorting and Ordering
order_by<F>(key: F)
Order results by a specified key function.
What it does: Sorts the results based on a key extracted from each triple.
Rust Example:
let results = QueryBuilder::new(&db)
.order_by(|triple| {
db.dictionary.decode(triple.subject).unwrap_or("").to_string()
})
.get_decoded_triples();
asc()
Set sort direction to ascending (default).
desc()
Set sort direction to descending.
Rust Example:
let results = QueryBuilder::new(&db)
.order_by(|triple| {
db.dictionary.decode(triple.subject).unwrap_or("").to_string()
})
.desc()
.get_decoded_triples();
Result Retrieval
distinct()
Return only distinct results.
What it does: Removes duplicate triples from the result set.
Rust Example:
let results = QueryBuilder::new(&db)
.with_predicate_like("knows")
.distinct()
.get_decoded_triples();
Python Example:
results = (db.query()
.with_predicate_like("knows")
.distinct()
.get_decoded_triples())
limit(n: usize)
Limit the number of results.
What it does: Returns at most n results from the query.
Rust Example:
let results = QueryBuilder::new(&db)
.limit(10)
.get_decoded_triples();
Python Example:
results = (db.query()
.limit(10)
.get_decoded_triples())
offset(n: usize)
Skip the first n results.
What it does: Skips the first n results, useful for pagination.
Rust Example:
let results = QueryBuilder::new(&db)
.offset(20)
.limit(10) // Get results 21-30
.get_decoded_triples();
get_decoded_triples()
Get results as decoded (subject, predicate, object) tuples.
Return Type: Vec<(String, String, String)>
get_subjects()
Get only the subjects from the results.
Return Type: Vec<String>
Rust Example:
let subjects = QueryBuilder::new(&db)
.with_predicate("http://example.org/knows")
.distinct()
.get_subjects();
get_predicates()
Get only the predicates from the results.
Return Type: Vec<String>
get_objects()
Get only the objects from the results.
Return Type: Vec<String>
get_triples()
Get the raw triple results.
Return Type: BTreeSet<Triple>
Aggregation Functions
count()
Count the number of results without retrieving them.
Return Type: usize
Rust Example:
let count = QueryBuilder::new(&db)
.with_predicate("http://example.org/knows")
.count();
println!("Found {} relationships", count);
Python Example:
count = (db.query()
.with_predicate("http://example.org/knows")
.count())
print(f"Found {count} relationships")
group_by<F, K>(key_fn: F)
Group results by a key function.
Return Type: BTreeMap<K, Vec<Triple>>
Rust Example:
let groups = QueryBuilder::new(&db)
.group_by(|triple| triple.predicate);
for (predicate_id, triples) in groups {
println!("Predicate {}: {} triples", predicate_id, triples.len());
}
Python API
The Python API provides the same QueryBuilder functionality through a Pythonic interface.
Complete Python Example
from py_kolibrie import PySparqlDatabase
def main():
db = PySparqlDatabase()
db.add_triple("http://example.org/Alice", "http://example.org/knows", "http://example.org/Bob")
db.add_triple("http://example.org/Bob", "http://example.org/knows", "http://example.org/Carol")
db.add_triple("http://example.org/Alice", "http://example.org/likes", "http://example.org/IceCream")
query = (db.query()
.with_subject("http://example.org/Alice")
.distinct()
.limit(20))
triples = query.get_decoded_triples()
print("Decoded triples:")
for s, p, o in triples:
print(f" {s} -- {p} --> {o}")
subjects = query.get_subjects()
print("\nDistinct subjects:")
for s in subjects:
print(" ", s)
count = query.count()
print(f"\nTotal matching triples: {count}")
if __name__ == "__main__":
main()
Method Chaining
Both Rust and Python APIs support fluent method chaining:
Python:
results = (db.query()
.with_subject_like("example.org")
.with_predicate("http://example.org/knows")
.distinct()
.limit(50)
.get_decoded_triples())
Rust:
let results = QueryBuilder::new(&db)
.with_subject_like("example.org")
.with_predicate("http://example.org/knows")
.distinct()
.limit(50)
.get_decoded_triples();