Source code for unified_planning.model.fluent

# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module defines the Fluent class.
A Fluent has a name, a type and a signature
that defines the types of its parameters.
"""

import unified_planning as up
from unified_planning.model.types import domain_size, domain_item, _IntType
from unified_planning.environment import get_environment, Environment
from unified_planning.exceptions import UPTypeError
from typing import List, OrderedDict, Optional, Union, Iterator, cast


[docs] class Fluent: """Represents a fluent.""" def __init__( self, name: str, typename: Optional["up.model.types.Type"] = None, _signature: Optional[ Union[ OrderedDict[str, "up.model.types.Type"], List["up.model.parameter.Parameter"], ] ] = None, environment: Optional[Environment] = None, **kwargs: "up.model.types.Type", ): self._env = get_environment(environment) self._name = name if typename is None: self._typename = self._env.type_manager.BoolType() else: assert self._env.type_manager.has_type( typename ), "type of parameter does not belong to the same environment of the fluent" self._typename = typename self._signature: List["up.model.parameter.Parameter"] = [] if _signature is not None: assert len(kwargs) == 0 if isinstance(_signature, OrderedDict): for param_name, param_type in _signature.items(): self._signature.append( up.model.parameter.Parameter(param_name, param_type, self._env) ) elif isinstance(_signature, List): assert all( p.environment == self._env for p in _signature ), "one of the parameters does not belong to the same environment of the fluent" self._signature = _signature[:] else: raise NotImplementedError else: for param_name, param_type in kwargs.items(): self._signature.append( up.model.parameter.Parameter(param_name, param_type, self._env) ) for param in self._signature: pt = param.type if pt.is_real_type() or ( pt.is_int_type() and ( cast(_IntType, pt).lower_bound is None or cast(_IntType, pt).upper_bound is None ) ): raise UPTypeError( f"Parameter {param} of fluent {name} has type {pt}; fluents parameters must have finite domains." ) def __repr__(self) -> str: sign = "" if self.arity > 0: sign_items = [f"{p.name}={str(p.type)}" for p in self.signature] sign = f'[{", ".join(sign_items)}]' return f"{str(self.type)} {self.name}{sign}" def __eq__(self, oth: object) -> bool: if isinstance(oth, Fluent): return ( self._name == oth._name and self._typename == oth._typename and self._signature == oth._signature and self._env == oth._env ) else: return False def __hash__(self) -> int: res = hash(self._typename) for p in self._signature: res += hash(p) return res ^ hash(self._name) @property def name(self) -> str: """Returns the `Fluent` `name`.""" return self._name @property def type(self) -> "up.model.types.Type": """Returns the `Fluent` `Type`.""" return self._typename @property def signature(self) -> List["up.model.parameter.Parameter"]: """ Returns the `Fluent` `signature`. The `signature` is the `List` of `Parameters` indicating the :class:`Types <unified_planning.model.Type>` compatible with this `Fluent`. """ return self._signature @property def arity(self) -> int: """ Returns the `Fluent` arity. IMPORTANT NOTE: this property does some computation, so it should be called as seldom as possible. """ return len(self._signature) @property def environment(self) -> "Environment": """Returns the `Fluent` `Environment`.""" return self._env def __call__( self, *args: "up.model.expression.Expression" ) -> "up.model.fnode.FNode": """ Returns a fluent expression with the given parameters. :param args: The expressions used as this fluent's parameters in the created expression. :return: The created FluentExp. """ return self._env.expression_manager.FluentExp(self, args) # # Infix operators # def __add__(self, right): return self._env.expression_manager.Plus(self, right) def __radd__(self, left): return self._env.expression_manager.Plus(left, self) def __sub__(self, right): return self._env.expression_manager.Minus(self, right) def __rsub__(self, left): return self._env.expression_manager.Minus(left, self) def __mul__(self, right): return self._env.expression_manager.Times(self, right) def __rmul__(self, left): return self._env.expression_manager.Times(left, self) def __truediv__(self, right): return self._env.expression_manager.Div(self, right) def __rtruediv__(self, left): return self._env.expression_manager.Div(left, self) def __floordiv__(self, right): return self._env.expression_manager.Div(self, right) def __rfloordiv__(self, left): return self._env.expression_manager.Div(left, self) def __gt__(self, right): return self._env.expression_manager.GT(self, right) def __ge__(self, right): return self._env.expression_manager.GE(self, right) def __lt__(self, right): return self._env.expression_manager.LT(self, right) def __le__(self, right): return self._env.expression_manager.LE(self, right) def __pos__(self): return self._env.expression_manager.Plus(0, self) def __neg__(self): return self._env.expression_manager.Minus(0, self)
[docs] def Equals(self, right): return self._env.expression_manager.Equals(self, right)
[docs] def And(self, *other): return self._env.expression_manager.And(self, *other)
def __and__(self, *other): return self._env.expression_manager.And(self, *other) def __rand__(self, *other): return self._env.expression_manager.And(*other, self)
[docs] def Or(self, *other): return self._env.expression_manager.Or(self, *other)
def __or__(self, *other): return self._env.expression_manager.Or(self, *other) def __ror__(self, *other): return self._env.expression_manager.Or(*other, self)
[docs] def Not(self): return self._env.expression_manager.Not(self)
def __invert__(self): return self._env.expression_manager.Not(self)
[docs] def Xor(self, *other): em = self._env.expression_manager return em.And(em.Or(self, *other), em.Not(em.And(self, *other)))
def __xor__(self, *other): em = self._env.expression_manager return em.And(em.Or(self, *other), em.Not(em.And(self, *other))) def __rxor__(self, other): em = self._env.expression_manager return em.And(em.Or(*other, self), em.Not(em.And(*other, self)))
[docs] def Implies(self, right): return self._env.expression_manager.Implies(self, right)
[docs] def Iff(self, right): return self._env.expression_manager.Iff(self, right)
def get_ith_fluent_exp( fluent: "up.model.fluent.Fluent", domains: List[List["up.model.FNode"]], idx: int, ) -> "up.model.fnode.FNode": """Returns the ith ground fluent expression.""" quot = idx rem = 0 actual_parameters = [] for i, p in enumerate(fluent.signature): ds = len(domains[i]) rem = quot % ds quot //= ds v = domains[i][rem] actual_parameters.append(v) return fluent(*actual_parameters) def get_all_fluent_exp( objects_set: "up.model.mixins.ObjectsSetMixin", fluent: "up.model.fluent.Fluent", ) -> Iterator["up.model.fnode.FNode"]: if fluent.arity == 0: yield fluent.environment.expression_manager.FluentExp(fluent) else: ground_size = 1 domains = [] for p in fluent.signature: ds = domain_size(objects_set, p.type) domain = [domain_item(objects_set, p.type, i) for i in range(ds)] domains.append(domain) ground_size *= ds for i in range(ground_size): yield get_ith_fluent_exp(fluent, domains, i)