Source code for unified_planning.io.pddl_writer

# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


from fractions import Fraction
import sys
import re

from decimal import Decimal, localcontext
from warnings import warn

import unified_planning as up
import unified_planning.environment
import unified_planning.model.walkers as walkers
from unified_planning.model import (
    InstantaneousAction,
    DurativeAction,
    Fluent,
    Parameter,
    Problem,
    Object,
    Effect,
    Timing,
)
from unified_planning.exceptions import (
    UPTypeError,
    UPProblemDefinitionError,
    UPException,
)
from unified_planning.model.htn import HierarchicalProblem
from unified_planning.model.types import _UserType
from unified_planning.plans import (
    SequentialPlan,
    TimeTriggeredPlan,
    Plan,
    ActionInstance,
)
from typing import Callable, Dict, IO, List, Optional, Set, Union, cast
from io import StringIO
from functools import reduce

PDDL_KEYWORDS = {
    "define",
    "domain",
    "requirements",
    "types",
    "constants",
    "atomic",
    "predicates",
    "problem",
    "atomic",
    "constraints",
    "either",
    "number",
    "action",
    "parameters",
    "precondition",
    "effect",
    "and",
    "forall",
    "preference",
    "or",
    "not",
    "imply",
    "exists",
    "scale-up",
    "scale-down",
    "increase",
    "decrease",
    "durative-action",
    "duration",
    "condition",
    "at",
    "over",
    "start",
    "end",
    "all",
    "derived",
    "objects",
    "init",
    "goal",
    "when",
    "decrease",
    "always",
    "sometime",
    "within",
    "at-most-once",
    "sometime-after",
    "sometime-before",
    "always-within",
    "hold-during",
    "hold-after",
    "metric",
    "minimize",
    "maximize",
    "total-time",
    "is-violated",
    "strips",
    "negative-preconditions",
    "typing",
    "disjunctive-preconditions",
    "equality",
    "existential-preconditions",
    "universal-preconditions",
    "quantified-preconditions",
    "conditional-effects",
    "fluents",
    "adl",
    "durative-actions",
    "derived-predicates",
    "timed-initial-literals",
    "timed-initial-effects",
    "preferences",
    "contingent",
}

# The following map is used to mangle the invalid names by their class.
INITIAL_LETTER: Dict[type, str] = {
    InstantaneousAction: "a",
    DurativeAction: "a",
    Fluent: "f",
    Parameter: "p",
    Problem: "p",
    Object: "o",
}

WithName = Union[
    "up.model.Type",
    "up.model.Action",
    "up.model.Fluent",
    "up.model.Object",
    "up.model.Parameter",
    "up.model.Variable",
    "up.model.multi_agent.Agent",
    "up.model.htn.Method",
    "up.model.htn.Task",
]
MangleFunction = Callable[[WithName], str]


class ObjectsExtractor(walkers.DagWalker):
    """Returns the object instances appearing in the expression."""

    def __init__(self):
        walkers.dag.DagWalker.__init__(self)

    def get(self, expression: "up.model.FNode") -> Dict[_UserType, Set[Object]]:
        """Returns all the free vars of the given expression."""
        return self.walk(expression)

    def walk_object_exp(
        self, expression: "up.model.FNode", args: List[Dict[_UserType, Set[Object]]]
    ) -> Dict[_UserType, Set[Object]]:
        res: Dict[_UserType, Set[Object]] = {}
        for a in args:
            _update_domain_objects(res, a)
        obj = expression.object()
        assert obj.type.is_user_type()
        res.setdefault(cast(_UserType, obj.type), set()).add(obj)
        return res

    @walkers.handles(
        set(up.model.OperatorKind).difference((up.model.OperatorKind.OBJECT_EXP,))
    )
    def walk_all_types(
        self, expression: "up.model.FNode", args: List[Dict[_UserType, Set[Object]]]
    ) -> Dict[_UserType, Set[Object]]:
        res: Dict[_UserType, Set[Object]] = {}
        for a in args:
            _update_domain_objects(res, a)
        return res


class ConverterToPDDLString(walkers.DagWalker):
    """Expression converter to a PDDL string."""

    DECIMAL_PRECISION = 10  # Number of decimal places to print real constants

    def __init__(
        self,
        environment: "up.environment.Environment",
        get_mangled_name: MangleFunction,
    ):
        walkers.DagWalker.__init__(self)
        self.get_mangled_name = get_mangled_name
        self.simplifier = environment.simplifier

    def convert(self, expression):
        """Converts the given expression to a PDDL string."""
        return self.walk(self.simplifier.simplify(expression))

    def convert_fraction(self, frac):
        with localcontext() as ctx:
            ctx.prec = self.DECIMAL_PRECISION
            dec = frac.numerator / Decimal(frac.denominator, ctx)

            if Fraction(dec) != frac:
                warn(
                    "The PDDL printer cannot exactly represent the real constant '%s'"
                    % frac
                )
            return float(dec)

    def walk_exists(self, expression, args):
        assert len(args) == 1
        vars_string_list = [
            f"{self.get_mangled_name(v)} - {self.get_mangled_name(v.type)}"
            for v in expression.variables()
        ]
        return f'(exists ({" ".join(vars_string_list)})\n {args[0]})'

    def walk_forall(self, expression, args):
        assert len(args) == 1
        vars_string_list = [
            f"{self.get_mangled_name(v)} - {self.get_mangled_name(v.type)}"
            for v in expression.variables()
        ]
        return f'(forall ({" ".join(vars_string_list)})\n {args[0]})'

    def walk_always(self, expression, args):
        assert len(args) == 1
        return f"(always {args[0]})"

    def walk_at_most_once(self, expression, args):
        assert len(args) == 1
        return f"(at-most-once {args[0]})"

    def walk_sometime(self, expression, args):
        assert len(args) == 1
        return f"(sometime {args[0]})"

    def walk_sometime_before(self, expression, args):
        assert len(args) == 2
        return f"(sometime-before {args[0]} {args[1]})"

    def walk_sometime_after(self, expression, args):
        assert len(args) == 2
        return f"(sometime-after {args[0]} {args[1]})"

    def walk_variable_exp(self, expression, args):
        assert len(args) == 0
        return f"{self.get_mangled_name(expression.variable())}"

    def walk_and(self, expression, args):
        assert len(args) > 1
        return f'(and {" ".join(args)})'

    def walk_or(self, expression, args):
        assert len(args) > 1
        return f'(or {" ".join(args)})'

    def walk_not(self, expression, args):
        assert len(args) == 1
        return f"(not {args[0]})"

    def walk_implies(self, expression, args):
        assert len(args) == 2
        return f"(imply {args[0]} {args[1]})"

    def walk_iff(self, expression, args):
        assert len(args) == 2
        return f"(and (imply {args[0]} {args[1]}) (imply {args[1]} {args[0]}) )"

    def walk_fluent_exp(self, expression, args):
        fluent = expression.fluent()
        return f'({self.get_mangled_name(fluent)}{" " if len(args) > 0 else ""}{" ".join(args)})'

    def walk_param_exp(self, expression, args):
        assert len(args) == 0
        p = expression.parameter()
        return f"{self.get_mangled_name(p)}"

    def walk_object_exp(self, expression, args):
        assert len(args) == 0
        o = expression.object()
        return f"{self.get_mangled_name(o)}"

    def walk_bool_constant(self, expression, args):
        raise up.exceptions.UPUnreachableCodeError(
            f"Found expression {expression} in PDDL"
        )

    def walk_real_constant(self, expression, args):
        assert len(args) == 0
        frac = expression.constant_value()
        return str(self.convert_fraction(frac))

    def walk_int_constant(self, expression, args):
        assert len(args) == 0
        return str(expression.constant_value())

    def walk_plus(self, expression, args):
        assert len(args) > 1
        return reduce(lambda x, y: f"(+ {y} {x})", args)

    def walk_minus(self, expression, args):
        assert len(args) == 2
        return f"(- {args[0]} {args[1]})"

    def walk_times(self, expression, args):
        assert len(args) > 1
        return reduce(lambda x, y: f"(* {y} {x})", args)

    def walk_div(self, expression, args):
        assert len(args) == 2
        return f"(/ {args[0]} {args[1]})"

    def walk_le(self, expression, args):
        assert len(args) == 2
        return f"(<= {args[0]} {args[1]})"

    def walk_lt(self, expression, args):
        assert len(args) == 2
        return f"(< {args[0]} {args[1]})"

    def walk_equals(self, expression, args):
        assert len(args) == 2
        return f"(= {args[0]} {args[1]})"


[docs] class PDDLWriter: """ This class can be used to write a :class:`~unified_planning.model.Problem` in `PDDL`. The constructor of this class takes the problem to write and 3 flags: needs_requirements determines if the printed problem must have the :requirements, rewrite_bool_assignments determines if this writer will write non constant boolean assignment as conditional effects. empty_preconditions determines if this writer will write ':precondition ()' in case of an instantenuous action without preconditions instead of writing nothing or similar with conditions in durative actions. """ def __init__( self, problem: "up.model.Problem", needs_requirements: bool = True, rewrite_bool_assignments: bool = False, empty_preconditions: bool = False, ): self.problem = problem self.problem_kind = self.problem.kind self.needs_requirements = needs_requirements self.rewrite_bool_assignments = rewrite_bool_assignments self.empty_preconditions = empty_preconditions # otn represents the old to new renamings self.otn_renamings: Dict[ WithName, str, ] = {} # nto represents the new to old renamings self.nto_renamings: Dict[ str, WithName, ] = {} # those 2 maps are "simmetrical", meaning that "(otn[k] == v) implies (nto[v] == k)" self.domain_objects: Optional[Dict[_UserType, Set[Object]]] = None def _write_domain(self, out: IO[str]): if self.problem_kind.has_intermediate_conditions_and_effects(): raise UPProblemDefinitionError( "PDDL does not support ICE.\nICE are Intermediate Conditions and Effects therefore when an Effect (or Condition) are not at StartTIming(0) or EndTIming(0)." ) if self.problem_kind.has_timed_goals(): raise UPProblemDefinitionError("PDDL does not support timed goals.") obe = ObjectsExtractor() out.write("(define ") if self.problem.name is None: name = "pddl" else: name = _get_pddl_name(self.problem) out.write(f"(domain {name}-domain)\n") if self.needs_requirements: out.write(" (:requirements :strips") if self.problem_kind.has_flat_typing(): out.write(" :typing") if self.problem_kind.has_negative_conditions(): out.write(" :negative-preconditions") if self.problem_kind.has_disjunctive_conditions(): out.write(" :disjunctive-preconditions") if self.problem_kind.has_equalities(): out.write(" :equality") if ( self.problem_kind.has_int_fluents() or self.problem_kind.has_real_fluents() or self.problem_kind.has_fluents_in_actions_cost() ): out.write(" :numeric-fluents") if self.problem_kind.has_conditional_effects(): out.write(" :conditional-effects") if self.problem_kind.has_existential_conditions(): out.write(" :existential-preconditions") if ( self.problem_kind.has_trajectory_constraints() or self.problem_kind.has_state_invariants() ): out.write(" :constraints") if self.problem_kind.has_universal_conditions(): out.write(" :universal-preconditions") if ( self.problem_kind.has_continuous_time() or self.problem_kind.has_discrete_time() ): out.write(" :durative-actions") if self.problem_kind.has_duration_inequalities(): out.write(" :duration-inequalities") if ( self.problem_kind.has_actions_cost() or self.problem_kind.has_plan_length() ): out.write(" :action-costs") if self.problem_kind.has_timed_effects(): only_bool = True for le in self.problem.timed_effects.values(): for e in le: if not e.fluent.type.is_bool_type(): only_bool = False if not only_bool: out.write(" :timed-initial-effects") else: out.write(" :timed-initial-literals") if self.problem_kind.has_hierarchical(): out.write(" :hierarchy") # HTN / HDDL if self.problem_kind.has_method_preconditions(): out.write(" :method-preconditions") out.write(")\n") if self.problem_kind.has_hierarchical_typing(): user_types_hierarchy = self.problem.user_types_hierarchy out.write(f" (:types\n") stack: List["unified_planning.model.Type"] = ( user_types_hierarchy[None] if None in user_types_hierarchy else [] ) out.write( f' {" ".join(self._get_mangled_name(t) for t in stack)} - object\n' ) while stack: current_type = stack.pop() direct_sons: List["unified_planning.model.Type"] = user_types_hierarchy[ current_type ] if direct_sons: stack.extend(direct_sons) out.write( f' {" ".join([self._get_mangled_name(t) for t in direct_sons])} - {self._get_mangled_name(current_type)}\n' ) out.write(" )\n") else: pddl_types = [ self._get_mangled_name(t) for t in self.problem.user_types if cast(_UserType, t).name != "object" ] out.write( f' (:types {" ".join(pddl_types)})\n' if len(pddl_types) > 0 else "" ) if self.domain_objects is None: # This method populates the self._domain_objects map self._populate_domain_objects(obe) assert self.domain_objects is not None if len(self.domain_objects) > 0: out.write(" (:constants") for ut, os in self.domain_objects.items(): if len(os) > 0: out.write( f'\n {" ".join([self._get_mangled_name(o) for o in os])} - {self._get_mangled_name(ut)}' ) out.write("\n )\n") predicates = [] functions = [] for f in self.problem.fluents: if f.type.is_bool_type(): params = [] i = 0 for param in f.signature: if param.type.is_user_type(): params.append( f" {self._get_mangled_name(param)} - {self._get_mangled_name(param.type)}" ) i += 1 else: raise UPTypeError("PDDL supports only user type parameters") predicates.append(f'({self._get_mangled_name(f)}{"".join(params)})') elif f.type.is_int_type() or f.type.is_real_type(): params = [] i = 0 for param in f.signature: if param.type.is_user_type(): params.append( f" {self._get_mangled_name(param)} - {self._get_mangled_name(param.type)}" ) i += 1 else: raise UPTypeError("PDDL supports only user type parameters") functions.append(f'({self._get_mangled_name(f)}{"".join(params)})') else: raise UPTypeError("PDDL supports only boolean and numerical fluents") if self.problem.kind.has_actions_cost() or self.problem.kind.has_plan_length(): functions.append("(total-cost)") out.write( f' (:predicates {" ".join(predicates)})\n' if len(predicates) > 0 else "" ) out.write( f' (:functions {" ".join(functions)})\n' if len(functions) > 0 else "" ) converter = ConverterToPDDLString( self.problem.environment, self._get_mangled_name ) costs = {} metrics = self.problem.quality_metrics if len(metrics) == 1: metric = metrics[0] if isinstance(metric, up.model.metrics.MinimizeActionCosts): for a in self.problem.actions: cost_exp = metric.get_action_cost(a) costs[a] = cost_exp if cost_exp is not None: _update_domain_objects(self.domain_objects, obe.get(cost_exp)) elif metric.is_minimize_sequential_plan_length(): for a in self.problem.actions: costs[a] = self.problem.environment.expression_manager.Int(1) elif len(metrics) > 1: raise up.exceptions.UPUnsupportedProblemTypeError( "Only one metric is supported!" ) em = self.problem.environment.expression_manager if isinstance(self.problem, HierarchicalProblem): for task in self.problem.tasks: out.write(f" (:task {self._get_mangled_name(task)}") out.write(f"\n :parameters (") for ap in task.parameters: if ap.type.is_user_type(): out.write( f" {self._get_mangled_name(ap)} - {self._get_mangled_name(ap.type)}" ) else: raise UPTypeError("PDDL supports only user type parameters") out.write("))\n") for m in self.problem.methods: out.write(f" (:method {self._get_mangled_name(m)}") out.write(f"\n :parameters (") for ap in m.parameters: if ap.type.is_user_type(): out.write( f" {self._get_mangled_name(ap)} - {self._get_mangled_name(ap.type)}" ) else: raise UPTypeError("PDDL supports only user type parameters") out.write(")") params_str = " ".join( converter.convert(em.ParameterExp(p)) for p in m.achieved_task.parameters ) out.write( f"\n :task ({self._get_mangled_name(m.achieved_task.task)} {params_str})" ) if len(m.preconditions) > 0: precond_str: List[str] = [] for p in (c.simplify() for c in m.preconditions): if not p.is_true(): if p.is_and(): precond_str.extend(map(converter.convert, p.args)) else: precond_str.append(converter.convert(p)) out.write(f'\n :precondition (and {" ".join(precond_str)})') elif len(m.preconditions) == 0 and self.empty_preconditions: out.write(f"\n :precondition ()") self._write_task_network(m, out, converter) out.write(")\n") for a in self.problem.actions: if isinstance(a, up.model.InstantaneousAction): if any(p.simplify().is_false() for p in a.preconditions): continue out.write(f" (:action {self._get_mangled_name(a)}") out.write(f"\n :parameters (") for ap in a.parameters: if ap.type.is_user_type(): out.write( f" {self._get_mangled_name(ap)} - {self._get_mangled_name(ap.type)}" ) else: raise UPTypeError("PDDL supports only user type parameters") out.write(")") if len(a.preconditions) > 0: precond_str = [] for p in (c.simplify() for c in a.preconditions): if not p.is_true(): if p.is_and(): precond_str.extend(map(converter.convert, p.args)) else: precond_str.append(converter.convert(p)) out.write(f'\n :precondition (and {" ".join(precond_str)})') elif len(a.preconditions) == 0 and self.empty_preconditions: out.write(f"\n :precondition ()") if len(a.effects) > 0: out.write("\n :effect (and") for e in a.effects: _write_effect( e, None, out, converter, self.rewrite_bool_assignments, self._get_mangled_name, ) if a in costs: out.write( f" (increase (total-cost) {converter.convert(costs[a])})" ) out.write(")") out.write(")\n") elif isinstance(a, DurativeAction): if any( c.simplify().is_false() for cl in a.conditions.values() for c in cl ): continue out.write(f" (:durative-action {self._get_mangled_name(a)}") out.write(f"\n :parameters (") for ap in a.parameters: if ap.type.is_user_type(): out.write( f" {self._get_mangled_name(ap)} - {self._get_mangled_name(ap.type)}" ) else: raise UPTypeError("PDDL supports only user type parameters") out.write(")") l, r = a.duration.lower, a.duration.upper if l == r: out.write(f"\n :duration (= ?duration {converter.convert(l)})") else: out.write(f"\n :duration (and ") if a.duration.is_left_open(): out.write(f"(> ?duration {converter.convert(l)})") else: out.write(f"(>= ?duration {converter.convert(l)})") if a.duration.is_right_open(): out.write(f"(< ?duration {converter.convert(r)})") else: out.write(f"(<= ?duration {converter.convert(r)})") out.write(")") if len(a.conditions) > 0: out.write(f"\n :condition (and ") for interval, cl in a.conditions.items(): for c in (cond.simplify() for cond in cl): if c.is_true(): continue if interval.lower == interval.upper: if interval.lower.is_from_start(): out.write(f"(at start {converter.convert(c)})") else: out.write(f"(at end {converter.convert(c)})") else: if not interval.is_left_open(): out.write(f"(at start {converter.convert(c)})") out.write(f"(over all {converter.convert(c)})") if not interval.is_right_open(): out.write(f"(at end {converter.convert(c)})") out.write(")") elif len(a.conditions) == 0 and self.empty_preconditions: out.write(f"\n :condition (and )") if len(a.effects) > 0: out.write("\n :effect (and") for t, el in a.effects.items(): for e in el: _write_effect( e, t, out, converter, self.rewrite_bool_assignments, self._get_mangled_name, ) if a in costs: out.write( f" (at end (increase (total-cost) {converter.convert(costs[a])}))" ) out.write(")") out.write(")\n") else: raise NotImplementedError out.write(")\n") def _write_problem(self, out: IO[str]): if self.problem.name is None: name = "pddl" else: name = _get_pddl_name(self.problem) out.write(f"(define (problem {name}-problem)\n") out.write(f" (:domain {name}-domain)\n") if self.domain_objects is None: # This method populates the self._domain_objects map self._populate_domain_objects(ObjectsExtractor()) assert self.domain_objects is not None if len(self.problem.user_types) > 0: out.write(" (:objects") for t in self.problem.user_types: constants_of_this_type = self.domain_objects.get( cast(_UserType, t), None ) if constants_of_this_type is None: objects = [o for o in self.problem.all_objects if o.type == t] else: objects = [ o for o in self.problem.all_objects if o.type == t and o not in constants_of_this_type ] if len(objects) > 0: out.write( f'\n {" ".join([self._get_mangled_name(o) for o in objects])} - {self._get_mangled_name(t)}' ) out.write("\n )\n") converter = ConverterToPDDLString( self.problem.environment, self._get_mangled_name ) if isinstance(self.problem, up.model.htn.HierarchicalProblem): out.write(" (:htn") self._write_task_network(self.problem.task_network, out, converter) out.write(")\n") out.write(" (:init") for f, v in self.problem.initial_values.items(): if v.is_true(): out.write(f" {converter.convert(f)}") elif v.is_false(): pass else: out.write(f" (= {converter.convert(f)} {converter.convert(v)})") if self.problem.kind.has_actions_cost() or self.problem.kind.has_plan_length(): out.write(" (= (total-cost) 0)") for tm, le in self.problem.timed_effects.items(): for e in le: out.write(f" (at {str(converter.convert_fraction(tm.delay))}") _write_effect( e, None, out, converter, self.rewrite_bool_assignments, self._get_mangled_name, ) out.write(")") out.write(")\n") goals_str: List[str] = [] for g in (c.simplify() for c in self.problem.goals): if g.is_and(): goals_str.extend(map(converter.convert, g.args)) else: goals_str.append(converter.convert(g)) out.write(f' (:goal (and {" ".join(goals_str)}))\n') if len(self.problem.trajectory_constraints) > 0: out.write( f' (:constraints {" ".join([converter.convert(c) for c in self.problem.trajectory_constraints])})\n' ) metrics = self.problem.quality_metrics if len(metrics) == 1: metric = metrics[0] out.write(" (:metric ") if metric.is_minimize_expression_on_final_state(): assert isinstance( metric, up.model.metrics.MinimizeExpressionOnFinalState ) out.write(f"minimize {converter.convert(metric.expression)}") elif metric.is_maximize_expression_on_final_state(): assert isinstance( metric, up.model.metrics.MaximizeExpressionOnFinalState ) out.write(f"maximize {converter.convert(metric.expression)}") elif ( metric.is_minimize_action_costs() or metric.is_minimize_sequential_plan_length() ): out.write(f"minimize (total-cost)") elif metric.is_minimize_makespan(): out.write(f"minimize (total-time)") else: raise NotImplementedError out.write(")\n") elif len(metrics) > 1: raise up.exceptions.UPUnsupportedProblemTypeError( "Only one metric is supported!" ) out.write(")\n") def _write_plan(self, plan: Plan, out: IO[str]): def _format_action_instance(action_instance: ActionInstance) -> str: param_str = "" if action_instance.actual_parameters: param_str = f" {' '.join((self._get_mangled_name(p.object()) for p in action_instance.actual_parameters))}" return f"({self._get_mangled_name(action_instance.action)}{param_str})" if isinstance(plan, SequentialPlan): for ai in plan.actions: out.write(f"{_format_action_instance(ai)}\n") elif isinstance(plan, TimeTriggeredPlan): for s, ai, dur in plan.timed_actions: start = s.numerator if s.denominator == 1 else float(s) out.write(f"{start}: {_format_action_instance(ai)}") if dur is not None: duration = dur.numerator if dur.denominator == 1 else float(dur) out.write(f"[{duration}]") out.write("\n") else: raise NotImplementedError
[docs] def print_domain(self): """Prints to std output the `PDDL` domain.""" self._write_domain(sys.stdout)
[docs] def print_problem(self): """Prints to std output the `PDDL` problem.""" self._write_problem(sys.stdout)
[docs] def print_plan(self, plan: Plan): """Prints to std output the `PDDL` plan.""" self._write_plan(plan, sys.stdout)
[docs] def get_domain(self) -> str: """Returns the `PDDL` domain.""" out = StringIO() self._write_domain(out) return out.getvalue()
[docs] def get_problem(self) -> str: """Returns the `PDDL` problem.""" out = StringIO() self._write_problem(out) return out.getvalue()
[docs] def get_plan(self, plan: Plan) -> str: """Returns the `PDDL` plan.""" out = StringIO() self._write_plan(plan, out) return out.getvalue()
[docs] def write_domain(self, filename: str): """Dumps to file the `PDDL` domain.""" with open(filename, "w") as f: self._write_domain(f)
[docs] def write_problem(self, filename: str): """Dumps to file the `PDDL` problem.""" with open(filename, "w") as f: self._write_problem(f)
[docs] def write_plan(self, plan: Plan, filename: str): """Dumps to file the `PDDL` plan.""" with open(filename, "w") as f: self._write_plan(plan, f)
def _get_mangled_name( self, item: WithName, ) -> str: """This function returns a valid and unique PDDL name.""" # If we already encountered this item, return it if item in self.otn_renamings: return self.otn_renamings[item] if isinstance(item, up.model.Type): assert item.is_user_type() original_name = cast(_UserType, item).name tmp_name = _get_pddl_name(item) # If the problem is hierarchical and the name is object, we want to change it if self.problem_kind.has_hierarchical_typing() and tmp_name == "object": tmp_name = f"{tmp_name}_" else: original_name = item.name tmp_name = _get_pddl_name(item) # if the pddl valid name is the same of the original one and it does not create conflicts, # it can be returned if tmp_name == original_name and tmp_name not in self.nto_renamings: new_name = tmp_name else: count = 0 new_name = tmp_name while self.problem.has_name(new_name) or new_name in self.nto_renamings: new_name = f"{tmp_name}_{count}" count += 1 assert ( new_name not in self.nto_renamings and new_name not in self.otn_renamings.values() ) self.otn_renamings[item] = new_name self.nto_renamings[new_name] = item return new_name
[docs] def get_item_named(self, name: str) -> WithName: """ Since `PDDL` has a stricter set of possible naming compared to the `unified_planning`, when writing a :class:`~unified_planning.model.Problem` it is possible that some things must be renamed. This is why the `PDDLWriter` offers this method, that takes a `PDDL` name and returns the original `unified_planning` data structure that corresponds to the `PDDL` entity with the given name. This method takes a name used in the `PDDL` domain or `PDDL` problem generated by this `PDDLWriter` and returns the original item in the `unified_planning` `Problem`. :param name: The name used in the generated `PDDL`. :return: The `unified_planning` model entity corresponding to the given name. """ try: return self.nto_renamings[name] except KeyError: raise UPException(f"The name {name} does not correspond to any item.")
[docs] def get_pddl_name( self, item: WithName, ) -> str: """ This method takes an item in the :class:`~unified_planning.model.Problem` and returns the chosen name for the same item in the `PDDL` problem or `PDDL` domain generated by this `PDDLWriter`. :param item: The `unified_planning` entity renamed by this `PDDLWriter`. :return: The `PDDL` name of the given item. """ try: return self.otn_renamings[item] except KeyError: raise UPException( f"The item {item} does not correspond to any item renamed." )
def _populate_domain_objects(self, obe: ObjectsExtractor): self.domain_objects = {} # Iterate the actions to retrieve domain objects for a in self.problem.actions: if isinstance(a, up.model.InstantaneousAction): for p in a.preconditions: _update_domain_objects(self.domain_objects, obe.get(p)) for e in a.effects: if e.is_conditional(): _update_domain_objects( self.domain_objects, obe.get(e.condition) ) _update_domain_objects(self.domain_objects, obe.get(e.fluent)) _update_domain_objects(self.domain_objects, obe.get(e.value)) elif isinstance(a, DurativeAction): _update_domain_objects(self.domain_objects, obe.get(a.duration.lower)) _update_domain_objects(self.domain_objects, obe.get(a.duration.upper)) for interval, cl in a.conditions.items(): for c in cl: _update_domain_objects(self.domain_objects, obe.get(c)) for t, el in a.effects.items(): for e in el: if e.is_conditional(): _update_domain_objects( self.domain_objects, obe.get(e.condition) ) _update_domain_objects(self.domain_objects, obe.get(e.fluent)) _update_domain_objects(self.domain_objects, obe.get(e.value)) if isinstance(self.problem, HierarchicalProblem): for m in self.problem.methods: for p in m.preconditions: _update_domain_objects(self.domain_objects, obe.get(p)) for subtask in m.subtasks: for targ in subtask.parameters: _update_domain_objects(self.domain_objects, obe.get(targ)) for c in m.non_temporal_constraints(): _update_domain_objects(self.domain_objects, obe.get(c)) def _write_task_network( self, tn: up.model.htn.task_network.AbstractTaskNetwork, out, converter: ConverterToPDDLString, ): def format_subtask(t: up.model.htn.Subtask): return f"({t.identifier} ({self._get_mangled_name(t.task)} {' '.join(map(converter.convert, t.parameters))}))" if isinstance(tn, up.model.htn.TaskNetwork) and len(tn.variables) > 0: out.write(f"\n :parameters (") for ap in tn.variables: if ap.type.is_user_type(): out.write( f" {self._get_mangled_name(ap)} - {self._get_mangled_name(ap.type)}" ) else: raise UPTypeError("PDDL supports only user type parameters") out.write(")") to = tn.total_order() po = tn.partial_order() if len(tn.subtasks) == 0: pass # nothing to do elif to is not None: # subtasks form a total order ordered_tasks = "\n ".join( format_subtask(tn.get_subtask(id)) for id in to ) out.write(f"\n :ordered-subtasks (and\n {ordered_tasks})") elif po is not None: # subtasks for a partial order tasks = "\n ".join(format_subtask(t) for t in tn.subtasks) out.write(f"\n :subtasks (and\n {tasks})") orders = "\n ".join(f"(< {id1} {id2})" for id1, id2 in po) out.write(f"\n :ordering (and\n {orders})") else: raise UPProblemDefinitionError( "HDDL does not support general temporal constraints. From:\n" + str(tn) ) if len(tn.non_temporal_constraints()) > 0: constraint_str: List[str] = [] for p in (c.simplify() for c in tn.non_temporal_constraints()): if not p.is_true(): if p.is_and(): constraint_str.extend(map(converter.convert, p.args)) else: constraint_str.append(converter.convert(p)) out.write(f'\n :constraints (and {" ".join(constraint_str)})') raise UPProblemDefinitionError( "Task network constraints not supported by HDDL Writer yet" )
def _get_pddl_name(item: Union[WithName, "up.model.AbstractProblem"]) -> str: """This function returns a pddl name for the chosen item""" name = item.name # type: ignore assert name is not None name = name.lower() regex = re.compile(r"^[a-zA-Z]+.*") if ( re.match(regex, name) is None ): # If the name does not start with an alphabetic char, we make it start with one. name = f'{INITIAL_LETTER.get(type(item), "x")}_{name}' name = re.sub("[^0-9a-zA-Z_]", "_", name) # Substitute non-valid elements with "_" while ( name in PDDL_KEYWORDS ): # If the name is in the keywords, apply an underscore at the end until it is not a keyword anymore. name = f"{name}_" if isinstance(item, up.model.Parameter) or isinstance(item, up.model.Variable): name = f"?{name}" return name def _update_domain_objects( dict_to_update: Dict[_UserType, Set[Object]], values: Dict[_UserType, Set[Object]] ) -> None: """Small utility method that updated a UserType -> Set[Object] dict with another dict of the same type.""" for ut, os in values.items(): os_to_update = dict_to_update.setdefault(ut, set()) os_to_update |= os def _write_effect( effect: Effect, timing: Optional[Timing], out: IO[str], converter: ConverterToPDDLString, rewrite_bool_assignments: bool, get_mangled_name: MangleFunction, ): simplified_cond = effect.condition.simplify() # check for non-constant-bool-assignment non_const_bool_ass = ( effect.value.type.is_bool_type() and not effect.value.is_true() and not effect.value.is_false() ) if non_const_bool_ass and not rewrite_bool_assignments: raise UPProblemDefinitionError( "The problem has non-constant boolean assignments.This can't be directly written ", "in PDDL, but it can be translated into a conditional effect maintaining the ", "semantic. To enable this feature, set the flag rewrite_bool_assignments", " to True in the PDDLWriter constructor.", ) forall_str = "" if effect.is_forall(): mid_str = " ".join( ( f"{get_mangled_name(v)} - {get_mangled_name(v.type)}" for v in effect.forall ) ) forall_str = f"(forall ({mid_str})" simplified_cond = effect.condition.simplify() if non_const_bool_ass: assert effect.is_assignment() positive_cond = (simplified_cond & effect.value).simplify() if not positive_cond.is_false(): out.write(forall_str) if not positive_cond.is_true(): out.write(" (when ") if timing is not None: if timing.is_from_start(): out.write(f" (at start") else: out.write(f" (at end") out.write(f"{converter.convert(positive_cond)}") if timing is not None: out.write(")") out.write(f" {converter.convert(effect.fluent)})") if timing is not None: if timing.is_from_start(): out.write(f" (at start") else: out.write(f" (at end") if positive_cond.is_true(): out.write(f" {converter.convert(effect.fluent)}") if timing is not None: out.write(")") if effect.is_forall(): out.write(")") negative_cond = (simplified_cond & effect.value.Not()).simplify() if not negative_cond.is_false(): out.write(forall_str) if not negative_cond.is_true(): out.write(" (when") if timing is not None: if timing.is_from_start(): out.write(f" (at start") else: out.write(f" (at end") out.write(f" (at start") out.write(f" {converter.convert(negative_cond)}") if timing is not None: out.write(")") out.write(f" (not {converter.convert(effect.fluent)}))") if timing is not None: if timing.is_from_start(): out.write(f" (at start") else: out.write(f" (at end") if negative_cond.is_true(): out.write(f" {converter.convert(effect.fluent)}") if timing is not None: out.write(")") if effect.is_forall(): out.write(")") return if simplified_cond.is_false(): return out.write(forall_str) if not simplified_cond.is_true(): out.write(f" (when") if timing is not None: if timing.is_from_start(): out.write(f" (at start") else: out.write(f" (at end") out.write(f" {converter.convert(effect.condition)}") if timing is not None: out.write(")") if timing is not None: if timing.is_from_start(): out.write(f" (at start") else: out.write(f" (at end") simplified_value = effect.value.simplify() fluent = converter.convert(effect.fluent) if simplified_value.is_true(): out.write(f" {fluent}") elif simplified_value.is_false(): out.write(f" (not {fluent})") elif effect.is_increase(): out.write(f" (increase {fluent} {converter.convert(simplified_value)})") elif effect.is_decrease(): out.write(f" (decrease {fluent} {converter.convert(simplified_value)})") else: out.write(f" (assign {fluent} {converter.convert(simplified_value)})") if not simplified_cond.is_true(): out.write(")") if timing is not None: out.write(")") if effect.is_forall(): out.write(")")