# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""This module defines the MultiAgentProblem class."""
import unified_planning as up
from unified_planning.model.abstract_problem import AbstractProblem
from unified_planning.model.expression import ConstantExpression
from unified_planning.model.fluent import get_all_fluent_exp
from unified_planning.model.operators import OperatorKind
from unified_planning.model.problem_kind_versioning import LATEST_PROBLEM_KIND_VERSION
from unified_planning.exceptions import (
UPProblemDefinitionError,
UPTypeError,
UPExpressionDefinitionError,
UPPlanDefinitionError,
)
from typing import Optional, List, Dict, Union, cast, Iterable
from unified_planning.model.mixins import (
ObjectsSetMixin,
UserTypesSetMixin,
AgentsSetMixin,
)
from fractions import Fraction
from itertools import chain
[docs]
class MultiAgentProblem( # type: ignore[misc]
AbstractProblem,
UserTypesSetMixin,
ObjectsSetMixin,
AgentsSetMixin,
):
"""
Represents the multi-agent planning problem, with :class:`Agent <unified_planning.model.multi_agent.agent>`, with :class:`MAEnvironment <unified_planning.model.multi_agent.ma_environment>`, :class:`Fluents <unified_planning.model.Fluent>`, :class:`Objects <unified_planning.model.Object>` and :class:`UserTypes <unified_planning.model.Type>`.
"""
def __init__(
self,
name: Optional[str] = None,
environment: Optional["up.environment.Environment"] = None,
*,
initial_defaults: Dict["up.model.types.Type", "ConstantExpression"] = {},
):
AbstractProblem.__init__(self, name, environment)
UserTypesSetMixin.__init__(self, self.environment, self.has_name)
ObjectsSetMixin.__init__(
self, self.environment, self._add_user_type, self.has_name
)
AgentsSetMixin.__init__(self, self.environment, self.has_name)
self._initial_defaults = initial_defaults
self._env_ma = up.model.multi_agent.ma_environment.MAEnvironment(self)
self._goals: List["up.model.fnode.FNode"] = list()
self._initial_value: Dict["up.model.fnode.FNode", "up.model.fnode.FNode"] = {}
self._operators_extractor = up.model.walkers.OperatorsExtractor()
def __setstate__(self, state):
self.__dict__.update(state)
for a in self._agents:
a._add_user_type_method = self._add_user_type
a._ma_problem_has_name_not_in_agents = self.has_name_not_in_agents
def __repr__(self) -> str:
s = []
if not self.name is None:
s.append(f"problem name = {str(self.name)}\n\n")
if len(self.user_types) > 0:
s.append(f"types = {str(list(self.user_types))}\n\n")
s.append("environment fluents = [\n")
for f in self.ma_environment.fluents:
s.append(f" {str(f)}\n")
s.append("]\n\n")
s.append("agents = [\n")
for ag in self.agents:
s.append(f" {str(ag)}\n")
s.append("]\n\n")
if len(self.user_types) > 0:
s.append("objects = [\n")
for ty in self.user_types:
s.append(f" {str(ty)}: {str(list(self.objects(ty)))}\n")
s.append("]\n\n")
s.append("initial values = [\n")
for k, v in self._initial_value.items():
s.append(f" {str(k)} := {str(v)}\n")
s.append("]\n\n")
s.append("goals = [\n")
for g in self.goals:
s.append(f" {str(g)}\n")
s.append("]\n\n")
return "".join(s)
def __eq__(self, oth: object) -> bool:
if not (isinstance(oth, MultiAgentProblem)) or self._env != oth._env:
return False
if self.kind != oth.kind or self._name != oth._name:
return False
if self.ma_environment != oth.ma_environment:
return False
if set(self._goals) != set(oth._goals):
return False
if set(self._user_types) != set(oth._user_types) or set(self._objects) != set(
oth._objects
):
return False
if set(self._agents) != set(oth._agents):
return False
oth_initial_values = oth.initial_values
for fluent, value in self.initial_values.items():
oth_value = oth_initial_values.get(fluent, None)
if oth_value is None:
return False
elif value != oth_value:
return False
return True
def __hash__(self) -> int:
res = hash(self._kind) + hash(self._name)
res += hash(self.ma_environment)
for a in self._agents:
res += hash(a)
for ut in self._user_types:
res += hash(ut)
for o in self._objects:
res += hash(o)
for iv in self.initial_values.items():
res += hash(iv)
for g in self._goals:
res += hash(g)
return res
[docs]
def clone(self):
new_p = MultiAgentProblem(self._name, self._env)
new_p.ma_environment._fluents = self.ma_environment._fluents.copy()
new_p.ma_environment._fluents_defaults = (
self.ma_environment._fluents_defaults.copy()
)
new_p._agents = [ag.clone(new_p) for ag in self._agents]
new_p._user_types = self._user_types[:]
new_p._user_types_hierarchy = self._user_types_hierarchy.copy()
new_p._objects = self._objects[:]
new_p._initial_value = self._initial_value.copy()
new_p._goals = self._goals[:]
new_p._initial_defaults = self._initial_defaults.copy()
return new_p
[docs]
def has_name(self, name: str) -> bool:
"""
Returns `True` if the given `name` is already in the `MultiAgentProblem`, `False` otherwise.
:param name: The target name to find in the `MultiAgentProblem`.
:return: `True` if the given `name` is already in the `MultiAgentProblem`, `False` otherwise.
"""
return (
self.has_object(name)
or self.has_type(name)
or self.has_agent(name)
or self._env_ma.has_name(name)
or any(a.has_name_in_agent(name) for a in self._agents)
)
[docs]
def has_name_not_in_agents(self, name: str) -> bool:
"""
Returns `True` if the given `name` is already in the `MultiAgentProblem`, `False` otherwise;
this method does not check in the problem's agents
:param name: The target name to find in the `MultiAgentProblem` without checking Agents.
:return: `True` if the given `name` is already in the `MultiAgentProblem`, `False` otherwise.
"""
return (
self.has_object(name)
or self.has_type(name)
or self.has_agent(name)
or self._env_ma.has_name(name)
)
@property
def ma_environment(self) -> "up.model.multi_agent.ma_environment.MAEnvironment":
"""Returns this `MultiAgentProblem` `MAEnvironment`."""
return self._env_ma
[docs]
def set_initial_value(
self,
fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"],
value: Union[
"up.model.expression.NumericExpression",
"up.model.fluent.Fluent",
"up.model.object.Object",
bool,
],
):
"""
Sets the initial value for the given `Fluent`. The given `Fluent` must be grounded, therefore if
it's :func:`arity <unified_planning.model.Fluent.arity>` is `> 0`, the `fluent` parameter must be
an `FNode` and the method :func:`~unified_planning.model.FNode.is_fluent_exp` must return `True`.
:param fluent: The grounded `Fluent` of which the initial value must be set.
:param value: The `value` assigned in the initial state to the given `fluent`.
"""
fluent_exp, value_exp = self._env.expression_manager.auto_promote(fluent, value)
if not fluent_exp.type.is_compatible(value_exp.type):
raise UPTypeError("Initial value assignment has not compatible types!")
self._initial_value[fluent_exp] = value_exp
[docs]
def initial_value(
self, fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"]
) -> "up.model.fnode.FNode":
"""
Retrieves the initial value assigned to the given `fluent`.
:param fluent: The target `fluent` of which the `value` in the initial state must be retrieved.
:return: The `value` expression assigned to the given `fluent` in the initial state.
"""
(fluent_exp,) = self._env.expression_manager.auto_promote(fluent)
fluent_args = (
fluent_exp.args if fluent_exp.is_fluent_exp() else fluent_exp.arg(0).args
)
for a in fluent_args:
if not a.is_constant():
raise UPExpressionDefinitionError(
f"Impossible to return the initial value of a fluent expression with no constant arguments: {fluent_exp}."
)
if fluent_exp in self._initial_value:
return self._initial_value[fluent_exp]
elif fluent_exp.is_dot():
agent = self.agent(fluent_exp.agent())
f = fluent_exp.arg(0).fluent()
if f not in agent.fluents:
raise UPProblemDefinitionError(
f"Expression {fluent_exp} is invalid because {f} does not belong to agent {agent.name}"
)
v = agent.fluents_defaults.get(f, None)
if v is None:
raise UPProblemDefinitionError("Initial value not set!")
return v
elif fluent_exp.fluent() in self.ma_environment.fluents_defaults:
return self.ma_environment.fluents_defaults[fluent_exp.fluent()]
else:
raise UPProblemDefinitionError("Initial value not set!")
@property
def initial_values(self) -> Dict["up.model.fnode.FNode", "up.model.fnode.FNode"]:
"""
Gets the initial value of all the grounded fluents present in the `MultiAgentProblem`.
IMPORTANT NOTE: this property does a lot of computation, so it should be called as
seldom as possible.
"""
res = self._initial_value
for f in self.ma_environment.fluents:
for f_exp in get_all_fluent_exp(self, f):
res[f_exp] = self.initial_value(f_exp)
for a in self.agents:
for f in a.fluents:
for f_exp in get_all_fluent_exp(self, f):
d = self.environment.expression_manager.Dot(a, f_exp)
res[d] = self.initial_value(d)
return res
@property
def explicit_initial_values(
self,
) -> Dict["up.model.fnode.FNode", "up.model.fnode.FNode"]:
"""
Returns the problem's defined initial values; those are only the initial values set with the
:func:`~unified_planning.model.multi_agent.MultiAgentProblem.set_initial_value` method.
IMPORTANT NOTE: For all the initial values of the problem use :func:`initial_values <unified_planning.model.multi_agent.MultiAgentProblem.initial_values>`.
"""
return self._initial_value
[docs]
def add_goal(
self, goal: Union["up.model.fnode.FNode", "up.model.fluent.Fluent", bool]
):
"""
Adds the given `goal` to the `MultiAgentProblem`; a goal is an expression that must be evaluated to `True` at the
end of the execution of a :class:`~unified_planning.plans.Plan`. If a `Plan` does not satisfy all the given `goals`, it is not valid.
:param goal: The expression added to the `MultiAgentProblem` :func:`goals <unified_planning.model.multi_agent.MultiAgentProblem.goals>`.
"""
assert (
isinstance(goal, bool) or goal.environment == self._env
), "goal does not have the same environment of the problem"
(goal_exp,) = self._env.expression_manager.auto_promote(goal)
assert self._env.type_checker.get_type(
goal_exp
).is_bool_type(), "A goal must be a boolean expression"
if goal_exp != self._env.expression_manager.TRUE():
self._goals.append(goal_exp)
[docs]
def add_goals(
self,
goals: Iterable[Union["up.model.fnode.FNode", "up.model.fluent.Fluent", bool]],
):
"""
Adds the given `goal` to the `MultiAgentProblem`.
:param goals: The `goals` that must be added to the `MultiAgentProblem`.
"""
for goal in goals:
self.add_goal(goal)
@property
def goals(self) -> List["up.model.fnode.FNode"]:
"""Returns all the `goals` in the `MultiAgentProblem`."""
return self._goals
[docs]
def clear_goals(self):
"""Removes all the `goals` from the `MultiAgentProblem`."""
self._goals = []
[docs]
def clear_agents(self):
"""Removes all the `goals` from the `MultiAgentProblem`."""
self._agents = []
@property
def kind(self) -> "up.model.problem_kind.ProblemKind":
"""
Calculates and returns the `problem kind` of this `planning problem`.
If the `Problem` is modified, this method must be called again in order to be reliable.
IMPORTANT NOTE: this property does a lot of computation, so it should be called as
seldom as possible.
"""
self._kind = up.model.problem_kind.ProblemKind(
version=LATEST_PROBLEM_KIND_VERSION
)
self._kind.set_problem_class("ACTION_BASED_MULTI_AGENT")
for ag in self.agents:
for fluent in ag.fluents:
self._update_problem_kind_fluent(fluent)
for fluent in self.ma_environment.fluents:
self._update_problem_kind_fluent(fluent)
for ag in self.agents:
self._update_agent_goal_kind(ag)
for action in ag.actions:
self._update_problem_kind_action(action)
for goal in self._goals:
self._update_problem_kind_condition(goal)
return self._kind
def _update_problem_kind_effect(self, e: "up.model.effect.Effect"):
if e.is_conditional():
self._update_problem_kind_condition(e.condition)
self._kind.set_effects_kind("CONDITIONAL_EFFECTS")
if e.is_forall():
self._kind.set_effects_kind("FORALL_EFFECTS")
if e.is_increase():
self._kind.set_effects_kind("INCREASE_EFFECTS")
elif e.is_decrease():
self._kind.set_effects_kind("DECREASE_EFFECTS")
def _update_problem_kind_condition(self, exp: "up.model.fnode.FNode"):
ops = self._operators_extractor.get(exp)
if OperatorKind.EQUALS in ops:
self._kind.set_conditions_kind("EQUALITIES")
if OperatorKind.NOT in ops:
self._kind.set_conditions_kind("NEGATIVE_CONDITIONS")
if OperatorKind.OR in ops or OperatorKind.IMPLIES in ops:
self._kind.set_conditions_kind("DISJUNCTIVE_CONDITIONS")
if OperatorKind.EXISTS in ops:
self._kind.set_conditions_kind("EXISTENTIAL_CONDITIONS")
if OperatorKind.FORALL in ops:
self._kind.set_conditions_kind("UNIVERSAL_CONDITIONS")
def _update_problem_kind_type(self, type: "up.model.types.Type"):
if type.is_user_type():
self._kind.set_typing("FLAT_TYPING")
if cast(up.model.types._UserType, type).father is not None:
self._kind.set_typing("HIERARCHICAL_TYPING")
def _update_problem_kind_fluent(self, fluent: "up.model.fluent.Fluent"):
self._update_problem_kind_type(fluent.type)
if fluent.type.is_int_type() or fluent.type.is_real_type():
numeric_type = fluent.type
assert isinstance(
numeric_type, (up.model.types._RealType, up.model.types._IntType)
)
if (
numeric_type.lower_bound is not None
or numeric_type.upper_bound is not None
):
self._kind.set_numbers("BOUNDED_TYPES")
if fluent.type.is_int_type():
self._kind.set_fluents_type("INT_FLUENTS")
else:
assert fluent.type.is_real_type()
self._kind.set_fluents_type("REAL_FLUENTS")
elif fluent.type.is_user_type():
self._kind.set_fluents_type("OBJECT_FLUENTS")
for p in fluent.signature:
self._update_problem_kind_type(p.type)
def _update_problem_kind_action(self, action: "up.model.action.Action"):
for p in action.parameters:
self._update_problem_kind_type(p.type)
if isinstance(action, up.model.action.InstantaneousAction):
for c in action.preconditions:
self._update_problem_kind_condition(c)
for e in action.effects:
self._update_problem_kind_effect(e)
elif isinstance(action, up.model.action.DurativeAction):
self._kind.set_time("CONTINUOUS_TIME")
else:
raise NotImplementedError
def _update_agent_goal_kind(self, agent: "up.model.multi_agent.Agent"):
if isinstance(agent, up.model.multi_agent.Agent):
if agent.public_goals:
self._kind.set_multi_agent("AGENT_SPECIFIC_PUBLIC_GOAL")
if agent.private_goals:
self._kind.set_multi_agent("AGENT_SPECIFIC_PRIVATE_GOAL")
for goal in chain(agent.private_goals, agent.public_goals):
self._update_problem_kind_condition(goal)
[docs]
def normalize_plan(self, plan: "up.plans.Plan") -> "up.plans.Plan":
"""
Normalizes the given `Plan`, that is potentially the result of another
`MAProblem`, updating the `Object` references in the `Plan` with the ones of
this `MAProblem` which are syntactically equal.
:param plan: The `Plan` that must be normalized.
:return: A `Plan` syntactically valid for this `Problem`.
"""
return plan.replace_action_instances(self._replace_action_instance)
def _replace_action_instance(
self, action_instance: "up.plans.ActionInstance"
) -> "up.plans.ActionInstance":
em = self.environment.expression_manager
if action_instance.agent is None:
raise UPPlanDefinitionError(
f"An ActionInstance for a multi-agent problem must have an Agent; {action_instance} has no Agent."
)
new_a = action_instance.agent.action(action_instance.action.name)
params = []
for p in action_instance.actual_parameters:
if p.is_object_exp():
obj = self.object(p.object().name)
params.append(em.ObjectExp(obj))
elif p.is_bool_constant():
params.append(em.Bool(p.is_true()))
elif p.is_int_constant():
params.append(em.Int(cast(int, p.constant_value())))
elif p.is_real_constant():
params.append(em.Real(cast(Fraction, p.constant_value())))
else:
raise NotImplementedError
return up.plans.ActionInstance(new_a, tuple(params), action_instance.agent)