Module clingox.reify

This module provides functions to reify programs.

This includes a Reifier implementing clingo's Observer interface that can be registered with a Control object.

Additionally, the module provides a ReifiedTheory class that provides a similar interface as clingo's theory atoms but uses the reified symbols.

Examples

The following example uses the reify_program() function to reify a program:

>>> from clingox.reify import reify_program
>>> prg = 'b :- a. {a}.'
>>> symbols = reify_program(prg)
>>> print([str(sym) for sym in symbols])
['tag(incremental)', 'atom_tuple(0)', 'atom_tuple(0,1)', 'literal_tuple(0)',
'rule(choice(0),normal(0))', 'atom_tuple(1)', 'atom_tuple(1,2)',
'literal_tuple(1)', 'literal_tuple(1,1)', 'rule(disjunction(1),normal(1))',
'output(a,1)', 'literal_tuple(2)', 'literal_tuple(2,2)', 'output(b,2)']

The last example shows how to use the ReifiedTheory class.

>>> from clingox.reify import ReifiedTheory, reify_program
>>> prg = '#theory theory { t { }; &p/0 : t, any }. &p { t }.'
>>> thy = ReifiedTheory(reify_program(prg))
>>> print([str(atm) for atm in thy])
['&p { t: literal_tuple(0) }']
>>> from clingox.theory import evaluate
>>> evaluate(next(iter(thy)).term)
Function('p', [], True)
Expand source code
"""
This module provides functions to reify programs.

This includes a `Reifier` implementing clingo's `clingo.backend.Observer`
interface that can be registered with a `clingo.control.Control` object.

Additionally, the module provides a `ReifiedTheory` class that provides a
similar interface as clingo's theory atoms but uses the reified symbols.

Examples
--------

The following example uses the `reify_program` function to reify a program:

```python-repl
>>> from clingox.reify import reify_program
>>> prg = 'b :- a. {a}.'
>>> symbols = reify_program(prg)
>>> print([str(sym) for sym in symbols])
['tag(incremental)', 'atom_tuple(0)', 'atom_tuple(0,1)', 'literal_tuple(0)',
'rule(choice(0),normal(0))', 'atom_tuple(1)', 'atom_tuple(1,2)',
'literal_tuple(1)', 'literal_tuple(1,1)', 'rule(disjunction(1),normal(1))',
'output(a,1)', 'literal_tuple(2)', 'literal_tuple(2,2)', 'output(b,2)']
```

The last example shows how to use the `ReifiedTheory` class.

```python-repl
>>> from clingox.reify import ReifiedTheory, reify_program
>>> prg = '#theory theory { t { }; &p/0 : t, any }. &p { t }.'
>>> thy = ReifiedTheory(reify_program(prg))
>>> print([str(atm) for atm in thy])
['&p { t: literal_tuple(0) }']
>>> from clingox.theory import evaluate
>>> evaluate(next(iter(thy)).term)
Function('p', [], True)
```
"""

from dataclasses import dataclass, field
from typing import (
    Callable,
    Dict,
    Generic,
    Iterator,
    List,
    Optional,
    Sequence,
    Set,
    Tuple,
    TypeVar,
)

from clingo.backend import HeuristicType, Observer, TruthValue
from clingo.control import Control
from clingo.symbol import Function, Number, String, Symbol
from clingo.theory_atoms import TheoryTermType

from .theory import is_operator

__all__ = [
    "Reifier",
    "ReifiedTheory",
    "ReifiedTheoryAtom",
    "ReifiedTheoryElement",
    "ReifiedTheoryTerm",
    "ReifiedTheory",
    "reify_program",
]

T = TypeVar("T")  # pylint: disable=invalid-name
U = TypeVar("U", int, Tuple[int, int])  # pylint: disable=invalid-name


@dataclass
class _Vertex(Generic[T]):
    """
    Vertex data to calculate SCCs of a graph.
    """

    name: T
    visited: int
    index: int = 0
    edges: List[int] = field(default_factory=list)


class _Graph(Generic[T]):
    """
    Simple class to compute strongly connected components using Tarjan's
    algorithm.
    """

    _names: Dict[T, int]
    _vertices: List[_Vertex]
    _phase: bool

    def __init__(self):
        self._names = {}
        self._vertices = []
        self._phase = True

    def _visited(self, key_u: int) -> bool:
        return self._vertices[key_u].visited != int(not self._phase)

    def _active(self, key_u: int) -> bool:
        return self._vertices[key_u].visited != int(self._phase)

    def _add_vertex(self, val_u: T) -> int:
        n = len(self._vertices)
        key_u = self._names.setdefault(val_u, n)
        if n == key_u:
            self._vertices.append(_Vertex(val_u, int(not self._phase)))
        return key_u

    def add_edge(self, val_u: T, val_v: T) -> None:
        """
        Add an edge to the graph.
        """
        key_u = self._add_vertex(val_u)
        key_v = self._add_vertex(val_v)
        self._vertices[key_u].edges.append(key_v)

    def tarjan(self) -> List[List[T]]:
        """
        Returns the strictly connected components of the graph.
        """
        sccs: List[List[T]] = []
        stack = []
        trail = []
        index = 1

        def push(key_u: int):
            nonlocal index
            index += 1
            vtx_u = self._vertices[key_u]
            vtx_u.visited = index
            vtx_u.index = 0
            stack.append(key_u)
            trail.append(key_u)

        for key_u in range(len(self._vertices)):
            if self._visited(key_u):
                continue
            index = 1
            push(key_u)
            while stack:
                key_v = stack[-1]
                vtx_v = self._vertices[key_v]
                len_v = len(vtx_v.edges)
                while vtx_v.index < len_v:
                    key_w = vtx_v.edges[vtx_v.index]
                    vtx_v.index += 1
                    if not self._visited(key_w):
                        push(key_w)
                        break
                else:
                    stack.pop()
                    root = True
                    for key_w in vtx_v.edges:
                        vtx_w = self._vertices[key_w]
                        if self._active(key_w) and vtx_w.visited < vtx_v.visited:
                            root = False
                            vtx_v.visited = vtx_w.visited
                    if root:
                        key_last = None
                        sccs.append([])
                        while key_last != key_v:
                            key_last = trail[-1]
                            vtx_last = self._vertices[key_last]
                            sccs[-1].append(vtx_last.name)
                            vtx_last.visited = int(self._phase)
                            trail.pop()
                        if len(sccs[-1]) == 1:
                            sccs.pop()

        self._phase = not self._phase
        return sccs


@dataclass
class _StepData:
    atom_tuples: Dict[Sequence[int], int] = field(default_factory=dict)
    lit_tuples: Dict[Sequence[int], int] = field(default_factory=dict)
    wlit_tuples: Dict[Sequence[Tuple[int, int]], int] = field(default_factory=dict)
    theory_tuples: Dict[Sequence[int], int] = field(default_factory=dict)
    theory_element_tuples: Dict[Sequence[int], int] = field(default_factory=dict)
    graph: _Graph = field(default_factory=_Graph)


def _theory(i: Symbol, pos: int, lit: int) -> Sequence[Symbol]:
    return [i, Number(pos), Number(lit)]


def _lit(i: Symbol, pos: int, lit: int) -> Sequence[Symbol]:
    # pylint: disable=unused-argument
    return [i, Number(lit)]


def _wlit(i: Symbol, pos: int, wlit: Tuple[int, int]) -> Sequence[Symbol]:
    # pylint: disable=unused-argument
    return [i, Number(wlit[0]), Number(wlit[1])]


class Reifier(Observer):
    """
    An observer that will gather the symbols of the reification, in the same way as `clingo --output=reify`.

    Parameters
    ----------
    cb
        A callback function that will be called with each symbol of the reification
    calculate_sccs
        Flag to calculate the SCCs
    reify_steps
        Flag to add a number as the last argument of all reification symbols for the corresponding step

    """

    # pylint:disable=too-many-public-methods
    _step: int
    # Bug in mypy???
    # _cb: Callable[[Symbol], None]
    _calculate_sccs: bool
    _reify_steps: bool
    _step_data: _StepData

    def __init__(
        self,
        cb: Callable[[Symbol], None],
        calculate_sccs: bool = False,
        reify_steps: bool = False,
    ):
        self._step = 0
        self._cb = cb
        self._calculate_sccs = calculate_sccs
        self._reify_steps = reify_steps
        self._step_data = _StepData()

    def calculate_sccs(self) -> None:
        """
        Trigger computation of SCCs.

        SCCs can only be computed if the Reifier has been initialized with
        `calculate_sccs=True`, This function is called automatically if
        `reify_steps=True` has been set when initializing the Reifier.
        """
        for idx, scc in enumerate(self._step_data.graph.tarjan()):
            for atm in scc:
                self._output("scc", [Number(idx), Number(atm)])

    def _add_edges(self, head: Sequence[int], body: Sequence[int]):
        if self._calculate_sccs:
            for u in head:
                for v in body:
                    if v > 0:
                        self._step_data.graph.add_edge(u, v)

    def _output(self, name: str, args: Sequence[Symbol]):
        if self._reify_steps:
            args = list(args) + [Number(self._step)]
        self._cb(Function(name, args))

    def _tuple(
        self,
        name: str,
        snmap: Dict[Sequence[U], int],
        elems: Sequence[U],
        afun: Callable[[Symbol, int, U], Sequence[Symbol]],
        ordered: bool = False,
    ) -> Symbol:
        pruned: Sequence[U]
        if ordered:
            pruned = elems
            ident = tuple(elems)
        else:
            seen: Set[U] = set()
            pruned = []
            for elem in elems:
                if elem not in seen:
                    seen.add(elem)
                    pruned.append(elem)
            ident = tuple(sorted(pruned))

        n = len(snmap)
        i = Number(snmap.setdefault(ident, n))
        if n == i.number:
            self._output(name, [i])
            for idx, atm in enumerate(pruned):
                self._output(name, afun(i, idx, atm))
        return i

    def _atom_tuple(self, atoms: Sequence[int]):
        return self._tuple("atom_tuple", self._step_data.atom_tuples, atoms, _lit)

    def _lit_tuple(self, lits: Sequence[int]):
        return self._tuple("literal_tuple", self._step_data.lit_tuples, lits, _lit)

    def _wlit_tuple(self, wlits: Sequence[Tuple[int, int]]):
        return self._tuple(
            "weighted_literal_tuple", self._step_data.wlit_tuples, wlits, _wlit
        )

    def init_program(self, incremental: bool) -> None:
        if incremental:
            self._cb(Function("tag", [Function("incremental")]))

    def begin_step(self) -> None:
        pass

    def rule(self, choice: bool, head: Sequence[int], body: Sequence[int]) -> None:
        hn = "choice" if choice else "disjunction"
        hd = Function(hn, [self._atom_tuple(head)])
        bd = Function("normal", [self._lit_tuple(body)])
        self._output("rule", [hd, bd])
        self._add_edges(head, body)

    def weight_rule(
        self,
        choice: bool,
        head: Sequence[int],
        lower_bound: int,
        body: Sequence[Tuple[int, int]],
    ) -> None:
        hn = "choice" if choice else "disjunction"
        hd = Function(hn, [self._atom_tuple(head)])
        bd = Function("sum", [self._wlit_tuple(body), Number(lower_bound)])
        self._output("rule", [hd, bd])
        self._add_edges(head, [lit for lit, w in body])

    def minimize(self, priority: int, literals: Sequence[Tuple[int, int]]) -> None:
        self._output("minimize", [Number(priority), self._wlit_tuple(literals)])

    def project(self, atoms: Sequence[int]) -> None:
        for atom in atoms:
            self._output("project", [Number(atom)])

    def output_atom(self, symbol: Symbol, atom: int) -> None:
        self._output("output", [symbol, self._lit_tuple([] if atom == 0 else [atom])])

    def output_term(self, symbol: Symbol, condition: Sequence[int]) -> None:
        self._output("output", [symbol, self._lit_tuple(condition)])

    def external(self, atom: int, value: TruthValue) -> None:
        value_name = str(value).replace("TruthValue.", "").lower().rstrip("_")
        self._output("external", [Number(atom), Function(value_name)])

    def assume(self, literals: Sequence[int]) -> None:
        for lit in literals:
            self._output("assume", [Number(lit)])

    def heuristic(
        self,
        atom: int,
        type_: HeuristicType,
        bias: int,
        priority: int,
        condition: Sequence[int],
    ) -> None:
        type_name = str(type_).replace("HeuristicType.", "").lower().rstrip("_")
        condition_lit = self._lit_tuple(condition)
        self._output(
            "heuristic",
            [
                Number(atom),
                Function(type_name),
                Number(bias),
                Number(priority),
                condition_lit,
            ],
        )

    def acyc_edge(self, node_u: int, node_v: int, condition: Sequence[int]) -> None:
        self._output(
            "edge", [Number(node_u), Number(node_v), self._lit_tuple(condition)]
        )

    def theory_term_number(self, term_id: int, number: int) -> None:
        self._output("theory_number", [Number(term_id), Number(number)])

    def theory_term_string(self, term_id: int, name: str) -> None:
        self._output("theory_string", [Number(term_id), String(name)])

    def theory_term_compound(
        self, term_id: int, name_id_or_type: int, arguments: Sequence[int]
    ) -> None:
        names = {-1: "tuple", -2: "set", -3: "list"}
        if name_id_or_type in names:
            name = "theory_sequence"
            value = Function(names[name_id_or_type])
        else:
            name = "theory_function"
            value = Number(name_id_or_type)
        tuple_id = self._tuple(
            "theory_tuple", self._step_data.theory_tuples, arguments, _theory, True
        )
        self._output(name, [Number(term_id), value, tuple_id])

    def theory_element(
        self, element_id: int, terms: Sequence[int], condition: Sequence[int]
    ) -> None:
        tuple_id = self._tuple(
            "theory_tuple", self._step_data.theory_tuples, terms, _theory, True
        )
        condition_id = self._tuple(
            "literal_tuple", self._step_data.lit_tuples, condition, _lit
        )
        self._output("theory_element", [Number(element_id), tuple_id, condition_id])

    def theory_atom(
        self, atom_id_or_zero: int, term_id: int, elements: Sequence[int]
    ) -> None:
        tuple_e_id = self._tuple(
            "theory_element_tuple",
            self._step_data.theory_element_tuples,
            elements,
            _lit,
        )
        self._output(
            "theory_atom", [Number(atom_id_or_zero), Number(term_id), tuple_e_id]
        )

    def theory_atom_with_guard(
        self,
        atom_id_or_zero: int,
        term_id: int,
        elements: Sequence[int],
        operator_id: int,
        right_hand_side_id: int,
    ) -> None:
        tuple_id = self._tuple(
            "theory_element_tuple",
            self._step_data.theory_element_tuples,
            elements,
            _lit,
        )
        self._output(
            "theory_atom",
            [
                Number(atom_id_or_zero),
                Number(term_id),
                tuple_id,
                Number(operator_id),
                Number(right_hand_side_id),
            ],
        )

    def end_step(self) -> None:
        if self._reify_steps:
            self.calculate_sccs()
            self._step += 1
            self._step_data = _StepData()


def _set(
    matches: Sequence[Tuple[str, int]],
    lst: List[Symbol],
    sym,
    append: bool = False,
    default: Symbol = Number(0),
) -> bool:
    for match in matches:
        if not sym.match(*match):
            continue
        idx = len(lst) if append else sym.arguments[0].number
        while len(lst) <= idx:
            lst.append(default)
        lst[idx] = sym
        return True
    return False


def _ensure(name: str, lst: List[List[int]], sym: Symbol, ordered=False) -> bool:
    empty = sym.match(name, 1)
    if empty or sym.match(name, 3 if ordered else 2):
        idx = sym.arguments[0].number
        while len(lst) <= idx:
            lst.append([])
        if not empty:
            if ordered:
                tup = lst[idx]
                jdx = sym.arguments[1].number
                while len(tup) <= jdx:
                    tup.append(0)
                tup[jdx] = sym.arguments[2].number
            else:
                lst[idx].append(sym.arguments[1].number)
        return True
    return False


class ReifiedTheory:
    """
    Class indexing the symbols related to a theory.

    The `ReifiedTheoryTerm`, `ReifiedTheoryElement`, and `ReifiedTheoryElement`
    classes provide views on this data that behave as the corresponding classes
    in clingo's `clingo.theory_atoms` module.
    """

    terms: List[Symbol]
    elements: List[Symbol]
    atoms: List[Symbol]
    term_tuples: List[List[int]]
    element_tuples: List[List[int]]

    def __init__(self, symbols: Sequence[Symbol]):
        self.terms = []
        self.elements = []
        self.atoms = []
        self.term_tuples = []
        self.element_tuples = []

        for sym in symbols:
            _ = (
                _set((("theory_atom", 3), ("theory_atom", 5)), self.atoms, sym, True)
                or _set((("theory_element", 3),), self.elements, sym)
                or _set(
                    (
                        ("theory_sequence", 3),
                        ("theory_string", 2),
                        ("theory_number", 2),
                        ("theory_function", 3),
                    ),
                    self.terms,
                    sym,
                )
                or _ensure("theory_tuple", self.term_tuples, sym, True)
                or _ensure("theory_element_tuple", self.element_tuples, sym)
            )

    def __iter__(self) -> Iterator["ReifiedTheoryAtom"]:
        for idx in range(len(self.atoms)):
            yield ReifiedTheoryAtom(idx, self)


class ReifiedTheoryTerm:
    """
    Class to represent theory terms.

    ReifiedTheory terms have a readable string representation, implement Python's rich
    comparison operators, and can be used as dictionary keys.
    """

    _idx: int
    _theory: ReifiedTheory

    def __init__(self, idx: int, theory: ReifiedTheory):
        self._idx = idx
        self._theory = theory
        assert self.index < len(theory.terms)

    @property
    def index(self) -> int:
        """
        The index of the corresponding reified fact.
        """
        return self._idx

    @property
    def _args(self) -> Sequence[Symbol]:
        return self._theory.terms[self._idx].arguments

    @property
    def arguments(self) -> List["ReifiedTheoryTerm"]:
        """
        The arguments of the term (for functions, tuples, list, and sets).
        """
        assert self.type in (
            TheoryTermType.List,
            TheoryTermType.Set,
            TheoryTermType.Tuple,
            TheoryTermType.Function,
        )
        term_ids = self._theory.term_tuples[self._args[2].number]
        return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids]

    @property
    def name(self) -> str:
        """
        The name of the term (for symbols and functions).
        """
        assert self.type in (TheoryTermType.Symbol, TheoryTermType.Function)
        if self.type == TheoryTermType.Function:
            return self._theory.terms[self._args[1].number].arguments[1].string
        return self._args[1].string

    @property
    def number(self) -> int:
        """
        The numeric representation of the term (for numbers).
        """
        assert self.type == TheoryTermType.Number
        return self._args[1].number

    @property
    def type(self) -> TheoryTermType:
        """
        The type of the theory term.
        """
        name = self._theory.terms[self._idx].name
        if name == "theory_number":
            return TheoryTermType.Number
        if name == "theory_string":
            return TheoryTermType.Symbol
        if name == "theory_function":
            return TheoryTermType.Function
        assert name == "theory_sequence"
        type_ = self._args[1].name
        if type_ == "tuple":
            return TheoryTermType.Tuple
        if type_ == "set":
            return TheoryTermType.Set
        assert type_ == "list"
        return TheoryTermType.List

    def __hash__(self):
        return self._idx

    def __eq__(self, other):
        return self._idx == other._idx

    def __lt__(self, other):
        return self._idx < other._idx

    def __str__(self):
        type_ = self.type

        if type_ == TheoryTermType.Number:
            return f"{self.number}"

        if type_ == TheoryTermType.Symbol:
            return f"{self.name}"

        if type_ == TheoryTermType.Function:
            args = self.arguments
            name = self.name
            if len(args) == 1 and is_operator(name):
                return f"{name}({args[0]})"
            if len(args) == 2 and is_operator(name):
                return f"({args[0]}){name}({args[1]})"
            return f'{name}({",".join(str(arg) for arg in args)})'

        if type_ == TheoryTermType.Tuple:
            lhs, rhs = "(", ")"
        elif type_ == TheoryTermType.List:
            lhs, rhs = "[", "]"
        else:
            lhs, rhs = "{", "}"
        return f'{lhs}{",".join(str(arg) for arg in self.arguments)}{rhs}'


class ReifiedTheoryElement:
    """
    Class to represent theory elements.

    ReifiedTheory elements have a readable string representation, implement Python's
    rich comparison operators, and can be used as dictionary keys.
    """

    _idx: int
    _theory: ReifiedTheory

    def __init__(self, idx: int, theory: ReifiedTheory):
        self._idx = idx
        self._theory = theory
        assert self.index < len(theory.elements)

    @property
    def index(self) -> int:
        """
        The index of the corresponding reified fact.
        """
        return self._idx

    @property
    def _args(self) -> Sequence[Symbol]:
        return self._theory.elements[self._idx].arguments

    @property
    def condition_id(self) -> int:
        """
        The id of the literal tuple of the condition.
        """
        return self._args[2].number

    @property
    def terms(self) -> List[ReifiedTheoryTerm]:
        """
        The tuple of the element.
        """
        term_ids = self._theory.term_tuples[self._args[1].number]
        return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids]

    def __hash__(self):
        return self._idx

    def __eq__(self, other):
        return self._idx == other._idx

    def __lt__(self, other):
        return self._idx < other._idx

    def __str__(self):
        return f'{",".join(str(term) for term in self.terms)}: literal_tuple({self.condition_id})'


class ReifiedTheoryAtom:
    """
    Class to represent theory atoms.

    Theory atoms have a readable string representation, implement Python's rich
    comparison operators, and can be used as dictionary keys.
    """

    _idx: int
    _theory: ReifiedTheory

    def __init__(self, idx: int, theory: ReifiedTheory):
        self._idx = idx
        self._theory = theory
        assert self.index < len(theory.atoms)

    @property
    def index(self) -> int:
        """
        The index of the corresponding reified fact.
        """
        return self._idx

    @property
    def _args(self) -> Sequence[Symbol]:
        return self._theory.atoms[self._idx].arguments

    @property
    def elements(self) -> List[ReifiedTheoryElement]:
        """
        The elements of the atom.
        """
        tuple_id = self._args[2].number
        return [
            ReifiedTheoryElement(elem_id, self._theory)
            for elem_id in self._theory.element_tuples[tuple_id]
        ]

    @property
    def guard(self) -> Optional[Tuple[str, ReifiedTheoryTerm]]:
        """
        The guard of the atom or None if the atom has no guard.
        """
        args = self._args
        if len(args) <= 3:
            return None

        op = self._theory.terms[args[3].number].arguments[1].string
        return (op, ReifiedTheoryTerm(args[4].number, self._theory))

    @property
    def literal(self) -> int:
        """
        The reified literal associated with the atom.
        """
        return self._args[0].number

    @property
    def term(self) -> ReifiedTheoryTerm:
        """
        The term of the atom.
        """
        return ReifiedTheoryTerm(self._args[1].number, self._theory)

    def __hash__(self):
        return self._idx

    def __eq__(self, other):
        return self._idx == other._idx

    def __lt__(self, other):
        return self._idx < other._idx

    def __str__(self):
        name = f"&{self.term}"

        elems = self.elements
        if elems:
            estr = f' {{ {"; ".join(str(elem) for elem in elems)} }}'
        else:
            estr = ""

        guard = self.guard
        if guard:
            gstr = f" {guard[0]} {guard[1]}"
        else:
            gstr = ""

        return f"{name}{estr}{gstr}"


def reify_program(
    prg: str, calculate_sccs: bool = False, reify_steps: bool = False
) -> List[Symbol]:
    """
    Reify the given program and return the reified symbols.

    Parameters
    ----------
    prg
        The program to reify in form of a string.
    calculate_sccs
        Whether to calculate SCCs of the reified program.
    reify_steps
        Whether to add a step number to the reified facts.

    Returns
    -------
    A list of symbols containing the reified facts.
    """
    ret: List[Symbol] = []
    ctl = Control()
    reifier = Reifier(ret.append, calculate_sccs, reify_steps)
    ctl.register_observer(reifier)
    ctl.add("base", [], prg)
    ctl.ground([("base", [])])
    if calculate_sccs and not reify_steps:
        reifier.calculate_sccs()

    return ret

Functions

def reify_program(prg: str, calculate_sccs: bool = False, reify_steps: bool = False) ‑> List[Symbol]

Reify the given program and return the reified symbols.

Parameters

prg
The program to reify in form of a string.
calculate_sccs
Whether to calculate SCCs of the reified program.
reify_steps
Whether to add a step number to the reified facts.

Returns

A list of symbols containing the reified facts.

Expand source code
def reify_program(
    prg: str, calculate_sccs: bool = False, reify_steps: bool = False
) -> List[Symbol]:
    """
    Reify the given program and return the reified symbols.

    Parameters
    ----------
    prg
        The program to reify in form of a string.
    calculate_sccs
        Whether to calculate SCCs of the reified program.
    reify_steps
        Whether to add a step number to the reified facts.

    Returns
    -------
    A list of symbols containing the reified facts.
    """
    ret: List[Symbol] = []
    ctl = Control()
    reifier = Reifier(ret.append, calculate_sccs, reify_steps)
    ctl.register_observer(reifier)
    ctl.add("base", [], prg)
    ctl.ground([("base", [])])
    if calculate_sccs and not reify_steps:
        reifier.calculate_sccs()

    return ret

Classes

class ReifiedTheory (symbols: Sequence[Symbol])

Class indexing the symbols related to a theory.

The ReifiedTheoryTerm, ReifiedTheoryElement, and ReifiedTheoryElement classes provide views on this data that behave as the corresponding classes in clingo's clingo.theory_atoms module.

Expand source code
class ReifiedTheory:
    """
    Class indexing the symbols related to a theory.

    The `ReifiedTheoryTerm`, `ReifiedTheoryElement`, and `ReifiedTheoryElement`
    classes provide views on this data that behave as the corresponding classes
    in clingo's `clingo.theory_atoms` module.
    """

    terms: List[Symbol]
    elements: List[Symbol]
    atoms: List[Symbol]
    term_tuples: List[List[int]]
    element_tuples: List[List[int]]

    def __init__(self, symbols: Sequence[Symbol]):
        self.terms = []
        self.elements = []
        self.atoms = []
        self.term_tuples = []
        self.element_tuples = []

        for sym in symbols:
            _ = (
                _set((("theory_atom", 3), ("theory_atom", 5)), self.atoms, sym, True)
                or _set((("theory_element", 3),), self.elements, sym)
                or _set(
                    (
                        ("theory_sequence", 3),
                        ("theory_string", 2),
                        ("theory_number", 2),
                        ("theory_function", 3),
                    ),
                    self.terms,
                    sym,
                )
                or _ensure("theory_tuple", self.term_tuples, sym, True)
                or _ensure("theory_element_tuple", self.element_tuples, sym)
            )

    def __iter__(self) -> Iterator["ReifiedTheoryAtom"]:
        for idx in range(len(self.atoms)):
            yield ReifiedTheoryAtom(idx, self)

Class variables

var atoms : List[Symbol]
var element_tuples : List[List[int]]
var elements : List[Symbol]
var term_tuples : List[List[int]]
var terms : List[Symbol]
class ReifiedTheoryAtom (idx: int, theory: ReifiedTheory)

Class to represent theory atoms.

Theory atoms have a readable string representation, implement Python's rich comparison operators, and can be used as dictionary keys.

Expand source code
class ReifiedTheoryAtom:
    """
    Class to represent theory atoms.

    Theory atoms have a readable string representation, implement Python's rich
    comparison operators, and can be used as dictionary keys.
    """

    _idx: int
    _theory: ReifiedTheory

    def __init__(self, idx: int, theory: ReifiedTheory):
        self._idx = idx
        self._theory = theory
        assert self.index < len(theory.atoms)

    @property
    def index(self) -> int:
        """
        The index of the corresponding reified fact.
        """
        return self._idx

    @property
    def _args(self) -> Sequence[Symbol]:
        return self._theory.atoms[self._idx].arguments

    @property
    def elements(self) -> List[ReifiedTheoryElement]:
        """
        The elements of the atom.
        """
        tuple_id = self._args[2].number
        return [
            ReifiedTheoryElement(elem_id, self._theory)
            for elem_id in self._theory.element_tuples[tuple_id]
        ]

    @property
    def guard(self) -> Optional[Tuple[str, ReifiedTheoryTerm]]:
        """
        The guard of the atom or None if the atom has no guard.
        """
        args = self._args
        if len(args) <= 3:
            return None

        op = self._theory.terms[args[3].number].arguments[1].string
        return (op, ReifiedTheoryTerm(args[4].number, self._theory))

    @property
    def literal(self) -> int:
        """
        The reified literal associated with the atom.
        """
        return self._args[0].number

    @property
    def term(self) -> ReifiedTheoryTerm:
        """
        The term of the atom.
        """
        return ReifiedTheoryTerm(self._args[1].number, self._theory)

    def __hash__(self):
        return self._idx

    def __eq__(self, other):
        return self._idx == other._idx

    def __lt__(self, other):
        return self._idx < other._idx

    def __str__(self):
        name = f"&{self.term}"

        elems = self.elements
        if elems:
            estr = f' {{ {"; ".join(str(elem) for elem in elems)} }}'
        else:
            estr = ""

        guard = self.guard
        if guard:
            gstr = f" {guard[0]} {guard[1]}"
        else:
            gstr = ""

        return f"{name}{estr}{gstr}"

Instance variables

var elements : List[ReifiedTheoryElement]

The elements of the atom.

Expand source code
@property
def elements(self) -> List[ReifiedTheoryElement]:
    """
    The elements of the atom.
    """
    tuple_id = self._args[2].number
    return [
        ReifiedTheoryElement(elem_id, self._theory)
        for elem_id in self._theory.element_tuples[tuple_id]
    ]
var guard : Optional[Tuple[str, ReifiedTheoryTerm]]

The guard of the atom or None if the atom has no guard.

Expand source code
@property
def guard(self) -> Optional[Tuple[str, ReifiedTheoryTerm]]:
    """
    The guard of the atom or None if the atom has no guard.
    """
    args = self._args
    if len(args) <= 3:
        return None

    op = self._theory.terms[args[3].number].arguments[1].string
    return (op, ReifiedTheoryTerm(args[4].number, self._theory))
var index : int

The index of the corresponding reified fact.

Expand source code
@property
def index(self) -> int:
    """
    The index of the corresponding reified fact.
    """
    return self._idx
var literal : int

The reified literal associated with the atom.

Expand source code
@property
def literal(self) -> int:
    """
    The reified literal associated with the atom.
    """
    return self._args[0].number
var termReifiedTheoryTerm

The term of the atom.

Expand source code
@property
def term(self) -> ReifiedTheoryTerm:
    """
    The term of the atom.
    """
    return ReifiedTheoryTerm(self._args[1].number, self._theory)
class ReifiedTheoryElement (idx: int, theory: ReifiedTheory)

Class to represent theory elements.

ReifiedTheory elements have a readable string representation, implement Python's rich comparison operators, and can be used as dictionary keys.

Expand source code
class ReifiedTheoryElement:
    """
    Class to represent theory elements.

    ReifiedTheory elements have a readable string representation, implement Python's
    rich comparison operators, and can be used as dictionary keys.
    """

    _idx: int
    _theory: ReifiedTheory

    def __init__(self, idx: int, theory: ReifiedTheory):
        self._idx = idx
        self._theory = theory
        assert self.index < len(theory.elements)

    @property
    def index(self) -> int:
        """
        The index of the corresponding reified fact.
        """
        return self._idx

    @property
    def _args(self) -> Sequence[Symbol]:
        return self._theory.elements[self._idx].arguments

    @property
    def condition_id(self) -> int:
        """
        The id of the literal tuple of the condition.
        """
        return self._args[2].number

    @property
    def terms(self) -> List[ReifiedTheoryTerm]:
        """
        The tuple of the element.
        """
        term_ids = self._theory.term_tuples[self._args[1].number]
        return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids]

    def __hash__(self):
        return self._idx

    def __eq__(self, other):
        return self._idx == other._idx

    def __lt__(self, other):
        return self._idx < other._idx

    def __str__(self):
        return f'{",".join(str(term) for term in self.terms)}: literal_tuple({self.condition_id})'

Instance variables

var condition_id : int

The id of the literal tuple of the condition.

Expand source code
@property
def condition_id(self) -> int:
    """
    The id of the literal tuple of the condition.
    """
    return self._args[2].number
var index : int

The index of the corresponding reified fact.

Expand source code
@property
def index(self) -> int:
    """
    The index of the corresponding reified fact.
    """
    return self._idx
var terms : List[ReifiedTheoryTerm]

The tuple of the element.

Expand source code
@property
def terms(self) -> List[ReifiedTheoryTerm]:
    """
    The tuple of the element.
    """
    term_ids = self._theory.term_tuples[self._args[1].number]
    return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids]
class ReifiedTheoryTerm (idx: int, theory: ReifiedTheory)

Class to represent theory terms.

ReifiedTheory terms have a readable string representation, implement Python's rich comparison operators, and can be used as dictionary keys.

Expand source code
class ReifiedTheoryTerm:
    """
    Class to represent theory terms.

    ReifiedTheory terms have a readable string representation, implement Python's rich
    comparison operators, and can be used as dictionary keys.
    """

    _idx: int
    _theory: ReifiedTheory

    def __init__(self, idx: int, theory: ReifiedTheory):
        self._idx = idx
        self._theory = theory
        assert self.index < len(theory.terms)

    @property
    def index(self) -> int:
        """
        The index of the corresponding reified fact.
        """
        return self._idx

    @property
    def _args(self) -> Sequence[Symbol]:
        return self._theory.terms[self._idx].arguments

    @property
    def arguments(self) -> List["ReifiedTheoryTerm"]:
        """
        The arguments of the term (for functions, tuples, list, and sets).
        """
        assert self.type in (
            TheoryTermType.List,
            TheoryTermType.Set,
            TheoryTermType.Tuple,
            TheoryTermType.Function,
        )
        term_ids = self._theory.term_tuples[self._args[2].number]
        return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids]

    @property
    def name(self) -> str:
        """
        The name of the term (for symbols and functions).
        """
        assert self.type in (TheoryTermType.Symbol, TheoryTermType.Function)
        if self.type == TheoryTermType.Function:
            return self._theory.terms[self._args[1].number].arguments[1].string
        return self._args[1].string

    @property
    def number(self) -> int:
        """
        The numeric representation of the term (for numbers).
        """
        assert self.type == TheoryTermType.Number
        return self._args[1].number

    @property
    def type(self) -> TheoryTermType:
        """
        The type of the theory term.
        """
        name = self._theory.terms[self._idx].name
        if name == "theory_number":
            return TheoryTermType.Number
        if name == "theory_string":
            return TheoryTermType.Symbol
        if name == "theory_function":
            return TheoryTermType.Function
        assert name == "theory_sequence"
        type_ = self._args[1].name
        if type_ == "tuple":
            return TheoryTermType.Tuple
        if type_ == "set":
            return TheoryTermType.Set
        assert type_ == "list"
        return TheoryTermType.List

    def __hash__(self):
        return self._idx

    def __eq__(self, other):
        return self._idx == other._idx

    def __lt__(self, other):
        return self._idx < other._idx

    def __str__(self):
        type_ = self.type

        if type_ == TheoryTermType.Number:
            return f"{self.number}"

        if type_ == TheoryTermType.Symbol:
            return f"{self.name}"

        if type_ == TheoryTermType.Function:
            args = self.arguments
            name = self.name
            if len(args) == 1 and is_operator(name):
                return f"{name}({args[0]})"
            if len(args) == 2 and is_operator(name):
                return f"({args[0]}){name}({args[1]})"
            return f'{name}({",".join(str(arg) for arg in args)})'

        if type_ == TheoryTermType.Tuple:
            lhs, rhs = "(", ")"
        elif type_ == TheoryTermType.List:
            lhs, rhs = "[", "]"
        else:
            lhs, rhs = "{", "}"
        return f'{lhs}{",".join(str(arg) for arg in self.arguments)}{rhs}'

Instance variables

var arguments : List[ReifiedTheoryTerm]

The arguments of the term (for functions, tuples, list, and sets).

Expand source code
@property
def arguments(self) -> List["ReifiedTheoryTerm"]:
    """
    The arguments of the term (for functions, tuples, list, and sets).
    """
    assert self.type in (
        TheoryTermType.List,
        TheoryTermType.Set,
        TheoryTermType.Tuple,
        TheoryTermType.Function,
    )
    term_ids = self._theory.term_tuples[self._args[2].number]
    return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids]
var index : int

The index of the corresponding reified fact.

Expand source code
@property
def index(self) -> int:
    """
    The index of the corresponding reified fact.
    """
    return self._idx
var name : str

The name of the term (for symbols and functions).

Expand source code
@property
def name(self) -> str:
    """
    The name of the term (for symbols and functions).
    """
    assert self.type in (TheoryTermType.Symbol, TheoryTermType.Function)
    if self.type == TheoryTermType.Function:
        return self._theory.terms[self._args[1].number].arguments[1].string
    return self._args[1].string
var number : int

The numeric representation of the term (for numbers).

Expand source code
@property
def number(self) -> int:
    """
    The numeric representation of the term (for numbers).
    """
    assert self.type == TheoryTermType.Number
    return self._args[1].number
var typeTheoryTermType

The type of the theory term.

Expand source code
@property
def type(self) -> TheoryTermType:
    """
    The type of the theory term.
    """
    name = self._theory.terms[self._idx].name
    if name == "theory_number":
        return TheoryTermType.Number
    if name == "theory_string":
        return TheoryTermType.Symbol
    if name == "theory_function":
        return TheoryTermType.Function
    assert name == "theory_sequence"
    type_ = self._args[1].name
    if type_ == "tuple":
        return TheoryTermType.Tuple
    if type_ == "set":
        return TheoryTermType.Set
    assert type_ == "list"
    return TheoryTermType.List
class Reifier (cb: Callable[[Symbol], None], calculate_sccs: bool = False, reify_steps: bool = False)

An observer that will gather the symbols of the reification, in the same way as clingo --output=reify.

Parameters

cb
A callback function that will be called with each symbol of the reification
calculate_sccs
Flag to calculate the SCCs
reify_steps
Flag to add a number as the last argument of all reification symbols for the corresponding step
Expand source code
class Reifier(Observer):
    """
    An observer that will gather the symbols of the reification, in the same way as `clingo --output=reify`.

    Parameters
    ----------
    cb
        A callback function that will be called with each symbol of the reification
    calculate_sccs
        Flag to calculate the SCCs
    reify_steps
        Flag to add a number as the last argument of all reification symbols for the corresponding step

    """

    # pylint:disable=too-many-public-methods
    _step: int
    # Bug in mypy???
    # _cb: Callable[[Symbol], None]
    _calculate_sccs: bool
    _reify_steps: bool
    _step_data: _StepData

    def __init__(
        self,
        cb: Callable[[Symbol], None],
        calculate_sccs: bool = False,
        reify_steps: bool = False,
    ):
        self._step = 0
        self._cb = cb
        self._calculate_sccs = calculate_sccs
        self._reify_steps = reify_steps
        self._step_data = _StepData()

    def calculate_sccs(self) -> None:
        """
        Trigger computation of SCCs.

        SCCs can only be computed if the Reifier has been initialized with
        `calculate_sccs=True`, This function is called automatically if
        `reify_steps=True` has been set when initializing the Reifier.
        """
        for idx, scc in enumerate(self._step_data.graph.tarjan()):
            for atm in scc:
                self._output("scc", [Number(idx), Number(atm)])

    def _add_edges(self, head: Sequence[int], body: Sequence[int]):
        if self._calculate_sccs:
            for u in head:
                for v in body:
                    if v > 0:
                        self._step_data.graph.add_edge(u, v)

    def _output(self, name: str, args: Sequence[Symbol]):
        if self._reify_steps:
            args = list(args) + [Number(self._step)]
        self._cb(Function(name, args))

    def _tuple(
        self,
        name: str,
        snmap: Dict[Sequence[U], int],
        elems: Sequence[U],
        afun: Callable[[Symbol, int, U], Sequence[Symbol]],
        ordered: bool = False,
    ) -> Symbol:
        pruned: Sequence[U]
        if ordered:
            pruned = elems
            ident = tuple(elems)
        else:
            seen: Set[U] = set()
            pruned = []
            for elem in elems:
                if elem not in seen:
                    seen.add(elem)
                    pruned.append(elem)
            ident = tuple(sorted(pruned))

        n = len(snmap)
        i = Number(snmap.setdefault(ident, n))
        if n == i.number:
            self._output(name, [i])
            for idx, atm in enumerate(pruned):
                self._output(name, afun(i, idx, atm))
        return i

    def _atom_tuple(self, atoms: Sequence[int]):
        return self._tuple("atom_tuple", self._step_data.atom_tuples, atoms, _lit)

    def _lit_tuple(self, lits: Sequence[int]):
        return self._tuple("literal_tuple", self._step_data.lit_tuples, lits, _lit)

    def _wlit_tuple(self, wlits: Sequence[Tuple[int, int]]):
        return self._tuple(
            "weighted_literal_tuple", self._step_data.wlit_tuples, wlits, _wlit
        )

    def init_program(self, incremental: bool) -> None:
        if incremental:
            self._cb(Function("tag", [Function("incremental")]))

    def begin_step(self) -> None:
        pass

    def rule(self, choice: bool, head: Sequence[int], body: Sequence[int]) -> None:
        hn = "choice" if choice else "disjunction"
        hd = Function(hn, [self._atom_tuple(head)])
        bd = Function("normal", [self._lit_tuple(body)])
        self._output("rule", [hd, bd])
        self._add_edges(head, body)

    def weight_rule(
        self,
        choice: bool,
        head: Sequence[int],
        lower_bound: int,
        body: Sequence[Tuple[int, int]],
    ) -> None:
        hn = "choice" if choice else "disjunction"
        hd = Function(hn, [self._atom_tuple(head)])
        bd = Function("sum", [self._wlit_tuple(body), Number(lower_bound)])
        self._output("rule", [hd, bd])
        self._add_edges(head, [lit for lit, w in body])

    def minimize(self, priority: int, literals: Sequence[Tuple[int, int]]) -> None:
        self._output("minimize", [Number(priority), self._wlit_tuple(literals)])

    def project(self, atoms: Sequence[int]) -> None:
        for atom in atoms:
            self._output("project", [Number(atom)])

    def output_atom(self, symbol: Symbol, atom: int) -> None:
        self._output("output", [symbol, self._lit_tuple([] if atom == 0 else [atom])])

    def output_term(self, symbol: Symbol, condition: Sequence[int]) -> None:
        self._output("output", [symbol, self._lit_tuple(condition)])

    def external(self, atom: int, value: TruthValue) -> None:
        value_name = str(value).replace("TruthValue.", "").lower().rstrip("_")
        self._output("external", [Number(atom), Function(value_name)])

    def assume(self, literals: Sequence[int]) -> None:
        for lit in literals:
            self._output("assume", [Number(lit)])

    def heuristic(
        self,
        atom: int,
        type_: HeuristicType,
        bias: int,
        priority: int,
        condition: Sequence[int],
    ) -> None:
        type_name = str(type_).replace("HeuristicType.", "").lower().rstrip("_")
        condition_lit = self._lit_tuple(condition)
        self._output(
            "heuristic",
            [
                Number(atom),
                Function(type_name),
                Number(bias),
                Number(priority),
                condition_lit,
            ],
        )

    def acyc_edge(self, node_u: int, node_v: int, condition: Sequence[int]) -> None:
        self._output(
            "edge", [Number(node_u), Number(node_v), self._lit_tuple(condition)]
        )

    def theory_term_number(self, term_id: int, number: int) -> None:
        self._output("theory_number", [Number(term_id), Number(number)])

    def theory_term_string(self, term_id: int, name: str) -> None:
        self._output("theory_string", [Number(term_id), String(name)])

    def theory_term_compound(
        self, term_id: int, name_id_or_type: int, arguments: Sequence[int]
    ) -> None:
        names = {-1: "tuple", -2: "set", -3: "list"}
        if name_id_or_type in names:
            name = "theory_sequence"
            value = Function(names[name_id_or_type])
        else:
            name = "theory_function"
            value = Number(name_id_or_type)
        tuple_id = self._tuple(
            "theory_tuple", self._step_data.theory_tuples, arguments, _theory, True
        )
        self._output(name, [Number(term_id), value, tuple_id])

    def theory_element(
        self, element_id: int, terms: Sequence[int], condition: Sequence[int]
    ) -> None:
        tuple_id = self._tuple(
            "theory_tuple", self._step_data.theory_tuples, terms, _theory, True
        )
        condition_id = self._tuple(
            "literal_tuple", self._step_data.lit_tuples, condition, _lit
        )
        self._output("theory_element", [Number(element_id), tuple_id, condition_id])

    def theory_atom(
        self, atom_id_or_zero: int, term_id: int, elements: Sequence[int]
    ) -> None:
        tuple_e_id = self._tuple(
            "theory_element_tuple",
            self._step_data.theory_element_tuples,
            elements,
            _lit,
        )
        self._output(
            "theory_atom", [Number(atom_id_or_zero), Number(term_id), tuple_e_id]
        )

    def theory_atom_with_guard(
        self,
        atom_id_or_zero: int,
        term_id: int,
        elements: Sequence[int],
        operator_id: int,
        right_hand_side_id: int,
    ) -> None:
        tuple_id = self._tuple(
            "theory_element_tuple",
            self._step_data.theory_element_tuples,
            elements,
            _lit,
        )
        self._output(
            "theory_atom",
            [
                Number(atom_id_or_zero),
                Number(term_id),
                tuple_id,
                Number(operator_id),
                Number(right_hand_side_id),
            ],
        )

    def end_step(self) -> None:
        if self._reify_steps:
            self.calculate_sccs()
            self._step += 1
            self._step_data = _StepData()

Ancestors

Methods

def calculate_sccs(self) ‑> None

Trigger computation of SCCs.

SCCs can only be computed if the Reifier has been initialized with calculate_sccs=True, This function is called automatically if reify_steps=True has been set when initializing the Reifier.

Expand source code
def calculate_sccs(self) -> None:
    """
    Trigger computation of SCCs.

    SCCs can only be computed if the Reifier has been initialized with
    `calculate_sccs=True`, This function is called automatically if
    `reify_steps=True` has been set when initializing the Reifier.
    """
    for idx, scc in enumerate(self._step_data.graph.tarjan()):
        for atm in scc:
            self._output("scc", [Number(idx), Number(atm)])

Inherited members