Skip to content

Commit

Permalink
Merge pull request #8 from krr-up/nico/fclingo
Browse files Browse the repository at this point in the history
Nico/fclingo
  • Loading branch information
nrueh authored Apr 8, 2024
2 parents 4948c5f + 8db1eb5 commit 642f8c2
Show file tree
Hide file tree
Showing 10 changed files with 386 additions and 218 deletions.
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ requires-python = ">=3.9"
dependencies = [
"antlr4-tools",
"antlr4-python3-runtime==4.9.3",
"clingo",
"clingo==5.7.*",
"importlib_resources",
"clintest>=0.2.0",
"clingcon >= 5.2.1, == 5.2.*", # requirement for fclingo
"fclingo@git+https://github.com/potassco/fclingo.git",
]
license = { file = "LICENSE" }
dynamic = ["version"]
Expand Down
2 changes: 1 addition & 1 deletion src/coomsolver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

log = get_logger("main")

SOLVERS = ["clingo"] # , "fclingo"]
SOLVERS = ["clingo", "fclingo"]
COOM_PROFILES = ["kids", "city", "travel"]


Expand Down
117 changes: 81 additions & 36 deletions src/coomsolver/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,34 @@
"""

import textwrap
from typing import Callable, List, Optional, Sequence
from typing import Callable, Optional, Sequence

from clingcon import ClingconTheory
from clingo import Control, Model, Symbol
from clingo.application import Application, ApplicationOptions # , Flag
from clingo.application import Application, ApplicationOptions, Flag
from clingo.ast import Location, Position, ProgramBuilder, Rule, parse_files
from clingo.symbol import Function
from fclingo.__main__ import CSP, DEF, MAX_INT, MIN_INT
from fclingo.__main__ import AppConfig as FclingoConfig
from fclingo.parsing import THEORY, HeadBodyTransformer
from fclingo.translator import AUX, Translator

from .utils import format_sym_coom, get_encoding
from .utils.logging import get_logger

# from .utils.logging import get_logger


# log = get_logger("main")
# mypy: allow-untyped-calls
log = get_logger("main")


def _sym_to_prg(symbols: Sequence[Symbol], output: Optional[str] = "asp") -> str: # nocoverage
"""
Turns symbols into a program.
"""
sorted_symbols = sorted(symbols)
if output == "asp":
output_list = [f"{str(s)}" for s in sorted(symbols)]
output_list = [f"{str(s)}" for s in sorted_symbols]
elif output == "coom":
output_list = [f"{format_sym_coom(s)}" for s in sorted(symbols)][1:] # First element is root = empty string
output_list = [f"{format_sym_coom(s)}" for s in sorted_symbols][1:] # First element is root = empty string
return "\n".join(output_list)


Expand All @@ -32,23 +39,25 @@ class COOMApp(Application):
Application class extending clingo.
"""

_input_files: List[str]
_solver: str
_profile: str
_output: str
_log_level: str
config: FclingoConfig
_propagator: ClingconTheory
program_name: str = "COOM solver"
version: str = "0.1"

def __init__(self, log_level: str = "", solver: str = "", profile: str = "", output: str = ""):
"""
Create application.
"""
self._input_files = []
self._solver = "clingo" if solver == "" else solver
self._profile = "travel" if profile == "" else profile
self._output = "asp" if output == "" else output
self._log_level = "WARNING" if log_level == "" else log_level
self.config = FclingoConfig(MIN_INT, MAX_INT, Flag(False), Flag(False), DEF)
self._propagator = ClingconTheory()

def parse_log_level(self, log_level: str) -> bool: # nocoverage
"""
Expand Down Expand Up @@ -81,43 +90,79 @@ def register_options(self, options: ApplicationOptions) -> None: # nocoverage
# group, "view", "Visualize the first solution using clinguin", self._view
# )

# def on_model(self, model: Model) -> None:
# """
# Function called after finding each model.
# Args:
# model (Model): clingo Model
# """
# self._dl_theory.on_model(model)
# model.extend(
# [
# Function("start", [key.arguments[0], Number(val)])
# for key, val in self._dl_theory.assignment(model.thread_id)
# ]
# )
# log.debug("------- Full model -----")
# log.debug(
# "\n".join(
# [str(s) for s in model.symbols(atoms=True, shown=True, theory=True)]
# )
# )
def on_model(self, model: Model) -> None: # nocoverage
"""
Function called after finding each model.
Args:
model (Model): clingo Model
"""
if self._solver == "fclingo":
self._propagator.on_model(model)

valuation = [
Function("val", assignment.arguments)
for assignment in model.symbols(theory=True)
if assignment.name == CSP
and len(assignment.arguments) == 2
and model.contains(Function(DEF, [assignment.arguments[0]]))
and not assignment.arguments[0].name == AUX
]
model.extend(valuation)

log.debug("------- Full model -----")
log.debug("\n".join([str(s) for s in model.symbols(atoms=True, shown=True, theory=True)]))

def print_model(self, model: Model, printer: Callable[[], None]) -> None: # nocoverage
"""
Print a model on the console.
"""
print(_sym_to_prg(model.symbols(shown=True), self._output))

if self._solver == "clingo":
output_symbols = model.symbols(shown=True)
elif self._solver == "fclingo":
output_symbols = [
atom
for atom in model.symbols(shown=True)
if not (atom.name == self.config.defined and len(atom.arguments) == 1)
]
output_symbols.extend([a for a in model.symbols(theory=True) if a.name == "val"])

print(_sym_to_prg(output_symbols, self._output))

def main(self, control: Control, files: Sequence[str]) -> None:
"""
Main function ran on call.
"""

self._input_files = list(files)
input_files = list(files)
encoding = get_encoding(f"{self._solver}-{self._profile}.lp")
self._input_files.extend([encoding])
input_files.extend([encoding])

if self._solver == "clingo":
for f in input_files:
control.load(f)

control.ground()
control.solve()
elif self._solver == "fclingo": # nocoverage
self._propagator.register(control)
self._propagator.configure("max-int", str(self.config.max_int))
self._propagator.configure("min-int", str(self.config.min_int))

with ProgramBuilder(control) as bld:
hbt = HeadBodyTransformer()

parse_files(input_files, lambda ast: bld.add(hbt.visit(ast)))

pos = Position("<string>", 1, 1)
loc = Location(pos, pos)
for rule in hbt.rules_to_add:
bld.add(Rule(loc, rule[0], rule[1]))

for f in self._input_files:
control.load(f)
control.add("base", [], THEORY)
control.ground([("base", [])])
translator = Translator(control, self.config)
translator.translate(control.theory_atoms)

control.ground()
control.solve()
self._propagator.prepare(control)
control.solve(on_model=self.on_model, on_statistics=self._propagator.on_statistics)
2 changes: 1 addition & 1 deletion src/coomsolver/utils/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def get(levels: list[tuple[str, int]], name: str) -> Optional[int]:
help="Path to the COOM file to solve",
# metavar='',
)
parser_solve.add_argument("--solver", type=str, help="Set solver", choices=SOLVERS, default="clingo")
parser_solve.add_argument("--solver", "-s", type=str, help="Set solver", choices=SOLVERS, default="clingo")

parser_solve.add_argument("--profile", type=str, help="Set COOM profile", choices=COOM_PROFILES, default="travel")
parser_solve.add_argument(
Expand Down
77 changes: 62 additions & 15 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
Basic functions to run tests.
"""

import os
import tempfile
from typing import Callable, List, Optional, Sequence
from os.path import join
from typing import Callable, List, Optional, Sequence, Set, Union

from antlr4 import InputStream
from clingo import Application, Control
from clingo import Application, Control, Symbol
from clingo.solving import Model
from clintest.assertion import Contains, SupersetOf
from clintest.solver import Solver
from clintest.test import Test

Expand Down Expand Up @@ -40,23 +42,68 @@ def f(*args): # type: ignore
return f


def get_solver(
program: Optional[str] = "", files: Optional[List[str]] = None, ctl_args: Optional[List[str]] = None, **kwargs: str
) -> Solver:
class SupersetOfTheory(SupersetOf):
"""
Gets the test solver for the tests
A clintest SupersetOf assertion that can also check theory atoms.
Args:
symbol (Symbol): A clingo symbol.
check_theory (bool): Whether to include theory atoms in the check
"""

def __init__(self, symbols: Set[Union[Symbol, str]], check_theory: bool = False) -> None:
super().__init__(symbols)
self.__symbols = self._SupersetOf__symbols # type: ignore # pylint: disable=no-member
self.__check_theory = check_theory

def holds_for(self, model: Model) -> bool:
if self.__check_theory:
return set(model.symbols(shown=True, theory=True)).issuperset(self.__symbols)
return super().holds_for(model)


class ContainsTheory(Contains):
"""
A clintest Contains assertion that can also check theory atoms.
Args:
program (Optional[str], optional): A clingo program. Defaults to "".
files (Optional[str], optional): List of files saved in examples/tests
ctl_args
Returns:
_type_: The solver wrapping the application class
symbol (Symbol): A clingo symbol.
check_theory (bool): Whether to include theory atoms in the check
"""

def __init__(self, symbol: Union[Symbol, str], check_theory: bool = False) -> None:
super().__init__(symbol)
self.__symbol = self._Contains__symbol # type: ignore # pylint: disable=no-member
self.__check_theory = check_theory

def holds_for(self, model: Model) -> bool:
if self.__check_theory:
return self.__symbol in model.symbols(shown=True, theory=True)
return super().holds_for(model)


def run_test(
test: Test,
files: Optional[List[str]] = None,
program: Optional[str] = None,
ctl_args: Optional[List[str]] = None,
**kwargs: str,
) -> None:
"""Creates a solver and runs a clintest test.
Args:
test (clintest.Test): The clintest test
files (Optional[List[str]], optional): List of files saved in examples/tests
program (Optional[str], optional): A clingo program. Defaults to ""
ctl_args (Optional[List[str]], optional): List of arguments for clingo.Control. Defaults to [].
"""
coom_app = COOMApp("coom", **kwargs)
files = [] if files is None else files
file_paths = [join("examples", "tests", f) for f in files] if files else None
ctl_args = [] if ctl_args is None else ctl_args
file_paths = [os.path.join("examples", "tests", f) for f in files]
return AppSolver(application=coom_app, files=file_paths, program=program, arguments=ctl_args)
solver = AppSolver(application=coom_app, files=file_paths, program=program, arguments=["0"])

solver.solve(test)
test.assert_()


class MockControl:
Expand Down
Loading

0 comments on commit 642f8c2

Please sign in to comment.