# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""This module defines an agent class."""
import unified_planning as up
from unified_planning.model.mixins import (
ActionsSetMixin,
FluentsSetMixin,
)
from typing import Optional, List, Union, Iterable
from unified_planning.model.expression import ConstantExpression
from unified_planning.exceptions import UPUsageError
[docs]
class Agent(
FluentsSetMixin,
ActionsSetMixin,
):
"""
This is an agent class that represents a generic `agent`.
"""
def __init__(
self,
name: str,
ma_problem: "up.model.multi_agent.ma_problem.MultiAgentProblem",
):
FluentsSetMixin.__init__(
self,
ma_problem.environment,
ma_problem._add_user_type,
self.has_name,
ma_problem._initial_defaults,
)
ActionsSetMixin.__init__(
self, ma_problem.environment, ma_problem._add_user_type, self.has_name
)
self._env = ma_problem.environment
self._name: str = name
self._public_fluents: List["up.model.fluent.Fluent"] = []
self._private_goals: List["up.model.fnode.FNode"] = list()
self._public_goals: List["up.model.fnode.FNode"] = list()
self._ma_problem_has_name_not_in_agents = ma_problem.has_name_not_in_agents
def __getstate__(self):
state = self.__dict__.copy()
# Don't pickle MultiAgentProblem methods
state["_add_user_type_method"] = None
state["_ma_problem_has_name_not_in_agents"] = None
return state
[docs]
def has_name(self, name: str) -> bool:
"""
Returns `True` if the given `name` is already in the `MultiAgentProblem`, `False` otherwise.
:param name: The target name to find in the `MultiAgentProblem`.
:return: `True` if the given `name` is already in the `MultiAgentProblem`, `False` otherwise.
"""
return (
self.has_action(name)
or self.has_fluent(name)
or self._ma_problem_has_name_not_in_agents(name)
)
[docs]
def has_name_in_agent(self, name: str) -> bool:
"""
Returns `True` if the given `name` is already in the `MultiAgentProblem`, `False` otherwise.
:param name: The target name to find in the `MultiAgentProblem`.
:return: `True` if the given `name` is already in the `MultiAgentProblem`, `False` otherwise.
"""
return self.has_action(name) or self.has_fluent(name)
@property
def name(self) -> str:
"""Returns the `Agent` `name`."""
return self._name
@name.setter
def name(self, new_value: str):
raise UPUsageError("The name of an Agent is immutable.")
@property
def environment(self) -> "up.Environment":
"""Returns this `Agent` `Environment`."""
return self._env
[docs]
def add_public_fluent(
self,
fluent_or_name: Union["up.model.fluent.Fluent", str],
typename: Optional["up.model.types.Type"] = None,
*,
default_initial_value: Optional["ConstantExpression"] = None,
**kwargs: "up.model.types.Type",
) -> "up.model.fluent.Fluent":
"""Adds the given `public fluent` to the `problem`.
If the first parameter is not a `Fluent`, the parameters will be passed to the `Fluent` constructor to create it.
:param fluent_or_name: `Fluent` instance or `name` of the `fluent` to be constructed.
:param typename: If only the `name` of the `fluent` is given, this is the `fluent's type` (passed to the `Fluent` constructor).
:param default_initial_value: If provided, defines the default value taken in initial state by
a state variable of this `fluent` that has no explicit value.
:param kwargs: If only the `name` of the `fluent` is given, these are the `fluent's parameters` (passed to the `Fluent` constructor).
:return: The `fluent` passed or constructed.
"""
fluent = self.add_fluent(
fluent_or_name,
typename,
default_initial_value=default_initial_value,
**kwargs,
)
self._public_fluents.append(fluent)
return fluent
[docs]
def add_private_fluent(
self,
fluent_or_name: Union["up.model.fluent.Fluent", str],
typename: Optional["up.model.types.Type"] = None,
*,
default_initial_value: Optional["ConstantExpression"] = None,
**kwargs: "up.model.types.Type",
) -> "up.model.fluent.Fluent":
"""Adds the given `private fluent` to the `problem`.
If the first parameter is not a `Fluent`, the parameters will be passed to the `Fluent` constructor to create it.
:param fluent_or_name: `Fluent` instance or `name` of the `fluent` to be constructed.
:param typename: If only the `name` of the `fluent` is given, this is the `fluent's type` (passed to the `Fluent` constructor).
:param default_initial_value: If provided, defines the default value taken in initial state by
a state variable of this `fluent` that has no explicit value.
:param kwargs: If only the `name` of the `fluent` is given, these are the `fluent's parameters` (passed to the `Fluent` constructor).
:return: The `fluent` passed or constructed.
"""
return self.add_fluent(
fluent_or_name,
typename,
default_initial_value=default_initial_value,
**kwargs,
)
[docs]
def add_public_fluents(self, fluents: Iterable["up.model.fluent.Fluent"]):
"""
Adds the given `public fluents` to the `problem`.
:param fluents: The `public fluents` that must be added to the `problem`.
"""
for fluent in fluents:
self.add_public_fluent(fluent)
[docs]
def add_private_fluents(self, fluents: Iterable["up.model.fluent.Fluent"]):
"""
Adds the given `private fluents` to the `problem`.
:param fluents: The `private fluents` that must be added to the `problem`.
"""
for fluent in fluents:
self.add_private_fluent(fluent)
@property
def public_fluents(self) -> List["up.model.fluent.Fluent"]:
"""Returns the `fluents` currently in the `problem`."""
return self._public_fluents
@property
def private_fluents(self) -> List["up.model.fluent.Fluent"]:
"""Returns the `fluents` currently in the `problem`."""
return [f for f in self._fluents if f not in self._public_fluents]
def _add_goal(
self,
goal: Union["up.model.fnode.FNode", "up.model.fluent.Fluent", bool],
is_private_goal: bool,
) -> "up.model.fnode.FNode":
"""
Adds the given `goal` to the specified `goal_list` of the `Agent`.
:param goal: The expression added to the `Agent` goals.
:param is_private_goal: A boolean flag indicating whether the goal should be added as a private goal.
:return: The expression of the goal added.
"""
assert (
isinstance(goal, bool) or goal.environment == self._env
), "goal does not have the same environment of the problem"
(goal_exp,) = self._env.expression_manager.auto_promote(goal)
assert self._env.type_checker.get_type(
goal_exp
).is_bool_type(), "A goal must be a boolean expression"
goal_list = self._private_goals if is_private_goal else self._public_goals
if goal_exp != self._env.expression_manager.TRUE():
if goal_exp not in goal_list:
goal_list.append(goal_exp)
return goal_exp
[docs]
def add_private_goal(
self, goal: Union["up.model.fnode.FNode", "up.model.fluent.Fluent", bool]
) -> "up.model.fnode.FNode":
"""
Adds the given `goal` to the `Agent` as a private goal.
:param goal: The expression added to the `Agent` private goals.
:return: The expression of the private goal added.
Note:
- Private-specific goals are; individual agent goals (not coalition goals) unknown to other agents.
"""
return self._add_goal(goal, is_private_goal=True)
[docs]
def add_public_goal(
self, goal: Union["up.model.fnode.FNode", "up.model.fluent.Fluent", bool]
) -> "up.model.fnode.FNode":
"""
Adds the given `goal` to the `Agent` as a public goal.
:param goal: The expression added to the `Agent` public goals.
:return: The expression of the public goal added.
Note:
- Public-specific goals are; individual agent goals (not coalition goals) known to other agents.
"""
return self._add_goal(goal, is_private_goal=False)
@property
def public_goals(self) -> List["up.model.fnode.FNode"]:
"""Returns the `public goals` currently in the `agent`."""
return self._public_goals
@property
def private_goals(self) -> List["up.model.fnode.FNode"]:
"""Returns the `private goals` currently in the `agent`."""
return self._private_goals
[docs]
def clear_goals(self):
"""Removes all the `goals` from the `Agent`."""
self._private_goals = []
self._public_goals = []
def __repr__(self) -> str:
s = []
s.append(f"Agent name = {str(self._name)}\n\n")
s.append("private fluents = [\n")
for f in self.private_fluents:
s.append(f" {str(f)}\n")
s.append("]\n\n")
s.append("public fluents = [\n")
for f in self._public_fluents:
s.append(f" {str(f)}\n")
s.append("]\n\n")
s.append("actions = [\n")
for a in self._actions:
s.append(f" {str(a)}\n")
s.append("]\n\n")
s.append("private goals = [\n")
for g in self.private_goals:
s.append(f" {str(g)}\n")
s.append("]\n\n")
s.append("public goals = [\n")
for g in self._public_goals:
s.append(f" {str(g)}\n")
s.append("]\n\n")
return "".join(s)
def __eq__(self, oth: object) -> bool:
if not (isinstance(oth, Agent)) or self._env != oth._env:
return False
if self._name != oth._name:
return False
if set(self._fluents) != set(oth._fluents):
return False
if set(self._public_fluents) != set(oth._public_fluents):
return False
if set(self._actions) != set(oth._actions):
return False
if set(self._private_goals) != set(oth._private_goals):
return False
if set(self._public_goals) != set(oth._public_goals):
return False
return True
def __hash__(self) -> int:
res = hash(self._name)
for f in self._fluents:
res += hash(f)
for f in self._public_fluents:
res += hash(f)
for a in self._actions:
res += hash(a)
for g in self._private_goals:
res += hash(g)
for g in self._public_goals:
res += hash(g)
return res
[docs]
def clone(
self,
ma_problem: "up.model.multi_agent.ma_problem.MultiAgentProblem",
name: Optional[str] = None,
):
if name is None:
name = self.name
new_ag = Agent(name, ma_problem)
new_ag._public_fluents = self._public_fluents.copy()
new_ag._fluents = self._fluents.copy()
new_ag._fluents_defaults = self._fluents_defaults.copy()
new_ag._private_goals = self._private_goals.copy()
new_ag._public_goals = self._public_goals.copy()
for a in self.actions:
new_ag.add_action(a.clone())
return new_ag