user@kolibrie:~/docs$
kolibrie@docs : ~/docs $ cat api-reference.md

API Reference

Table of Contents

  1. SparqlDatabase

  2. HTTP Server REST API

  3. Python Streaming API

  4. Reasoner API

  5. QueryBuilder

  6. Python QueryBuilder API


SparqlDatabase

SparqlDatabase is the core type in Kolibrie. It holds your RDF data and is the entry point for parsing, querying, and modification.

Initialization

Rust:

use kolibrie::SparqlDatabase;

let mut db = SparqlDatabase::new();

Python:

from py_kolibrie import PySparqlDatabase

db = PySparqlDatabase()

Parsing Methods

MethodInputDescription
parse_rdf(str)RDF/XML stringParse and ingest RDF/XML
parse_rdf_from_file(path)File pathMulti-threaded file-based parse
parse_turtle(str)Turtle stringParse Turtle format
parse_n3(str)N3 stringParse N3 format (supports rules)
parse_ntriples_and_add(str)N-Triples stringParse N-Triples format

All formats support RDF-star (quoted triples).

Rust Examples:

db.parse_rdf("<rdf:RDF ...>...</rdf:RDF>");
db.parse_rdf_from_file("dataset.rdf");        // parallelized
db.parse_turtle("@prefix ex: <http://ex.org/> . ex:Alice ex:knows ex:Bob .");
db.parse_n3("{ ?X ex:knows ?Y . } => { ?Y ex:knownBy ?X . } .");
db.parse_ntriples_and_add("<http://ex.org/s> <http://ex.org/p> <http://ex.org/o> .");

Data Manipulation

add_triple_parts(subject, predicate, object)

Add a single triple directly, without parsing RDF syntax:

db.add_triple_parts(
    "http://example.org/Alice",
    "http://example.org/knows",
    "http://example.org/Bob",
);

delete_triple_parts(subject, predicate, object) -> bool

Remove a specific triple. Returns true if the triple existed and was removed:

let removed = db.delete_triple_parts(
    "http://example.org/Alice",
    "http://example.org/knows",
    "http://example.org/Bob",
);

build_all_indexes()

Explicitly rebuild all indexes after bulk data changes:

db.build_all_indexes();

Query Execution

execute_query(sparql, &mut db) -> Vec<Vec<String>>

Run a SPARQL query string and get results as a 2D vector of decoded strings. Each inner Vec<String> corresponds to one result row, with values in the order of your SELECT variables.

use kolibrie::execute_query::execute_query;

let results = execute_query(
    "PREFIX ex: <http://example.org/> SELECT ?s WHERE { ?s ex:knows ex:Bob }",
    &mut db,
);
for row in results {
    println!("{}", row[0]);
}

execute_query_rayon_parallel2_volcano(sparql, &mut db) -> Vec<Vec<String>>

Multi-threaded query execution with the Streamertail cost-based optimizer:

use kolibrie::execute_query::execute_query_rayon_parallel2_volcano;

let results = execute_query_rayon_parallel2_volcano(sparql_str, &mut db);

query(&self) -> QueryBuilder

Start a programmatic query using the fluent QueryBuilder API (see QueryBuilder):

let results = db.query()
    .with_predicate("http://example.org/knows")
    .distinct()
    .get_decoded_triples();

Optimization Hooks

get_or_build_stats() -> Arc<DatabaseStats>

Pre-build the cost-based optimizer’s statistics. Call once after loading data, before running many queries:

db.get_or_build_stats();

invalidate_stats_cache()

Reset cached statistics. Call after bulk INSERT or DELETE operations:

db.invalidate_stats_cache();

User-Defined Functions

register_udf(name, fn)

Register a named function for use in SPARQL BIND() expressions:

db.register_udf("toUpperCase", |args: Vec<String>| -> String {
    args.into_iter().map(|s| s.to_uppercase()).collect::<Vec<_>>().join("")
});

Then use in SPARQL:

SELECT ?upper WHERE {
  ?p ex:name ?name .
  BIND(toUpperCase(?name) AS ?upper)
}

Export

generate_rdf_xml() -> String

Serialize the entire database to an RDF/XML string:

let rdf_xml = db.generate_rdf_xml();
std::fs::write("output.rdf", rdf_xml).unwrap();

HTTP Server REST API

Start the HTTP server:

cargo run --bin kolibrie-http-server
# Server available at http://localhost:8080

Or via Docker:

docker compose up --build

POST /query — Execute a SPARQL Query

Run a SPARQL query against RDF data provided in the request body.

Request body:

{
  "sparql": "SELECT ?s ?p ?o WHERE { ?s ?p ?o }",
  "rdf": "<optional RDF/XML data to load>",
  "format": "rdfxml",
  "rule": "optional SPARQL RULE definition string",
  "n3logic": "optional N3 logic rules",
  "queries": ["optional array of SPARQL strings for batch execution"]
}

The format field accepts: rdfxml, turtle, n3, ntriples.

Response:

{
  "results": [
    {
      "query_index": 0,
      "query": "SELECT ...",
      "data": [["Alice", "knows", "Bob"]],
      "execution_time_ms": 1.23
    }
  ]
}

Example:

curl -X POST http://localhost:8080/query \
  -H "Content-Type: application/json" \
  -d '{
    "rdf": "@prefix ex: <http://example.org/> . ex:Alice ex:knows ex:Bob .",
    "format": "turtle",
    "sparql": "PREFIX ex: <http://example.org/> SELECT ?s ?o WHERE { ?s ex:knows ?o }"
  }'

POST /rsp/register — Register an RSP-QL Session

Create a persistent streaming session. Returns a session_id for subsequent push and subscribe calls.

Request body:

{
  "query": "REGISTER RSTREAM <http://output/stream> AS SELECT * FROM NAMED WINDOW :w ON :s [RANGE PT10M STEP PT1M] WHERE { WINDOW :w { ?s ?p ?o } }",
  "static_rdf": "<optional background RDF/XML joined with every window>",
  "n3logic": "optional N3 rules",
  "sparql_rules": ["optional SPARQL RULE strings"]
}

Response:

{
  "session_id": "session_1",
  "streams": [":s"]
}

POST /rsp/push — Push Events to a Session

Push timestamped N-Triples to a named stream within a session:

{
  "session_id": "session_1",
  "stream": ":s",
  "timestamp": 1000,
  "ntriples": "<http://ex.org/sensor1> <http://ex.org/temperature> \"72.5\" ."
}

Example:

curl -X POST http://localhost:8080/rsp/push \
  -H "Content-Type: application/json" \
  -d '{
    "session_id": "session_1",
    "stream": ":s",
    "timestamp": 1000,
    "ntriples": "<http://ex.org/r1> <http://ex.org/value> \"42\" ."
  }'

GET /rsp/stream/{session_id} — Subscribe to SSE Results

Open a long-lived Server-Sent Events connection to receive results each time a window closes:

curl -N http://localhost:8080/rsp/stream/session_1

Results are delivered as SSE events in JSON format.

POST /rsp/query — Stateless RSP Query

Execute a single-shot RSP-QL query with inline events (no persistent session):

{
  "query": "SELECT * FROM NAMED WINDOW :w ON :s [RANGE PT10M STEP PT1M] WHERE { WINDOW :w { ?s ?p ?o } }",
  "events": [
    { "stream": ":s", "timestamp": 100, "ntriples": "<http://ex.org/r1> <http://ex.org/p> <http://ex.org/v> ." }
  ],
  "static_rdf": ""
}

Python Streaming API

The Python streaming API is available via QueryBuilder.as_stream().

Creating a Stream

from py_kolibrie import PySparqlDatabase, PyStreamOperator

db = PySparqlDatabase()

stream = (db.query()
            .window(size=10, step=2)
            .with_predicate("http://example.org/knows")
            .with_stream_operator(PyStreamOperator.RSTREAM)
            .as_stream())

Stream Methods

MethodDescription
add_stream_triple(s, p, o, timestamp)Add a timestamped triple to the stream
get_stream_results()Get results since the last call (ISTREAM-style for incremental use)
get_all_stream_results()Get all accumulated result batches
stop_stream()Shut down the stream and release resources

Stream Operators

OperatorBehavior
PyStreamOperator.RSTREAMAll triples in the current window
PyStreamOperator.ISTREAMOnly triples new since the last window
PyStreamOperator.DSTREAMTriples that left the window

Example

stream.add_stream_triple("http://example.org/Alice",
                         "http://example.org/knows",
                         "http://example.org/Bob",
                         timestamp=1)
stream.add_stream_triple("http://example.org/Bob",
                         "http://example.org/knows",
                         "http://example.org/Carol",
                         timestamp=3)

batches = stream.get_stream_results()
for batch in batches:
    for subject, predicate, obj in batch:
        print(f"{subject} -> {obj}")

stream.stop_stream()

Reasoner API

The Reasoner provides rule-based inference over ABox (instance-level) facts.

Rust

use datalog::reasoning::Reasoner;
MethodReturnDescription
Reasoner::new()ReasonerCreate an empty reasoner
add_abox_triple(s, p, o)Add a fact triple
query_abox(s?, p?, o?)Vec<Triple>Query facts with optional filters
add_rule(rule)Add an inference rule
add_constraint(rule)Add an integrity constraint
infer_new_facts()Vec<Triple>Forward chaining (naive)
infer_new_facts_semi_naive()Vec<Triple>Forward chaining (semi-naive, efficient)
infer_new_facts_semi_naive_parallel()Vec<Triple>Parallel forward chaining
backward_chaining(pattern)Vec<HashMap<String, Term>>Goal-directed proof search
infer_new_facts_semi_naive_with_repairs()Vec<Triple>Inference with inconsistency repair
query_with_repairs(pattern)Vec<HashMap<String, u32>>IAR-semantics inconsistency-tolerant query

Example:

use datalog::reasoning::Reasoner;
use shared::terms::Term;
use shared::rule::Rule;

let mut kg = Reasoner::new();
kg.add_abox_triple("Alice", "hasParent", "Bob");
kg.add_abox_triple("Bob",   "hasParent", "Charlie");

let parent_id      = kg.dictionary.write().unwrap().encode("hasParent");
let grandparent_id = kg.dictionary.write().unwrap().encode("hasGrandparent");

kg.add_rule(Rule {
    premise: vec![
        (Term::Variable("X".into()), Term::Constant(parent_id), Term::Variable("Y".into())),
        (Term::Variable("Y".into()), Term::Constant(parent_id), Term::Variable("Z".into())),
    ],
    negative_premise: vec![],
    conclusion: vec![(
        Term::Variable("X".into()),
        Term::Constant(grandparent_id),
        Term::Variable("Z".into()),
    )],
    filters: vec![],
});

let inferred = kg.infer_new_facts_semi_naive();
// ["Alice hasGrandparent Charlie"]

Python

import py_kolibrie
MethodDescription
PyKnowledgeGraph()Create a knowledge graph with Reasoner
add_abox_triple(s, p, o)Add a fact
encode_term(str) -> intEncode a term for use in PyTerm.Constant()
query_abox()Return all (subject, predicate, object) tuples
add_rule(PyRule)Add an inference rule
add_constraint(PyRule)Add an integrity constraint
infer_new_facts()Run forward chaining
infer_new_facts_semi_naive_with_repairs()Inference with inconsistency repair
query_with_repairs(PyTriplePattern)Inconsistency-tolerant query

Example:

import py_kolibrie

graph = py_kolibrie.PyKnowledgeGraph()
graph.add_abox_triple("Alice", "hasParent", "Bob")
graph.add_abox_triple("Bob",   "hasParent", "Charlie")

has_parent      = graph.encode_term("hasParent")
has_grandparent = graph.encode_term("hasGrandparent")

rule = py_kolibrie.PyRule(
    premise=[
        py_kolibrie.PyTriplePattern(
            py_kolibrie.PyTerm.Variable("X"),
            py_kolibrie.PyTerm.Constant(has_parent),
            py_kolibrie.PyTerm.Variable("Y")),
        py_kolibrie.PyTriplePattern(
            py_kolibrie.PyTerm.Variable("Y"),
            py_kolibrie.PyTerm.Constant(has_parent),
            py_kolibrie.PyTerm.Variable("Z")),
    ],
    filters=[],
    conclusion=[py_kolibrie.PyTriplePattern(
        py_kolibrie.PyTerm.Variable("X"),
        py_kolibrie.PyTerm.Constant(has_grandparent),
        py_kolibrie.PyTerm.Variable("Z"),
    )],
)
graph.add_rule(rule)

inferred = graph.infer_new_facts()
for subject, predicate, obj in inferred:
    print(f"{subject} {predicate} {obj}")

QueryBuilder Overview

The QueryBuilder provides a fluent interface for constructing and executing queries against RDF triple stores. It supports filtering, joining, sorting, and various result formats.

Creating a QueryBuilder

Rust:

use kolibrie::SparqlDatabase;
use kolibrie::query_builder::QueryBuilder;

let db = SparqlDatabase::new();
let query = QueryBuilder::new(&db);

Python:

from py_kolibrie import PySparqlDatabase

db = PySparqlDatabase()
query = db.query()

Basic Filtering

Subject Filtering

with_subject(subject: &str)

Filter triples by exact subject value.

What it does: Returns only triples where the subject exactly matches the provided string.

Rust Example:

let results = QueryBuilder::new(&db)
    .with_subject("http://example.org/Alice")
    .get_decoded_triples();

Python Example:

results = (db.query()
    .with_subject("http://example.org/Alice")
    .get_decoded_triples())

with_subject_like(pattern: &str)

Filter triples by subject containing a substring.

What it does: Returns triples where the subject contains the specified pattern as a substring.

Rust Example:

let results = QueryBuilder::new(&db)
    .with_subject_like("example.org")
    .get_decoded_triples();

with_subject_starting(prefix: &str)

Filter triples by subject starting with a prefix.

What it does: Returns triples where the subject begins with the specified prefix.

Rust Example:

let results = QueryBuilder::new(&db)
    .with_subject_starting("http://example.org/")
    .get_decoded_triples();

with_subject_ending(suffix: &str)

Filter triples by subject ending with a suffix.

What it does: Returns triples where the subject ends with the specified suffix.

Rust Example:

let results = QueryBuilder::new(&db)
    .with_subject_ending("Alice")
    .get_decoded_triples();

Predicate Filtering

with_predicate(predicate: &str)

Filter triples by exact predicate value.

What it does: Returns only triples where the predicate exactly matches the provided string.

Rust Example:

let results = QueryBuilder::new(&db)
    .with_predicate("http://example.org/knows")
    .get_decoded_triples();

Python Example:

results = (db.query()
    .with_predicate("http://example.org/knows")
    .get_decoded_triples())

with_predicate_like(pattern: &str)

Filter triples by predicate containing a substring.

What it does: Returns triples where the predicate contains the specified pattern.

with_predicate_starting(prefix: &str)

Filter triples by predicate starting with a prefix.

with_predicate_ending(suffix: &str)

Filter triples by predicate ending with a suffix.

Object Filtering

with_object(object: &str)

Filter triples by exact object value.

What it does: Returns only triples where the object exactly matches the provided string.

Rust Example:

let results = QueryBuilder::new(&db)
    .with_object("http://example.org/Bob")
    .get_decoded_triples();

with_object_like(pattern: &str)

Filter triples by object containing a substring.

with_object_starting(prefix: &str)

Filter triples by object starting with a prefix.

with_object_ending(suffix: &str)

Filter triples by object ending with a suffix.


Advanced Filtering

filter<F>(predicate: F)

Apply a custom filter function to all triples.

What it does: Applies a user-defined function to filter triples based on custom logic.

Rust Example:

let results = QueryBuilder::new(&db)
    .filter(|triple| {
        db.dictionary.decode(triple.subject)
            .map(|s| s.contains("Alice"))
            .unwrap_or(false)
    })
    .get_decoded_triples();

Joining Databases

join(other: &SparqlDatabase)

Join with another SPARQL database.

What it does: Prepares to join the current query results with triples from another database.

join_on_subject()

Specify join condition on subject.

What it does: Joins triples where the subject values match between databases.

Rust Example:

let other_db = SparqlDatabase::new();
// ... populate other_db ...

let results = QueryBuilder::new(&db)
    .join(&other_db)
    .join_on_subject()
    .get_decoded_triples();

join_on_predicate()

Specify join condition on predicate.

What it does: Joins triples where the predicate values match between databases.

join_on_object()

Specify join condition on object.

What it does: Joins triples where the object values match between databases.

join_with<F>(condition: F)

Specify a custom join condition.

What it does: Joins triples based on a user-defined condition function.

Rust Example:

let results = QueryBuilder::new(&db)
    .join(&other_db)
    .join_with(|left, right| {
        left.subject == right.object
    })
    .get_decoded_triples();

Sorting and Ordering

order_by<F>(key: F)

Order results by a specified key function.

What it does: Sorts the results based on a key extracted from each triple.

Rust Example:

let results = QueryBuilder::new(&db)
    .order_by(|triple| {
        db.dictionary.decode(triple.subject).unwrap_or("").to_string()
    })
    .get_decoded_triples();

asc()

Set sort direction to ascending (default).

desc()

Set sort direction to descending.

Rust Example:

let results = QueryBuilder::new(&db)
    .order_by(|triple| {
        db.dictionary.decode(triple.subject).unwrap_or("").to_string()
    })
    .desc()
    .get_decoded_triples();

Result Retrieval

distinct()

Return only distinct results.

What it does: Removes duplicate triples from the result set.

Rust Example:

let results = QueryBuilder::new(&db)
    .with_predicate_like("knows")
    .distinct()
    .get_decoded_triples();

Python Example:

results = (db.query()
    .with_predicate_like("knows")
    .distinct()
    .get_decoded_triples())

limit(n: usize)

Limit the number of results.

What it does: Returns at most n results from the query.

Rust Example:

let results = QueryBuilder::new(&db)
    .limit(10)
    .get_decoded_triples();

Python Example:

results = (db.query()
    .limit(10)
    .get_decoded_triples())

offset(n: usize)

Skip the first n results.

What it does: Skips the first n results, useful for pagination.

Rust Example:

let results = QueryBuilder::new(&db)
    .offset(20)
    .limit(10)  // Get results 21-30
    .get_decoded_triples();

get_decoded_triples()

Get results as decoded (subject, predicate, object) tuples.

Return Type: Vec<(String, String, String)>

get_subjects()

Get only the subjects from the results.

Return Type: Vec<String>

Rust Example:

let subjects = QueryBuilder::new(&db)
    .with_predicate("http://example.org/knows")
    .distinct()
    .get_subjects();

get_predicates()

Get only the predicates from the results.

Return Type: Vec<String>

get_objects()

Get only the objects from the results.

Return Type: Vec<String>

get_triples()

Get the raw triple results.

Return Type: BTreeSet<Triple>


Aggregation Functions

count()

Count the number of results without retrieving them.

Return Type: usize

Rust Example:

let count = QueryBuilder::new(&db)
    .with_predicate("http://example.org/knows")
    .count();
println!("Found {} relationships", count);

Python Example:

count = (db.query()
    .with_predicate("http://example.org/knows")
    .count())
print(f"Found {count} relationships")

group_by<F, K>(key_fn: F)

Group results by a key function.

Return Type: BTreeMap<K, Vec<Triple>>

Rust Example:

let groups = QueryBuilder::new(&db)
    .group_by(|triple| triple.predicate);

for (predicate_id, triples) in groups {
    println!("Predicate {}: {} triples", predicate_id, triples.len());
}

Python API

The Python API provides the same QueryBuilder functionality through a Pythonic interface.

Complete Python Example

from py_kolibrie import PySparqlDatabase

def main():
    db = PySparqlDatabase()
    db.add_triple("http://example.org/Alice", "http://example.org/knows", "http://example.org/Bob")
    db.add_triple("http://example.org/Bob", "http://example.org/knows", "http://example.org/Carol")
    db.add_triple("http://example.org/Alice", "http://example.org/likes", "http://example.org/IceCream")

    query = (db.query()
        .with_subject("http://example.org/Alice")
        .distinct()
        .limit(20))

    triples = query.get_decoded_triples()
    print("Decoded triples:")
    for s, p, o in triples:
        print(f"  {s} -- {p} --> {o}")

    subjects = query.get_subjects()
    print("\nDistinct subjects:")
    for s in subjects:
        print(" ", s)

    count = query.count()
    print(f"\nTotal matching triples: {count}")

if __name__ == "__main__":
    main()

Method Chaining

Both Rust and Python APIs support fluent method chaining:

Python:

results = (db.query()
    .with_subject_like("example.org")
    .with_predicate("http://example.org/knows")
    .distinct()
    .limit(50)
    .get_decoded_triples())

Rust:

let results = QueryBuilder::new(&db)
    .with_subject_like("example.org")
    .with_predicate("http://example.org/knows")
    .distinct()
    .limit(50)
    .get_decoded_triples();