sparql_handles

SPARQL query execution with first-class result handles

Overview

This module implements Stage 3 from the trajectory: SPARQL query execution with first-class result handles.

Result Handle Pattern

Every SPARQL execution produces a SPARQLResultHandle with: - meta: query, endpoint/local, timestamp, row count, columns - rows: stored internally as list of dicts (SELECT) or Graph (CONSTRUCT/DESCRIBE) - Bounded view operations: res_head(), res_where(), res_group(), res_sample()

Progressive Disclosure

Result handles enable the root model to refine queries by inspecting metadata and small slices, not rerunning blind queries.

Dataset Integration

SPARQL results can optionally be stored in dataset work graphs with full provenance tracking.

Imports

SPARQLResultHandle

Unified wrapper for all SPARQL result types with metadata and bounded view operations.


SPARQLResultHandle


def SPARQLResultHandle(
    rows:list | rdflib.graph.Graph, result_type:str, query:str, endpoint:str, timestamp:str=<factory>,
    columns:list=None, total_rows:int=0, triple_count:int=0, total_triples:int=0
)->None:

Wrapper for SPARQL results with metadata and bounded view operations.

Test SPARQLResultHandle with different result types:

# Test SELECT result
select_handle = SPARQLResultHandle(
    rows=[{'s': 'http://ex.org/alice', 'age': '30'}],
    result_type='select',
    query='SELECT ?s ?age WHERE { ?s :age ?age }',
    endpoint='local',
    columns=['s', 'age'],
    total_rows=1
)
assert select_handle.summary() == "SELECT: 1 rows, columns=['s', 'age']"
assert len(select_handle) == 1
print(f"✓ SELECT handle: {select_handle}")

# Test SELECT with truncation
truncated_select = SPARQLResultHandle(
    rows=[{'s': 'http://ex.org/alice', 'age': '30'}],
    result_type='select',
    query='SELECT ?s ?age WHERE { ?s :age ?age }',
    endpoint='local',
    columns=['s', 'age'],
    total_rows=100  # More than stored
)
assert '(of 100 total)' in truncated_select.summary()
print(f"✓ Truncated SELECT handle: {truncated_select}")

# Test ASK result
ask_handle = SPARQLResultHandle(
    rows=True,
    result_type='ask',
    query='ASK { ?s ?p ?o }',
    endpoint='local'
)
assert ask_handle.summary() == "ASK: True"
print(f"✓ ASK handle: {ask_handle}")

# Test CONSTRUCT result
g = Graph()
g.add((URIRef('http://ex.org/alice'), URIRef('http://ex.org/age'), Literal('30')))
construct_handle = SPARQLResultHandle(
    rows=g,
    result_type='construct',
    query='CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }',
    endpoint='local',
    triple_count=1,
    total_triples=1
)
assert construct_handle.summary() == "CONSTRUCT: 1 triples"
print(f"✓ CONSTRUCT handle: {construct_handle}")

# Test CONSTRUCT with truncation
truncated_construct = SPARQLResultHandle(
    rows=g,
    result_type='construct',
    query='CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }',
    endpoint='local',
    triple_count=1,
    total_triples=500  # More than stored
)
assert '(of 500 total)' in truncated_construct.summary()
print(f"✓ Truncated CONSTRUCT handle: {truncated_construct}")
✓ SELECT handle: SPARQLResultHandle(SELECT: 1 rows, columns=['s', 'age'])
✓ Truncated SELECT handle: SPARQLResultHandle(SELECT: 1 rows (of 100 total), columns=['s', 'age'])
✓ ASK handle: SPARQLResultHandle(ASK: True)
✓ CONSTRUCT handle: SPARQLResultHandle(CONSTRUCT: 1 triples)
✓ Truncated CONSTRUCT handle: SPARQLResultHandle(CONSTRUCT: 1 triples (of 500 total))
DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
  timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat() + 'Z')

Remote SPARQL Query

Execute SPARQL queries against remote endpoints and return result handles.

Query Rewriting Helper

Helper to inject LIMIT clauses into SELECT queries to bound server-side work.

# Test LIMIT injection
q1 = "SELECT ?s ?p ?o WHERE { ?s ?p ?o }"
modified, injected = _inject_limit(q1, 100)
assert injected == True
assert 'LIMIT 100' in modified
print(f"✓ Basic injection: {modified}")

# Test with existing LIMIT (should not modify)
q2 = "SELECT ?s WHERE { ?s ?p ?o } LIMIT 50"
modified, injected = _inject_limit(q2, 100)
assert injected == False
assert modified == q2
print(f"✓ Existing LIMIT preserved: {modified}")

# Test with ORDER BY (inject before it)
q3 = "SELECT ?s ?o WHERE { ?s ?p ?o } ORDER BY ?s"
modified, injected = _inject_limit(q3, 100)
assert injected == True
assert 'LIMIT 100' in modified
assert modified.index('LIMIT') < modified.index('ORDER')
print(f"✓ Injection before ORDER BY: {modified}")

# Test CONSTRUCT (should not inject)
q4 = "CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }"
modified, injected = _inject_limit(q4, 100)
assert injected == False
print(f"✓ CONSTRUCT not modified: {modified}")
✓ Basic injection: SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 100
✓ Existing LIMIT preserved: SELECT ?s WHERE { ?s ?p ?o } LIMIT 50
✓ Injection before ORDER BY: SELECT ?s ?o WHERE { ?s ?p ?o }  LIMIT 100 ORDER BY ?s
✓ CONSTRUCT not modified: CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }

sparql_query


def sparql_query(
    query:str, endpoint:str='https://query.wikidata.org/sparql', max_results:int=100, name:str='res', ns:dict=None,
    timeout:float=30.0, ds_meta:NoneType=None, # Dataset integration
    store_in_work:bool=False, work_task_id:str=None
)->str:

Execute SPARQL query, store SPARQLResultHandle in namespace.

For SELECT: Stores SPARQLResultHandle with rows as list of dicts For CONSTRUCT/DESCRIBE: Stores SPARQLResultHandle with rdflib.Graph For ASK: Stores SPARQLResultHandle with boolean result

IMPORTANT - Work Bounds: - For SELECT: Automatically injects LIMIT clause to bound server-side work - For CONSTRUCT/DESCRIBE: max_results only truncates locally; full results still fetched from endpoint (SPARQL 1.1 has no standard LIMIT for graphs)

If ds_meta provided and store_in_work=True: - CONSTRUCT results stored in work/ graph - Query logged to prov graph

Args: query: SPARQL query string endpoint: SPARQL endpoint URL max_results: Maximum results to return (for SELECT/CONSTRUCT) name: Variable name to store result handle ns: Namespace dict (defaults to globals()) timeout: Query timeout in seconds ds_meta: Optional DatasetMeta for dataset integration store_in_work: If True and ds_meta provided, store CONSTRUCT results in work graph work_task_id: Task ID for work graph (auto-generated if None)

Returns: Summary string describing the result

Test against Wikidata:

# Test SELECT query against Wikidata
test_ns = {}
result = sparql_query(
    "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 5",
    ns=test_ns,
    name='wikidata_test'
)
print(result)
assert 'wikidata_test' in test_ns
assert isinstance(test_ns['wikidata_test'], SPARQLResultHandle)
assert test_ns['wikidata_test'].result_type == 'select'
assert len(test_ns['wikidata_test'].rows) == 5
print(f"✓ SELECT query works: {test_ns['wikidata_test'].summary()}")

# Test CONSTRUCT query
result = sparql_query(
    "CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o } LIMIT 3",
    ns=test_ns,
    name='graph_test'
)
print(result)
assert test_ns['graph_test'].result_type == 'construct'
assert isinstance(test_ns['graph_test'].rows, Graph)
print(f"✓ CONSTRUCT query works: {test_ns['graph_test'].summary()}")

Local Graph Query

Execute SPARQL queries against local rdflib graphs (mounted ontologies or work graphs).


sparql_local


def sparql_local(
    query:str, graph:rdflib.graph.Graph | str, max_results:int=100, name:str='res', ns:dict=None
)->str:

Execute SPARQL query on local rdflib Graph.

Useful for querying mounted ontologies or work graphs. Returns SPARQLResultHandle same as sparql_query().

IMPORTANT - Work Bounds: - max_results is output truncation only; full result set is materialized - For large local graphs, consider filtering in the SPARQL query itself

Args: query: SPARQL query string graph: rdflib.Graph object or name of graph in namespace max_results: Maximum results to return name: Variable name to store result handle ns: Namespace dict (defaults to globals())

Returns: Summary string describing the result

Test with local graph:

# Create test graph
test_graph = Graph()
test_graph.add((URIRef('http://ex.org/alice'), URIRef('http://ex.org/age'), Literal('30')))
test_graph.add((URIRef('http://ex.org/bob'), URIRef('http://ex.org/age'), Literal('25')))
test_graph.add((URIRef('http://ex.org/alice'), URIRef('http://ex.org/city'), Literal('Boston')))

test_ns = {'my_graph': test_graph}

# Test SELECT query on local graph
result = sparql_local(
    "SELECT ?s ?age WHERE { ?s <http://ex.org/age> ?age }",
    'my_graph',
    ns=test_ns,
    name='local_res'
)
print(result)
assert 'local_res' in test_ns
assert test_ns['local_res'].result_type == 'select'
assert len(test_ns['local_res'].rows) == 2
assert test_ns['local_res'].total_rows == 2
print(f"✓ Local SELECT query works: {test_ns['local_res'].rows}")

# Test CONSTRUCT on local graph
result = sparql_local(
    "CONSTRUCT { ?s <http://ex.org/age> ?age } WHERE { ?s <http://ex.org/age> ?age }",
    test_graph,
    ns=test_ns,
    name='local_graph'
)
print(result)
assert test_ns['local_graph'].result_type == 'construct'
assert len(test_ns['local_graph'].rows) == 2
assert test_ns['local_graph'].triple_count == 2
assert test_ns['local_graph'].total_triples == 2
print(f"✓ Local CONSTRUCT query works")

# Test truncation
result = sparql_local(
    "CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }",
    test_graph,
    max_results=2,
    ns=test_ns,
    name='truncated'
)
print(result)
assert len(test_ns['truncated'].rows) == 2
assert test_ns['truncated'].triple_count == 2
assert test_ns['truncated'].total_triples == 3  # Original had 3
assert '(of 3 total)' in test_ns['truncated'].summary()
print(f"✓ Truncation works correctly: {test_ns['truncated'].summary()}")
SELECT result with 2 rows, columns: ['s', 'age'], stored in 'local_res'
✓ Local SELECT query works: [{'s': rdflib.term.URIRef('http://ex.org/alice'), 'age': rdflib.term.Literal('30')}, {'s': rdflib.term.URIRef('http://ex.org/bob'), 'age': rdflib.term.Literal('25')}]
Graph with 2 triples stored in 'local_graph'
✓ Local CONSTRUCT query works
Graph with 2 triples stored in 'truncated' (of 3 total)
✓ Truncation works correctly: CONSTRUCT: 2 triples (of 3 total)
DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
  timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat() + 'Z')

View Operations

Bounded view functions for progressive disclosure over result sets.

These functions work with SPARQLResultHandle, ResultTable, or plain lists.


res_sample


def res_sample(
    result, n:int=10, seed:int=None
)->list:

Get random sample of N rows from result.

Args: result: SPARQLResultHandle, ResultTable, or list n: Number of rows to sample seed: Optional random seed for reproducibility

Returns: List of sampled rows

Test res_sample:

# Test with list
test_list = [{'x': i} for i in range(20)]
sample = res_sample(test_list, n=5, seed=42)
assert len(sample) == 5
assert all(isinstance(item, dict) for item in sample)
print(f"✓ res_sample works with list: {len(sample)} items")

# Test with SPARQLResultHandle
handle = SPARQLResultHandle(
    rows=[{'s': f'http://ex.org/item{i}'} for i in range(15)],
    result_type='select',
    query='SELECT ?s WHERE { ?s ?p ?o }',
    endpoint='local',
    columns=['s'],
    total_rows=15
)
sample = res_sample(handle, n=3, seed=42)
assert len(sample) == 3
print(f"✓ res_sample works with SPARQLResultHandle")

# Test with small result (no sampling needed)
small_list = [1, 2, 3]
sample = res_sample(small_list, n=10)
assert len(sample) == 3
print(f"✓ res_sample handles small results correctly")
✓ res_sample works with list: 5 items
✓ res_sample works with SPARQLResultHandle
✓ res_sample handles small results correctly
DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
  timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat() + 'Z')

Setup Function

Initialize SPARQL tools in namespace for RLM sessions.


setup_sparql_context


def setup_sparql_context(
    ns:dict, default_endpoint:str='https://query.wikidata.org/sparql', ds_meta:NoneType=None
)->str:

Initialize SPARQL tools in namespace.

Binds: - sparql_query() with default endpoint - sparql_local() if ds_meta provided - res_head(), res_where(), res_group(), res_distinct(), res_sample()

Args: ns: Namespace dict where functions will be bound default_endpoint: Default SPARQL endpoint URL ds_meta: Optional DatasetMeta for dataset integration

Returns: Status message

Test setup function:

# Test basic setup
test_ns = {}
result = setup_sparql_context(test_ns)
print(result)
assert 'sparql_query' in test_ns
assert 'sparql_local' in test_ns
assert 'res_sample' in test_ns
print(f"✓ Setup function works")

# Test with dataset integration
try:
    from rlm.dataset import DatasetMeta
    from rdflib import Dataset
    
    ds = Dataset()
    ds_meta = DatasetMeta(ds, name='test')
    
    test_ns2 = {}
    result = setup_sparql_context(test_ns2, ds_meta=ds_meta)
    print(result)
    assert 'session:' in result
    print(f"✓ Setup with dataset integration works")
except ImportError:
    print("⊘ Dataset module not available, skipping integration test")
SPARQL context initialized with endpoint: https://query.wikidata.org/sparql
Bound functions: sparql_query, sparql_local, res_sample, res_head, res_where, res_group, res_distinct
✓ Setup function works
SPARQL context initialized with endpoint: https://query.wikidata.org/sparql
Dataset integration enabled (session: d6379b48)
Bound functions: sparql_query, sparql_local, res_sample, res_head, res_where, res_group, res_distinct
✓ Setup with dataset integration works

Usage Examples

End-to-end examples showing SPARQL handles in RLM context.

# Example 1: Basic SPARQL workflow
ns = {}
setup_sparql_context(ns)

# Execute query (LLM would do this)
ns['sparql_query']('SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 10', name='results')

# Inspect results
print(ns['results'].summary())
print(ns['res_head'](ns['results'], 5))
print(ns['res_sample'](ns['results'], 3))
# Example 2: Dataset integration
from rlm.dataset import setup_dataset_context

ns = {}
setup_dataset_context(ns)
setup_sparql_context(ns, ds_meta=ns['ds_meta'])

# Query and store in work graph
ns['sparql_query'](
    'CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o } LIMIT 5',
    name='discovered_triples',
    store_in_work=True,
    work_task_id='discovery_1'
)

# Check provenance
print(ns['dataset_stats']())
# Example 3: Local graph queries
from rlm.ontology import setup_ontology_context

ns = {}
setup_sparql_context(ns)
setup_ontology_context('ontology/prov.ttl', ns, name='prov')

# Query mounted ontology
ns['sparql_local'](
    'SELECT ?c WHERE { ?c a <http://www.w3.org/2002/07/owl#Class> }',
    'prov',
    name='classes'
)

print(f"Found {len(ns['classes'].rows)} classes")
print(ns['res_head'](ns['classes'], 10))