dataset

RDF Dataset-based session memory for RLM

Overview

This module implements RDF Dataset-based memory for RLM sessions using named graphs:

  • onto/<name> - Read-only ontology graphs
  • mem - Mutable working memory for current session
  • prov - Provenance/audit trail
  • work/<task_id> - Scratch graphs for intermediate results

Design Principles

  • Session-scoped: mem is working memory for current RLM run
  • Handle-based access: Model sees bounded views, never raw quads
  • Provenance tracking: All mem changes recorded with timestamp/source/reason
  • Lazy indexing: Caches invalidated on mutation

Imports

DatasetMeta

Meta-graph navigation for RDF Dataset with lazy-cached indexes.


DatasetMeta


def DatasetMeta(
    dataset:Dataset, name:str='ds', session_id:str=<factory>
)->None:

Meta-graph navigation for RDF Dataset.

Provides lazy-cached indexes and bounded views over named graphs. Indexes are invalidated on any mutation to mem graph.

Setup Function

Memory Operations


mem_add


def mem_add(
    ds_meta:DatasetMeta, subject, predicate, obj, source:str='agent', reason:str=None
)->str:

Add fact to mem with provenance tracking.

Args: ds_meta: DatasetMeta containing the dataset subject: Subject URI or literal predicate: Predicate URI obj: Object URI or literal source: Source of this fact (default: ‘agent’) reason: Optional reason for adding

Returns: Summary string


mem_query


def mem_query(
    ds_meta:DatasetMeta, sparql:str, limit:int=100
)->list:

Query mem graph, return bounded results.

Args: ds_meta: DatasetMeta containing the dataset sparql: SPARQL query string limit: Maximum results to return

Returns: List of result rows (as dicts)


mem_retract


def mem_retract(
    ds_meta:DatasetMeta, subject:NoneType=None, predicate:NoneType=None, obj:NoneType=None, source:str='agent',
    reason:str=None
)->str:

Remove triples with provenance.

Args: ds_meta: DatasetMeta containing the dataset subject: Subject URI or None (wildcard) predicate: Predicate URI or None (wildcard) obj: Object URI/literal or None (wildcard) source: Source of this retraction reason: Optional reason for removing

Returns: Summary string


mem_describe


def mem_describe(
    ds_meta:DatasetMeta, uri:str, limit:int=20
)->dict:

Get bounded entity description from mem.

Args: ds_meta: DatasetMeta containing the dataset uri: URI of entity to describe limit: Maximum triples to include

Returns: Dict with ‘as_subject’ and ‘as_object’ triple lists

Scratch Graph Operations


work_create


def work_create(
    ds_meta:DatasetMeta, task_id:str=None
)->tuple:

Create a scratch graph for intermediate results.

Args: ds_meta: DatasetMeta containing the dataset task_id: Task identifier (default: auto-generated)

Returns: (graph_uri, graph) tuple


work_cleanup


def work_cleanup(
    ds_meta:DatasetMeta, task_id:str=None, all:bool=False
)->str:

Remove scratch graph(s).

Args: ds_meta: DatasetMeta containing the dataset task_id: Specific task to clean up, or None all: If True, remove all work/* graphs

Returns: Summary string


work_to_mem


def work_to_mem(
    ds_meta:DatasetMeta, task_id:str, source:str='work', reason:str=None
)->str:

Promote triples from scratch graph to mem with provenance.

Args: ds_meta: DatasetMeta containing the dataset task_id: Task identifier for work graph source: Source label for provenance reason: Optional reason for promotion

Returns: Summary string

Snapshot Functions


snapshot_dataset


def snapshot_dataset(
    ds_meta:DatasetMeta, path:str=None, format:str='trig'
)->str:

Serialize dataset to TriG/N-Quads for debugging.

Args: ds_meta: DatasetMeta to snapshot path: Output path (default: auto-generated with timestamp) format: ‘trig’ or ‘nquads’

Returns: Path to snapshot file


load_snapshot


def load_snapshot(
    path:str, ns:dict, name:str='ds'
)->str:

Load dataset from TriG/N-Quads snapshot.

Useful for debugging/replay. Note: The snapshot preserves the original dataset name in graph URIs, so if you want to use the original name, extract it from the graph URIs.

Args: path: Path to snapshot file ns: Namespace dict where Dataset will be stored name: Variable name for the Dataset handle

Returns: Summary string

Bounded View Functions


res_distinct


def res_distinct(
    result, column:str, limit:int=50
)->list:

Get distinct values in a column.

Args: result: ResultTable or list of dicts column: Column to get distinct values from limit: Maximum distinct values to return

Returns: List of distinct values


res_group


def res_group(
    result, column:str, limit:int=20
)->list:

Get counts grouped by column value.

Args: result: ResultTable or list of dicts column: Column to group by limit: Maximum groups to return

Returns: List of (value, count) tuples, sorted by count descending


res_where


def res_where(
    result, column:str, pattern:str=None, value:str=None, limit:int=100
)->list:

Filter result rows by column value or regex pattern.

Args: result: ResultTable or list of dicts column: Column name to filter on pattern: Optional regex pattern to match value: Optional exact value to match limit: Maximum matching rows to return (default: 100)

Returns: List of matching rows


res_head


def res_head(
    result, n:int=10
)->list:

Get first N rows of a result set.

Args: result: ResultTable, list of dicts, or list of tuples n: Number of rows to return

Returns: List of rows (same format as input)


ResultTable


def ResultTable(
    rows:list, columns:list, query:str, total_rows:int
)->None:

Wrapper for SPARQL query results with bounded view operations.

Result Table Views (Stage 2)

Bounded view operations over SPARQL query results enable iterative exploration without overwhelming context:

  • res_head(): Preview first N rows
  • res_where(): Filter by column value or regex
  • res_group(): Aggregate and count by column
  • res_distinct(): Find unique values

These work with ResultTable wrapper or plain list-of-dicts from mem_query().

Use Cases

  • Previewing large result sets: res_head(results, 10)
  • Finding specific entities: res_where(results, 'name', pattern='Alice')
  • Understanding data distribution: res_group(results, 'category')
  • Exploring unique values: res_distinct(results, 'author')

dataset_stats


def dataset_stats(
    ds_meta:DatasetMeta
)->str:

Get dataset statistics summary.


list_graphs


def list_graphs(
    ds_meta:DatasetMeta, pattern:str=None
)->list:

List named graphs, optionally filtered.

Args: ds_meta: DatasetMeta containing the dataset pattern: Optional substring to filter graph URIs

Returns: List of (graph_uri, triple_count) tuples


graph_sample


def graph_sample(
    ds_meta:DatasetMeta, graph_uri:str, limit:int=10
)->list:

Get sample triples from a graph.

Args: ds_meta: DatasetMeta containing the dataset graph_uri: URI of graph to sample limit: Maximum triples to return

Returns: List of (s, p, o) tuples as strings

Ontology Integration


mount_ontology


def mount_ontology(
    ds_meta:DatasetMeta, ns:dict, path:str, ont_name:str, index_shacl:bool=True, index_queries:bool=True
)->str:

Mount ontology into dataset as read-only onto/ graph.

If index_shacl=True and SHACL content detected, also builds SHACLIndex and stores in ns[’{ont_name}_shacl’].

If index_queries=True and sh:SPARQLExecutable detected, also builds QueryIndex and stores in ns[’{ont_name}_queries’].

Args: ds_meta: DatasetMeta containing the dataset ns: Namespace dict (for compatibility with setup_ontology_context) path: Path to ontology file ont_name: Name for the ontology index_shacl: Whether to detect and index SHACL shapes (default: True) index_queries: Whether to detect and index query templates (default: True)

Returns: Summary string


setup_dataset_context


def setup_dataset_context(
    ns:dict, name:str='ds'
)->str:

Initialize Dataset with mem/prov graphs, bind helper functions.

Args: ns: Namespace dict where Dataset will be stored name: Variable name for the Dataset handle

Returns: Summary string describing what was created

# Test result table views
test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

# Add test data
mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/age', '30')
mem_add(ds_meta, 'http://ex.org/bob', 'http://ex.org/age', '25')
mem_add(ds_meta, 'http://ex.org/charlie', 'http://ex.org/age', '30')
mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/city', 'Boston')
mem_add(ds_meta, 'http://ex.org/bob', 'http://ex.org/city', 'NYC')

# Query and get results as list
results = mem_query(ds_meta, 'SELECT ?s ?age WHERE { ?s <http://ex.org/age> ?age }')

# Test res_head
head = res_head(results, n=2)
assert len(head) == 2
print(f"✓ res_head works: {len(head)} rows")

# Test res_where with exact value
filtered = res_where(results, 'age', value='30')
assert len(filtered) == 2
print(f"✓ res_where (exact) works: {len(filtered)} rows with age=30")

# Test res_where with pattern
filtered_pattern = res_where(results, 's', pattern='alice')
assert len(filtered_pattern) == 1
print(f"✓ res_where (pattern) works: {len(filtered_pattern)} rows matching 'alice'")

# Test res_group
groups = res_group(results, 'age')
assert len(groups) == 2  # Two distinct ages
assert groups[0][1] == 2  # Age '30' appears twice
print(f"✓ res_group works: {groups}")

# Test res_distinct
distinct_ages = res_distinct(results, 'age')
assert len(distinct_ages) == 2
assert '25' in distinct_ages and '30' in distinct_ages
print(f"✓ res_distinct works: {distinct_ages}")

# Test ResultTable wrapper
result_table = ResultTable(
    rows=results,
    columns=['s', 'age'],
    query='SELECT ?s ?age WHERE { ?s <http://ex.org/age> ?age }',
    total_rows=len(results)
)
assert len(result_table) == 3
print(f"✓ ResultTable works: {result_table}")

# Test result table views work with ResultTable
head_from_table = res_head(result_table, n=2)
assert len(head_from_table) == 2
print(f"✓ res_head works with ResultTable")
✓ res_head works: 2 rows
✓ res_where (exact) works: 2 rows with age=30
✓ res_where (pattern) works: 1 rows matching 'alice'
✓ res_group works: [('30', 2), ('25', 1)]
✓ res_distinct works: ['25', '30']
✓ ResultTable works: ResultTable(3 rows, columns=['s', 'age'])
✓ res_head works with ResultTable
DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
  ds_meta.prov.add((event_uri, RLM_PROV.timestamp, Literal(datetime.utcnow().isoformat() + 'Z', datatype=XSD.dateTime)))
# Test dataset creation
test_ns = {}
result = setup_dataset_context(test_ns, name='test_ds')
assert 'test_ds' in test_ns
assert 'test_ds_meta' in test_ns
assert len(test_ns['test_ds_meta'].session_id) == 8
print("✓ Dataset creation works")
print(result)
✓ Dataset creation works
Created dataset 'test_ds' with session_id=cb576fb5
# Test mem_add with provenance
test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

result = mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/knows', 'http://ex.org/bob', 
                 source='test', reason='Testing')
assert len(ds_meta.mem) == 1
assert len(ds_meta.prov) > 0
print("✓ mem_add works")
print(result)
✓ mem_add works
Added triple to mem: (http://ex.org/alice, http://ex.org/knows, http://ex.org/bob)
DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
  ds_meta.prov.add((event_uri, RLM_PROV.timestamp, Literal(datetime.utcnow().isoformat() + 'Z', datatype=XSD.dateTime)))
# Test mem_query
test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/age', '30')
mem_add(ds_meta, 'http://ex.org/bob', 'http://ex.org/age', '25')

results = mem_query(ds_meta, 'SELECT ?s ?age WHERE { ?s <http://ex.org/age> ?age }')
assert len(results) == 2
assert all('s' in r and 'age' in r for r in results)
print("✓ mem_query works")
print(results)
✓ mem_query works
[{'s': 'http://ex.org/alice', 'age': '30'}, {'s': 'http://ex.org/bob', 'age': '25'}]
DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
  ds_meta.prov.add((event_uri, RLM_PROV.timestamp, Literal(datetime.utcnow().isoformat() + 'Z', datatype=XSD.dateTime)))
# Test mem_retract
test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/age', '30')
assert len(ds_meta.mem) == 1

result = mem_retract(ds_meta, predicate='http://ex.org/age', source='test', reason='Correction')
assert len(ds_meta.mem) == 0
assert 'Removed 1 triples' in result
print("✓ mem_retract works")
print(result)
✓ mem_retract works
Removed 1 triples from mem
DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
  ds_meta.prov.add((event_uri, RLM_PROV.timestamp, Literal(datetime.utcnow().isoformat() + 'Z', datatype=XSD.dateTime)))
<ipython-input-1-e3f77e94d507>:36: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
  ds_meta.prov.add((event_uri, RLM_PROV.timestamp, Literal(datetime.utcnow().isoformat() + 'Z', datatype=XSD.dateTime)))
# Test mem_describe
test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/knows', 'http://ex.org/bob')
mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/age', '30')

desc = mem_describe(ds_meta, 'http://ex.org/alice')
assert 'as_subject' in desc
assert 'as_object' in desc
assert len(desc['as_subject']) == 2
print("✓ mem_describe works")
print(desc)
✓ mem_describe works
{'uri': 'http://ex.org/alice', 'as_subject': [('http://ex.org/alice', 'http://ex.org/knows', 'http://ex.org/bob'), ('http://ex.org/alice', 'http://ex.org/age', '30')], 'as_object': []}
DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
  ds_meta.prov.add((event_uri, RLM_PROV.timestamp, Literal(datetime.utcnow().isoformat() + 'Z', datatype=XSD.dateTime)))
# Test index invalidation
test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

# Access cached property
initial_version = ds_meta._version
_ = ds_meta.graph_stats

# Mutate
mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/age', '30')

# Check version incremented
assert ds_meta._version > initial_version
print("✓ Index invalidation works")
✓ Index invalidation works
DeprecationWarning: Dataset.contexts is deprecated, use Dataset.graphs instead.
  for ctx in self.dataset.contexts():
<ipython-input-1-3a8dafc08295>:33: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
  ds_meta.prov.add((event_uri, RLM_PROV.timestamp, Literal(datetime.utcnow().isoformat() + 'Z', datatype=XSD.dateTime)))
# Test work graph lifecycle
test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

# Create work graph
uri, graph = work_create(ds_meta, task_id='test_task')
assert 'work/test_task' in uri
assert len(ds_meta.work_graphs) == 1

# Add some triples to work graph
graph.add((URIRef('http://ex.org/alice'), URIRef('http://ex.org/temp'), Literal('value')))
assert len(graph) == 1

# Promote to mem
result = work_to_mem(ds_meta, 'test_task', reason='Test promotion')
assert len(ds_meta.mem) == 1
assert 'Promoted 1 triples' in result

# Cleanup
result = work_cleanup(ds_meta, task_id='test_task')
assert 'Removed 1 work' in result
assert len(ds_meta.work_graphs) == 0

print("✓ Work graph lifecycle works")
✓ Work graph lifecycle works
DeprecationWarning: Dataset.contexts is deprecated, use Dataset.graphs instead.
  return [str(ctx.identifier) for ctx in self.dataset.contexts()
<ipython-input-1-661dda18a793>:32: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
  ds_meta.prov.add((event_uri, RLM_PROV.timestamp, Literal(datetime.utcnow().isoformat() + 'Z', datatype=XSD.dateTime)))
# Test snapshot
import tempfile
import os

test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

# Add some data
mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/age', '30')

# Take snapshot
with tempfile.NamedTemporaryFile(mode='w', suffix='.trig', delete=False) as f:
    snapshot_path = f.name

result = snapshot_dataset(ds_meta, path=snapshot_path)
assert os.path.exists(snapshot_path)
assert 'Snapshot saved' in result

# Load snapshot (let it auto-detect the name 'ds' from graph URIs)
test_ns2 = {}
result = load_snapshot(snapshot_path, test_ns2, name='restored')
assert 'restored' in test_ns2
assert 'restored_meta' in test_ns2
# Should auto-detect original name 'ds' and use it for URIs
assert len(test_ns2['restored_meta'].mem) == 1

# Also test loading with same name
test_ns3 = {}
result = load_snapshot(snapshot_path, test_ns3, name='ds')
assert 'ds' in test_ns3
assert 'ds_meta' in test_ns3
assert len(test_ns3['ds_meta'].mem) == 1

# Cleanup
os.unlink(snapshot_path)

print("✓ Snapshot roundtrip works")
✓ Snapshot roundtrip works
# Test bounded view functions
test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

# Add some data
mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/age', '30')
work_create(ds_meta, 'task1')
work_create(ds_meta, 'task2')

# Test dataset_stats
stats = dataset_stats(ds_meta)
assert 'mem: 1 triples' in stats
assert 'work graphs: 2' in stats

# Test list_graphs
graphs = list_graphs(ds_meta)
assert len(graphs) >= 4  # mem, prov, work/task1, work/task2

# Test list_graphs with pattern
work_graphs = list_graphs(ds_meta, pattern='work/')
assert len(work_graphs) == 2

# Test graph_sample
mem_uri = f'urn:rlm:{ds_meta.name}:mem'
sample = graph_sample(ds_meta, mem_uri)
assert len(sample) == 1

print("✓ Bounded view functions work")
✓ Bounded view functions work
DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
  ds_meta.prov.add((event_uri, RLM_PROV.timestamp, Literal(datetime.utcnow().isoformat() + 'Z', datatype=XSD.dateTime)))
<ipython-input-1-338468221890>:54: DeprecationWarning: Dataset.contexts is deprecated, use Dataset.graphs instead.
  return [str(ctx.identifier) for ctx in self.dataset.contexts()
<ipython-input-1-338468221890>:44: DeprecationWarning: Dataset.contexts is deprecated, use Dataset.graphs instead.
  for ctx in self.dataset.contexts():
<ipython-input-1-0d2d1cee68f5>:13: DeprecationWarning: Dataset.contexts is deprecated, use Dataset.graphs instead.
  for ctx in ds_meta.dataset.contexts():

Tests

Usage Examples

# Basic usage in RLM context
ns = {}
setup_dataset_context(ns)

# RLM can now use: mem_add, mem_query, mem_describe, etc.
ns['mem_add']('http://ex.org/alice', 'http://ex.org/knows', 'http://ex.org/bob')
results = ns['mem_query']('SELECT ?s ?p ?o WHERE { ?s ?p ?o }')
print(results)
# Integration with ontology
from rlm.ontology import setup_ontology_context

ns = {}
setup_dataset_context(ns)
setup_ontology_context('ontology/prov.ttl', ns, name='prov')

# Mount ontology into dataset
ns['mount_ontology']('ontology/prov.ttl', 'prov')

# Now ontology is in dataset as onto/prov graph
graphs = ns['list_graphs']()
print(graphs)