# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unified_planning as up
from unified_planning.environment import Environment, get_environment
from unified_planning.exceptions import UPProblemDefinitionError, UPUsageError
from unified_planning.model.expression import NumericConstant, uniform_numeric_constant
from abc import ABC
from fractions import Fraction
from typing import Dict, Optional, Union, Tuple
[docs]
class PlanQualityMetric(ABC):
"""
This is the base class of any metric for :class:`~unified_planning.model.Plan` quality.
The addition of a `PlanQualityMetric` in a `Problem` restricts the set of valid `Plans` to only those who
satisfy the semantic of the given metric, so a `Plan`, to be valid, not only needs to satisfy all the
problem goals, but also the problem's quality metric.
"""
def __init__(self, environment: Optional[Environment] = None):
self._env = get_environment(environment)
@property
def environment(self) -> Environment:
return self._env
[docs]
@staticmethod
def is_minimize_action_costs():
return False
[docs]
@staticmethod
def is_minimize_sequential_plan_length():
return False
[docs]
@staticmethod
def is_minimize_makespan():
return False
[docs]
@staticmethod
def is_minimize_expression_on_final_state():
return False
[docs]
@staticmethod
def is_maximize_expression_on_final_state():
return False
[docs]
@staticmethod
def is_oversubscription():
return False
[docs]
@staticmethod
def is_temporal_oversubscription():
return False
[docs]
class MinimizeActionCosts(PlanQualityMetric):
"""
This metric means that only the :class:`~unified_planning.model.Plan` minimizing the total cost of the :class:`Actions <unified_planning.model.Action>` is valid.
The costs for each `Action` of the problem is stored in this quality metric.
"""
def __init__(
self,
costs: Dict["up.model.Action", "up.model.Expression"],
default: Optional["up.model.Expression"] = None,
environment: Optional[Environment] = None,
):
PlanQualityMetric.__init__(self, environment)
em = self._env.expression_manager
self._costs: Dict["up.model.Action", "up.model.FNode"] = {}
for action, cost in costs.items():
cost_exp: Optional["up.model.FNode"] = None
assert cost is not None, "Typing not respected"
(cost_exp,) = em.auto_promote(cost)
cost_type = cost_exp.type
if not cost_type.is_int_type() and not cost_type.is_real_type():
raise UPProblemDefinitionError(
"The costs of a MinimizeActionCosts must be numeric.",
f"{cost_type} is neither IntType or RealType.",
)
if cost_exp.environment != self._env:
raise UPProblemDefinitionError(
f"The cost expression {cost_exp} and the metric don't have the same environment"
)
if action.environment != self._env:
raise UPProblemDefinitionError(
f"The action {action.name} and the metric don't have the same environment"
)
self._costs[action] = cost_exp
self._default: Optional["up.model.FNode"] = None
if default is not None:
default_exp: "up.model.FNode" = em.auto_promote(default)[0]
default_type = default_exp.type
if not default_type.is_int_type() and not default_type.is_real_type():
raise UPProblemDefinitionError(
"The costs of a MinimizeActionCosts must be numeric.",
f"{default_type} is neither IntType or RealType.",
)
if default_exp.environment != self._env:
raise UPProblemDefinitionError(
f"The default cost expression {default_exp} and the metric don't have the same environment"
)
self._default = default_exp
def __repr__(self):
costs: Dict[str, Optional["up.model.fnode.FNode"]] = {
a.name: c for a, c in self._costs.items()
}
costs["default"] = self._default
return f"minimize actions-cost: {costs}"
def __eq__(self, other):
return (
isinstance(other, MinimizeActionCosts)
and self._default == other._default
and self._costs == other._costs
)
def __hash__(self):
return hash(self.__class__.__name__)
@property
def costs(self) -> Dict["up.model.Action", "up.model.FNode"]:
return self._costs
@property
def default(self) -> Optional["up.model.FNode"]:
return self._default
[docs]
def get_action_cost(self, action: "up.model.Action") -> Optional["up.model.FNode"]:
"""
Returns the cost of the given `Action`.
:param action: The action of which cost must be retrieved.
:return: The expression representing the cost of the given action.
If the retrieved cost is `None` it means it is not set and therefore
it's invalid; every action cost MUST be set, either with the cost mapping
or with the default.
"""
if not isinstance(action, up.model.Action):
raise UPUsageError(
f"An `Action` was expected for this method, got {action}!"
)
return self._costs.get(action, self._default)
[docs]
@staticmethod
def is_minimize_action_costs():
return True
[docs]
class MinimizeSequentialPlanLength(PlanQualityMetric):
"""This metric means that the number of :func:`actions <unified_planning.plans.SequentialPlan.actions>` in the resulting :class:`~unified_planning.plans.SequentialPlan` must be minimized."""
def __repr__(self):
return "minimize sequential-plan-length"
def __eq__(self, other):
return isinstance(other, MinimizeSequentialPlanLength)
def __hash__(self):
return hash(self.__class__.__name__)
[docs]
@staticmethod
def is_minimize_sequential_plan_length():
return True
[docs]
class MinimizeMakespan(PlanQualityMetric):
"""
This metric means that the makespan must be minimized.
The makespan is the time from the start of the plan to the end of the plan.
"""
def __repr__(self):
return "minimize makespan"
def __eq__(self, other):
return isinstance(other, MinimizeMakespan)
def __hash__(self):
return hash(self.__class__.__name__)
[docs]
@staticmethod
def is_minimize_makespan():
return True
[docs]
class MinimizeExpressionOnFinalState(PlanQualityMetric):
"""
This metric means that the given expression must be minimized on the final state reached
following the given :class:`~unified_planning.model.Plan`.
"""
def __init__(
self,
expression: "up.model.Expression",
environment: Optional[Environment] = None,
):
PlanQualityMetric.__init__(self, environment)
self.expression: "up.model.FNode" = self._env.expression_manager.auto_promote(
expression
)[0]
exp_type = self.expression.type
if not exp_type.is_int_type() and not exp_type.is_real_type():
raise UPProblemDefinitionError(
"The expression of a MinimizeExpressionOnFinalState must be numeric.",
f"{exp_type} is neither IntType or RealType.",
)
if self.expression.environment != self._env:
raise UPProblemDefinitionError(
"The expression and the metric don't have the same environment"
)
def __repr__(self):
return f"minimize {self.expression}"
def __eq__(self, other):
return (
isinstance(other, MinimizeExpressionOnFinalState)
and self.expression == other.expression
)
def __hash__(self):
return hash(self.__class__.__name__)
[docs]
@staticmethod
def is_minimize_expression_on_final_state():
return True
[docs]
class MaximizeExpressionOnFinalState(PlanQualityMetric):
"""
This metric means that the given expression must be maximized on the final state reached
following the given :class:`~unified_planning.model.Plan`.
"""
def __init__(
self,
expression: "up.model.Expression",
environment: Optional[Environment] = None,
):
PlanQualityMetric.__init__(self, environment)
self.expression: "up.model.FNode" = self._env.expression_manager.auto_promote(
expression
)[0]
exp_type = self.expression.type
if not exp_type.is_int_type() and not exp_type.is_real_type():
raise UPProblemDefinitionError(
"The expression of a MaximizeExpressionOnFinalState must be numeric.",
f"{exp_type} is neither IntType or RealType.",
)
if self.expression.environment != self._env:
raise UPProblemDefinitionError(
"The expression and the metric don't have the same environment"
)
def __repr__(self):
return f"maximize {self.expression}"
def __eq__(self, other):
return (
isinstance(other, MaximizeExpressionOnFinalState)
and self.expression == other.expression
)
def __hash__(self):
return hash(self.__class__.__name__)
[docs]
@staticmethod
def is_maximize_expression_on_final_state():
return True
[docs]
class Oversubscription(PlanQualityMetric):
"""
This metric means that only the plans maximizing the total gain of the achieved `goals` is valid.
The gained value for each fulfilled `goal` of the problem is stored in this quality metric.
"""
def __init__(
self,
goals: Dict["up.model.BoolExpression", NumericConstant],
environment: Optional[Environment] = None,
):
PlanQualityMetric.__init__(self, environment)
em = self._env.expression_manager
self._goals: Dict["up.model.fnode.FNode", Union[Fraction, int]] = {}
for goal, gain in goals.items():
(g_exp,) = em.auto_promote(goal)
if not g_exp.type.is_bool_type():
raise UPProblemDefinitionError(
f"goal {g_exp} type is {g_exp.type}. Expected BoolType."
)
if g_exp.environment != self._env:
raise UPProblemDefinitionError(
f"goal {g_exp} does not have the same environment given to the metric."
)
self._goals[g_exp] = uniform_numeric_constant(gain)
def __repr__(self):
return f"oversubscription planning goals: {self.goals}"
def __eq__(self, other):
return isinstance(other, Oversubscription) and self.goals == other.goals
def __hash__(self):
return hash(self.__class__.__name__)
@property
def goals(self) -> Dict["up.model.fnode.FNode", Union[Fraction, int]]:
return self._goals
[docs]
@staticmethod
def is_oversubscription():
return True
[docs]
class TemporalOversubscription(PlanQualityMetric):
"""
This metric means that only the plans maximizing the total gain of the achieved `goals` is valid.
The gained value for each fulfilled `goal` of the problem is stored in this quality metric.
"""
def __init__(
self,
goals: Dict[
Tuple["up.model.timing.TimeInterval", "up.model.BoolExpression"],
NumericConstant,
],
environment: Optional[Environment] = None,
):
PlanQualityMetric.__init__(self, environment)
em = self._env.expression_manager
self._goals: Dict[
Tuple["up.model.timing.TimeInterval", "up.model.FNode"],
Union[Fraction, int],
] = {}
for (interval, goal), gain in goals.items():
(g_exp,) = em.auto_promote(goal)
if not g_exp.type.is_bool_type():
raise UPProblemDefinitionError(
f"goal {g_exp} type is {g_exp.type}. Expected BoolType."
)
if g_exp.environment != self._env:
raise UPProblemDefinitionError(
f"goal {g_exp} does not have the same environment given to the metric."
)
self._goals[(interval, g_exp)] = uniform_numeric_constant(gain)
def __repr__(self):
return f"oversubscription planning timed goals: {self.goals}"
def __eq__(self, other):
return isinstance(other, TemporalOversubscription) and self.goals == other.goals
def __hash__(self):
return hash(self.__class__.__name__)
@property
def goals(
self,
) -> Dict[
Tuple["up.model.timing.TimeInterval", "up.model.FNode"], Union[Fraction, int]
]:
return self._goals
[docs]
@staticmethod
def is_temporal_oversubscription():
return True