Source code for unified_planning.io.ma_pddl_writer

# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os as osy
from fractions import Fraction
import sys
import re

from decimal import Decimal, localcontext
from warnings import warn
import unified_planning as up
import unified_planning.environment
import unified_planning.model.walkers as walkers
from unified_planning.model import (
    InstantaneousAction,
    DurativeAction,
    Fluent,
    Parameter,
    Problem,
    Object,
)
from unified_planning.model.multi_agent.agent import Agent
from unified_planning.model.multi_agent.ma_problem import MultiAgentProblem
from unified_planning.exceptions import (
    UPTypeError,
    UPProblemDefinitionError,
    UPException,
)
from unified_planning.model.types import _UserType
from typing import Callable, Dict, List, Optional, Set, Union, cast
from io import StringIO
from unified_planning.io.pddl_writer import (
    ObjectsExtractor,
    ConverterToPDDLString,
    MangleFunction,
    WithName,
    _write_effect,
    _get_pddl_name,
)


class ConverterToMAPDDLString(ConverterToPDDLString):
    """Expression converter to a MA-PDDL string."""

    def __init__(
        self,
        problem: MultiAgentProblem,
        get_mangled_name: MangleFunction,
        agent: Optional["up.model.multi_agent.Agent"],
        unfactored: Optional[bool] = False,
    ):
        ConverterToPDDLString.__init__(self, problem.environment, get_mangled_name)
        self._problem = problem
        self._agent = agent
        self._unfactored = unfactored

    def walk_dot(self, expression, args):
        agent = self._problem.agent(expression.agent())
        fluent = expression.args[0].fluent()
        objects = expression.args[0].args
        agent_name = ""
        if (
            self._unfactored
            and self._agent is not None
            and fluent not in self._agent.public_fluents
        ):
            agent_name = f"_{self.get_mangled_name(agent)}"

        return f'(a_{self.get_mangled_name(fluent)}{agent_name} {self.get_mangled_name(agent)} {" ".join([self.convert(obj) for obj in objects])})'

    def walk_fluent_exp(self, expression, args):
        fluent = expression.fluent()
        agent_name = ""
        if (
            self._unfactored
            and self._agent is not None
            and fluent not in self._agent.public_fluents
        ):
            agent_name = f"_{self._agent.name}"

        if self._agent is not None and fluent in self._agent.fluents:
            return f'(a_{self.get_mangled_name(fluent)}{agent_name}  ?{self._agent.name}{" " if len(args) > 0 else ""}{" ".join(args)})'
        else:
            return f'({self.get_mangled_name(fluent)}{" " if len(args) > 0 else ""}{" ".join(args)})'


[docs] class MAPDDLWriter: """ This class can be used to write a :class:`~unified_planning.model.MultiAgentProblem` in `MA-PDDL`. The constructor of this class takes the problem to write and 4 flags: * ``explicit_false_initial_states`` determines if this writer includes initially false predicates in the problem ``:init``, * ``unfactored`` when ``False`` this writer prints a domain and a problem for every agent, if ``True`` generates a single domain and a single problem, * ``needs_requirements`` determines if the printed problem must have the :requirements, * ``rewrite_bool_assignments`` determines if this writer will write non constant boolean assignment as conditional effects. """ def __init__( self, problem: "up.model.multi_agent.MultiAgentProblem", explicit_false_initial_states: Optional[bool] = False, unfactored: Optional[bool] = False, needs_requirements: bool = True, rewrite_bool_assignments: bool = False, ): self._env = problem.environment self.problem = problem self.problem_kind = self.problem.kind self.explicit_false_initial_states = explicit_false_initial_states self.unfactored = unfactored self.needs_requirements = needs_requirements self.rewrite_bool_assignments = rewrite_bool_assignments # otn represents the old to new renamings self.otn_renamings: Dict[ WithName, str, ] = {} # nto represents the new to old renamings self.nto_renamings: Dict[ str, WithName, ] = {} # those 2 maps are "simmetrical", meaning that "(otn[k] == v) implies (nto[v] == k)" self.domain_objects: Optional[Dict[_UserType, Set[Object]]] = None self.domain_objects_agents: Dict[up.model.multi_agent.Agent, str] self.all_public_fluents: Set[Fluent] = set() def _write_domain(self): ag_domains = {} for ag in self.problem.agents: out = StringIO() if self.problem_kind.has_intermediate_conditions_and_effects(): raise UPProblemDefinitionError( "PDDL2.1 does not support ICE.\nICE are Intermediate Conditions and Effects therefore when an Effect (or Condition) are not at StartTIming(0) or EndTIming(0)." ) if ( self.problem_kind.has_timed_effects() or self.problem_kind.has_timed_goals() ): raise UPProblemDefinitionError( "PDDL2.1 does not support timed effects or timed goals." ) obe = ObjectsExtractor() out.write("(define ") if self.problem.name is None: name = "ma-pddl" else: name = _get_pddl_name(self.problem) out.write(f"(domain {name}-domain)\n") if self.needs_requirements: out.write(" (:requirements :multi-agent") if not self.unfactored: out.write(" :factored-privacy") else: out.write(" :unfactored-privacy") if self.problem_kind.has_flat_typing(): out.write(" :typing") if self.problem_kind.has_negative_conditions(): out.write(" :negative-preconditions") if self.problem_kind.has_disjunctive_conditions(): out.write(" :disjunctive-preconditions") if self.problem_kind.has_equalities(): out.write(" :equality") if ( self.problem_kind.has_int_fluents() or self.problem_kind.has_real_fluents() ): out.write(" :numeric-fluents") if self.problem_kind.has_conditional_effects(): out.write(" :conditional-effects") if self.problem_kind.has_existential_conditions(): out.write(" :existential-preconditions") if self.problem_kind.has_universal_conditions(): out.write(" :universal-preconditions") if ( self.problem_kind.has_continuous_time() or self.problem_kind.has_discrete_time() ): out.write(" :durative-actions") if self.problem_kind.has_duration_inequalities(): out.write(" :duration-inequalities") if ( self.problem_kind.has_actions_cost() or self.problem_kind.has_plan_length() ): out.write(" :action-costs") out.write(")\n") if self.problem_kind.has_hierarchical_typing(): user_types_hierarchy = self.problem.user_types_hierarchy out.write(f" (:types\n") stack: List["unified_planning.model.Type"] = ( user_types_hierarchy[None] if None in user_types_hierarchy else [] ) out.write( f' {" ".join(self._get_mangled_name(t) for t in stack)}{" " if len(self.problem.agents) > 0 else ""}ag - object\n' ) out.write( f' {" ".join(self._get_mangled_name(ag) + "_type" for ag in self.problem.agents)} - ag\n' ) while stack: current_type = stack.pop() direct_sons: List[ "unified_planning.model.Type" ] = user_types_hierarchy[current_type] if direct_sons: stack.extend(direct_sons) out.write( f' {" ".join([self._get_mangled_name(t) for t in direct_sons])} - {self._get_mangled_name(current_type)}\n' ) out.write(" )\n") else: out.write( f' (:types {" ".join([self._get_mangled_name(t) for t in self.problem.user_types])}{" " if len(self.problem.agents) > 0 else ""}ag - object\n' if len(self.problem.user_types) > 0 else "" ) out.write( f' {" ".join(self._get_mangled_name(ag) + "_type" for ag in self.problem.agents)} - ag\n' ) out.write(" )\n") if self.domain_objects is None: # This method populates the self._domain_objects map self._populate_domain_objects(obe, ag) assert self.domain_objects is not None if len(self.all_public_fluents) == 0: self._all_public_fluents(self.all_public_fluents, self.problem.agents) if len(self.domain_objects) > 0: out.write(" (:constants") for ut, os in self.domain_objects.items(): if len(os) > 0: out.write( f'\n {" ".join([self._get_mangled_name(o) for o in os])} - {self._get_mangled_name(ut)}' ) if len(self.domain_objects_agents) > 0: for k, v in self.domain_objects_agents.items(): if len(v) > 0: out.write(f"\n {self._get_mangled_name(k)} - {v}") if len(self.domain_objects) > 0 or len(self.domain_objects_agents) > 0: out.write("\n )\n") ( predicates_environment, functions_environment, ) = self.get_predicates_functions(self.problem.ma_environment) predicates_agents = [] functions_agent = [] nl = "\n " if self.unfactored: for ag in self.problem.agents: predicates_agent, functions_agent = self.get_predicates_functions( ag, is_private=True ) if len(predicates_agent) > 0: predicates_agents.append( f" (:private agent - {ag.name + '_type'}\n {nl.join(predicates_agent)})\n" ) else: predicates_agents, functions_agent = self.get_predicates_functions( ag, is_private=True ) predicates_public_agent = [] functions_public_agent = [] for f in self.all_public_fluents: params = [] i = 0 for param in f.signature: if param.type.is_user_type(): params.append( f" {self._get_mangled_name(param)} - {self._get_mangled_name(param.type)}" ) i += 1 else: raise UPTypeError("MA-PDDL supports only user type parameters") expression = ( f'(a_{self._get_mangled_name(f)} ?agent - {"ag"}{"".join(params)})' ) if f.type.is_bool_type(): predicates_public_agent.append(expression) elif f.type.is_int_type() or f.type.is_real_type(): functions_public_agent.append(expression) else: raise UPTypeError( "MA-PDDL supports only boolean and numerical fluents" ) predicates_agent_goal = [] functions_agent_goal = [] for g in self.problem.goals: if g.is_dot(): f = g.args[0].fluent() agent = g.agent() # args = g.args # objects = g.args[0].args if f not in ag.fluents and f not in self.all_public_fluents: if f.type.is_bool_type(): params = [] i = 0 for param in f.signature: if param.type.is_user_type(): params.append( f" {self._get_mangled_name(param)} - {self._get_mangled_name(param.type)}" ) i += 1 else: raise UPTypeError( "MA-PDDL supports only user type parameters" ) predicates_agent_goal.append( f'(a_{self._get_mangled_name(f)} ?agent - {"ag"}{"".join(params)})' ) elif f.type.is_int_type() or f.type.is_real_type(): params = [] i = 0 for param in f.signature: if param.type.is_user_type(): params.append( f" {self._get_mangled_name(param)} - {self._get_mangled_name(param.type)}" ) i += 1 else: raise UPTypeError( "MA-PDDL supports only user type parameters" ) functions_agent_goal.append( f'(a_{self._get_mangled_name(f)} ?agent - {"ag"}{"".join(params)})' ) else: raise UPTypeError( "MA-PDDL supports only boolean and numerical fluents" ) nl = "\n " out.write( f" (:predicates\n " if len(predicates_environment) > 0 or len(predicates_agents) > 0 or len(predicates_agent_goal) > 0 or len(predicates_public_agent) > 0 else "" ) out.write( f" {nl.join(predicates_agent_goal)}\n" if len(predicates_agent_goal) > 0 else "" ) out.write( f" {nl.join(predicates_environment)}\n" if len(predicates_environment) > 0 else "" ) out.write( f" {nl.join(predicates_public_agent)}\n" if len(predicates_public_agent) > 0 else "" ) nl = "\n " if self.unfactored: out.write( "".join(predicates_agents) if len(predicates_agents) > 0 else "" ) else: out.write( f" (:private\n {nl.join(predicates_agents)})" if len(predicates_agents) > 0 else "" ) out.write( f")\n" if len(predicates_environment) > 0 or len(predicates_agents) > 0 or len(predicates_agent_goal) > 0 or len(predicates_public_agent) > 0 else "" ) out.write( f" (:functions\n" if len(functions_environment) > 0 or len(functions_agent) > 0 or len(functions_agent_goal) > 0 or len(functions_public_agent) > 0 else "" ) out.write( f' {" ".join(functions_environment)}\n' if len(functions_environment) > 0 else "" ) out.write( f' {" ".join(functions_agent_goal)}\n' if len(functions_agent_goal) > 0 else "" ) out.write( f' {" ".join(functions_public_agent)}\n' if len(functions_public_agent) > 0 else "" ) out.write( f' (:private{" ".join(functions_agent)})\n' if len(functions_agent) > 0 else "" ) out.write( f" )\n" if len(functions_environment) > 0 or len(functions_agent) > 0 or len(functions_agent_goal) > 0 or len(functions_public_agent) > 0 else "" ) costs: dict = {} def write_action(ag): converter = ConverterToMAPDDLString( self.problem, self._get_mangled_name, ag, unfactored=self.unfactored ) for a in ag.actions: if isinstance(a, up.model.InstantaneousAction): if self.unfactored: out.write( f" (:action {self._get_mangled_name(a)}_{ag.name}" ) else: out.write(f" (:action {self._get_mangled_name(a)}") if self.unfactored: out.write(f'\n :agent ?{ag.name} - {ag.name + "_type"}') out.write(f"\n :parameters (") if not self.unfactored: out.write( f' ?{self._get_mangled_name(ag)} - {self._get_mangled_name(ag) +"_type"}' ) for ap in a.parameters: if ap.type.is_user_type(): out.write( f" {self._get_mangled_name(ap)} - {self._get_mangled_name(ap.type)}" ) else: raise UPTypeError( "MA-PDDL supports only user type parameters" ) out.write(")") if len(a.preconditions) > 0: out.write(f"\n :precondition (and \n") for p in a.preconditions: out.write(f" {converter.convert(p)}\n") out.write(f" )") if len(a.effects) > 0: out.write("\n :effect (and\n") for e in a.effects: _write_effect( e, None, out, converter, self.rewrite_bool_assignments, self._get_mangled_name, ) if a in costs: out.write( f" (increase (total-cost) {converter.convert(costs[a])})" ) out.write(")") out.write(")\n") elif isinstance(a, DurativeAction): out.write(f" (:durative-action {self._get_mangled_name(a)}") out.write(f"\n :parameters (") for ap in a.parameters: if ap.type.is_user_type(): out.write( f" {self._get_mangled_name(ap)} - {self._get_mangled_name(ap.type)}" ) else: raise UPTypeError( "MA-PDDL supports only user type parameters" ) out.write(")") l, r = a.duration.lower, a.duration.upper if l == r: out.write( f"\n :duration (= ?duration {converter.convert(l)})" ) else: out.write(f"\n :duration (and ") if a.duration.is_left_open(): out.write(f"(> ?duration {converter.convert(l)})") else: out.write(f"(>= ?duration {converter.convert(l)})") if a.duration.is_right_open(): out.write(f"(< ?duration {converter.convert(r)})") else: out.write(f"(<= ?duration {converter.convert(r)})") out.write(")") if len(a.conditions) > 0: out.write(f"\n :condition (and ") for interval, cl in a.conditions.items(): for c in cl: if interval.lower == interval.upper: if interval.lower.is_from_start(): out.write( f"(at start {converter.convert(c)})" ) else: out.write( f"(at end {converter.convert(c)})" ) else: if not interval.is_left_open(): out.write( f"(at start {converter.convert(c)})" ) out.write(f"(over all {converter.convert(c)})") if not interval.is_right_open(): out.write( f"(at end {converter.convert(c)})" ) out.write(")") if len(a.effects) > 0: out.write("\n :effect (and") for t, el in a.effects.items(): for e in el: _write_effect( e, t, out, converter, self.rewrite_bool_assignments, self._get_mangled_name, ) if a in costs: out.write( f" (at end (increase (total-cost) {converter.convert(costs[a])}))" ) out.write(")") out.write(")\n") else: raise NotImplementedError if self.unfactored: for ag in self.problem.agents: write_action(ag) else: write_action(ag) out.write(")\n") ag_domains[self._get_mangled_name(ag)] = out.getvalue() out.close() self.domain_objects = None self.domain_objects_agents = {} # self.all_public_fluents = [] if self.unfactored: break return ag_domains def _write_problem(self): ag_problems = {} for ag in self.problem.agents: out = StringIO() if self.problem.name is None: name = "ma-pddl" else: name = _get_pddl_name(self.problem) out.write(f"(define (problem {name}-problem)\n") out.write(f" (:domain {name}-domain)\n") if self.domain_objects is None: # This method populates the self._domain_objects map self._populate_domain_objects(ObjectsExtractor(), ag) assert self.domain_objects is not None if len(self.problem.user_types) > 0: out.write(" (:objects") for t in self.problem.user_types: constants_of_this_type = self.domain_objects.get( cast(_UserType, t), None ) if constants_of_this_type is None: objects = [o for o in self.problem.all_objects if o.type == t] else: objects = [ o for o in self.problem.all_objects if o.type == t and o not in constants_of_this_type ] if len(objects) > 0: out.write( f'\n {" ".join([self._get_mangled_name(o) for o in objects])} - {self._get_mangled_name(t)}' ) # If agents are not defined as "constant" in the domain, they are defined in the problem if len(self.problem.agents) > 0: for agent in self.problem.agents: if agent not in self.domain_objects_agents.keys(): out.write( f'\n {self._get_mangled_name(agent)} - {self._get_mangled_name(agent) + "_type"}' ) else: out.write(f"") out.write("\n )\n") converter = ConverterToMAPDDLString( self.problem, self._get_mangled_name, ag, unfactored=self.unfactored ) out.write(" (:init") for f, v in self.problem.initial_values.items(): if v.is_true(): if f.is_dot(): fluent = f.args[0].fluent() args = f.args if not self.unfactored: if ( fluent in self.all_public_fluents or fluent in ag.fluents and f.agent() == ag.name ): out.write(f"\n {converter.convert(f)}") elif ( f.agent() != ag.name and fluent in self.all_public_fluents ): out.write(f"\n {converter.convert(f)}") else: out.write(f"") else: out.write(f"\n {converter.convert(f)}") else: out.write(f"\n {converter.convert(f)}") elif v.is_false(): if self.explicit_false_initial_states: if f.is_dot(): fluent = f.args[0].fluent() args = f.args if not self.unfactored: if ( fluent in self.all_public_fluents or fluent in ag.fluents and f.agent() == ag.name ): out.write(f"\n (not {converter.convert(f)})") elif ( f.agent() != ag.name and fluent in self.all_public_fluents ): out.write(f"\n (not {converter.convert(f)})") else: out.write(f"") else: out.write(f"\n (not {converter.convert(f)})") else: out.write(f"\n (not {converter.convert(f)})") else: pass else: out.write(f"\n (= {converter.convert(f)} {converter.convert(v)})") if self.problem.kind.has_actions_cost(): out.write(f" (= (total-cost) 0)") out.write(")\n") out.write(f" (:goal (and") for p in self.problem.goals: out.write(f" {converter.convert(p)}") out.write(f"))") out.write("\n)") ag_problems[self._get_mangled_name(ag)] = out.getvalue() out.close() self.domain_objects = None self.domain_objects_agents = {} # self.all_public_fluents = [] if self.unfactored: break return ag_problems
[docs] def print_ma_domain_agent(self, agent_name): """Prints to std output the `MA-PDDL` domain.""" domains = self._write_domain() domain_agent = domains[agent_name] sys.stdout.write(domain_agent)
[docs] def print_ma_problem_agent(self, agent_name): """Prints to std output the `MA-PDDL` problem.""" problems = self._write_problem() problem_agent = problems[agent_name] sys.stdout.write(problem_agent)
[docs] def get_ma_domains(self) -> Dict: """Returns the `MA-PDDL` domains.""" domains = self._write_domain() return domains
[docs] def get_ma_domain_agent(self, agent_name) -> str: """Returns the `MA-PDDL` agent domain.""" domains = self._write_domain() domain_agent = domains[agent_name] return domain_agent
[docs] def get_ma_problems(self) -> Dict: """Returns the `MA-PDDL` problems.""" problems = self._write_problem() return problems
[docs] def get_ma_problem_agent(self, agent_name) -> str: """Returns the `MA-PDDL` agent problem.""" problems = self._write_problem() problem_agent = problems[agent_name] return problem_agent
[docs] def write_ma_domain(self, directory_name): """Dumps to file the `MA-PDDL` domains.""" ag_domains = self._write_domain() osy.makedirs(directory_name, exist_ok=True) for ag, domain in ag_domains.items(): if not self.unfactored: name_domain = ag + "_" else: name_domain = "" path_ma_pddl = osy.path.join(directory_name, name_domain + "domain.pddl") with open(path_ma_pddl, "w") as f: f.write(domain)
[docs] def write_ma_problem(self, directory_name): """Dumps to file the `MA-PDDL` problems.""" ag_problems = self._write_problem() osy.makedirs(directory_name, exist_ok=True) for ag, problem in ag_problems.items(): if not self.unfactored: name_problem = ag + "_" else: name_problem = "" path_ma_pddl = osy.path.join(directory_name, name_problem + "problem.pddl") with open(path_ma_pddl, "w") as f: f.write(problem)
[docs] def get_predicates_functions( self, obj: Union[ up.model.multi_agent.Agent, up.model.multi_agent.ma_environment.MAEnvironment, ], is_private: bool = False, ): if isinstance(obj, up.model.multi_agent.Agent): fluents_list = obj.private_fluents if is_private else obj.public_fluents prefix = "a_" else: fluents_list = obj.fluents prefix = "" predicates = [] functions = [] for f in fluents_list: params = [] i = 0 for param in f.signature: if param.type.is_user_type(): params.append( f" {self._get_mangled_name(param)} - {self._get_mangled_name(param.type)}" ) i += 1 else: raise UPTypeError("MA-PDDL supports only user type parameters") if isinstance(obj, up.model.multi_agent.Agent): if self.unfactored: expression = f'({prefix}{self._get_mangled_name(f)}_{obj.name} ?agent - {obj.name + "_type"}{"".join(params)})' else: expression = f'({prefix}{self._get_mangled_name(f)} ?agent - {"ag"}{"".join(params)})' else: expression = f'({prefix}{self._get_mangled_name(f)}{"".join(params)})' if f.type.is_bool_type(): predicates.append(expression) elif f.type.is_int_type() or f.type.is_real_type(): functions.append(expression) else: raise UPTypeError("MA-PDDL supports only boolean and numerical fluents") return predicates, functions
def _get_mangled_name( self, item: WithName, ) -> str: """This function returns a valid and unique MA-PDDL name.""" # If we already encountered this item, return it if item in self.otn_renamings: return self.otn_renamings[item] if isinstance(item, up.model.Type): assert item.is_user_type() original_name = cast(_UserType, item).name tmp_name = _get_pddl_name(item) # If the problem is hierarchical and the name is object, we want to change it if self.problem_kind.has_hierarchical_typing() and tmp_name == "object": tmp_name = f"{tmp_name}_" else: original_name = item.name tmp_name = _get_pddl_name(item) # if the ma-pddl valid name is the same of the original one and it does not create conflicts, # it can be returned if not isinstance(item, up.model.multi_agent.Agent): if tmp_name == original_name and tmp_name not in self.nto_renamings: new_name = tmp_name else: count = 0 new_name = tmp_name while self.problem.has_name(new_name) or new_name in self.nto_renamings: new_name = f"{tmp_name}_{count}" count += 1 assert ( new_name not in self.nto_renamings and new_name not in self.otn_renamings.values() ) else: new_name = tmp_name self.otn_renamings[item] = new_name self.nto_renamings[new_name] = item return new_name
[docs] def get_item_named(self, name: str) -> WithName: """ Since `MA-PDDL` has a stricter set of possible naming compared to the `unified_planning`, when writing a :class:`~unified_planning.model.Problem` it is possible that some things must be renamed. This is why the `MAPDDLWriter` offers this method, that takes a `MA-PDDL` name and returns the original `unified_planning` data structure that corresponds to the `MA-PDDL` entity with the given name. This method takes a name used in the `MA-PDDL` domain or `MA-PDDL` problem generated by this `MAPDDLWriter` and returns the original item in the `unified_planning` `Problem`. :param name: The name used in the generated `MA-PDDL`. :return: The `unified_planning` model entity corresponding to the given name. """ try: return self.nto_renamings[name] except KeyError: raise UPException(f"The name {name} does not correspond to any item.")
[docs] def get_ma_pddl_name( self, item: Union[ "up.model.Type", "up.model.Action", "up.model.Fluent", "up.model.Object", "up.model.Parameter", "up.model.Variable", ], ) -> str: """ This method takes an item in the :class:`~unified_planning.model.MultiAgentProblem` and returns the chosen name for the same item in the `MA-PDDL` problem or `MA-PDDL` domain generated by this `MAPDDLWriter`. :param item: The `unified_planning` entity renamed by this `MAPDDLWriter`. :return: The `MA-PDDL` name of the given item. """ try: return self.otn_renamings[item] except KeyError: raise UPException( f"The item {item} does not correspond to any item renamed." )
def _all_public_fluents( self, list_to_update: Set[Fluent], agents: List[up.model.multi_agent.Agent], ) -> None: """This function creates a list with all public fluents of all agents.""" for agent in agents: for fluent in agent.public_fluents: list_to_update.add(fluent) def _populate_domain_objects( self, obe: ObjectsExtractor, agent: "up.model.multi_agent.Agent" ): self.domain_objects = {} self.domain_objects_agents = {} # Iterate the actions to retrieve domain objects import unified_planning.model.walkers as walkers get_dots = walkers.AnyGetter(lambda x: x.is_dot()) for a in agent.actions: if isinstance(a, up.model.InstantaneousAction): for p in a.preconditions: for d in get_dots.get(p): _update_domain_objects_ag( self.domain_objects_agents, self.problem.agent(d.agent()) ) _update_domain_objects(self.domain_objects, obe.get(p)) for e in a.effects: if e.is_conditional(): _update_domain_objects( self.domain_objects, obe.get(e.condition) ) _update_domain_objects(self.domain_objects, obe.get(e.fluent)) _update_domain_objects(self.domain_objects, obe.get(e.value)) elif isinstance(a, DurativeAction): _update_domain_objects(self.domain_objects, obe.get(a.duration.lower)) _update_domain_objects(self.domain_objects, obe.get(a.duration.upper)) for cl in a.conditions.values(): for c in cl: _update_domain_objects(self.domain_objects, obe.get(c)) for el in a.effects.values(): for e in el: if e.is_conditional(): _update_domain_objects( self.domain_objects, obe.get(e.condition) ) _update_domain_objects(self.domain_objects, obe.get(e.fluent)) _update_domain_objects(self.domain_objects, obe.get(e.value))
def _update_domain_objects( dict_to_update: Dict[_UserType, Set[Object]], values: Dict[_UserType, Set[Object]] ) -> None: """Small utility method that updated a UserType -> Set[Object] dict with another dict of the same type.""" for ut, os in values.items(): os_to_update = dict_to_update.setdefault(ut, set()) os_to_update |= os def _update_domain_objects_ag( dict_to_update: Dict["up.model.multi_agent.Agent", str], agent: up.model.multi_agent.Agent, ) -> None: """Small utility method that updated the dict domain_objects_agents.""" dict_to_update.setdefault(agent, agent.name + "_type")