Module clingox.reify

This module provides functions to reify programs.

This includes a Reifier implementing clingo's clingo.Observer interface that can be registered with a clingo.Control object.

Additionally, the module provides a ReifiedTheory class that provides a similar interface as clingo's theory atoms but uses the reified symbols.

Examples

The following example uses the reify_program() function to reify a program:

>>> from clingox.reify import reify_program
>>> prg = 'b :- a. {a}.'
>>> symbols = reify_program(prg)
>>> print([str(sym) for sym in symbols])
['tag(incremental)', 'atom_tuple(0)', 'atom_tuple(0,1)', 'literal_tuple(0)',
'rule(choice(0),normal(0))', 'atom_tuple(1)', 'atom_tuple(1,2)',
'literal_tuple(1)', 'literal_tuple(1,1)', 'rule(disjunction(1),normal(1))',
'output(a,1)', 'literal_tuple(2)', 'literal_tuple(2,2)', 'output(b,2)']

The last example shows how to use the ReifiedTheory class.

>>> from clingox.reify import ReifiedTheory, reify_program
>>> prg = '#theory theory { t { }; &p/0 : t, any }. &p { t }.'
>>> thy = ReifiedTheory(reify_program(prg))
>>> print([str(atm) for atm in thy])
['&p { t: literal_tuple(0) }']
>>> from clingox.theory import evaluate
>>> evaluate(next(iter(thy)).term)
Function('p', [], True)
Expand source code
'''
This module provides functions to reify programs.

This includes a `Reifier` implementing clingo's `clingo.Observer` interface
that can be registered with a `clingo.Control` object.

Additionally, the module provides a `ReifiedTheory` class that provides a
similar interface as clingo's theory atoms but uses the reified symbols.

Examples
--------

The following example uses the `reify_program` function to reify a program:

```python-repl
>>> from clingox.reify import reify_program
>>> prg = 'b :- a. {a}.'
>>> symbols = reify_program(prg)
>>> print([str(sym) for sym in symbols])
['tag(incremental)', 'atom_tuple(0)', 'atom_tuple(0,1)', 'literal_tuple(0)',
'rule(choice(0),normal(0))', 'atom_tuple(1)', 'atom_tuple(1,2)',
'literal_tuple(1)', 'literal_tuple(1,1)', 'rule(disjunction(1),normal(1))',
'output(a,1)', 'literal_tuple(2)', 'literal_tuple(2,2)', 'output(b,2)']
```

The last example shows how to use the `ReifiedTheory` class.

```python-repl
>>> from clingox.reify import ReifiedTheory, reify_program
>>> prg = '#theory theory { t { }; &p/0 : t, any }. &p { t }.'
>>> thy = ReifiedTheory(reify_program(prg))
>>> print([str(atm) for atm in thy])
['&p { t: literal_tuple(0) }']
>>> from clingox.theory import evaluate
>>> evaluate(next(iter(thy)).term)
Function('p', [], True)
```
'''

from typing import Callable, Dict, Generic, Iterator, List, Optional, Sequence, Set, Tuple, TypeVar
from dataclasses import dataclass, field

from clingo.control import Control
from clingo.backend import HeuristicType, Observer, TruthValue
from clingo.symbol import Function, Number, String, Symbol
from clingo.theory_atoms import TheoryTermType

from .theory import is_operator

__all__ = ['Reifier', 'ReifiedTheory', 'ReifiedTheoryAtom', 'ReifiedTheoryElement', 'ReifiedTheoryTerm',
           'ReifiedTheory', 'reify_program']

T = TypeVar('T')  # pylint: disable=invalid-name
U = TypeVar('U', int, Tuple[int, int])  # pylint: disable=invalid-name


@dataclass
class _Vertex(Generic[T]):
    '''
    Vertex data to calculate SCCs of a graph.
    '''
    name: T
    visited: int
    index: int = 0
    edges: List[int] = field(default_factory=list)


class _Graph(Generic[T]):
    '''
    Simple class to compute strongly connected components using Tarjan's
    algorithm.
    '''
    _names: Dict[T, int]
    _vertices: List[_Vertex]
    _phase: bool

    def __init__(self):
        self._names = {}
        self._vertices = []
        self._phase = True

    def _visited(self, key_u: int) -> bool:
        return self._vertices[key_u].visited != int(not self._phase)

    def _active(self, key_u: int) -> bool:
        return self._vertices[key_u].visited != int(self._phase)

    def _add_vertex(self, val_u: T) -> int:
        n = len(self._vertices)
        key_u = self._names.setdefault(val_u, n)
        if n == key_u:
            self._vertices.append(_Vertex(val_u, int(not self._phase)))
        return key_u

    def add_edge(self, val_u: T, val_v: T) -> None:
        '''
        Add an edge to the graph.
        '''
        key_u = self._add_vertex(val_u)
        key_v = self._add_vertex(val_v)
        self._vertices[key_u].edges.append(key_v)

    def tarjan(self) -> List[List[T]]:
        '''
        Returns the strictly connected components of the graph.
        '''
        sccs: List[List[T]] = []
        stack = []
        trail = []
        index = 1

        def push(key_u: int):
            nonlocal index
            index += 1
            vtx_u = self._vertices[key_u]
            vtx_u.visited = index
            vtx_u.index = 0
            stack.append(key_u)
            trail.append(key_u)

        for key_u in range(len(self._vertices)):
            if self._visited(key_u):
                continue
            index = 1
            push(key_u)
            while stack:
                key_v = stack[-1]
                vtx_v = self._vertices[key_v]
                len_v = len(vtx_v.edges)
                while vtx_v.index < len_v:
                    key_w = vtx_v.edges[vtx_v.index]
                    vtx_v.index += 1
                    if not self._visited(key_w):
                        push(key_w)
                        break
                else:
                    stack.pop()
                    root = True
                    for key_w in vtx_v.edges:
                        vtx_w = self._vertices[key_w]
                        if self._active(key_w) and vtx_w.visited < vtx_v.visited:
                            root = False
                            vtx_v.visited = vtx_w.visited
                    if root:
                        key_last = None
                        sccs.append([])
                        while key_last != key_v:
                            key_last = trail[-1]
                            vtx_last = self._vertices[key_last]
                            sccs[-1].append(vtx_last.name)
                            vtx_last.visited = int(self._phase)
                            trail.pop()
                        if len(sccs[-1]) == 1:
                            sccs.pop()

        self._phase = not self._phase
        return sccs


@dataclass
class _StepData:
    atom_tuples: Dict[Sequence[int], int] = field(default_factory=dict)
    lit_tuples: Dict[Sequence[int], int] = field(default_factory=dict)
    wlit_tuples: Dict[Sequence[Tuple[int, int]], int] = field(default_factory=dict)
    theory_tuples: Dict[Sequence[int], int] = field(default_factory=dict)
    theory_element_tuples: Dict[Sequence[int], int] = field(default_factory=dict)
    graph: _Graph = field(default_factory=_Graph)


def _theory(i: Symbol, pos: int, lit: int) -> Sequence[Symbol]:
    return [i, Number(pos), Number(lit)]


def _lit(i: Symbol, pos: int, lit: int) -> Sequence[Symbol]:
    # pylint: disable=unused-argument
    return [i, Number(lit)]


def _wlit(i: Symbol, pos: int, wlit: Tuple[int, int]) -> Sequence[Symbol]:
    # pylint: disable=unused-argument
    return [i, Number(wlit[0]), Number(wlit[1])]


class Reifier(Observer):
    '''
    An observer that will gather the symbols of the reification, in the same way as `clingo --output=reify`.

    Parameters
    ----------
    cb
        A callback function that will be called with each symbol of the reification
    calculate_sccs
        Flag to calculate the SCCs
    reify_steps
        Flag to add a number as the last argument of all reification symbols for the corresponding step

    '''
    # pylint:disable=too-many-public-methods
    _step: int
    # Bug in mypy???
    # _cb: Callable[[Symbol], None]
    _calculate_sccs: bool
    _reify_steps: bool
    _step_data: _StepData

    def __init__(self, cb: Callable[[Symbol], None], calculate_sccs: bool = False, reify_steps: bool = False):
        self._step = 0
        self._cb = cb
        self._calculate_sccs = calculate_sccs
        self._reify_steps = reify_steps
        self._step_data = _StepData()

    def calculate_sccs(self) -> None:
        '''
        Trigger computation of SCCs.

        SCCs can only be computed if the Reifier has been initialized with
        `calculate_sccs=True`, This function is called automatically if
        `reify_steps=True` has been set when initializing the Reifier.
        '''
        for idx, scc in enumerate(self._step_data.graph.tarjan()):
            for atm in scc:
                self._output('scc', [Number(idx), Number(atm)])

    def _add_edges(self, head: Sequence[int], body: Sequence[int]):
        if self._calculate_sccs:
            for u in head:
                for v in body:
                    if v > 0:
                        self._step_data.graph.add_edge(u, v)

    def _output(self, name: str, args: Sequence[Symbol]):
        if self._reify_steps:
            args = list(args) + [Number(self._step)]
        self._cb(Function(name, args))

    def _tuple(self, name: str,
               snmap: Dict[Sequence[U], int],
               elems: Sequence[U],
               afun: Callable[[Symbol, int, U], Sequence[Symbol]],
               ordered: bool = False) -> Symbol:
        pruned: Sequence[U]
        if ordered:
            pruned = elems
            ident = tuple(elems)
        else:
            seen: Set[U] = set()
            pruned = []
            for elem in elems:
                if elem not in seen:
                    seen.add(elem)
                    pruned.append(elem)
            ident = tuple(sorted(pruned))

        n = len(snmap)
        i = Number(snmap.setdefault(ident, n))
        if n == i.number:
            self._output(name, [i])
            for idx, atm in enumerate(pruned):
                self._output(name, afun(i, idx, atm))
        return i

    def _atom_tuple(self, atoms: Sequence[int]):
        return self._tuple("atom_tuple", self._step_data.atom_tuples, atoms, _lit)

    def _lit_tuple(self, lits: Sequence[int]):
        return self._tuple("literal_tuple", self._step_data.lit_tuples, lits, _lit)

    def _wlit_tuple(self, wlits: Sequence[Tuple[int, int]]):
        return self._tuple("weighted_literal_tuple", self._step_data.wlit_tuples, wlits, _wlit)

    def init_program(self, incremental: bool) -> None:
        if incremental:
            self._cb(Function("tag", [Function("incremental")]))

    def begin_step(self) -> None:
        pass

    def rule(self, choice: bool, head: Sequence[int], body: Sequence[int]) -> None:
        hn = "choice" if choice else "disjunction"
        hd = Function(hn, [self._atom_tuple(head)])
        bd = Function("normal", [self._lit_tuple(body)])
        self._output("rule", [hd, bd])
        self._add_edges(head, body)

    def weight_rule(self, choice: bool, head: Sequence[int], lower_bound: int,
                    body: Sequence[Tuple[int, int]]) -> None:
        hn = "choice" if choice else "disjunction"
        hd = Function(hn, [self._atom_tuple(head)])
        bd = Function("sum", [self._wlit_tuple(body), Number(lower_bound)])
        self._output("rule", [hd, bd])
        self._add_edges(head, [lit for lit, w in body])

    def minimize(self, priority: int, literals: Sequence[Tuple[int, int]]) -> None:
        self._output("minimize", [Number(priority), self._wlit_tuple(literals)])

    def project(self, atoms: Sequence[int]) -> None:
        for atom in atoms:
            self._output("project", [Number(atom)])

    def output_atom(self, symbol: Symbol, atom: int) -> None:
        self._output("output", [symbol, self._lit_tuple([] if atom == 0 else [atom])])

    def output_term(self, symbol: Symbol, condition: Sequence[int]) -> None:
        self._output("output", [symbol, self._lit_tuple(condition)])

    def output_csp(self, symbol: Symbol, value: int,
                   condition: Sequence[int]) -> None:
        self._output("output_csp", [symbol, Number(value), self._lit_tuple(condition)])

    def external(self, atom: int, value: TruthValue) -> None:
        value_name = str(value).replace('TruthValue.', '').lower().rstrip('_')
        self._output("external", [Number(atom), Function(value_name)])

    def assume(self, literals: Sequence[int]) -> None:
        for lit in literals:
            self._output("assume", [Number(lit)])

    def heuristic(self, atom: int, type_: HeuristicType, bias: int,
                  priority: int, condition: Sequence[int]) -> None:
        type_name = str(type_).replace('HeuristicType.', '').lower().rstrip('_')
        condition_lit = self._lit_tuple(condition)
        self._output("heuristic", [Number(atom),
                                   Function(type_name),
                                   Number(bias),
                                   Number(priority),
                                   condition_lit])

    def acyc_edge(self, node_u: int, node_v: int,
                  condition: Sequence[int]) -> None:
        self._output("edge", [Number(node_u), Number(node_v), self._lit_tuple(condition)])

    def theory_term_number(self, term_id: int, number: int) -> None:
        self._output("theory_number", [Number(term_id), Number(number)])

    def theory_term_string(self, term_id: int, name: str) -> None:
        self._output("theory_string", [Number(term_id), String(name)])

    def theory_term_compound(self, term_id: int, name_id_or_type: int,
                             arguments: Sequence[int]) -> None:
        names = {-1: "tuple", -2: "set", -3: "list"}
        if name_id_or_type in names:
            name = "theory_sequence"
            value = Function(names[name_id_or_type])
        else:
            name = "theory_function"
            value = Number(name_id_or_type)
        tuple_id = self._tuple("theory_tuple", self._step_data.theory_tuples, arguments, _theory, True)
        self._output(name, [Number(term_id), value, tuple_id])

    def theory_element(self, element_id: int, terms: Sequence[int],
                       condition: Sequence[int]) -> None:
        tuple_id = self._tuple("theory_tuple", self._step_data.theory_tuples, terms, _theory, True)
        condition_id = self._tuple("literal_tuple", self._step_data.lit_tuples, condition, _lit)
        self._output("theory_element", [Number(element_id), tuple_id, condition_id])

    def theory_atom(self, atom_id_or_zero: int, term_id: int,
                    elements: Sequence[int]) -> None:
        tuple_e_id = self._tuple("theory_element_tuple", self._step_data.theory_element_tuples, elements, _lit)
        self._output("theory_atom", [Number(atom_id_or_zero), Number(term_id), tuple_e_id])

    def theory_atom_with_guard(self, atom_id_or_zero: int, term_id: int,
                               elements: Sequence[int], operator_id: int,
                               right_hand_side_id: int) -> None:
        tuple_id = self._tuple("theory_element_tuple", self._step_data.theory_element_tuples, elements, _lit)
        self._output("theory_atom", [Number(atom_id_or_zero),
                                     Number(term_id),
                                     tuple_id,
                                     Number(operator_id),
                                     Number(right_hand_side_id)])

    def end_step(self) -> None:
        if self._reify_steps:
            self.calculate_sccs()
            self._step += 1
            self._step_data = _StepData()


def _set(matches: Sequence[Tuple[str, int]], lst: List[Symbol], sym,
         append: bool = False, default: Symbol = Number(0)) -> bool:
    for match in matches:
        if not sym.match(*match):
            continue
        idx = len(lst) if append else sym.arguments[0].number
        while len(lst) <= idx:
            lst.append(default)
        lst[idx] = sym
        return True
    return False


def _ensure(name: str, lst: List[List[int]], sym: Symbol, ordered=False) -> bool:
    empty = sym.match(name, 1)
    if empty or sym.match(name, 3 if ordered else 2):
        idx = sym.arguments[0].number
        while len(lst) <= idx:
            lst.append([])
        if not empty:
            if ordered:
                tup = lst[idx]
                jdx = sym.arguments[1].number
                while len(tup) <= jdx:
                    tup.append(0)
                tup[jdx] = sym.arguments[2].number
            else:
                lst[idx].append(sym.arguments[1].number)
        return True
    return False


class ReifiedTheory:
    '''
    Class indexing the symbols related to a theory.

    The `ReifiedTheoryTerm`, `ReifiedTheoryElement`, and `ReifiedTheoryElement`
    classes provide views on this data that behave as the corresponding classes
    in clingo's `clingo.theory_atoms` module.
    '''
    terms: List[Symbol]
    elements: List[Symbol]
    atoms: List[Symbol]
    term_tuples: List[List[int]]
    element_tuples: List[List[int]]

    def __init__(self, symbols: Sequence[Symbol]):
        self.terms = []
        self.elements = []
        self.atoms = []
        self.term_tuples = []
        self.element_tuples = []

        for sym in symbols:
            _ = (_set((('theory_atom', 3), ('theory_atom', 5)), self.atoms, sym, True) or
                 _set((('theory_element', 3),), self.elements, sym) or
                 _set((('theory_sequence', 3), ('theory_string', 2),
                       ('theory_number', 2), ('theory_function', 3)), self.terms, sym) or
                 _ensure('theory_tuple', self.term_tuples, sym, True) or
                 _ensure('theory_element_tuple', self.element_tuples, sym))

    def __iter__(self) -> Iterator['ReifiedTheoryAtom']:
        for idx in range(len(self.atoms)):
            yield ReifiedTheoryAtom(idx, self)


class ReifiedTheoryTerm:
    '''
    Class to represent theory terms.

    ReifiedTheory terms have a readable string representation, implement Python's rich
    comparison operators, and can be used as dictionary keys.
    '''
    _idx: int
    _theory: ReifiedTheory

    def __init__(self, idx: int, theory: ReifiedTheory):
        self._idx = idx
        self._theory = theory
        assert self.index < len(theory.terms)

    @property
    def index(self) -> int:
        '''
        The index of the corresponding reified fact.
        '''
        return self._idx

    @property
    def _args(self) -> Sequence[Symbol]:
        return self._theory.terms[self._idx].arguments

    @property
    def arguments(self) -> List['ReifiedTheoryTerm']:
        '''
        The arguments of the term (for functions, tuples, list, and sets).
        '''
        assert self.type in (TheoryTermType.List, TheoryTermType.Set,
                             TheoryTermType.Tuple, TheoryTermType.Function)
        term_ids = self._theory.term_tuples[self._args[2].number]
        return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids]

    @property
    def name(self) -> str:
        '''
        The name of the term (for symbols and functions).
        '''
        assert self.type in (TheoryTermType.Symbol, TheoryTermType.Function)
        if self.type == TheoryTermType.Function:
            return self._theory.terms[self._args[1].number].arguments[1].string
        return self._args[1].string

    @property
    def number(self) -> int:
        '''
        The numeric representation of the term (for numbers).
        '''
        assert self.type == TheoryTermType.Number
        return self._args[1].number

    @property
    def type(self) -> TheoryTermType:
        '''
        The type of the theory term.
        '''
        name = self._theory.terms[self._idx].name
        if name == "theory_number":
            return TheoryTermType.Number
        if name == "theory_string":
            return TheoryTermType.Symbol
        if name == "theory_function":
            return TheoryTermType.Function
        assert name == "theory_sequence"
        type_ = self._args[1].name
        if type_ == "tuple":
            return TheoryTermType.Tuple
        if type_ == "set":
            return TheoryTermType.Set
        assert type_ == "list"
        return TheoryTermType.List

    def __hash__(self):
        return self._idx

    def __eq__(self, other):
        return self._idx == other._idx

    def __lt__(self, other):
        return self._idx < other._idx

    def __str__(self):
        type_ = self.type

        if type_ == TheoryTermType.Number:
            return f'{self.number}'

        if type_ == TheoryTermType.Symbol:
            return f'{self.name}'

        if type_ == TheoryTermType.Function:
            args = self.arguments
            name = self.name
            if len(args) == 1 and is_operator(name):
                return f'{name}({args[0]})'
            if len(args) == 2 and is_operator(name):
                return f'({args[0]}){name}({args[1]})'
            return f'{name}({",".join(str(arg) for arg in args)})'

        if type_ == TheoryTermType.Tuple:
            lhs, rhs = '(', ')'
        elif type_ == TheoryTermType.List:
            lhs, rhs = '[', ']'
        else:
            lhs, rhs = '{', '}'
        return f'{lhs}{",".join(str(arg) for arg in self.arguments)}{rhs}'


class ReifiedTheoryElement:
    '''
    Class to represent theory elements.

    ReifiedTheory elements have a readable string representation, implement Python's
    rich comparison operators, and can be used as dictionary keys.
    '''
    _idx: int
    _theory: ReifiedTheory

    def __init__(self, idx: int, theory: ReifiedTheory):
        self._idx = idx
        self._theory = theory
        assert self.index < len(theory.elements)

    @property
    def index(self) -> int:
        '''
        The index of the corresponding reified fact.
        '''
        return self._idx

    @property
    def _args(self) -> Sequence[Symbol]:
        return self._theory.elements[self._idx].arguments

    @property
    def condition_id(self) -> int:
        '''
        The id of the literal tuple of the condition.
        '''
        return self._args[2].number

    @property
    def terms(self) -> List[ReifiedTheoryTerm]:
        '''
        The tuple of the element.
        '''
        term_ids = self._theory.term_tuples[self._args[1].number]
        return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids]

    def __hash__(self):
        return self._idx

    def __eq__(self, other):
        return self._idx == other._idx

    def __lt__(self, other):
        return self._idx < other._idx

    def __str__(self):
        return f'{",".join(str(term) for term in self.terms)}: literal_tuple({self.condition_id})'


class ReifiedTheoryAtom:
    '''
    Class to represent theory atoms.

    Theory atoms have a readable string representation, implement Python's rich
    comparison operators, and can be used as dictionary keys.
    '''
    _idx: int
    _theory: ReifiedTheory

    def __init__(self, idx: int, theory: ReifiedTheory):
        self._idx = idx
        self._theory = theory
        assert self.index < len(theory.atoms)

    @property
    def index(self) -> int:
        '''
        The index of the corresponding reified fact.
        '''
        return self._idx

    @property
    def _args(self) -> Sequence[Symbol]:
        return self._theory.atoms[self._idx].arguments

    @property
    def elements(self) -> List[ReifiedTheoryElement]:
        '''
        The elements of the atom.
        '''
        tuple_id = self._args[2].number
        return [ReifiedTheoryElement(elem_id, self._theory)
                for elem_id in self._theory.element_tuples[tuple_id]]

    @property
    def guard(self) -> Optional[Tuple[str, ReifiedTheoryTerm]]:
        '''
        The guard of the atom or None if the atom has no guard.
        '''
        args = self._args
        if len(args) <= 3:
            return None

        op = self._theory.terms[args[3].number].arguments[1].string
        return (op, ReifiedTheoryTerm(args[4].number, self._theory))

    @property
    def literal(self) -> int:
        '''
        The reified literal associated with the atom.
        '''
        return self._args[0].number

    @property
    def term(self) -> ReifiedTheoryTerm:
        '''
        The term of the atom.
        '''
        return ReifiedTheoryTerm(self._args[1].number, self._theory)

    def __hash__(self):
        return self._idx

    def __eq__(self, other):
        return self._idx == other._idx

    def __lt__(self, other):
        return self._idx < other._idx

    def __str__(self):
        name = f'&{self.term}'

        elems = self.elements
        if elems:
            estr = f' {{ {"; ".join(str(elem) for elem in elems)} }}'
        else:
            estr = ''

        guard = self.guard
        if guard:
            gstr = f' {guard[0]} {guard[1]}'
        else:
            gstr = ''

        return f'{name}{estr}{gstr}'


def reify_program(prg: str, calculate_sccs: bool = False, reify_steps: bool = False) -> List[Symbol]:
    '''
    Reify the given program and return the reified symbols.

    Parameters
    ----------
    prg
        The program to reify in form of a string.
    calculate_sccs
        Whether to calculate SCCs of the reified program.
    reify_steps
        Whether to add a step number to the reified facts.

    Returns
    -------
    A list of symbols containing the reified facts.
    '''
    ret: List[Symbol] = []
    ctl = Control()
    reifier = Reifier(ret.append, calculate_sccs, reify_steps)
    ctl.register_observer(reifier)
    ctl.add("base", [], prg)
    ctl.ground([('base', [])])
    if calculate_sccs and not reify_steps:
        reifier.calculate_sccs()

    return ret

Functions

def reify_program(prg: str, calculate_sccs: bool = False, reify_steps: bool = False) ‑> List[Symbol]

Reify the given program and return the reified symbols.

Parameters

prg
The program to reify in form of a string.
calculate_sccs
Whether to calculate SCCs of the reified program.
reify_steps
Whether to add a step number to the reified facts.

Returns

A list of symbols containing the reified facts.

Expand source code
def reify_program(prg: str, calculate_sccs: bool = False, reify_steps: bool = False) -> List[Symbol]:
    '''
    Reify the given program and return the reified symbols.

    Parameters
    ----------
    prg
        The program to reify in form of a string.
    calculate_sccs
        Whether to calculate SCCs of the reified program.
    reify_steps
        Whether to add a step number to the reified facts.

    Returns
    -------
    A list of symbols containing the reified facts.
    '''
    ret: List[Symbol] = []
    ctl = Control()
    reifier = Reifier(ret.append, calculate_sccs, reify_steps)
    ctl.register_observer(reifier)
    ctl.add("base", [], prg)
    ctl.ground([('base', [])])
    if calculate_sccs and not reify_steps:
        reifier.calculate_sccs()

    return ret

Classes

class ReifiedTheory (symbols: Sequence[Symbol])

Class indexing the symbols related to a theory.

The ReifiedTheoryTerm, ReifiedTheoryElement, and ReifiedTheoryElement classes provide views on this data that behave as the corresponding classes in clingo's clingo.theory_atoms module.

Expand source code
class ReifiedTheory:
    '''
    Class indexing the symbols related to a theory.

    The `ReifiedTheoryTerm`, `ReifiedTheoryElement`, and `ReifiedTheoryElement`
    classes provide views on this data that behave as the corresponding classes
    in clingo's `clingo.theory_atoms` module.
    '''
    terms: List[Symbol]
    elements: List[Symbol]
    atoms: List[Symbol]
    term_tuples: List[List[int]]
    element_tuples: List[List[int]]

    def __init__(self, symbols: Sequence[Symbol]):
        self.terms = []
        self.elements = []
        self.atoms = []
        self.term_tuples = []
        self.element_tuples = []

        for sym in symbols:
            _ = (_set((('theory_atom', 3), ('theory_atom', 5)), self.atoms, sym, True) or
                 _set((('theory_element', 3),), self.elements, sym) or
                 _set((('theory_sequence', 3), ('theory_string', 2),
                       ('theory_number', 2), ('theory_function', 3)), self.terms, sym) or
                 _ensure('theory_tuple', self.term_tuples, sym, True) or
                 _ensure('theory_element_tuple', self.element_tuples, sym))

    def __iter__(self) -> Iterator['ReifiedTheoryAtom']:
        for idx in range(len(self.atoms)):
            yield ReifiedTheoryAtom(idx, self)

Class variables

var atoms : List[Symbol]
var element_tuples : List[List[int]]
var elements : List[Symbol]
var term_tuples : List[List[int]]
var terms : List[Symbol]
class ReifiedTheoryAtom (idx: int, theory: ReifiedTheory)

Class to represent theory atoms.

Theory atoms have a readable string representation, implement Python's rich comparison operators, and can be used as dictionary keys.

Expand source code
class ReifiedTheoryAtom:
    '''
    Class to represent theory atoms.

    Theory atoms have a readable string representation, implement Python's rich
    comparison operators, and can be used as dictionary keys.
    '''
    _idx: int
    _theory: ReifiedTheory

    def __init__(self, idx: int, theory: ReifiedTheory):
        self._idx = idx
        self._theory = theory
        assert self.index < len(theory.atoms)

    @property
    def index(self) -> int:
        '''
        The index of the corresponding reified fact.
        '''
        return self._idx

    @property
    def _args(self) -> Sequence[Symbol]:
        return self._theory.atoms[self._idx].arguments

    @property
    def elements(self) -> List[ReifiedTheoryElement]:
        '''
        The elements of the atom.
        '''
        tuple_id = self._args[2].number
        return [ReifiedTheoryElement(elem_id, self._theory)
                for elem_id in self._theory.element_tuples[tuple_id]]

    @property
    def guard(self) -> Optional[Tuple[str, ReifiedTheoryTerm]]:
        '''
        The guard of the atom or None if the atom has no guard.
        '''
        args = self._args
        if len(args) <= 3:
            return None

        op = self._theory.terms[args[3].number].arguments[1].string
        return (op, ReifiedTheoryTerm(args[4].number, self._theory))

    @property
    def literal(self) -> int:
        '''
        The reified literal associated with the atom.
        '''
        return self._args[0].number

    @property
    def term(self) -> ReifiedTheoryTerm:
        '''
        The term of the atom.
        '''
        return ReifiedTheoryTerm(self._args[1].number, self._theory)

    def __hash__(self):
        return self._idx

    def __eq__(self, other):
        return self._idx == other._idx

    def __lt__(self, other):
        return self._idx < other._idx

    def __str__(self):
        name = f'&{self.term}'

        elems = self.elements
        if elems:
            estr = f' {{ {"; ".join(str(elem) for elem in elems)} }}'
        else:
            estr = ''

        guard = self.guard
        if guard:
            gstr = f' {guard[0]} {guard[1]}'
        else:
            gstr = ''

        return f'{name}{estr}{gstr}'

Instance variables

var elements : List[ReifiedTheoryElement]

The elements of the atom.

Expand source code
@property
def elements(self) -> List[ReifiedTheoryElement]:
    '''
    The elements of the atom.
    '''
    tuple_id = self._args[2].number
    return [ReifiedTheoryElement(elem_id, self._theory)
            for elem_id in self._theory.element_tuples[tuple_id]]
var guard : Optional[Tuple[str, ReifiedTheoryTerm]]

The guard of the atom or None if the atom has no guard.

Expand source code
@property
def guard(self) -> Optional[Tuple[str, ReifiedTheoryTerm]]:
    '''
    The guard of the atom or None if the atom has no guard.
    '''
    args = self._args
    if len(args) <= 3:
        return None

    op = self._theory.terms[args[3].number].arguments[1].string
    return (op, ReifiedTheoryTerm(args[4].number, self._theory))
var index : int

The index of the corresponding reified fact.

Expand source code
@property
def index(self) -> int:
    '''
    The index of the corresponding reified fact.
    '''
    return self._idx
var literal : int

The reified literal associated with the atom.

Expand source code
@property
def literal(self) -> int:
    '''
    The reified literal associated with the atom.
    '''
    return self._args[0].number
var termReifiedTheoryTerm

The term of the atom.

Expand source code
@property
def term(self) -> ReifiedTheoryTerm:
    '''
    The term of the atom.
    '''
    return ReifiedTheoryTerm(self._args[1].number, self._theory)
class ReifiedTheoryElement (idx: int, theory: ReifiedTheory)

Class to represent theory elements.

ReifiedTheory elements have a readable string representation, implement Python's rich comparison operators, and can be used as dictionary keys.

Expand source code
class ReifiedTheoryElement:
    '''
    Class to represent theory elements.

    ReifiedTheory elements have a readable string representation, implement Python's
    rich comparison operators, and can be used as dictionary keys.
    '''
    _idx: int
    _theory: ReifiedTheory

    def __init__(self, idx: int, theory: ReifiedTheory):
        self._idx = idx
        self._theory = theory
        assert self.index < len(theory.elements)

    @property
    def index(self) -> int:
        '''
        The index of the corresponding reified fact.
        '''
        return self._idx

    @property
    def _args(self) -> Sequence[Symbol]:
        return self._theory.elements[self._idx].arguments

    @property
    def condition_id(self) -> int:
        '''
        The id of the literal tuple of the condition.
        '''
        return self._args[2].number

    @property
    def terms(self) -> List[ReifiedTheoryTerm]:
        '''
        The tuple of the element.
        '''
        term_ids = self._theory.term_tuples[self._args[1].number]
        return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids]

    def __hash__(self):
        return self._idx

    def __eq__(self, other):
        return self._idx == other._idx

    def __lt__(self, other):
        return self._idx < other._idx

    def __str__(self):
        return f'{",".join(str(term) for term in self.terms)}: literal_tuple({self.condition_id})'

Instance variables

var condition_id : int

The id of the literal tuple of the condition.

Expand source code
@property
def condition_id(self) -> int:
    '''
    The id of the literal tuple of the condition.
    '''
    return self._args[2].number
var index : int

The index of the corresponding reified fact.

Expand source code
@property
def index(self) -> int:
    '''
    The index of the corresponding reified fact.
    '''
    return self._idx
var terms : List[ReifiedTheoryTerm]

The tuple of the element.

Expand source code
@property
def terms(self) -> List[ReifiedTheoryTerm]:
    '''
    The tuple of the element.
    '''
    term_ids = self._theory.term_tuples[self._args[1].number]
    return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids]
class ReifiedTheoryTerm (idx: int, theory: ReifiedTheory)

Class to represent theory terms.

ReifiedTheory terms have a readable string representation, implement Python's rich comparison operators, and can be used as dictionary keys.

Expand source code
class ReifiedTheoryTerm:
    '''
    Class to represent theory terms.

    ReifiedTheory terms have a readable string representation, implement Python's rich
    comparison operators, and can be used as dictionary keys.
    '''
    _idx: int
    _theory: ReifiedTheory

    def __init__(self, idx: int, theory: ReifiedTheory):
        self._idx = idx
        self._theory = theory
        assert self.index < len(theory.terms)

    @property
    def index(self) -> int:
        '''
        The index of the corresponding reified fact.
        '''
        return self._idx

    @property
    def _args(self) -> Sequence[Symbol]:
        return self._theory.terms[self._idx].arguments

    @property
    def arguments(self) -> List['ReifiedTheoryTerm']:
        '''
        The arguments of the term (for functions, tuples, list, and sets).
        '''
        assert self.type in (TheoryTermType.List, TheoryTermType.Set,
                             TheoryTermType.Tuple, TheoryTermType.Function)
        term_ids = self._theory.term_tuples[self._args[2].number]
        return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids]

    @property
    def name(self) -> str:
        '''
        The name of the term (for symbols and functions).
        '''
        assert self.type in (TheoryTermType.Symbol, TheoryTermType.Function)
        if self.type == TheoryTermType.Function:
            return self._theory.terms[self._args[1].number].arguments[1].string
        return self._args[1].string

    @property
    def number(self) -> int:
        '''
        The numeric representation of the term (for numbers).
        '''
        assert self.type == TheoryTermType.Number
        return self._args[1].number

    @property
    def type(self) -> TheoryTermType:
        '''
        The type of the theory term.
        '''
        name = self._theory.terms[self._idx].name
        if name == "theory_number":
            return TheoryTermType.Number
        if name == "theory_string":
            return TheoryTermType.Symbol
        if name == "theory_function":
            return TheoryTermType.Function
        assert name == "theory_sequence"
        type_ = self._args[1].name
        if type_ == "tuple":
            return TheoryTermType.Tuple
        if type_ == "set":
            return TheoryTermType.Set
        assert type_ == "list"
        return TheoryTermType.List

    def __hash__(self):
        return self._idx

    def __eq__(self, other):
        return self._idx == other._idx

    def __lt__(self, other):
        return self._idx < other._idx

    def __str__(self):
        type_ = self.type

        if type_ == TheoryTermType.Number:
            return f'{self.number}'

        if type_ == TheoryTermType.Symbol:
            return f'{self.name}'

        if type_ == TheoryTermType.Function:
            args = self.arguments
            name = self.name
            if len(args) == 1 and is_operator(name):
                return f'{name}({args[0]})'
            if len(args) == 2 and is_operator(name):
                return f'({args[0]}){name}({args[1]})'
            return f'{name}({",".join(str(arg) for arg in args)})'

        if type_ == TheoryTermType.Tuple:
            lhs, rhs = '(', ')'
        elif type_ == TheoryTermType.List:
            lhs, rhs = '[', ']'
        else:
            lhs, rhs = '{', '}'
        return f'{lhs}{",".join(str(arg) for arg in self.arguments)}{rhs}'

Instance variables

var arguments : List[ReifiedTheoryTerm]

The arguments of the term (for functions, tuples, list, and sets).

Expand source code
@property
def arguments(self) -> List['ReifiedTheoryTerm']:
    '''
    The arguments of the term (for functions, tuples, list, and sets).
    '''
    assert self.type in (TheoryTermType.List, TheoryTermType.Set,
                         TheoryTermType.Tuple, TheoryTermType.Function)
    term_ids = self._theory.term_tuples[self._args[2].number]
    return [ReifiedTheoryTerm(term_id, self._theory) for term_id in term_ids]
var index : int

The index of the corresponding reified fact.

Expand source code
@property
def index(self) -> int:
    '''
    The index of the corresponding reified fact.
    '''
    return self._idx
var name : str

The name of the term (for symbols and functions).

Expand source code
@property
def name(self) -> str:
    '''
    The name of the term (for symbols and functions).
    '''
    assert self.type in (TheoryTermType.Symbol, TheoryTermType.Function)
    if self.type == TheoryTermType.Function:
        return self._theory.terms[self._args[1].number].arguments[1].string
    return self._args[1].string
var number : int

The numeric representation of the term (for numbers).

Expand source code
@property
def number(self) -> int:
    '''
    The numeric representation of the term (for numbers).
    '''
    assert self.type == TheoryTermType.Number
    return self._args[1].number
var typeTheoryTermType

The type of the theory term.

Expand source code
@property
def type(self) -> TheoryTermType:
    '''
    The type of the theory term.
    '''
    name = self._theory.terms[self._idx].name
    if name == "theory_number":
        return TheoryTermType.Number
    if name == "theory_string":
        return TheoryTermType.Symbol
    if name == "theory_function":
        return TheoryTermType.Function
    assert name == "theory_sequence"
    type_ = self._args[1].name
    if type_ == "tuple":
        return TheoryTermType.Tuple
    if type_ == "set":
        return TheoryTermType.Set
    assert type_ == "list"
    return TheoryTermType.List
class Reifier (cb: Callable[[Symbol], None], calculate_sccs: bool = False, reify_steps: bool = False)

An observer that will gather the symbols of the reification, in the same way as clingo --output=reify.

Parameters

cb
A callback function that will be called with each symbol of the reification
calculate_sccs
Flag to calculate the SCCs
reify_steps
Flag to add a number as the last argument of all reification symbols for the corresponding step
Expand source code
class Reifier(Observer):
    '''
    An observer that will gather the symbols of the reification, in the same way as `clingo --output=reify`.

    Parameters
    ----------
    cb
        A callback function that will be called with each symbol of the reification
    calculate_sccs
        Flag to calculate the SCCs
    reify_steps
        Flag to add a number as the last argument of all reification symbols for the corresponding step

    '''
    # pylint:disable=too-many-public-methods
    _step: int
    # Bug in mypy???
    # _cb: Callable[[Symbol], None]
    _calculate_sccs: bool
    _reify_steps: bool
    _step_data: _StepData

    def __init__(self, cb: Callable[[Symbol], None], calculate_sccs: bool = False, reify_steps: bool = False):
        self._step = 0
        self._cb = cb
        self._calculate_sccs = calculate_sccs
        self._reify_steps = reify_steps
        self._step_data = _StepData()

    def calculate_sccs(self) -> None:
        '''
        Trigger computation of SCCs.

        SCCs can only be computed if the Reifier has been initialized with
        `calculate_sccs=True`, This function is called automatically if
        `reify_steps=True` has been set when initializing the Reifier.
        '''
        for idx, scc in enumerate(self._step_data.graph.tarjan()):
            for atm in scc:
                self._output('scc', [Number(idx), Number(atm)])

    def _add_edges(self, head: Sequence[int], body: Sequence[int]):
        if self._calculate_sccs:
            for u in head:
                for v in body:
                    if v > 0:
                        self._step_data.graph.add_edge(u, v)

    def _output(self, name: str, args: Sequence[Symbol]):
        if self._reify_steps:
            args = list(args) + [Number(self._step)]
        self._cb(Function(name, args))

    def _tuple(self, name: str,
               snmap: Dict[Sequence[U], int],
               elems: Sequence[U],
               afun: Callable[[Symbol, int, U], Sequence[Symbol]],
               ordered: bool = False) -> Symbol:
        pruned: Sequence[U]
        if ordered:
            pruned = elems
            ident = tuple(elems)
        else:
            seen: Set[U] = set()
            pruned = []
            for elem in elems:
                if elem not in seen:
                    seen.add(elem)
                    pruned.append(elem)
            ident = tuple(sorted(pruned))

        n = len(snmap)
        i = Number(snmap.setdefault(ident, n))
        if n == i.number:
            self._output(name, [i])
            for idx, atm in enumerate(pruned):
                self._output(name, afun(i, idx, atm))
        return i

    def _atom_tuple(self, atoms: Sequence[int]):
        return self._tuple("atom_tuple", self._step_data.atom_tuples, atoms, _lit)

    def _lit_tuple(self, lits: Sequence[int]):
        return self._tuple("literal_tuple", self._step_data.lit_tuples, lits, _lit)

    def _wlit_tuple(self, wlits: Sequence[Tuple[int, int]]):
        return self._tuple("weighted_literal_tuple", self._step_data.wlit_tuples, wlits, _wlit)

    def init_program(self, incremental: bool) -> None:
        if incremental:
            self._cb(Function("tag", [Function("incremental")]))

    def begin_step(self) -> None:
        pass

    def rule(self, choice: bool, head: Sequence[int], body: Sequence[int]) -> None:
        hn = "choice" if choice else "disjunction"
        hd = Function(hn, [self._atom_tuple(head)])
        bd = Function("normal", [self._lit_tuple(body)])
        self._output("rule", [hd, bd])
        self._add_edges(head, body)

    def weight_rule(self, choice: bool, head: Sequence[int], lower_bound: int,
                    body: Sequence[Tuple[int, int]]) -> None:
        hn = "choice" if choice else "disjunction"
        hd = Function(hn, [self._atom_tuple(head)])
        bd = Function("sum", [self._wlit_tuple(body), Number(lower_bound)])
        self._output("rule", [hd, bd])
        self._add_edges(head, [lit for lit, w in body])

    def minimize(self, priority: int, literals: Sequence[Tuple[int, int]]) -> None:
        self._output("minimize", [Number(priority), self._wlit_tuple(literals)])

    def project(self, atoms: Sequence[int]) -> None:
        for atom in atoms:
            self._output("project", [Number(atom)])

    def output_atom(self, symbol: Symbol, atom: int) -> None:
        self._output("output", [symbol, self._lit_tuple([] if atom == 0 else [atom])])

    def output_term(self, symbol: Symbol, condition: Sequence[int]) -> None:
        self._output("output", [symbol, self._lit_tuple(condition)])

    def output_csp(self, symbol: Symbol, value: int,
                   condition: Sequence[int]) -> None:
        self._output("output_csp", [symbol, Number(value), self._lit_tuple(condition)])

    def external(self, atom: int, value: TruthValue) -> None:
        value_name = str(value).replace('TruthValue.', '').lower().rstrip('_')
        self._output("external", [Number(atom), Function(value_name)])

    def assume(self, literals: Sequence[int]) -> None:
        for lit in literals:
            self._output("assume", [Number(lit)])

    def heuristic(self, atom: int, type_: HeuristicType, bias: int,
                  priority: int, condition: Sequence[int]) -> None:
        type_name = str(type_).replace('HeuristicType.', '').lower().rstrip('_')
        condition_lit = self._lit_tuple(condition)
        self._output("heuristic", [Number(atom),
                                   Function(type_name),
                                   Number(bias),
                                   Number(priority),
                                   condition_lit])

    def acyc_edge(self, node_u: int, node_v: int,
                  condition: Sequence[int]) -> None:
        self._output("edge", [Number(node_u), Number(node_v), self._lit_tuple(condition)])

    def theory_term_number(self, term_id: int, number: int) -> None:
        self._output("theory_number", [Number(term_id), Number(number)])

    def theory_term_string(self, term_id: int, name: str) -> None:
        self._output("theory_string", [Number(term_id), String(name)])

    def theory_term_compound(self, term_id: int, name_id_or_type: int,
                             arguments: Sequence[int]) -> None:
        names = {-1: "tuple", -2: "set", -3: "list"}
        if name_id_or_type in names:
            name = "theory_sequence"
            value = Function(names[name_id_or_type])
        else:
            name = "theory_function"
            value = Number(name_id_or_type)
        tuple_id = self._tuple("theory_tuple", self._step_data.theory_tuples, arguments, _theory, True)
        self._output(name, [Number(term_id), value, tuple_id])

    def theory_element(self, element_id: int, terms: Sequence[int],
                       condition: Sequence[int]) -> None:
        tuple_id = self._tuple("theory_tuple", self._step_data.theory_tuples, terms, _theory, True)
        condition_id = self._tuple("literal_tuple", self._step_data.lit_tuples, condition, _lit)
        self._output("theory_element", [Number(element_id), tuple_id, condition_id])

    def theory_atom(self, atom_id_or_zero: int, term_id: int,
                    elements: Sequence[int]) -> None:
        tuple_e_id = self._tuple("theory_element_tuple", self._step_data.theory_element_tuples, elements, _lit)
        self._output("theory_atom", [Number(atom_id_or_zero), Number(term_id), tuple_e_id])

    def theory_atom_with_guard(self, atom_id_or_zero: int, term_id: int,
                               elements: Sequence[int], operator_id: int,
                               right_hand_side_id: int) -> None:
        tuple_id = self._tuple("theory_element_tuple", self._step_data.theory_element_tuples, elements, _lit)
        self._output("theory_atom", [Number(atom_id_or_zero),
                                     Number(term_id),
                                     tuple_id,
                                     Number(operator_id),
                                     Number(right_hand_side_id)])

    def end_step(self) -> None:
        if self._reify_steps:
            self.calculate_sccs()
            self._step += 1
            self._step_data = _StepData()

Ancestors

Methods

def calculate_sccs(self) ‑> None

Trigger computation of SCCs.

SCCs can only be computed if the Reifier has been initialized with calculate_sccs=True, This function is called automatically if reify_steps=True has been set when initializing the Reifier.

Expand source code
def calculate_sccs(self) -> None:
    '''
    Trigger computation of SCCs.

    SCCs can only be computed if the Reifier has been initialized with
    `calculate_sccs=True`, This function is called automatically if
    `reify_steps=True` has been set when initializing the Reifier.
    '''
    for idx, scc in enumerate(self._step_data.graph.tarjan()):
        for atm in scc:
            self._output('scc', [Number(idx), Number(atm)])

Inherited members