Module clingox.reify
This module provides functions to reify programs.
This includes a Reifier
implementing clingo's Observer
interface that can be registered with a Control
object.
Additionally, the module provides a ReifiedTheory
class that provides a
similar interface as clingo's theory atoms but uses the reified symbols.
Examples
The following example uses the reify_program()
function to reify a program:
>>> from clingox.reify import reify_program
>>> prg = 'b :- a. {a}.'
>>> symbols = reify_program(prg)
>>> print([str(sym) for sym in symbols])
['tag(incremental)', 'atom_tuple(0)', 'atom_tuple(0,1)', 'literal_tuple(0)',
'rule(choice(0),normal(0))', 'atom_tuple(1)', 'atom_tuple(1,2)',
'literal_tuple(1)', 'literal_tuple(1,1)', 'rule(disjunction(1),normal(1))',
'output(a,1)', 'literal_tuple(2)', 'literal_tuple(2,2)', 'output(b,2)']
The last example shows how to use the ReifiedTheory
class.
>>> from clingox.reify import ReifiedTheory, reify_program
>>> prg = '#theory theory { t { }; &p/0 : t, any }. &p { t }.'
>>> thy = ReifiedTheory(reify_program(prg))
>>> print([str(atm) for atm in thy])
['&p { t: literal_tuple(0) }']
>>> from clingox.theory import evaluate
>>> evaluate(next(iter(thy)).term)
Function('p', [], True)
Expand source code
"""
This module provides functions to reify programs.
This includes a `Reifier` implementing clingo's `clingo.backend.Observer`
interface that can be registered with a `clingo.control.Control` object.
Additionally, the module provides a `ReifiedTheory` class that provides a
similar interface as clingo's theory atoms but uses the reified symbols.
Examples
--------
The following example uses the `reify_program` function to reify a program:
```python-repl
>>> from clingox.reify import reify_program
>>> prg = 'b :- a. {a}.'
>>> symbols = reify_program(prg)
>>> print([str(sym) for sym in symbols])
['tag(incremental)', 'atom_tuple(0)', 'atom_tuple(0,1)', 'literal_tuple(0)',
'rule(choice(0),normal(0))', 'atom_tuple(1)', 'atom_tuple(1,2)',
'literal_tuple(1)', 'literal_tuple(1,1)', 'rule(disjunction(1),normal(1))',
'output(a,1)', 'literal_tuple(2)', 'literal_tuple(2,2)', 'output(b,2)']
```
The last example shows how to use the `ReifiedTheory` class.
```python-repl
>>> from clingox.reify import ReifiedTheory, reify_program
>>> prg = '#theory theory { t { }; &p/0 : t, any }. &p { t }.'
>>> thy = ReifiedTheory(reify_program(prg))
>>> print([str(atm) for atm in thy])
['&p { t: literal_tuple(0) }']
>>> from clingox.theory import evaluate
>>> evaluate(next(iter(thy)).term)
Function('p', [], True)
```
"""
from dataclasses import dataclass, field
from typing import (
Callable,
Dict,
Generic,
Iterator,
List,
Optional,
Sequence,
Set,
Tuple,
TypeVar,
)
from clingo.backend import HeuristicType, Observer, TruthValue
from clingo.control import Control
from clingo.symbol import Function, Number, String, Symbol
from clingo.theory_atoms import TheoryTermType
from .theory import is_operator
__all__ = [
"Reifier",
"ReifiedTheory",
"ReifiedTheoryAtom",
"ReifiedTheoryElement",
"ReifiedTheoryTerm",
"ReifiedTheory",
"reify_program",
]
T = TypeVar("T") # pylint: disable=invalid-name
U = TypeVar("U", int, Tuple[int, int]) # pylint: disable=invalid-name
@dataclass
class _Vertex(Generic[T]):
"""
Vertex data to calculate SCCs of a graph.
"""
name: T
visited: int
index: int = 0
edges: List[int] = field(default_factory=list)
class _Graph(Generic[T]):
"""
Simple class to compute strongly connected components using Tarjan's
algorithm.
"""
_names: Dict[T, int]
_vertices: List[_Vertex]
_phase: bool
def __init__(self):
self._names = {}
self._vertices = []
self._phase = True
def _visited(self, key_u: int) -> bool:
return self._vertices[key_u].visited != int(not self._phase)
def _active(self, key_u: int) -> bool:
return self._vertices[key_u].visited != int(self._phase)
def _add_vertex(self, val_u: T) -> int:
n = len(self._vertices)
key_u = self._names.setdefault(val_u, n)
if n == key_u:
self._vertices.append(_Vertex(val_u, int(not self._phase)))
return key_u
def add_edge(self, val_u: T, val_v: T) -> None:
"""
Add an edge to the graph.
"""
key_u = self._add_vertex(val_u)
key_v = self._add_vertex(val_v)
self._vertices[key_u].edges.append(key_v)
def tarjan(self) -> List[List[T]]:
"""
Returns the strictly connected components of the graph.
"""
sccs: List[List[T]] = []
stack = []
trail = []
index = 1
def push(key_u: int):
nonlocal index
index += 1
vtx_u = self._vertices[key_u]
vtx_u.visited = index
vtx_u.index = 0
stack.append(key_u)
trail.append(key_u)
for key_u in range(len(self._vertices)):
if self._visited(key_u):
continue
index = 1
push(key_u)
while stack:
key_v = stack[-1]
vtx_v = self._vertices[key_v]
len_v = len(vtx_v.edges)
while vtx_v.index < len_v:
key_w = vtx_v.edges[vtx_v.index]
vtx_v.index += 1
if not self._visited(key_w):
push(key_w)
break
else:
stack.pop()
root = True
for key_w in vtx_v.edges:
vtx_w = self._vertices[key_w]
if self._active(key_w) and vtx_w.visited < vtx_v.visited:
root = False
vtx_v.visited = vtx_w.visited
if root:
key_last = None
sccs.append([])
while key_last != key_v:
key_last = trail[-1]
vtx_last = self._vertices[key_last]
sccs[-1].append(vtx_last.name)
vtx_last.visited = int(self._phase)
trail.pop()
if len(sccs[-1]) == 1:
sccs.pop()
self._phase = not self._phase
return sccs
@dataclass
class _StepData:
atom_tuples: Dict[Sequence[int], int] = field(default_factory=dict)
lit_tuples: Dict[Sequence[int], int] = field(default_factory=dict)
wlit_tuples: Dict[Sequence[Tuple[int, int]], int] = field(default_factory=dict)
theory_tuples: Dict[Sequence[int], int] = field(default_factory=dict)
theory_element_tuples: Dict[Sequence[int], int] = field(default_factory=dict)
graph: _Graph = field(default_factory=_Graph)
def _theory(i: Symbol, pos: int, lit: int) -> Sequence[Symbol]:
return [i, Number(pos), Number(lit)]
def _lit(i: Symbol, pos: int, lit: int) -> Sequence[Symbol]:
# pylint: disable=unused-argument
return [i, Number(lit)]
def _wlit(i: Symbol, pos: int, wlit: Tuple[int, int]) -> Sequence[Symbol]:
# pylint: disable=unused-argument
return [i, Number(wlit[0]), Number(wlit[1])]
class Reifier(Observer):
"""
An observer that will gather the symbols of the reification, in the same way as `clingo --output=reify`.
Parameters
----------
cb
A callback function that will be called with each symbol of the reification
calculate_sccs
Flag to calculate the SCCs
reify_steps
Flag to add a number as the last argument of all reification symbols for the corresponding step
"""
# pylint:disable=too-many-public-methods
_step: int
# Bug in mypy???
# _cb: Callable[[Symbol], None]
_calculate_sccs: bool
_reify_steps: bool
_step_data: _StepData
def __init__(
self,
cb: Callable[[Symbol], None],
calculate_sccs: bool = False,
reify_steps: bool = False,
):
self._step = 0
self._cb = cb
self._calculate_sccs = calculate_sccs
self._reify_steps = reify_steps
self._step_data = _StepData()
def calculate_sccs(self) -> None:
"""
Trigger computation of SCCs.
SCCs can only be computed if the Reifier has been initialized with
`calculate_sccs=True`, This function is called automatically if
`reify_steps=True` has been set when initializing the Reifier.
"""
for idx, scc in enumerate(self._step_data.graph.tarjan()):
for atm in scc:
self._output("scc", [Number(idx), Number(atm)])
def _add_edges(self, head: Sequence[int], body: Sequence[int]):
if self._calculate_sccs:
for u in head:
for v in body:
if v > 0:
self._step_data.graph.add_edge(u, v)
def _output(self, name: str, args: Sequence[Symbol]):
if self._reify_steps:
args = list(args) + [Number(self._step)]
self._cb(Function(name, args))
def _tuple(
self,
name: str,
snmap: Dict[Sequence[U], int],
elems: Sequence[U],
afun: Callable[[Symbol, int, U], Sequence[Symbol]],
ordered: bool = False,
) -> Symbol:
pruned: Sequence[U]
if ordered:
pruned = elems
ident = tuple(elems)
else:
seen: Set[U] = set()
pruned = []
for elem in elems:
if elem not in seen:
seen.add(elem)
pruned.append(elem)
ident = tuple(sorted(pruned))
n = len(snmap)
i = Number(snmap.setdefault(ident, n))
if n == i.number:
self._output(name, [i])
for idx, atm in enumerate(pruned):
self._output(name, afun(i, idx, atm))
return i
def _atom_tuple(self, atoms: Sequence[int]):
return self._tuple("atom_tuple", self._step_data.atom_tuples, atoms, _lit)
def _lit_tuple(self, lits: Sequence[int]):
return self._tuple("literal_tuple", self._step_data.lit_tuples, lits, _lit)
def _wlit_tuple(self, wlits: Sequence[Tuple[int, int]]):
return self._tuple(
"weighted_literal_tuple", self._step_data.wlit_tuples, wlits, _wlit
)
def init_program(self, incremental: bool) -> None:
if incremental:
self._cb(Function("tag", [Function("incremental")]))
def begin_step(self) -> None:
pass
def rule(self, choice: bool, head: Sequence[int], body: Sequence[int]) -> None:
hn = "choice" if choice else "disjunction"
hd = Function(hn, [self._atom_tuple(head)])
bd = Function("normal", [self._lit_tuple(body)])
self._output("rule", [hd, bd])
self._add_edges(head, body)
def weight_rule(
self,
choice: bool,
head: Sequence[int],
lower_bound: int,
body: Sequence[Tuple[int, int]],
) -> None:
hn = "choice" if choice else "disjunction"
hd = Function(hn, [self._atom_tuple(head)])
bd = Function("sum", [self._wlit_tuple(body), Number(lower_bound)])
self._output("rule", [hd, bd])
self._add_edges(head, [lit for lit, w in body])
def minimize(self, priority: int, literals: Sequence[Tuple[int, int]]) -> None:
self._output("minimize", [Number(priority), self._wlit_tuple(literals)])
def project(self, atoms: Sequence[int]) -> None:
for atom in atoms:
self._output("project", [Number(atom)])
def output_atom(self, symbol: Symbol, atom: int) -> None:
self._output("output", [symbol, self._lit_tuple([] if atom == 0 else [atom])])
def output_term(self, symbol: Symbol, condition: Sequence[int]) -> None:
self._output("output", [symbol, self._lit_tuple(condition)])
def external(self, atom: int, value: TruthValue) -> None:
value_name = str(value).replace("TruthValue.", "").lower().rstrip("_")
self._output("external", [Number(atom), Function(value_name)])
def assume(self, literals: Sequence[int]) -> None:
for lit in literals:
self._output("assume", [Number(lit)])
def heuristic(
self,
atom: int,
type_: HeuristicType,
bias: int,
priority: int,
condition: Sequence[int],
) -> None:
type_name = str(type_).replace("HeuristicType.", "").lower().rstrip("_")
condition_lit = self._lit_tuple(condition)
self._output(
"heuristic",
[
Number(atom),
Function(type_name),
Number(bias),
Number(priority),
condition_lit,
],
)
def acyc_edge(self, node_u: int, node_v: int, condition: Sequence[int]) -> None:
self._output(
"edge", [Number(node_u), Number(node_v), self._lit_tuple(condition)]
)
def theory_term_number(self, term_id: int, number: int) -> None:
self._output("theory_number", [Number(term_id), Number(number)])
def theory_term_string(self, term_id: int, name: str) -> None:
self._output("theory_string", [Number(term_id), String(name)])
def theory_term_compound(
self, term_id: int, name_id_or_type: int, arguments: Sequence[int]
) -> None:
names = {-1: "tuple", -2: "set", -3: "list"}
if name_id_or_type in names:
name = "theory_sequence"
value = Function(names[name_id_or_type])
else:
name = "theory_function"
value = Number(name_id_or_type)
tuple_id = self._tuple(
"theory_tuple", self._step_data.theory_tuples, arguments, _theory, True
)
self._output(name, [Number(term_id), value, tuple_id])
def theory_element(
self, element_id: int, terms: Sequence[int], condition: Sequence[int]
) -> None:
tuple_id = self._tuple(
"theory_tuple", self._step_data.theory_tuples, terms, _theory, True
)
condition_id = self._tuple(
"literal_tuple", self._step_data.lit_tuples, condition, _lit
)
self._output("theory_element", [Number(element_id), tuple_id, condition_id])
def theory_atom(
self, atom_id_or_zero: int, term_id: int, elements: Sequence[int]
) -> None:
tuple_e_id = self._tuple(
"theory_element_tuple",
self._step_data.theory_element_tuples,
elements,
_lit,
)
self._output(
"theory_atom", [Number(atom_id_or_zero), Number(term_id), tuple_e_id]
)
def theory_atom_with_guard(
self,
atom_id_or_zero: int,
term_id: int,
elements: Sequence[int],
operator_id: int,
right_hand_side_id: int,
) -> None:
tuple_id = self._tuple(
"theory_element_tuple",
self._step_data.theory_element_tuples,
elements,
_lit,
)
self._output(
"theory_atom",
[
Number(atom_id_or_zero),
Number(term_id),
tuple_id,
Number(operator_id),
Number(right_hand_side_id),
],
)
def end_step(self) -> None:
if self._reify_steps:
self.calculate_sccs()
self._step += 1
self._step_data = _StepData()
def _set(
matches: Sequence[Tuple[str, int]],
lst: List[Symbol],
sym,
append: bool = False,
default: Symbol = Number(0),
) -> bool:
for match in matches:
if not sym.match(*match):
continue
idx = len(lst) if append else sym.arguments[0].number
while len(lst) <= idx:
lst.append(default)
lst[idx] = sym
return True
return False
def _ensure(name: str, lst: List[List[int]], sym: Symbol, ordered=False) -> bool:
empty = sym.match(name, 1)
if empty or sym.match(name, 3 if ordered else 2):
idx = sym.arguments[0].number
while len(lst) <= idx:
lst.append([])
if not empty:
if ordered:
tup = lst[idx]
jdx = sym.arguments[1].number
while len(tup) <= jdx:
tup.append(0)
tup[jdx] = sym.arguments[2].number
else:
lst[idx].append(sym.arguments[1].number)
return True
return False
class ReifiedTheory:
"""
Class indexing the symbols related to a theory.
The `ReifiedTheoryTerm`, `ReifiedTheoryElement`, and `ReifiedTheoryElement`
classes provide views on this data that behave as the corresponding classes
in clingo's `clingo.theory_atoms` module.
"""
terms: List[Symbol]
elements: List[Symbol]
atoms: List[Symbol]
term_tuples: List[List[int]]
element_tuples: List[List[int]]
def __init__(self, symbols: Sequence[Symbol]):
self.terms = []
self.elements = []
self.atoms = []
self.term_tuples = []
self.element_tuples = []
for sym in symbols:
_ = (
_set((("theory_atom", 3), ("theory_atom", 5)), self.atoms, sym, True)
or _set((("theory_element", 3),), self.elements, sym)
or _set(
(
("theory_sequence", 3),
("theory_string", 2),
("theory_number", 2),
("theory_function", 3),
),
self.terms,
sym,
)
or _ensure("theory_tuple", self.term_tuples, sym, True)
or _ensure("theory_element_tuple", self.element_tuples, sym)
)
def __iter__(self) -> Iterator["ReifiedTheoryAtom"]:
for idx in range(len(self.atoms)):
yield ReifiedTheoryAtom(idx, self)
class ReifiedTheoryTerm:
"""
Class to represent theory terms.
ReifiedTheory terms have a readable string representation, implement Python's rich
comparison operators, and can be used as dictionary keys.
"""
_idx: int
_theory: ReifiedTheory
def __init__(self, idx: int, theory: ReifiedTheory):
self._idx = idx
self._theory = theory
assert self.index < len(theory.terms)
@property
def index(self) -> int:
"""
The index of the corresponding reified fact.
"""
return self._idx
@property
def _args(self) -> Sequence[Symbol]:
return self._theory.terms[self._idx].arguments
@property
def arguments(self) -> List["ReifiedTheoryTerm"]:
"""
The arguments of the term (for functions, tuples, list, and sets).
"""
assert self.type in (
TheoryTermType.List,
TheoryTermType.Set,
TheoryTermType.Tuple,
TheoryTermType.Function,
)
term_ids = self._theory.term_tuples[self._args[2].number]
return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids]
@property
def name(self) -> str:
"""
The name of the term (for symbols and functions).
"""
assert self.type in (TheoryTermType.Symbol, TheoryTermType.Function)
if self.type == TheoryTermType.Function:
return self._theory.terms[self._args[1].number].arguments[1].string
return self._args[1].string
@property
def number(self) -> int:
"""
The numeric representation of the term (for numbers).
"""
assert self.type == TheoryTermType.Number
return self._args[1].number
@property
def type(self) -> TheoryTermType:
"""
The type of the theory term.
"""
name = self._theory.terms[self._idx].name
if name == "theory_number":
return TheoryTermType.Number
if name == "theory_string":
return TheoryTermType.Symbol
if name == "theory_function":
return TheoryTermType.Function
assert name == "theory_sequence"
type_ = self._args[1].name
if type_ == "tuple":
return TheoryTermType.Tuple
if type_ == "set":
return TheoryTermType.Set
assert type_ == "list"
return TheoryTermType.List
def __hash__(self):
return self._idx
def __eq__(self, other):
return self._idx == other._idx
def __lt__(self, other):
return self._idx < other._idx
def __str__(self):
type_ = self.type
if type_ == TheoryTermType.Number:
return f"{self.number}"
if type_ == TheoryTermType.Symbol:
return f"{self.name}"
if type_ == TheoryTermType.Function:
args = self.arguments
name = self.name
if len(args) == 1 and is_operator(name):
return f"{name}({args[0]})"
if len(args) == 2 and is_operator(name):
return f"({args[0]}){name}({args[1]})"
return f'{name}({",".join(str(arg) for arg in args)})'
if type_ == TheoryTermType.Tuple:
lhs, rhs = "(", ")"
elif type_ == TheoryTermType.List:
lhs, rhs = "[", "]"
else:
lhs, rhs = "{", "}"
return f'{lhs}{",".join(str(arg) for arg in self.arguments)}{rhs}'
class ReifiedTheoryElement:
"""
Class to represent theory elements.
ReifiedTheory elements have a readable string representation, implement Python's
rich comparison operators, and can be used as dictionary keys.
"""
_idx: int
_theory: ReifiedTheory
def __init__(self, idx: int, theory: ReifiedTheory):
self._idx = idx
self._theory = theory
assert self.index < len(theory.elements)
@property
def index(self) -> int:
"""
The index of the corresponding reified fact.
"""
return self._idx
@property
def _args(self) -> Sequence[Symbol]:
return self._theory.elements[self._idx].arguments
@property
def condition_id(self) -> int:
"""
The id of the literal tuple of the condition.
"""
return self._args[2].number
@property
def terms(self) -> List[ReifiedTheoryTerm]:
"""
The tuple of the element.
"""
term_ids = self._theory.term_tuples[self._args[1].number]
return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids]
def __hash__(self):
return self._idx
def __eq__(self, other):
return self._idx == other._idx
def __lt__(self, other):
return self._idx < other._idx
def __str__(self):
return f'{",".join(str(term) for term in self.terms)}: literal_tuple({self.condition_id})'
class ReifiedTheoryAtom:
"""
Class to represent theory atoms.
Theory atoms have a readable string representation, implement Python's rich
comparison operators, and can be used as dictionary keys.
"""
_idx: int
_theory: ReifiedTheory
def __init__(self, idx: int, theory: ReifiedTheory):
self._idx = idx
self._theory = theory
assert self.index < len(theory.atoms)
@property
def index(self) -> int:
"""
The index of the corresponding reified fact.
"""
return self._idx
@property
def _args(self) -> Sequence[Symbol]:
return self._theory.atoms[self._idx].arguments
@property
def elements(self) -> List[ReifiedTheoryElement]:
"""
The elements of the atom.
"""
tuple_id = self._args[2].number
return [
ReifiedTheoryElement(elem_id, self._theory)
for elem_id in self._theory.element_tuples[tuple_id]
]
@property
def guard(self) -> Optional[Tuple[str, ReifiedTheoryTerm]]:
"""
The guard of the atom or None if the atom has no guard.
"""
args = self._args
if len(args) <= 3:
return None
op = self._theory.terms[args[3].number].arguments[1].string
return (op, ReifiedTheoryTerm(args[4].number, self._theory))
@property
def literal(self) -> int:
"""
The reified literal associated with the atom.
"""
return self._args[0].number
@property
def term(self) -> ReifiedTheoryTerm:
"""
The term of the atom.
"""
return ReifiedTheoryTerm(self._args[1].number, self._theory)
def __hash__(self):
return self._idx
def __eq__(self, other):
return self._idx == other._idx
def __lt__(self, other):
return self._idx < other._idx
def __str__(self):
name = f"&{self.term}"
elems = self.elements
if elems:
estr = f' {{ {"; ".join(str(elem) for elem in elems)} }}'
else:
estr = ""
guard = self.guard
if guard:
gstr = f" {guard[0]} {guard[1]}"
else:
gstr = ""
return f"{name}{estr}{gstr}"
def reify_program(
prg: str, calculate_sccs: bool = False, reify_steps: bool = False
) -> List[Symbol]:
"""
Reify the given program and return the reified symbols.
Parameters
----------
prg
The program to reify in form of a string.
calculate_sccs
Whether to calculate SCCs of the reified program.
reify_steps
Whether to add a step number to the reified facts.
Returns
-------
A list of symbols containing the reified facts.
"""
ret: List[Symbol] = []
ctl = Control()
reifier = Reifier(ret.append, calculate_sccs, reify_steps)
ctl.register_observer(reifier)
ctl.add("base", [], prg)
ctl.ground([("base", [])])
if calculate_sccs and not reify_steps:
reifier.calculate_sccs()
return ret
Functions
def reify_program(prg: str, calculate_sccs: bool = False, reify_steps: bool = False) ‑> List[Symbol]
-
Reify the given program and return the reified symbols.
Parameters
prg
- The program to reify in form of a string.
calculate_sccs
- Whether to calculate SCCs of the reified program.
reify_steps
- Whether to add a step number to the reified facts.
Returns
A list of symbols containing the reified facts.
Expand source code
def reify_program( prg: str, calculate_sccs: bool = False, reify_steps: bool = False ) -> List[Symbol]: """ Reify the given program and return the reified symbols. Parameters ---------- prg The program to reify in form of a string. calculate_sccs Whether to calculate SCCs of the reified program. reify_steps Whether to add a step number to the reified facts. Returns ------- A list of symbols containing the reified facts. """ ret: List[Symbol] = [] ctl = Control() reifier = Reifier(ret.append, calculate_sccs, reify_steps) ctl.register_observer(reifier) ctl.add("base", [], prg) ctl.ground([("base", [])]) if calculate_sccs and not reify_steps: reifier.calculate_sccs() return ret
Classes
class ReifiedTheory (symbols: Sequence[Symbol])
-
Class indexing the symbols related to a theory.
The
ReifiedTheoryTerm
,ReifiedTheoryElement
, andReifiedTheoryElement
classes provide views on this data that behave as the corresponding classes in clingo'sclingo.theory_atoms
module.Expand source code
class ReifiedTheory: """ Class indexing the symbols related to a theory. The `ReifiedTheoryTerm`, `ReifiedTheoryElement`, and `ReifiedTheoryElement` classes provide views on this data that behave as the corresponding classes in clingo's `clingo.theory_atoms` module. """ terms: List[Symbol] elements: List[Symbol] atoms: List[Symbol] term_tuples: List[List[int]] element_tuples: List[List[int]] def __init__(self, symbols: Sequence[Symbol]): self.terms = [] self.elements = [] self.atoms = [] self.term_tuples = [] self.element_tuples = [] for sym in symbols: _ = ( _set((("theory_atom", 3), ("theory_atom", 5)), self.atoms, sym, True) or _set((("theory_element", 3),), self.elements, sym) or _set( ( ("theory_sequence", 3), ("theory_string", 2), ("theory_number", 2), ("theory_function", 3), ), self.terms, sym, ) or _ensure("theory_tuple", self.term_tuples, sym, True) or _ensure("theory_element_tuple", self.element_tuples, sym) ) def __iter__(self) -> Iterator["ReifiedTheoryAtom"]: for idx in range(len(self.atoms)): yield ReifiedTheoryAtom(idx, self)
Class variables
var atoms : List[Symbol]
var element_tuples : List[List[int]]
var elements : List[Symbol]
var term_tuples : List[List[int]]
var terms : List[Symbol]
class ReifiedTheoryAtom (idx: int, theory: ReifiedTheory)
-
Class to represent theory atoms.
Theory atoms have a readable string representation, implement Python's rich comparison operators, and can be used as dictionary keys.
Expand source code
class ReifiedTheoryAtom: """ Class to represent theory atoms. Theory atoms have a readable string representation, implement Python's rich comparison operators, and can be used as dictionary keys. """ _idx: int _theory: ReifiedTheory def __init__(self, idx: int, theory: ReifiedTheory): self._idx = idx self._theory = theory assert self.index < len(theory.atoms) @property def index(self) -> int: """ The index of the corresponding reified fact. """ return self._idx @property def _args(self) -> Sequence[Symbol]: return self._theory.atoms[self._idx].arguments @property def elements(self) -> List[ReifiedTheoryElement]: """ The elements of the atom. """ tuple_id = self._args[2].number return [ ReifiedTheoryElement(elem_id, self._theory) for elem_id in self._theory.element_tuples[tuple_id] ] @property def guard(self) -> Optional[Tuple[str, ReifiedTheoryTerm]]: """ The guard of the atom or None if the atom has no guard. """ args = self._args if len(args) <= 3: return None op = self._theory.terms[args[3].number].arguments[1].string return (op, ReifiedTheoryTerm(args[4].number, self._theory)) @property def literal(self) -> int: """ The reified literal associated with the atom. """ return self._args[0].number @property def term(self) -> ReifiedTheoryTerm: """ The term of the atom. """ return ReifiedTheoryTerm(self._args[1].number, self._theory) def __hash__(self): return self._idx def __eq__(self, other): return self._idx == other._idx def __lt__(self, other): return self._idx < other._idx def __str__(self): name = f"&{self.term}" elems = self.elements if elems: estr = f' {{ {"; ".join(str(elem) for elem in elems)} }}' else: estr = "" guard = self.guard if guard: gstr = f" {guard[0]} {guard[1]}" else: gstr = "" return f"{name}{estr}{gstr}"
Instance variables
var elements : List[ReifiedTheoryElement]
-
The elements of the atom.
Expand source code
@property def elements(self) -> List[ReifiedTheoryElement]: """ The elements of the atom. """ tuple_id = self._args[2].number return [ ReifiedTheoryElement(elem_id, self._theory) for elem_id in self._theory.element_tuples[tuple_id] ]
var guard : Optional[Tuple[str, ReifiedTheoryTerm]]
-
The guard of the atom or None if the atom has no guard.
Expand source code
@property def guard(self) -> Optional[Tuple[str, ReifiedTheoryTerm]]: """ The guard of the atom or None if the atom has no guard. """ args = self._args if len(args) <= 3: return None op = self._theory.terms[args[3].number].arguments[1].string return (op, ReifiedTheoryTerm(args[4].number, self._theory))
var index : int
-
The index of the corresponding reified fact.
Expand source code
@property def index(self) -> int: """ The index of the corresponding reified fact. """ return self._idx
var literal : int
-
The reified literal associated with the atom.
Expand source code
@property def literal(self) -> int: """ The reified literal associated with the atom. """ return self._args[0].number
var term : ReifiedTheoryTerm
-
The term of the atom.
Expand source code
@property def term(self) -> ReifiedTheoryTerm: """ The term of the atom. """ return ReifiedTheoryTerm(self._args[1].number, self._theory)
class ReifiedTheoryElement (idx: int, theory: ReifiedTheory)
-
Class to represent theory elements.
ReifiedTheory elements have a readable string representation, implement Python's rich comparison operators, and can be used as dictionary keys.
Expand source code
class ReifiedTheoryElement: """ Class to represent theory elements. ReifiedTheory elements have a readable string representation, implement Python's rich comparison operators, and can be used as dictionary keys. """ _idx: int _theory: ReifiedTheory def __init__(self, idx: int, theory: ReifiedTheory): self._idx = idx self._theory = theory assert self.index < len(theory.elements) @property def index(self) -> int: """ The index of the corresponding reified fact. """ return self._idx @property def _args(self) -> Sequence[Symbol]: return self._theory.elements[self._idx].arguments @property def condition_id(self) -> int: """ The id of the literal tuple of the condition. """ return self._args[2].number @property def terms(self) -> List[ReifiedTheoryTerm]: """ The tuple of the element. """ term_ids = self._theory.term_tuples[self._args[1].number] return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids] def __hash__(self): return self._idx def __eq__(self, other): return self._idx == other._idx def __lt__(self, other): return self._idx < other._idx def __str__(self): return f'{",".join(str(term) for term in self.terms)}: literal_tuple({self.condition_id})'
Instance variables
var condition_id : int
-
The id of the literal tuple of the condition.
Expand source code
@property def condition_id(self) -> int: """ The id of the literal tuple of the condition. """ return self._args[2].number
var index : int
-
The index of the corresponding reified fact.
Expand source code
@property def index(self) -> int: """ The index of the corresponding reified fact. """ return self._idx
var terms : List[ReifiedTheoryTerm]
-
The tuple of the element.
Expand source code
@property def terms(self) -> List[ReifiedTheoryTerm]: """ The tuple of the element. """ term_ids = self._theory.term_tuples[self._args[1].number] return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids]
class ReifiedTheoryTerm (idx: int, theory: ReifiedTheory)
-
Class to represent theory terms.
ReifiedTheory terms have a readable string representation, implement Python's rich comparison operators, and can be used as dictionary keys.
Expand source code
class ReifiedTheoryTerm: """ Class to represent theory terms. ReifiedTheory terms have a readable string representation, implement Python's rich comparison operators, and can be used as dictionary keys. """ _idx: int _theory: ReifiedTheory def __init__(self, idx: int, theory: ReifiedTheory): self._idx = idx self._theory = theory assert self.index < len(theory.terms) @property def index(self) -> int: """ The index of the corresponding reified fact. """ return self._idx @property def _args(self) -> Sequence[Symbol]: return self._theory.terms[self._idx].arguments @property def arguments(self) -> List["ReifiedTheoryTerm"]: """ The arguments of the term (for functions, tuples, list, and sets). """ assert self.type in ( TheoryTermType.List, TheoryTermType.Set, TheoryTermType.Tuple, TheoryTermType.Function, ) term_ids = self._theory.term_tuples[self._args[2].number] return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids] @property def name(self) -> str: """ The name of the term (for symbols and functions). """ assert self.type in (TheoryTermType.Symbol, TheoryTermType.Function) if self.type == TheoryTermType.Function: return self._theory.terms[self._args[1].number].arguments[1].string return self._args[1].string @property def number(self) -> int: """ The numeric representation of the term (for numbers). """ assert self.type == TheoryTermType.Number return self._args[1].number @property def type(self) -> TheoryTermType: """ The type of the theory term. """ name = self._theory.terms[self._idx].name if name == "theory_number": return TheoryTermType.Number if name == "theory_string": return TheoryTermType.Symbol if name == "theory_function": return TheoryTermType.Function assert name == "theory_sequence" type_ = self._args[1].name if type_ == "tuple": return TheoryTermType.Tuple if type_ == "set": return TheoryTermType.Set assert type_ == "list" return TheoryTermType.List def __hash__(self): return self._idx def __eq__(self, other): return self._idx == other._idx def __lt__(self, other): return self._idx < other._idx def __str__(self): type_ = self.type if type_ == TheoryTermType.Number: return f"{self.number}" if type_ == TheoryTermType.Symbol: return f"{self.name}" if type_ == TheoryTermType.Function: args = self.arguments name = self.name if len(args) == 1 and is_operator(name): return f"{name}({args[0]})" if len(args) == 2 and is_operator(name): return f"({args[0]}){name}({args[1]})" return f'{name}({",".join(str(arg) for arg in args)})' if type_ == TheoryTermType.Tuple: lhs, rhs = "(", ")" elif type_ == TheoryTermType.List: lhs, rhs = "[", "]" else: lhs, rhs = "{", "}" return f'{lhs}{",".join(str(arg) for arg in self.arguments)}{rhs}'
Instance variables
var arguments : List[ReifiedTheoryTerm]
-
The arguments of the term (for functions, tuples, list, and sets).
Expand source code
@property def arguments(self) -> List["ReifiedTheoryTerm"]: """ The arguments of the term (for functions, tuples, list, and sets). """ assert self.type in ( TheoryTermType.List, TheoryTermType.Set, TheoryTermType.Tuple, TheoryTermType.Function, ) term_ids = self._theory.term_tuples[self._args[2].number] return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids]
var index : int
-
The index of the corresponding reified fact.
Expand source code
@property def index(self) -> int: """ The index of the corresponding reified fact. """ return self._idx
var name : str
-
The name of the term (for symbols and functions).
Expand source code
@property def name(self) -> str: """ The name of the term (for symbols and functions). """ assert self.type in (TheoryTermType.Symbol, TheoryTermType.Function) if self.type == TheoryTermType.Function: return self._theory.terms[self._args[1].number].arguments[1].string return self._args[1].string
var number : int
-
The numeric representation of the term (for numbers).
Expand source code
@property def number(self) -> int: """ The numeric representation of the term (for numbers). """ assert self.type == TheoryTermType.Number return self._args[1].number
var type : TheoryTermType
-
The type of the theory term.
Expand source code
@property def type(self) -> TheoryTermType: """ The type of the theory term. """ name = self._theory.terms[self._idx].name if name == "theory_number": return TheoryTermType.Number if name == "theory_string": return TheoryTermType.Symbol if name == "theory_function": return TheoryTermType.Function assert name == "theory_sequence" type_ = self._args[1].name if type_ == "tuple": return TheoryTermType.Tuple if type_ == "set": return TheoryTermType.Set assert type_ == "list" return TheoryTermType.List
class Reifier (cb: Callable[[Symbol], None], calculate_sccs: bool = False, reify_steps: bool = False)
-
An observer that will gather the symbols of the reification, in the same way as
clingo --output=reify
.Parameters
cb
- A callback function that will be called with each symbol of the reification
calculate_sccs
- Flag to calculate the SCCs
reify_steps
- Flag to add a number as the last argument of all reification symbols for the corresponding step
Expand source code
class Reifier(Observer): """ An observer that will gather the symbols of the reification, in the same way as `clingo --output=reify`. Parameters ---------- cb A callback function that will be called with each symbol of the reification calculate_sccs Flag to calculate the SCCs reify_steps Flag to add a number as the last argument of all reification symbols for the corresponding step """ # pylint:disable=too-many-public-methods _step: int # Bug in mypy??? # _cb: Callable[[Symbol], None] _calculate_sccs: bool _reify_steps: bool _step_data: _StepData def __init__( self, cb: Callable[[Symbol], None], calculate_sccs: bool = False, reify_steps: bool = False, ): self._step = 0 self._cb = cb self._calculate_sccs = calculate_sccs self._reify_steps = reify_steps self._step_data = _StepData() def calculate_sccs(self) -> None: """ Trigger computation of SCCs. SCCs can only be computed if the Reifier has been initialized with `calculate_sccs=True`, This function is called automatically if `reify_steps=True` has been set when initializing the Reifier. """ for idx, scc in enumerate(self._step_data.graph.tarjan()): for atm in scc: self._output("scc", [Number(idx), Number(atm)]) def _add_edges(self, head: Sequence[int], body: Sequence[int]): if self._calculate_sccs: for u in head: for v in body: if v > 0: self._step_data.graph.add_edge(u, v) def _output(self, name: str, args: Sequence[Symbol]): if self._reify_steps: args = list(args) + [Number(self._step)] self._cb(Function(name, args)) def _tuple( self, name: str, snmap: Dict[Sequence[U], int], elems: Sequence[U], afun: Callable[[Symbol, int, U], Sequence[Symbol]], ordered: bool = False, ) -> Symbol: pruned: Sequence[U] if ordered: pruned = elems ident = tuple(elems) else: seen: Set[U] = set() pruned = [] for elem in elems: if elem not in seen: seen.add(elem) pruned.append(elem) ident = tuple(sorted(pruned)) n = len(snmap) i = Number(snmap.setdefault(ident, n)) if n == i.number: self._output(name, [i]) for idx, atm in enumerate(pruned): self._output(name, afun(i, idx, atm)) return i def _atom_tuple(self, atoms: Sequence[int]): return self._tuple("atom_tuple", self._step_data.atom_tuples, atoms, _lit) def _lit_tuple(self, lits: Sequence[int]): return self._tuple("literal_tuple", self._step_data.lit_tuples, lits, _lit) def _wlit_tuple(self, wlits: Sequence[Tuple[int, int]]): return self._tuple( "weighted_literal_tuple", self._step_data.wlit_tuples, wlits, _wlit ) def init_program(self, incremental: bool) -> None: if incremental: self._cb(Function("tag", [Function("incremental")])) def begin_step(self) -> None: pass def rule(self, choice: bool, head: Sequence[int], body: Sequence[int]) -> None: hn = "choice" if choice else "disjunction" hd = Function(hn, [self._atom_tuple(head)]) bd = Function("normal", [self._lit_tuple(body)]) self._output("rule", [hd, bd]) self._add_edges(head, body) def weight_rule( self, choice: bool, head: Sequence[int], lower_bound: int, body: Sequence[Tuple[int, int]], ) -> None: hn = "choice" if choice else "disjunction" hd = Function(hn, [self._atom_tuple(head)]) bd = Function("sum", [self._wlit_tuple(body), Number(lower_bound)]) self._output("rule", [hd, bd]) self._add_edges(head, [lit for lit, w in body]) def minimize(self, priority: int, literals: Sequence[Tuple[int, int]]) -> None: self._output("minimize", [Number(priority), self._wlit_tuple(literals)]) def project(self, atoms: Sequence[int]) -> None: for atom in atoms: self._output("project", [Number(atom)]) def output_atom(self, symbol: Symbol, atom: int) -> None: self._output("output", [symbol, self._lit_tuple([] if atom == 0 else [atom])]) def output_term(self, symbol: Symbol, condition: Sequence[int]) -> None: self._output("output", [symbol, self._lit_tuple(condition)]) def external(self, atom: int, value: TruthValue) -> None: value_name = str(value).replace("TruthValue.", "").lower().rstrip("_") self._output("external", [Number(atom), Function(value_name)]) def assume(self, literals: Sequence[int]) -> None: for lit in literals: self._output("assume", [Number(lit)]) def heuristic( self, atom: int, type_: HeuristicType, bias: int, priority: int, condition: Sequence[int], ) -> None: type_name = str(type_).replace("HeuristicType.", "").lower().rstrip("_") condition_lit = self._lit_tuple(condition) self._output( "heuristic", [ Number(atom), Function(type_name), Number(bias), Number(priority), condition_lit, ], ) def acyc_edge(self, node_u: int, node_v: int, condition: Sequence[int]) -> None: self._output( "edge", [Number(node_u), Number(node_v), self._lit_tuple(condition)] ) def theory_term_number(self, term_id: int, number: int) -> None: self._output("theory_number", [Number(term_id), Number(number)]) def theory_term_string(self, term_id: int, name: str) -> None: self._output("theory_string", [Number(term_id), String(name)]) def theory_term_compound( self, term_id: int, name_id_or_type: int, arguments: Sequence[int] ) -> None: names = {-1: "tuple", -2: "set", -3: "list"} if name_id_or_type in names: name = "theory_sequence" value = Function(names[name_id_or_type]) else: name = "theory_function" value = Number(name_id_or_type) tuple_id = self._tuple( "theory_tuple", self._step_data.theory_tuples, arguments, _theory, True ) self._output(name, [Number(term_id), value, tuple_id]) def theory_element( self, element_id: int, terms: Sequence[int], condition: Sequence[int] ) -> None: tuple_id = self._tuple( "theory_tuple", self._step_data.theory_tuples, terms, _theory, True ) condition_id = self._tuple( "literal_tuple", self._step_data.lit_tuples, condition, _lit ) self._output("theory_element", [Number(element_id), tuple_id, condition_id]) def theory_atom( self, atom_id_or_zero: int, term_id: int, elements: Sequence[int] ) -> None: tuple_e_id = self._tuple( "theory_element_tuple", self._step_data.theory_element_tuples, elements, _lit, ) self._output( "theory_atom", [Number(atom_id_or_zero), Number(term_id), tuple_e_id] ) def theory_atom_with_guard( self, atom_id_or_zero: int, term_id: int, elements: Sequence[int], operator_id: int, right_hand_side_id: int, ) -> None: tuple_id = self._tuple( "theory_element_tuple", self._step_data.theory_element_tuples, elements, _lit, ) self._output( "theory_atom", [ Number(atom_id_or_zero), Number(term_id), tuple_id, Number(operator_id), Number(right_hand_side_id), ], ) def end_step(self) -> None: if self._reify_steps: self.calculate_sccs() self._step += 1 self._step_data = _StepData()
Ancestors
Methods
def calculate_sccs(self) ‑> None
-
Trigger computation of SCCs.
SCCs can only be computed if the Reifier has been initialized with
calculate_sccs=True
, This function is called automatically ifreify_steps=True
has been set when initializing the Reifier.Expand source code
def calculate_sccs(self) -> None: """ Trigger computation of SCCs. SCCs can only be computed if the Reifier has been initialized with `calculate_sccs=True`, This function is called automatically if `reify_steps=True` has been set when initializing the Reifier. """ for idx, scc in enumerate(self._step_data.graph.tarjan()): for atm in scc: self._output("scc", [Number(idx), Number(atm)])
Inherited members