diff --git a/pyoverkiz/auth/base.py b/pyoverkiz/auth/base.py index e10b99ca..6131c3bc 100644 --- a/pyoverkiz/auth/base.py +++ b/pyoverkiz/auth/base.py @@ -4,7 +4,7 @@ import datetime from collections.abc import Mapping -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Any, Protocol @@ -12,8 +12,8 @@ class AuthContext: """Authentication context holding tokens and expiration.""" - access_token: str | None = None - refresh_token: str | None = None + access_token: str | None = field(default=None, repr=False) + refresh_token: str | None = field(default=None, repr=False) expires_at: datetime.datetime | None = None def is_expired(self, *, skew_seconds: int = 5) -> bool: diff --git a/pyoverkiz/auth/credentials.py b/pyoverkiz/auth/credentials.py index 777f950b..b70b3f6e 100644 --- a/pyoverkiz/auth/credentials.py +++ b/pyoverkiz/auth/credentials.py @@ -2,7 +2,7 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field class Credentials: @@ -14,14 +14,14 @@ class UsernamePasswordCredentials(Credentials): """Credentials using username and password.""" username: str - password: str + password: str = field(repr=False) @dataclass(slots=True) class TokenCredentials(Credentials): """Credentials using an (API) token.""" - token: str + token: str = field(repr=False) @dataclass(slots=True) @@ -33,5 +33,5 @@ class LocalTokenCredentials(TokenCredentials): class RexelOAuthCodeCredentials(Credentials): """Credentials using Rexel OAuth2 authorization code.""" - code: str + code: str = field(repr=False) redirect_uri: str diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index 38d8dbba..6ad4ce38 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -23,6 +23,7 @@ from pyoverkiz.action_queue import ActionQueue, ActionQueueSettings from pyoverkiz.auth import AuthStrategy, Credentials, build_auth_strategy from pyoverkiz.const import SUPPORTED_SERVERS +from pyoverkiz.converter import converter from pyoverkiz.enums import APIType, ExecutionMode, Server from pyoverkiz.exceptions import ( ExecutionQueueFullError, @@ -306,7 +307,7 @@ async def get_setup(self, refresh: bool = False) -> Setup: response = await self._get("setup") - setup = Setup(**decamelize(response)) + setup = converter.structure(decamelize(response), Setup) # Cache response self.setup = setup @@ -344,7 +345,7 @@ async def get_devices(self, refresh: bool = False) -> list[Device]: return self.devices response = await self._get("setup/devices") - devices = [Device(**d) for d in decamelize(response)] + devices = converter.structure(decamelize(response), list[Device]) # Cache response self.devices = devices @@ -363,7 +364,7 @@ async def get_gateways(self, refresh: bool = False) -> list[Gateway]: return self.gateways response = await self._get("setup/gateways") - gateways = [Gateway(**g) for g in decamelize(response)] + gateways = converter.structure(decamelize(response), list[Gateway]) # Cache response self.gateways = gateways @@ -376,7 +377,7 @@ async def get_gateways(self, refresh: bool = False) -> list[Gateway]: async def get_execution_history(self) -> list[HistoryExecution]: """List past executions and their outcomes.""" response = await self._get("history/executions") - return [HistoryExecution(**h) for h in decamelize(response)] + return converter.structure(decamelize(response), list[HistoryExecution]) @retry_on_auth_error async def get_device_definition(self, deviceurl: str) -> JSON | None: @@ -393,7 +394,7 @@ async def get_state(self, deviceurl: str) -> list[State]: response = await self._get( f"setup/devices/{urllib.parse.quote_plus(deviceurl)}/states" ) - return [State(**s) for s in decamelize(response)] + return converter.structure(decamelize(response), list[State]) @retry_on_auth_error async def refresh_states(self) -> None: @@ -437,7 +438,7 @@ async def fetch_events(self) -> list[Event]: """ await self._refresh_token_if_expired() response = await self._post(f"events/{self.event_listener_id}/fetch") - return [Event(**e) for e in decamelize(response)] + return converter.structure(decamelize(response), list[Event]) async def unregister_event_listener(self) -> None: """Unregister an event listener. @@ -455,17 +456,16 @@ async def get_current_execution(self, exec_id: str) -> Execution | None: Returns None if the execution does not exist. """ response = await self._get(f"exec/current/{exec_id}") - if not response or not isinstance(response, dict): return None - return Execution(**decamelize(response)) + return converter.structure(decamelize(response), Execution) @retry_on_auth_error async def get_current_executions(self) -> list[Execution]: """Get all currently running executions.""" response = await self._get("exec/current") - return [Execution(**e) for e in decamelize(response)] + return converter.structure(decamelize(response), list[Execution]) @retry_on_auth_error async def get_api_version(self) -> str: @@ -557,7 +557,7 @@ async def cancel_execution(self, exec_id: str) -> None: async def get_action_groups(self) -> list[ActionGroup]: """List action groups persisted on the server.""" response = await self._get("actionGroups") - return [ActionGroup(**action_group) for action_group in decamelize(response)] + return converter.structure(decamelize(response), list[ActionGroup]) @retry_on_auth_error async def get_places(self) -> Place: @@ -572,7 +572,7 @@ async def get_places(self) -> Place: - `sub_places`: List of nested places within this location """ response = await self._get("setup/places") - return Place(**decamelize(response)) + return converter.structure(decamelize(response), Place) @retry_on_auth_error async def execute_persisted_action_group(self, oid: str) -> str: @@ -594,7 +594,7 @@ async def get_setup_options(self) -> list[Option]: Access scope : Full enduser API access (enduser/*). """ response = await self._get("setup/options") - return [Option(**o) for o in decamelize(response)] + return converter.structure(decamelize(response), list[Option]) @retry_on_auth_error async def get_setup_option(self, option: str) -> Option | None: @@ -605,7 +605,7 @@ async def get_setup_option(self, option: str) -> Option | None: response = await self._get(f"setup/options/{option}") if response: - return Option(**decamelize(response)) + return converter.structure(decamelize(response), Option) return None @@ -623,7 +623,7 @@ async def get_setup_option_parameter( response = await self._get(f"setup/options/{option}/{parameter}") if response: - return OptionParameter(**decamelize(response)) + return converter.structure(decamelize(response), OptionParameter) return None @@ -655,7 +655,7 @@ async def get_reference_protocol_types(self) -> list[ProtocolType]: - label: Human-readable protocol label """ response = await self._get("reference/protocolTypes") - return [ProtocolType(**protocol) for protocol in response] + return converter.structure(response, list[ProtocolType]) @retry_on_auth_error async def get_reference_timezones(self) -> JSON: @@ -685,7 +685,7 @@ async def get_reference_ui_profile(self, profile_name: str) -> UIProfileDefiniti response = await self._get( f"reference/ui/profile/{urllib.parse.quote_plus(profile_name)}" ) - return UIProfileDefinition(**decamelize(response)) + return converter.structure(decamelize(response), UIProfileDefinition) @retry_on_auth_error async def get_reference_ui_profile_names(self) -> list[str]: @@ -701,7 +701,7 @@ async def get_reference_ui_widgets(self) -> list[str]: async def get_devices_not_up_to_date(self) -> list[Device]: """Get all devices whose firmware is not up to date.""" response = await self._get("setup/devices/notUpToDate") - return [Device(**d) for d in decamelize(response)] + return converter.structure(decamelize(response), list[Device]) @retry_on_auth_error async def get_device_firmware_status(self, deviceurl: str) -> FirmwareStatus | None: @@ -715,7 +715,7 @@ async def get_device_firmware_status(self, deviceurl: str) -> FirmwareStatus | N ) except UnsupportedOperationError: return None - return FirmwareStatus(**decamelize(response)) + return converter.structure(decamelize(response), FirmwareStatus) @retry_on_auth_error async def get_device_firmware_update_capability(self, deviceurl: str) -> bool: diff --git a/pyoverkiz/converter.py b/pyoverkiz/converter.py new file mode 100644 index 00000000..901b01de --- /dev/null +++ b/pyoverkiz/converter.py @@ -0,0 +1,64 @@ +"""Centralized cattrs converter for structuring Overkiz API responses.""" + +from __future__ import annotations + +import types +from enum import Enum +from typing import Any, Union, get_args, get_origin + +import attr +import cattrs + +from pyoverkiz.models import ( + CommandDefinition, + CommandDefinitions, + State, + States, +) + + +def _is_primitive_union(t: Any) -> bool: + """True for unions of JSON-native types (e.g. StateType). + + Excludes unions containing attrs classes (e.g. Definition | None) since those + need actual structuring by cattrs. + """ + origin = get_origin(t) + if origin is not Union and not isinstance(t, types.UnionType): + return False + non_none = [arg for arg in get_args(t) if arg is not type(None)] + if any(isinstance(arg, type) and attr.has(arg) for arg in non_none): + return False + # Exclude pure Optional[Enum] unions — those need the Enum structure hook. + return not all(isinstance(arg, type) and issubclass(arg, Enum) for arg in non_none) + + +def _make_converter() -> cattrs.Converter: + c = cattrs.Converter() + + # JSON-native unions like StateType (str | int | float | … | None) are already the + # correct Python type after JSON parsing — tell cattrs to pass them through as-is. + c.register_structure_hook_func(_is_primitive_union, lambda v, _: v) + + # Enums: call the constructor so UnknownEnumMixin._missing_ can handle unknown values + c.register_structure_hook_func( + lambda t: isinstance(t, type) and issubclass(t, Enum), + lambda v, t: v if isinstance(v, t) else t(v), + ) + + # Custom container types that take a list in __init__ + def _structure_states(val: Any, _: type) -> States: + if val is None: + return States() + return States([c.structure(s, State) for s in val]) + + def _structure_command_definitions(val: Any, _: type) -> CommandDefinitions: + return CommandDefinitions([c.structure(cd, CommandDefinition) for cd in val]) + + c.register_structure_hook(States, _structure_states) + c.register_structure_hook(CommandDefinitions, _structure_command_definitions) + + return c + + +converter = _make_converter() diff --git a/pyoverkiz/enums/__init__.py b/pyoverkiz/enums/__init__.py index af98f61a..33cc80c9 100644 --- a/pyoverkiz/enums/__init__.py +++ b/pyoverkiz/enums/__init__.py @@ -7,7 +7,12 @@ ExecutionSubType, ExecutionType, ) -from pyoverkiz.enums.gateway import GatewaySubType, GatewayType, UpdateBoxStatus +from pyoverkiz.enums.gateway import ( + GatewaySubType, + GatewayType, + UpdateBoxStatus, + UpdateCriticityLevel, +) from pyoverkiz.enums.general import DataType, EventName, FailureType, ProductType from pyoverkiz.enums.measured_value_type import MeasuredValueType from pyoverkiz.enums.protocol import Protocol @@ -40,4 +45,5 @@ "UIProfile", "UIWidget", "UpdateBoxStatus", + "UpdateCriticityLevel", ] diff --git a/pyoverkiz/enums/gateway.py b/pyoverkiz/enums/gateway.py index 15f4517c..f817f055 100644 --- a/pyoverkiz/enums/gateway.py +++ b/pyoverkiz/enums/gateway.py @@ -107,6 +107,15 @@ def beautify_name(self) -> str: ) +@unique +class UpdateCriticityLevel(UnknownEnumMixin, StrEnum): + """Criticity level of an available gateway update.""" + + BUG_FIX = "BUG_FIX" + DEVICES_CONTROL_ONLY = "DEVICES_CONTROL_ONLY" + UNKNOWN = "UNKNOWN" + + @unique class UpdateBoxStatus(StrEnum): """Status of the gateway update box indicating its updateability.""" diff --git a/pyoverkiz/models.py b/pyoverkiz/models.py index 411ec73d..0fad3556 100644 --- a/pyoverkiz/models.py +++ b/pyoverkiz/models.py @@ -3,7 +3,6 @@ from __future__ import annotations import json -import logging import re from collections.abc import Iterator from typing import Any, cast @@ -23,6 +22,7 @@ UIClass, UIWidget, UpdateBoxStatus, + UpdateCriticityLevel, ) from pyoverkiz.enums.command import OverkizCommand, OverkizCommandParam from pyoverkiz.enums.protocol import Protocol @@ -30,496 +30,18 @@ from pyoverkiz.obfuscate import obfuscate_email, obfuscate_id, obfuscate_string from pyoverkiz.types import DATA_TYPE_TO_PYTHON, StateType -# pylint: disable=unused-argument, too-many-instance-attributes, too-many-locals +# --------------------------------------------------------------------------- +# State & command primitives +# --------------------------------------------------------------------------- -# :///[#] -DEVICE_URL_RE = re.compile( - r"(?P[^:]+)://(?P[^/]+)/(?P[^#]+)(#(?P\d+))?" -) - -_LOGGER = logging.getLogger(__name__) - - -@define(init=False, kw_only=True) -class Setup: - """Representation of a complete setup returned by the Overkiz API.""" - - creation_time: int | None = None - last_update_time: int | None = None - id: str | None = field(repr=obfuscate_id, default=None) - location: Location | None = None - gateways: list[Gateway] - devices: list[Device] - zones: list[Zone] | None = None - reseller_delegation_type: str | None = None - oid: str | None = None - root_place: Place | None = None - features: list[Feature] | None = None - - def __init__( - self, - *, - creation_time: int | None = None, - last_update_time: int | None = None, - id: str | None = None, - location: dict[str, Any] | None = None, - gateways: list[dict[str, Any]], - devices: list[dict[str, Any]], - zones: list[dict[str, Any]] | None = None, - reseller_delegation_type: str | None = None, - oid: str | None = None, - root_place: dict[str, Any] | None = None, - features: list[dict[str, Any]] | None = None, - **_: Any, - ) -> None: - """Initialize a Setup and construct nested model instances.""" - self.id = id - self.creation_time = creation_time - self.last_update_time = last_update_time - self.location = Location(**location) if location else None - self.gateways = [Gateway(**g) for g in gateways] - self.devices = [Device(**d) for d in devices] - self.zones = [Zone(**z) for z in zones] if zones else None - self.reseller_delegation_type = reseller_delegation_type - self.oid = oid - self.root_place = Place(**root_place) if root_place else None - self.features = [Feature(**f) for f in features] if features else None - - -@define(init=False, kw_only=True) -class Location: - """Geographical and address metadata for a Setup.""" - - creation_time: int - last_update_time: int | None = None - city: str = field(repr=obfuscate_string, default=None) - country: str = field(repr=obfuscate_string, default=None) - postal_code: str = field(repr=obfuscate_string, default=None) - address_line1: str = field(repr=obfuscate_string, default=None) - address_line2: str = field(repr=obfuscate_string, default=None) - timezone: str - longitude: str = field(repr=obfuscate_string, default=None) - latitude: str = field(repr=obfuscate_string, default=None) - twilight_mode: int - twilight_angle: str - twilight_city: str | None = None - summer_solstice_dusk_minutes: str - winter_solstice_dusk_minutes: str - twilight_offset_enabled: bool - dawn_offset: int - dusk_offset: int - - def __init__( - self, - *, - creation_time: int, - last_update_time: int | None = None, - city: str = field(repr=obfuscate_string, default=None), - country: str = field(repr=obfuscate_string, default=None), - postal_code: str = field(repr=obfuscate_string, default=None), - address_line1: str = field(repr=obfuscate_string, default=None), - address_line2: str = field(repr=obfuscate_string, default=None), - timezone: str, - longitude: str = field(repr=obfuscate_string, default=None), - latitude: str = field(repr=obfuscate_string, default=None), - twilight_mode: int, - twilight_angle: str, - twilight_city: str | None = None, - summer_solstice_dusk_minutes: str, - winter_solstice_dusk_minutes: str, - twilight_offset_enabled: bool, - dawn_offset: int, - dusk_offset: int, - **_: Any, - ) -> None: - """Initialize Location with address and timezone information.""" - self.creation_time = creation_time - self.last_update_time = last_update_time - self.city = city - self.country = country - self.postal_code = postal_code - self.address_line1 = address_line1 - self.address_line2 = address_line2 - self.timezone = timezone - self.longitude = longitude - self.latitude = latitude - self.twilight_mode = twilight_mode - self.twilight_angle = twilight_angle - self.twilight_city = twilight_city - self.summer_solstice_dusk_minutes = summer_solstice_dusk_minutes - self.winter_solstice_dusk_minutes = winter_solstice_dusk_minutes - self.twilight_offset_enabled = twilight_offset_enabled - self.dawn_offset = dawn_offset - self.dusk_offset = dusk_offset - - -@define(init=False, kw_only=True) -class DeviceIdentifier: - """Parsed components from a device URL.""" - - protocol: Protocol - gateway_id: str = field(repr=obfuscate_id) - device_address: str = field(repr=obfuscate_id) - subsystem_id: int | None = None - base_device_url: str = field(repr=obfuscate_id, init=False) - - def __init__( - self, - *, - protocol: Protocol, - gateway_id: str, - device_address: str, - subsystem_id: int | None = None, - ) -> None: - """Initialize DeviceIdentifier with required URL components.""" - self.protocol = protocol - self.gateway_id = gateway_id - self.device_address = device_address - self.subsystem_id = subsystem_id - self.base_device_url = f"{protocol}://{gateway_id}/{device_address}" - - @property - def is_sub_device(self) -> bool: - """Return True if this identifier represents a sub-device (subsystem_id > 1).""" - return self.subsystem_id is not None and self.subsystem_id > 1 - - @classmethod - def from_device_url(cls, device_url: str) -> DeviceIdentifier: - """Parse a device URL into its structured identifier components.""" - match = DEVICE_URL_RE.fullmatch(device_url) - if not match: - raise ValueError(f"Invalid device URL: {device_url}") - - subsystem_id = ( - int(match.group("subsystemId")) if match.group("subsystemId") else None - ) - - return cls( - protocol=Protocol(match.group("protocol")), - gateway_id=match.group("gatewayId"), - device_address=match.group("deviceAddress"), - subsystem_id=subsystem_id, - ) - - -@define(init=False, kw_only=True) -class Device: - """Representation of a device in the setup including parsed fields and states.""" - - attributes: States - available: bool - enabled: bool - label: str = field(repr=obfuscate_string) - device_url: str = field(repr=obfuscate_id) - controllable_name: str - definition: Definition - states: States - type: ProductType - oid: str | None = field(repr=obfuscate_id, default=None) - place_oid: str | None = None - creation_time: int | None = None - last_update_time: int | None = None - shortcut: bool | None = None - metadata: str | None = None - synced: bool | None = None # Local API only - subsystem_id: int | None = None # Local API only - identifier: DeviceIdentifier = field(init=False, repr=False) - _ui_class: UIClass | None = field(init=False, repr=False) - _widget: UIWidget | None = field(init=False, repr=False) - - def __init__( - self, - *, - attributes: list[dict[str, Any]] | None = None, - available: bool, - enabled: bool, - label: str, - device_url: str, - controllable_name: str, - definition: dict[str, Any], - widget: str | None = None, - ui_class: str | None = None, - states: list[dict[str, Any]] | None = None, - type: int, - oid: str | None = None, - place_oid: str | None = None, - creation_time: int | None = None, - last_update_time: int | None = None, - shortcut: bool | None = None, - metadata: str | None = None, - synced: bool | None = None, - subsystem_id: int | None = None, - **_: Any, - ) -> None: - """Initialize Device and parse URL, protocol and nested definitions.""" - self.attributes = States(attributes) - self.available = available - self.definition = Definition(**definition) - self.device_url = device_url - self.enabled = enabled - self.label = label - self.controllable_name = controllable_name - self.states = States(states) - self.type = ProductType(type) - self.oid = oid - self.place_oid = place_oid - self.creation_time = creation_time - self.last_update_time = last_update_time - self.shortcut = shortcut - self.metadata = metadata - self.synced = synced - self.subsystem_id = subsystem_id - - self.identifier = DeviceIdentifier.from_device_url(device_url) - - self._ui_class = UIClass(ui_class) if ui_class else None - self._widget = UIWidget(widget) if widget else None - - @property - def ui_class(self) -> UIClass: - """Return the UI class, falling back to the definition if available.""" - if self._ui_class is not None: - return self._ui_class - if self.definition.ui_class: - return UIClass(self.definition.ui_class) - raise ValueError(f"Device {self.device_url} has no UI class defined") - - @property - def widget(self) -> UIWidget: - """Return the widget, falling back to the definition if available.""" - if self._widget is not None: - return self._widget - if self.definition.widget_name: - return UIWidget(self.definition.widget_name) - raise ValueError(f"Device {self.device_url} has no widget defined") - - def supports_command(self, command: str | OverkizCommand) -> bool: - """Check if device supports a command.""" - return str(command) in self.definition.commands - - def supports_any_command(self, commands: list[str | OverkizCommand]) -> bool: - """Check if device supports any of the commands.""" - return self.definition.commands.has_any(commands) - - def select_first_command(self, commands: list[str | OverkizCommand]) -> str | None: - """Return first supported command name from list, or None.""" - return self.definition.commands.select(commands) - - def get_state_value(self, state: str) -> StateType | None: - """Get value of a single state, or None if not found or None.""" - return self.states.select_value([state]) - - def select_first_state_value(self, states: list[str]) -> StateType | None: - """Return value of first state with non-None value from list, or None.""" - return self.states.select_value(states) - - def has_state_value(self, state: str) -> bool: - """Check if a state exists with a non-None value.""" - return self.states.has_any([state]) - - def has_any_state_value(self, states: list[str]) -> bool: - """Check if any of the states exist with non-None values.""" - return self.states.has_any(states) - - def get_state_definition(self, state: str) -> StateDefinition | None: - """Get StateDefinition for a single state name, or None.""" - return self.definition.get_state_definition([state]) - - def select_first_state_definition( - self, states: list[str] - ) -> StateDefinition | None: - """Return first matching StateDefinition from list, or None.""" - return self.definition.get_state_definition(states) - - def get_attribute_value(self, attribute: str) -> StateType | None: - """Get value of a single attribute, or None if not found or None.""" - return self.attributes.select_value([attribute]) - - def select_first_attribute_value(self, attributes: list[str]) -> StateType | None: - """Return value of first attribute with non-None value from list, or None.""" - return self.attributes.select_value(attributes) - - -@define(init=False, kw_only=True) -class DataProperty: - """Data property with qualified name and value.""" - - qualified_name: str - value: str - - def __init__( - self, - *, - qualified_name: str, - value: str, - **_: Any, - ) -> None: - """Initialize DataProperty.""" - self.qualified_name = qualified_name - self.value = value - - -@define(init=False, kw_only=True) -class StateDefinition: - """Definition metadata for a state (qualified name, type and possible values).""" - - qualified_name: str - type: str | None = None - values: list[str] | None = None - event_based: bool | None = None - - def __init__( - self, - name: str | None = None, - qualified_name: str | None = None, - type: str | None = None, - values: list[str] | None = None, - event_based: bool | None = None, - **_: Any, - ) -> None: - """Initialize StateDefinition and set qualified name from either `name` or `qualified_name`.""" - self.type = type - self.values = values - self.event_based = event_based - - if qualified_name: - self.qualified_name = qualified_name - elif name: - self.qualified_name = name - else: - raise ValueError( - "StateDefinition requires either `name` or `qualified_name`." - ) - - -@define(init=False, kw_only=True) -class Definition: - """Definition of device capabilities: command definitions, state definitions and UI hints.""" - - commands: CommandDefinitions - states: list[StateDefinition] - data_properties: list[DataProperty] - widget_name: str | None = None - ui_class: str | None = None - qualified_name: str | None = None - ui_profiles: list[str] | None = None - ui_classifiers: list[str] | None = None - type: str | None = None - attributes: list[dict[str, Any]] | None = None # Local API only - - def __init__( - self, - *, - commands: list[dict[str, Any]], - states: list[dict[str, Any]] | None = None, - data_properties: list[dict[str, Any]] | None = None, - widget_name: str | None = None, - ui_class: str | None = None, - qualified_name: str | None = None, - ui_profiles: list[str] | None = None, - ui_classifiers: list[str] | None = None, - type: str | None = None, - attributes: list[dict[str, Any]] | None = None, - **_: Any, - ) -> None: - """Initialize Definition and construct nested command/state definitions.""" - self.commands = CommandDefinitions(commands) - self.states = [StateDefinition(**sd) for sd in states] if states else [] - self.data_properties = ( - [DataProperty(**dp) for dp in data_properties] if data_properties else [] - ) - self.widget_name = widget_name - self.ui_class = ui_class - self.qualified_name = qualified_name - self.ui_profiles = ui_profiles - self.ui_classifiers = ui_classifiers - self.type = type - self.attributes = attributes - - def get_state_definition(self, states: list[str]) -> StateDefinition | None: - """Return the first StateDefinition whose `qualified_name` matches, or None.""" - states_set = set(states) - for state_def in self.states: - if state_def.qualified_name in states_set: - return state_def - return None - def has_state_definition(self, states: list[str]) -> bool: - """Return True if any of the given state definitions exist.""" - return self.get_state_definition(states) is not None - - -@define(init=False, kw_only=True) -class CommandDefinition: - """Metadata for a single command definition (name and parameter count).""" - - command_name: str - nparams: int - - def __init__(self, command_name: str, nparams: int, **_: Any) -> None: - """Initialize CommandDefinition.""" - self.command_name = command_name - self.nparams = nparams - - -@define(init=False) -class CommandDefinitions: - """Container for command definitions providing convenient lookup by name.""" - - _commands: list[CommandDefinition] - - def __init__(self, commands: list[dict[str, Any]]): - """Build the inner list of CommandDefinition objects from raw data.""" - self._commands = [CommandDefinition(**command) for command in commands] - - def __iter__(self) -> Iterator[CommandDefinition]: - """Iterate over defined commands.""" - return self._commands.__iter__() - - def __contains__(self, name: str) -> bool: - """Return True if a command with `name` exists.""" - return self.__getitem__(name) is not None - - def __getitem__(self, command: str) -> CommandDefinition | None: - """Return the command definition or None if missing.""" - return next((cd for cd in self._commands if cd.command_name == command), None) - - def __len__(self) -> int: - """Return number of command definitions.""" - return len(self._commands) - - get = __getitem__ - - def select(self, commands: list[str | OverkizCommand]) -> str | None: - """Return the first command name that exists in this definition, or None.""" - return next( - (str(command) for command in commands if str(command) in self), None - ) - - def has_any(self, commands: list[str | OverkizCommand]) -> bool: - """Return True if any of the given commands exist in this definition.""" - return self.select(commands) is not None - - -@define(init=False, kw_only=True) +@define(kw_only=True) class State: """A single device state with typed accessors for its value.""" name: str type: DataType - value: StateType - - def __init__( - self, - name: str, - type: int, - value: StateType = None, - **_: Any, - ) -> None: - """Initialize State and set its declared data type.""" - self.name = name - self.value = value - self.type = DataType(type) + value: StateType = None @property def value_as_int(self) -> int | None: @@ -578,31 +100,22 @@ def value_as_list(self) -> list[Any] | None: raise TypeError(f"{self.name} is not an array") -@define(init=False, kw_only=True) +@define(kw_only=True) class EventState(State): """State variant used when parsing event payloads (casts string values).""" - def __init__( - self, - name: str, - type: int, - value: str | None = None, - **_: Any, - ): - """Initialize EventState and cast string values based on declared data type.""" - super().__init__(name, type, value, **_) - + def __attrs_post_init__(self) -> None: + """Cast string values based on declared data type.""" # Overkiz (cloud) returns all state values as a string - # We cast them here based on the data type provided by Overkiz # Overkiz (local) returns all state values in the right format if not isinstance(self.value, str) or self.type not in DATA_TYPE_TO_PYTHON: return - caster = DATA_TYPE_TO_PYTHON[self.type] if self.type in (DataType.JSON_ARRAY, DataType.JSON_OBJECT): self.value = self._cast_json_value(self.value) return + caster = DATA_TYPE_TO_PYTHON[self.type] self.value = caster(self.value) def _cast_json_value(self, raw_value: str) -> StateType: @@ -620,105 +133,403 @@ class States: """Container of State objects providing lookup and mapping helpers.""" _states: list[State] + _index: dict[str, State] - def __init__(self, states: list[dict[str, Any]] | None = None) -> None: - """Create a container of State objects from raw state dicts or an empty list.""" - if states: - self._states = [State(**state) for state in states] - else: - self._states = [] + def __init__(self, states: list[State] | None = None) -> None: + """Create a States container from a list of State objects or empty.""" + self._states = list(states) if states else [] + self._index = {state.name: state for state in self._states} def __iter__(self) -> Iterator[State]: """Return an iterator over contained State objects.""" return self._states.__iter__() - def __contains__(self, name: str) -> bool: + def __contains__(self, name: object) -> bool: """Return True if a state with the given name exists in the container.""" - return self.__getitem__(name) is not None + return name in self._index - def __getitem__(self, name: str) -> State | None: - """Return the State with the given name or None if missing.""" - return next((state for state in self._states if state.name == name), None) + def __getitem__(self, name: str) -> State: + """Return the State with the given name or raise KeyError if missing.""" + return self._index[name] def __setitem__(self, name: str, state: State) -> None: """Set or append a State identified by name.""" - found = self.__getitem__(name) - if found is None: - self._states.append(state) + if name in self._index: + idx = self._states.index(self._index[name]) + self._states[idx] = state else: - self._states[self._states.index(found)] = state + self._states.append(state) + self._index[name] = state def __len__(self) -> int: """Return number of states in the container.""" return len(self._states) - get = __getitem__ + def get(self, name: str) -> State | None: + """Return the State with the given name or None if missing.""" + return self._index.get(name) + + def select(self, names: list[str]) -> State | None: + """Return the first State that exists and has a non-None value, or None.""" + for name in names: + state = self._index.get(name) + if state is not None and state.value is not None: + return state + return None + + def select_value(self, names: list[str]) -> StateType: + """Return the value of the first State that exists with a non-None value.""" + if state := self.select(names): + return state.value + return None + + def has_any(self, names: list[str]) -> bool: + """Return True if any of the given state names exist with a non-None value.""" + return self.select(names) is not None + + +@define(kw_only=True) +class CommandDefinition: + """Metadata for a single command definition (name and parameter count).""" + + command_name: str + nparams: int + + +@define(init=False) +class CommandDefinitions: + """Container for command definitions providing convenient lookup by name.""" + + _commands: list[CommandDefinition] + _index: dict[str, CommandDefinition] + + def __init__(self, commands: list[CommandDefinition] | None = None) -> None: + """Build the inner list and index from CommandDefinition objects.""" + self._commands = list(commands) if commands else [] + self._index = {cd.command_name: cd for cd in self._commands} + + def __iter__(self) -> Iterator[CommandDefinition]: + """Iterate over defined commands.""" + return self._commands.__iter__() + + def __contains__(self, name: object) -> bool: + """Return True if a command with `name` exists.""" + return name in self._index + + def __getitem__(self, command: str) -> CommandDefinition: + """Return the command definition or raise KeyError if missing.""" + return self._index[command] + + def __len__(self) -> int: + """Return number of command definitions.""" + return len(self._commands) + + def get(self, command: str) -> CommandDefinition | None: + """Return the command definition or None if missing.""" + return self._index.get(command) + + def select(self, commands: list[str | OverkizCommand]) -> str | None: + """Return the first command name that exists in this definition, or None.""" + return next( + (str(command) for command in commands if str(command) in self._index), None + ) + + def has_any(self, commands: list[str | OverkizCommand]) -> bool: + """Return True if any of the given commands exist in this definition.""" + return self.select(commands) is not None + + +@define(kw_only=True) +class StateDefinition: + """Definition metadata for a state (qualified name, type and possible values).""" + + qualified_name: str | None = None + name: str | None = field(default=None, init=True, repr=False, eq=False) + type: str | None = None + values: list[str] | None = None + event_based: bool | None = None + + def __attrs_post_init__(self) -> None: + """Resolve qualified_name from either `name` or `qualified_name`.""" + if self.qualified_name is None: + if self.name is not None: + self.qualified_name = self.name + else: + raise ValueError( + "StateDefinition requires either `name` or `qualified_name`." + ) + + +@define(kw_only=True) +class DataProperty: + """Data property with qualified name and value.""" + + qualified_name: str + value: str + + +@define(kw_only=True) +class Command: + """Represents an OverKiz Command.""" + + name: str | OverkizCommand + parameters: list[str | int | float | OverkizCommandParam] | None = None + type: int | None = None + + def to_payload(self) -> dict[str, object]: + """Return a JSON-serializable payload for this command. + + The payload uses snake_case keys; the client will convert to camelCase + and apply small key fixes (like `deviceURL`) before sending. + """ + payload: dict[str, object] = {"name": str(self.name)} + + if self.type is not None: + payload["type"] = self.type + + if self.parameters is not None: + payload["parameters"] = [ + p if isinstance(p, (str, int, float, bool)) else str(p) + for p in self.parameters + ] + + return payload + + +# --------------------------------------------------------------------------- +# Device & definition +# --------------------------------------------------------------------------- + + +@define(kw_only=True) +class Definition: + """Definition of device capabilities: command definitions, state definitions and UI hints.""" + + commands: CommandDefinitions = field(factory=CommandDefinitions) + states: list[StateDefinition] = field(factory=list) + data_properties: list[DataProperty] = field(factory=list) + widget_name: str | None = None + ui_class: str | None = None + qualified_name: str | None = None + ui_profiles: list[str] | None = None + ui_classifiers: list[str] | None = None + type: str | None = None + attributes: list[dict[str, Any]] | None = None + + def get_state_definition(self, states: list[str]) -> StateDefinition | None: + """Return the first StateDefinition whose `qualified_name` matches, or None.""" + states_set = set(states) + for state_def in self.states: + if state_def.qualified_name in states_set: + return state_def + return None + + def has_state_definition(self, states: list[str]) -> bool: + """Return True if any of the given state definitions exist.""" + return self.get_state_definition(states) is not None + + +# :///[#] +DEVICE_URL_RE = re.compile( + r"(?P[^:]+)://(?P[^/]+)/(?P[^#]+)(#(?P\d+))?" +) + + +@define(kw_only=True) +class DeviceIdentifier: + """Parsed components from a device URL.""" + + protocol: Protocol + gateway_id: str = field(repr=obfuscate_id) + device_address: str = field(repr=obfuscate_id) + subsystem_id: int | None = None + base_device_url: str = field(repr=obfuscate_id, init=False) + + def __attrs_post_init__(self) -> None: + """Compute base_device_url from protocol, gateway_id and device_address.""" + self.base_device_url = ( + f"{self.protocol}://{self.gateway_id}/{self.device_address}" + ) + + @property + def is_sub_device(self) -> bool: + """Return True if this identifier represents a sub-device (subsystem_id > 1).""" + return self.subsystem_id is not None and self.subsystem_id > 1 + + @classmethod + def from_device_url(cls, device_url: str) -> DeviceIdentifier: + """Parse a device URL into its structured identifier components.""" + match = DEVICE_URL_RE.fullmatch(device_url) + if not match: + raise ValueError(f"Invalid device URL: {device_url}") + + subsystem_id = ( + int(match.group("subsystemId")) if match.group("subsystemId") else None + ) + + return cls( + protocol=Protocol(match.group("protocol")), + gateway_id=match.group("gatewayId"), + device_address=match.group("deviceAddress"), + subsystem_id=subsystem_id, + ) + + +@define(kw_only=True) +class Device: + """Representation of a device in the setup including parsed fields and states.""" + + attributes: States = field(factory=States) + available: bool + enabled: bool + label: str = field(repr=obfuscate_string) + device_url: str = field(repr=obfuscate_id) + controllable_name: str + definition: Definition | None = None + states: States = field(factory=States) + type: ProductType + ui_class: UIClass = field(default=None) # type: ignore[assignment] + widget: UIWidget = field(default=None) # type: ignore[assignment] + identifier: DeviceIdentifier = field(init=False, repr=False) + oid: str | None = field(repr=obfuscate_id, default=None) + place_oid: str | None = None + creation_time: int | None = None + last_update_time: int | None = None + shortcut: bool | None = None + metadata: str | None = None + synced: bool | None = None + subsystem_id: int | None = None + + def __attrs_post_init__(self) -> None: + """Resolve computed fields from device URL and definition fallbacks.""" + self.identifier = DeviceIdentifier.from_device_url(self.device_url) + + if self.definition: + if self.ui_class is None and self.definition.ui_class: + self.ui_class = UIClass(self.definition.ui_class) + + if self.widget is None and self.definition.widget_name: + self.widget = UIWidget(self.definition.widget_name) + + if self.ui_class is None or self.widget is None: + raise ValueError(f"Device {self.device_url} is missing ui_class or widget") + + def supports_command(self, command: str | OverkizCommand) -> bool: + """Check if device supports a command.""" + return self.definition is not None and str(command) in self.definition.commands + + def supports_any_command(self, commands: list[str | OverkizCommand]) -> bool: + """Check if device supports any of the commands.""" + return self.definition is not None and self.definition.commands.has_any( + commands + ) + + def select_first_command(self, commands: list[str | OverkizCommand]) -> str | None: + """Return first supported command name from list, or None.""" + if self.definition is None: + return None + return self.definition.commands.select(commands) + + def get_state_value(self, state: str) -> StateType | None: + """Get value of a single state, or None if not found or None.""" + return self.states.select_value([state]) + + def select_first_state_value(self, states: list[str]) -> StateType | None: + """Return value of first state with non-None value from list, or None.""" + return self.states.select_value(states) + + def has_state_value(self, state: str) -> bool: + """Check if a state exists with a non-None value.""" + return self.states.has_any([state]) + + def has_any_state_value(self, states: list[str]) -> bool: + """Check if any of the states exist with non-None values.""" + return self.states.has_any(states) + + def get_state_definition(self, state: str) -> StateDefinition | None: + """Get StateDefinition for a single state name, or None.""" + if self.definition is None: + return None + return self.definition.get_state_definition([state]) + + def select_first_state_definition( + self, states: list[str] + ) -> StateDefinition | None: + """Return first matching StateDefinition from list, or None.""" + if self.definition is None: + return None + return self.definition.get_state_definition(states) - def select(self, names: list[str]) -> State | None: - """Return the first State that exists and has a non-None value, or None.""" - for name in names: - if (state := self[name]) and state.value is not None: - return state - return None + def get_attribute_value(self, attribute: str) -> StateType | None: + """Get value of a single attribute, or None if not found or None.""" + return self.attributes.select_value([attribute]) - def select_value(self, names: list[str]) -> StateType: - """Return the value of the first State that exists with a non-None value.""" - if state := self.select(names): - return state.value - return None + def select_first_attribute_value(self, attributes: list[str]) -> StateType | None: + """Return value of first attribute with non-None value from list, or None.""" + return self.attributes.select_value(attributes) - def has_any(self, names: list[str]) -> bool: - """Return True if any of the given state names exist with a non-None value.""" - return self.select(names) is not None +# --------------------------------------------------------------------------- +# Execution & action groups +# --------------------------------------------------------------------------- -@define(init=False, kw_only=True) -class Command: - """Represents an OverKiz Command.""" - type: int | None = None - name: str | OverkizCommand - parameters: list[str | int | float | OverkizCommandParam] | None - - def __init__( - self, - name: str | OverkizCommand, - parameters: list[str | int | float | OverkizCommandParam] | None = None, - type: int | None = None, - **_: Any, - ): - """Initialize a command instance and mirror fields into dict base class.""" - self.name = name - self.parameters = parameters - self.type = type +@define(kw_only=True) +class Action: + """An action consists of multiple commands related to a single device, identified by its device URL.""" + + device_url: str = field(repr=obfuscate_id) + commands: list[Command] = field(factory=list) def to_payload(self) -> dict[str, object]: - """Return a JSON-serializable payload for this command. + """Return a JSON-serializable payload for this action (snake_case). - The payload uses snake_case keys; the client will convert to camelCase - and apply small key fixes (like `deviceURL`) before sending. + The final camelCase conversion is handled by the client. """ - payload: dict[str, object] = {"name": str(self.name)} + return { + "device_url": self.device_url, + "commands": [c.to_payload() for c in self.commands], + } - if self.type is not None: - payload["type"] = self.type - if self.parameters is not None: - payload["parameters"] = [ - p if isinstance(p, (str, int, float, bool)) else str(p) - for p in self.parameters - ] +@define(kw_only=True) +class ActionGroup: + """An action group is composed of one or more actions. - return payload + Each action is related to a single setup device (designated by its device URL) and + is composed of one or more commands to be executed on that device. + """ + + actions: list[Action] = field(factory=list) + creation_time: int | None = None + last_update_time: int | None = None + label: str = field(repr=obfuscate_string, default="") + metadata: str | None = None + shortcut: bool | None = None + notification_type_mask: int | None = None + notification_condition: str | None = None + notification_text: str | None = None + notification_title: str | None = None + oid: str | None = field(repr=obfuscate_id, default=None) + def __attrs_post_init__(self) -> None: + """Default label to empty string when None.""" + if self.label is None: + self.label = "" -@define(init=False, kw_only=True) + @property + def id(self) -> str | None: + """Alias for oid.""" + return self.oid + + +@define(kw_only=True) class Event: """Represents an Overkiz event containing metadata and device states.""" name: EventName - timestamp: int | None - setupoid: str | None = field(repr=obfuscate_id, default=None) + timestamp: int | None = None + setup_oid: str | None = field(repr=obfuscate_id, default=None) owner_key: str | None = field(repr=obfuscate_id, default=None) type: int | None = None sub_type: int | None = None @@ -729,193 +540,74 @@ class Event: condition_groupoid: str | None = None place_oid: str | None = None label: str | None = None - metadata: Any | None = None + metadata: str | None = None camera_id: str | None = None - deleted_raw_devices_count: Any | None = None - protocol_type: Any | None = None + deleted_raw_devices_count: int | None = None + protocol_type: int | None = None gateway_id: str | None = field(repr=obfuscate_id, default=None) exec_id: str | None = None device_url: str | None = field(repr=obfuscate_id, default=None) - device_states: list[EventState] + device_states: list[EventState] = field(factory=list) old_state: ExecutionState | None = None new_state: ExecutionState | None = None - - def __init__( - self, - name: EventName, - timestamp: int | None = None, - setupoid: str | None = field(repr=obfuscate_id, default=None), - owner_key: str | None = None, - type: int | None = None, - sub_type: int | None = None, - time_to_next_state: int | None = None, - failed_commands: list[dict[str, Any]] | None = None, - failure_type_code: FailureType | None = None, - failure_type: str | None = None, - condition_groupoid: str | None = None, - place_oid: str | None = None, - label: str | None = None, - metadata: Any | None = None, - camera_id: str | None = None, - deleted_raw_devices_count: Any | None = None, - protocol_type: Any | None = None, - gateway_id: str | None = None, - exec_id: str | None = None, - device_url: str | None = None, - device_states: list[dict[str, Any]] | None = None, - old_state: ExecutionState | None = None, - new_state: ExecutionState | None = None, - **_: Any, - ): - """Initialize Event from raw Overkiz payload fields.""" - self.timestamp = timestamp - self.gateway_id = gateway_id - self.exec_id = exec_id - self.device_url = device_url - self.device_states = ( - [EventState(**s) for s in device_states] if device_states else [] - ) - self.old_state = ExecutionState(old_state) if old_state else None - self.new_state = ExecutionState(new_state) if new_state else None - self.setupoid = setupoid - self.owner_key = owner_key - self.type = type - self.sub_type = sub_type - self.time_to_next_state = time_to_next_state - self.failed_commands = failed_commands - - self.failure_type = failure_type - self.condition_groupoid = condition_groupoid - self.place_oid = place_oid - self.label = label - self.metadata = metadata - self.camera_id = camera_id - self.deleted_raw_devices_count = deleted_raw_devices_count - self.protocol_type = protocol_type - self.name = EventName(name) - self.failure_type_code = ( - None if failure_type_code is None else FailureType(failure_type_code) - ) + actions: list[Action] | None = None + owner: str | None = field(repr=obfuscate_email, default=None) + source: str | None = None -@define(init=False, kw_only=True) +@define(kw_only=True) class Execution: """Execution occurrence with owner, state and action group metadata.""" id: str description: str owner: str = field(repr=obfuscate_email) - state: ExecutionState - action_group: ActionGroup + state: str + action_group: ActionGroup | None = None start_time: int | None = None execution_type: ExecutionType | None = None execution_sub_type: ExecutionSubType | None = None - def __init__( - self, - id: str, - description: str, - owner: str, - state: str, - action_group: dict[str, Any], - start_time: int | None = None, - execution_type: str | None = None, - execution_sub_type: str | None = None, - **_: Any, - ): - """Initialize Execution object from API fields.""" - self.id = id - self.description = description - self.owner = owner - self.state = ExecutionState(state) - self.action_group = ActionGroup(**action_group) - self.start_time = start_time - self.execution_type = ExecutionType(execution_type) if execution_type else None - self.execution_sub_type = ( - ExecutionSubType(execution_sub_type) if execution_sub_type else None - ) - -@define(init=False, kw_only=True) -class Action: - """An action consists of multiple commands related to a single device, identified by its device URL.""" +@define(kw_only=True) +class HistoryExecutionCommand: + """A command within a recorded historical execution, including its status and parameters.""" - device_url: str - commands: list[Command] + device_url: str = field(repr=obfuscate_id) + command: str + rank: int + dynamic: bool + state: ExecutionState + failure_type: str + parameters: list[Any] | None = None - def __init__(self, device_url: str, commands: list[dict[str, Any] | Command]): - """Initialize Action from API data and convert nested commands.""" - self.device_url = device_url - self.commands = [ - c if isinstance(c, Command) else Command(**c) for c in commands - ] - def to_payload(self) -> dict[str, object]: - """Return a JSON-serializable payload for this action (snake_case). +@define(kw_only=True) +class HistoryExecution: + """A recorded execution entry containing details and its list of commands.""" - The final camelCase conversion is handled by the client. - """ - return { - "device_url": self.device_url, - "commands": [c.to_payload() for c in self.commands], - } + id: str + event_time: int + owner: str = field(repr=obfuscate_email) + source: str + end_time: int | None = None + effective_start_time: int | None = None + duration: int + label: str | None = None + type: str + state: ExecutionState + failure_type: str + commands: list[HistoryExecutionCommand] = field(factory=list) + execution_type: ExecutionType + execution_sub_type: ExecutionSubType -@define(init=False, kw_only=True) -class ActionGroup: - """An action group is composed of one or more actions. +# --------------------------------------------------------------------------- +# Infrastructure: gateways, places, zones +# --------------------------------------------------------------------------- - Each action is related to a single setup device (designated by its device URL) and - is composed of one or more commands to be executed on that device. - """ - id: str | None = field(default=None, repr=obfuscate_id) - creation_time: int | None = None - last_update_time: int | None = None - label: str = field(repr=obfuscate_string) - metadata: str | None = None - shortcut: bool | None = None - notification_type_mask: int | None = None - notification_condition: str | None = None - notification_text: str | None = None - notification_title: str | None = None - actions: list[Action] - oid: str | None = field(default=None, repr=obfuscate_id) - - def __init__( - self, - actions: list[dict[str, Any]], - creation_time: int | None = None, - metadata: str | None = None, - oid: str | None = None, - id: str | None = None, - last_update_time: int | None = None, - label: str | None = None, - shortcut: bool | None = None, - notification_type_mask: int | None = None, - notification_condition: str | None = None, - notification_text: str | None = None, - notification_title: str | None = None, - **_: Any, - ) -> None: - """Initialize ActionGroup from API data and convert nested actions.""" - self.id = oid or id - self.creation_time = creation_time - self.last_update_time = last_update_time - self.label = ( - label or "" - ) # for backwards compatibility we set label to empty string if None - self.metadata = metadata - self.shortcut = shortcut - self.notification_type_mask = notification_type_mask - self.notification_condition = notification_condition - self.notification_text = notification_text - self.notification_title = notification_title - self.actions = [Action(**action) for action in actions] - self.oid = oid or id - - -@define(init=False, kw_only=True) +@define(kw_only=True) class Partner: """Partner details for a gateway or service provider.""" @@ -924,206 +616,40 @@ class Partner: id: str = field(repr=obfuscate_id) status: str - def __init__(self, activated: bool, name: str, id: str, status: str, **_: Any): - """Initialize Partner information.""" - self.activated = activated - self.name = name - self.id = id - self.status = status - -@define(init=False, kw_only=True) +@define(kw_only=True) class Connectivity: """Connectivity metadata for a gateway update box.""" status: str protocol_version: str - def __init__(self, status: str, protocol_version: str, **_: Any): - """Initialize Connectivity information.""" - self.status = status - self.protocol_version = protocol_version - -@define(init=False, kw_only=True) +@define(kw_only=True) class Gateway: """Representation of a gateway, including connectivity and partner info.""" - partners: list[Partner] + gateway_id: str = field(repr=obfuscate_id) + connectivity: Connectivity | None = None + partners: list[Partner] = field(factory=list) functions: str | None = None sub_type: GatewaySubType | None = None - id: str = field(repr=obfuscate_id) - gateway_id: str = field(repr=obfuscate_id) alive: bool | None = None mode: str | None = None place_oid: str | None = None time_reliable: bool | None = None - connectivity: Connectivity up_to_date: bool | None = None update_status: UpdateBoxStatus | None = None sync_in_progress: bool | None = None type: GatewayType | None = None + auto_update_enabled: bool | None = None + update_criticity_level: UpdateCriticityLevel | None = None + automatic_update: bool | None = None - def __init__( - self, - *, - partners: list[dict[str, Any]] | None = None, - functions: str | None = None, - sub_type: GatewaySubType | None = None, - gateway_id: str, - alive: bool | None = None, - mode: str | None = None, - place_oid: str | None = None, - time_reliable: bool | None = None, - connectivity: dict[str, Any], - up_to_date: bool | None = None, - update_status: UpdateBoxStatus | None = None, - sync_in_progress: bool | None = None, - type: GatewayType | None = None, - **_: Any, - ) -> None: - """Initialize Gateway from API data and child objects.""" - self.id = gateway_id - self.gateway_id = gateway_id - self.functions = functions - self.alive = alive - self.mode = mode - self.place_oid = place_oid - self.time_reliable = time_reliable - self.connectivity = Connectivity(**connectivity) - self.up_to_date = up_to_date - self.update_status = UpdateBoxStatus(update_status) if update_status else None - self.sync_in_progress = sync_in_progress - self.partners = [Partner(**p) for p in partners] if partners else [] - self.type = GatewayType(type) if type else None - self.sub_type = GatewaySubType(sub_type) if sub_type else None - - -@define(init=False, kw_only=True) -class HistoryExecutionCommand: - """A command within a recorded historical execution, including its status and parameters.""" - - device_url: str = field(repr=obfuscate_id) - command: str - rank: int - dynamic: bool - state: ExecutionState - failure_type: str - parameters: list[Any] | None = None - - def __init__( - self, - device_url: str, - command: str, - rank: int, - dynamic: bool, - state: ExecutionState, - failure_type: str, - parameters: list[Any] | None = None, - **_: Any, - ) -> None: - """Initialize HistoryExecutionCommand from API fields.""" - self.device_url = device_url - self.command = command - self.parameters = parameters - self.rank = rank - self.dynamic = dynamic - self.state = ExecutionState(state) - self.failure_type = failure_type - - -@define(init=False, kw_only=True) -class HistoryExecution: - """A recorded execution entry containing details and its list of commands.""" - - id: str - event_time: int - owner: str = field(repr=obfuscate_email) - source: str - end_time: int | None = None - effective_start_time: int | None = None - duration: int - label: str | None = None - type: str - state: ExecutionState - failure_type: str - commands: list[HistoryExecutionCommand] - execution_type: ExecutionType - execution_sub_type: ExecutionSubType - - def __init__( - self, - *, - id: str, - event_time: int, - owner: str, - source: str, - end_time: int | None = None, - effective_start_time: int | None = None, - duration: int, - label: str | None = None, - type: str, - state: ExecutionState, - failure_type: str, - commands: list[dict[str, Any]], - execution_type: ExecutionType, - execution_sub_type: ExecutionSubType, - **_: Any, - ) -> None: - """Initialize HistoryExecution and convert nested command structures.""" - self.id = id - self.event_time = event_time - self.owner = owner - self.source = source - self.end_time = end_time - self.effective_start_time = effective_start_time - self.duration = duration - self.label = label - self.type = type - self.state = ExecutionState(state) - self.failure_type = failure_type - self.commands = [HistoryExecutionCommand(**hec) for hec in commands] - self.execution_type = ExecutionType(execution_type) - self.execution_sub_type = ExecutionSubType(execution_sub_type) - - -@define(init=False, kw_only=True) -class Place: - """Hierarchical representation of a location (house, room, area) in a setup. - - Places form a tree structure where the root place is typically the entire house - or property, and `sub_places` contains nested child locations. This recursive - structure allows navigation from house -> floors/rooms -> individual areas. - Each place has associated metadata like creation time, label, and type identifier. - """ - - creation_time: int - last_update_time: int | None = None - label: str - type: int - id: str - oid: str - sub_places: list[Place] - - def __init__( - self, - *, - creation_time: int, - last_update_time: int | None = None, - label: str, - type: int, - oid: str, - sub_places: list[Any] | None, - **_: Any, - ) -> None: - """Initialize Place from API data and convert nested sub-places.""" - self.id = oid - self.creation_time = creation_time - self.last_update_time = last_update_time - self.label = label - self.type = type - self.oid = oid - self.sub_places = [Place(**p) for p in sub_places] if sub_places else [] + @property + def id(self) -> str: + """Alias for gateway_id.""" + return self.gateway_id @define(kw_only=True) @@ -1143,7 +669,7 @@ class ZoneItem: device_url: str -@define(init=False, kw_only=True) +@define(kw_only=True) class Zone: """A Zone groups related devices inside a place.""" @@ -1151,66 +677,58 @@ class Zone: last_update_time: int label: str type: int - items: list[ZoneItem] | None - external_oid: str | None - metadata: str | None + items: list[ZoneItem] = field(factory=list) + external_oid: str | None = None + metadata: str | None = None + oid: str = "" + + +@define(kw_only=True) +class Place: + """Hierarchical representation of a location (house, room, area) in a setup.""" + + creation_time: int + last_update_time: int | None = None + label: str + type: int oid: str + sub_places: list[Place] = field(factory=list) - def __init__( - self, - *, - creation_time: int, - last_update_time: int, - label: str, - type: int, - items: list[dict[str, Any]] | None, - external_oid: str | None = None, - metadata: str | None = None, - oid: str, - **_: Any, - ) -> None: - """Initialize Zone from API data and convert nested items.""" - self.creation_time = creation_time - self.last_update_time = last_update_time - self.label = label - self.type = type - self.items = [ZoneItem(**z) for z in items] if items else [] - self.external_oid = external_oid - self.metadata = metadata - self.oid = oid - - -@define(init=False, kw_only=True) -class ServerConfig: - """Connection target details for an Overkiz-compatible server.""" + @property + def id(self) -> str: + """Alias for oid.""" + return self.oid - server: Server | None - name: str - endpoint: str - manufacturer: str - api_type: APIType - configuration_url: str | None = None - def __init__( - self, - *, - server: Server | str | None = None, - name: str, - endpoint: str, - manufacturer: str, - api_type: str | APIType, - configuration_url: str | None = None, - **_: Any, - ) -> None: - """Initialize ServerConfig and convert enum fields.""" - self.server = ( - server if isinstance(server, Server) or server is None else Server(server) - ) - self.name = name - self.endpoint = endpoint - self.manufacturer = manufacturer - self.api_type = api_type if isinstance(api_type, APIType) else APIType(api_type) - self.configuration_url = configuration_url +@define(kw_only=True) +class Location: + """Geographical and address metadata for a Setup.""" + + creation_time: int + last_update_time: int | None = None + city: str | None = field(repr=obfuscate_string, default=None) + country: str | None = field(repr=obfuscate_string, default=None) + postal_code: str | None = field(repr=obfuscate_string, default=None) + address_line1: str | None = field(repr=obfuscate_string, default=None) + address_line2: str | None = field(repr=obfuscate_string, default=None) + timezone: str = "" + longitude: str | None = field(repr=obfuscate_string, default=None) + latitude: str | None = field(repr=obfuscate_string, default=None) + twilight_mode: int = 0 + twilight_angle: str = "" + twilight_city: str | None = None + summer_solstice_dusk_minutes: str = "" + winter_solstice_dusk_minutes: str = "" + twilight_offset_enabled: bool = False + dawn_offset: int = 0 + dusk_offset: int = 0 + country_code: str | None = field(repr=obfuscate_string, default=None) + tariff_settings: dict[str, Any] | None = None + + +# --------------------------------------------------------------------------- +# Configuration & options +# --------------------------------------------------------------------------- @define(kw_only=True) @@ -1221,7 +739,7 @@ class OptionParameter: value: str -@define(init=False, kw_only=True) +@define(kw_only=True) class Option: """A subscribed option for a setup including parameters.""" @@ -1229,29 +747,22 @@ class Option: last_update_time: int option_id: str start_date: int - parameters: list[OptionParameter] | None - - def __init__( - self, - *, - creation_time: int, - last_update_time: int, - option_id: str, - start_date: int, - parameters: list[dict[str, Any]] | None, - **_: Any, - ) -> None: - """Initialize Option from API data and convert nested parameters.""" - self.creation_time = creation_time - self.last_update_time = last_update_time - self.option_id = option_id - self.start_date = start_date - self.parameters = ( - [OptionParameter(**p) for p in parameters] if parameters else [] - ) + parameters: list[OptionParameter] = field(factory=list) + + +@define(kw_only=True) +class ServerConfig: + """Connection target details for an Overkiz-compatible server.""" + + server: Server | None = None + name: str + endpoint: str + manufacturer: str + api_type: APIType + configuration_url: str | None = None -@define(init=False, kw_only=True) +@define(kw_only=True) class ProtocolType: """Protocol type definition from the reference API.""" @@ -1260,80 +771,40 @@ class ProtocolType: name: str label: str - def __init__(self, id: int, prefix: str, name: str, label: str, **_: Any): - """Initialize ProtocolType from API data.""" - self.id = id - self.prefix = prefix - self.name = name - self.label = label + +# --------------------------------------------------------------------------- +# UI profile definitions (reference API) +# --------------------------------------------------------------------------- -@define(init=False, kw_only=True) +@define(kw_only=True) class ValuePrototype: """Value prototype defining parameter/state value constraints.""" type: str - min_value: float | None = None - max_value: float | None = None + min_value: int | float | None = None + max_value: int | float | None = None enum_values: list[str] | None = None description: str | None = None - def __init__( - self, - type: str, - min_value: float | None = None, - max_value: float | None = None, - enum_values: list[str] | None = None, - description: str | None = None, - **_: Any, - ): - """Initialize ValuePrototype from API data.""" - self.type = type - self.min_value = min_value - self.max_value = max_value - self.enum_values = enum_values - self.description = description - - -@define(init=False, kw_only=True) + +@define(kw_only=True) class CommandParameter: """Command parameter definition.""" optional: bool sensitive: bool - value_prototypes: list[ValuePrototype] - - def __init__( - self, - optional: bool, - sensitive: bool, - value_prototypes: list[dict] | None = None, - **_: Any, - ): - """Initialize CommandParameter from API data.""" - self.optional = optional - self.sensitive = sensitive - self.value_prototypes = ( - [ValuePrototype(**vp) for vp in value_prototypes] - if value_prototypes - else [] - ) + value_prototypes: list[ValuePrototype] = field(factory=list) -@define(init=False, kw_only=True) +@define(kw_only=True) class CommandPrototype: """Command prototype defining parameters.""" - parameters: list[CommandParameter] - - def __init__(self, parameters: list[dict] | None = None, **_: Any): - """Initialize CommandPrototype from API data.""" - self.parameters = ( - [CommandParameter(**p) for p in parameters] if parameters else [] - ) + parameters: list[CommandParameter] = field(factory=list) -@define(init=False, kw_only=True) +@define(kw_only=True) class UIProfileCommand: """UI profile command definition.""" @@ -1341,35 +812,15 @@ class UIProfileCommand: prototype: CommandPrototype | None = None description: str | None = None - def __init__( - self, - name: str, - prototype: dict | None = None, - description: str | None = None, - **_: Any, - ): - """Initialize UIProfileCommand from API data.""" - self.name = name - self.prototype = CommandPrototype(**prototype) if prototype else None - self.description = description - -@define(init=False, kw_only=True) +@define(kw_only=True) class StatePrototype: """State prototype defining value constraints.""" - value_prototypes: list[ValuePrototype] - - def __init__(self, value_prototypes: list[dict] | None = None, **_: Any): - """Initialize StatePrototype from API data.""" - self.value_prototypes = ( - [ValuePrototype(**vp) for vp in value_prototypes] - if value_prototypes - else [] - ) + value_prototypes: list[ValuePrototype] = field(factory=list) -@define(init=False, kw_only=True) +@define(kw_only=True) class UIProfileState: """UI profile state definition.""" @@ -1377,20 +828,8 @@ class UIProfileState: prototype: StatePrototype | None = None description: str | None = None - def __init__( - self, - name: str, - prototype: dict | None = None, - description: str | None = None, - **_: Any, - ): - """Initialize UIProfileState from API data.""" - self.name = name - self.prototype = StatePrototype(**prototype) if prototype else None - self.description = description - -@define(init=False, kw_only=True) +@define(kw_only=True) class UIProfileDefinition: """UI profile definition from the reference API. @@ -1398,25 +837,9 @@ class UIProfileDefinition: """ name: str - commands: list[UIProfileCommand] - states: list[UIProfileState] - form_factor: bool - - def __init__( - self, - name: str, - commands: list[dict] | None = None, - states: list[dict] | None = None, - form_factor: bool = False, - **_: Any, - ): - """Initialize UIProfileDefinition from API data.""" - self.name = name - self.commands = ( - [UIProfileCommand(**cmd) for cmd in commands] if commands else [] - ) - self.states = [UIProfileState(**s) for s in states] if states else [] - self.form_factor = form_factor + commands: list[UIProfileCommand] = field(factory=list) + states: list[UIProfileState] = field(factory=list) + form_factor: bool = False @define(kw_only=True) @@ -1425,3 +848,27 @@ class FirmwareStatus: up_to_date: bool notes: list[dict[str, str]] + + +# --------------------------------------------------------------------------- +# Setup (root model — references most other models) +# --------------------------------------------------------------------------- + + +@define(kw_only=True) +class Setup: + """Representation of a complete setup returned by the Overkiz API.""" + + creation_time: int | None = None + last_update_time: int | None = None + id: str | None = field(repr=obfuscate_id, default=None) + location: Location | None = None + gateways: list[Gateway] = field(factory=list) + devices: list[Device] = field(factory=list) + zones: list[Zone] | None = None + reseller_delegation_type: str | None = None + oid: str | None = None + root_place: Place | None = None + features: list[Feature] | None = None + disconnection_configuration: dict[str, Any] | None = None + metadata: str | None = None diff --git a/pyproject.toml b/pyproject.toml index f97104c5..7b009465 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ dependencies = [ "attrs>=21.2", "boto3<2.0.0,>=1.18.59", "warrant-lite<2.0.0,>=1.0.4", + "cattrs>=26.1.0", ] [project.optional-dependencies] diff --git a/tests/test_client.py b/tests/test_client.py index 17bbbb9b..e6cd35a4 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -651,8 +651,8 @@ async def test_execute_action_group_omits_none_fields(self, client: OverkizClien from pyoverkiz.models import Action, Command action = Action( - "rts://2025-8464-6867/16756006", - [Command(name=OverkizCommand.CLOSE, parameters=None, type=None)], + device_url="rts://2025-8464-6867/16756006", + commands=[Command(name=OverkizCommand.CLOSE, parameters=None, type=None)], ) resp = MockResponse('{"execId": "exec-123"}') @@ -881,8 +881,8 @@ async def test_get_places(self, client: OverkizClient): async def test_execute_action_group_rts_close(self, client: OverkizClient): """Verify executing a close command on an RTS cover.""" action = Action( - "rts://2025-8464-6867/16756006", - [Command(name="close", parameters=None, type=1)], + device_url="rts://2025-8464-6867/16756006", + commands=[Command(name="close", parameters=None, type=1)], ) resp = MockResponse('{"execId": "ee7a5676-c68f-43a3-956d-6f5efc745954"}') @@ -905,12 +905,12 @@ async def test_execute_action_group_multiple_rts_devices( """Verify executing commands on multiple RTS devices in a single action group.""" actions = [ Action( - "rts://2025-8464-6867/16756006", - [Command(name="close", parameters=None, type=1)], + device_url="rts://2025-8464-6867/16756006", + commands=[Command(name="close", parameters=None, type=1)], ), Action( - "rts://2025-8464-6867/16756007", - [Command(name="open", parameters=None, type=1)], + device_url="rts://2025-8464-6867/16756007", + commands=[Command(name="open", parameters=None, type=1)], ), ] resp = MockResponse('{"execId": "aaa-bbb-ccc"}') @@ -1100,8 +1100,8 @@ async def test_local_execute_action_group_rts_close( ): """Verify executing an RTS command via the local API.""" action = Action( - "rts://2025-8464-6867/16756006", - [Command(name="close")], + device_url="rts://2025-8464-6867/16756006", + commands=[Command(name="close")], ) resp = MockResponse('{"execId": "45e52d27-3c08-4fd5-87f2-03d650b67f4b"}') diff --git a/tests/test_models.py b/tests/test_models.py index 91f3ac0a..59b3d745 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -5,14 +5,17 @@ import json from pathlib import Path +import cattrs.errors import pytest from pyoverkiz._case import decamelize -from pyoverkiz.enums import DataType, Protocol +from pyoverkiz.converter import converter +from pyoverkiz.enums import DataType, EventName, ExecutionState, FailureType, Protocol from pyoverkiz.models import ( CommandDefinitions, Definition, Device, + Event, EventState, Setup, State, @@ -79,6 +82,11 @@ FIXTURES_DIR = Path(__file__).resolve().parent / "fixtures" / "setup" +def _make_device(raw: dict | None = None) -> Device: + """Create a Device from raw camelCase dict via the converter.""" + return converter.structure(decamelize(raw or RAW_DEVICES), Device) + + class TestSetup: """Tests for setup-level ID parsing and redaction behavior.""" @@ -87,7 +95,7 @@ def test_id_is_raw_but_repr_is_redacted_when_present(self): raw_setup = json.loads( (FIXTURES_DIR / "setup_tahoma_1.json").read_text(encoding="utf-8") ) - setup = Setup(**decamelize(raw_setup)) + setup = converter.structure(decamelize(raw_setup), Setup) raw_id = "SETUP-1234-1234-8044" redacted_id = obfuscate_id(raw_id) @@ -100,7 +108,7 @@ def test_id_is_none_when_missing(self): raw_setup = json.loads( (FIXTURES_DIR / "setup_local.json").read_text(encoding="utf-8") ) - setup = Setup(**decamelize(raw_setup)) + setup = converter.structure(decamelize(raw_setup), Setup) assert setup.id is None @@ -216,12 +224,7 @@ def test_base_url_parsing( is_sub_device: bool, ): """Ensure device URL parsing extracts protocol, gateway and address correctly.""" - test_device = { - **RAW_DEVICES, - "deviceURL": device_url, - } - device_data = decamelize(test_device) - device = Device(**device_data) + device = _make_device({**RAW_DEVICES, "deviceURL": device_url}) assert device.identifier.protocol == protocol assert device.identifier.gateway_id == gateway_id @@ -238,76 +241,62 @@ def test_base_url_parsing( ) def test_invalid_device_url_raises(self, device_url: str): """Invalid device URLs should raise during identifier parsing.""" - test_device = { - **RAW_DEVICES, - "deviceURL": device_url, - } - device_data = decamelize(test_device) - - with pytest.raises(ValueError, match="Invalid device URL"): - Device(**device_data) + with pytest.raises(cattrs.errors.ClassValidationError): + _make_device({**RAW_DEVICES, "deviceURL": device_url}) def test_none_states(self): """Devices without a `states` field should provide an empty States object.""" - device_data = decamelize(RAW_DEVICES) - del device_data["states"] - device = Device(**device_data) + raw = dict(RAW_DEVICES) + del raw["states"] + device = _make_device(raw) assert not device.states.get(STATE) def test_select_first_command(self): """Device.select_first_command() returns first supported command from list.""" - device_data = decamelize(RAW_DEVICES) - device = Device(**device_data) + device = _make_device() assert device.select_first_command(["nonexistent", "open", "close"]) == "open" assert device.select_first_command(["nonexistent"]) is None def test_supports_command(self): """Device.supports_command() checks if device supports a single command.""" - device_data = decamelize(RAW_DEVICES) - device = Device(**device_data) + device = _make_device() assert device.supports_command("open") assert not device.supports_command("nonexistent") def test_supports_any_command(self): """Device.supports_any_command() checks if device supports any command.""" - device_data = decamelize(RAW_DEVICES) - device = Device(**device_data) + device = _make_device() assert device.supports_any_command(["nonexistent", "open"]) assert not device.supports_any_command(["nonexistent"]) def test_get_state_value(self): """Device.get_state_value() returns value of a single state.""" - device_data = decamelize(RAW_DEVICES) - device = Device(**device_data) + device = _make_device() value = device.get_state_value("core:ClosureState") assert value == 100 assert device.get_state_value("nonexistent") is None def test_select_first_state_value(self): """Device.select_first_state_value() returns value of first matching state from list.""" - device_data = decamelize(RAW_DEVICES) - device = Device(**device_data) + device = _make_device() value = device.select_first_state_value(["nonexistent", "core:ClosureState"]) assert value == 100 def test_has_state_value(self): """Device.has_state_value() checks if a single state exists with non-None value.""" - device_data = decamelize(RAW_DEVICES) - device = Device(**device_data) + device = _make_device() assert device.has_state_value("core:ClosureState") assert not device.has_state_value("nonexistent") def test_has_any_state_value(self): """Device.has_any_state_value() checks if any state exists with non-None value.""" - device_data = decamelize(RAW_DEVICES) - device = Device(**device_data) + device = _make_device() assert device.has_any_state_value(["nonexistent", "core:ClosureState"]) assert not device.has_any_state_value(["nonexistent"]) def test_get_state_definition(self): """Device.get_state_definition() returns StateDefinition for a single state.""" - device_data = decamelize(RAW_DEVICES) - device = Device(**device_data) + device = _make_device() state_def = device.get_state_definition("core:ClosureState") assert state_def is not None assert state_def.qualified_name == "core:ClosureState" @@ -315,8 +304,7 @@ def test_get_state_definition(self): def test_select_first_state_definition(self): """Device.select_first_state_definition() returns first matching StateDefinition from list.""" - device_data = decamelize(RAW_DEVICES) - device = Device(**device_data) + device = _make_device() state_def = device.select_first_state_definition( ["nonexistent", "core:ClosureState"] ) @@ -325,30 +313,30 @@ def test_select_first_state_definition(self): def test_get_attribute_value(self): """Device.get_attribute_value() returns value of a single attribute.""" - test_device = { - **RAW_DEVICES, - "attributes": [ - {"name": "core:Manufacturer", "type": 3, "value": "VELUX"}, - {"name": "core:Model", "type": 3, "value": "WINDOW 100"}, - ], - } - device_data = decamelize(test_device) - device = Device(**device_data) + device = _make_device( + { + **RAW_DEVICES, + "attributes": [ + {"name": "core:Manufacturer", "type": 3, "value": "VELUX"}, + {"name": "core:Model", "type": 3, "value": "WINDOW 100"}, + ], + } + ) value = device.get_attribute_value("core:Model") assert value == "WINDOW 100" assert device.get_attribute_value("nonexistent") is None def test_select_first_attribute_value_returns_first_match(self): """Device.select_first_attribute_value() returns value of first matching attribute from list.""" - test_device = { - **RAW_DEVICES, - "attributes": [ - {"name": "core:Manufacturer", "type": 3, "value": "VELUX"}, - {"name": "core:Model", "type": 3, "value": "WINDOW 100"}, - ], - } - device_data = decamelize(test_device) - device = Device(**device_data) + device = _make_device( + { + **RAW_DEVICES, + "attributes": [ + {"name": "core:Manufacturer", "type": 3, "value": "VELUX"}, + {"name": "core:Model", "type": 3, "value": "WINDOW 100"}, + ], + } + ) value = device.select_first_attribute_value( ["nonexistent", "core:Model", "core:Manufacturer"] ) @@ -356,30 +344,27 @@ def test_select_first_attribute_value_returns_first_match(self): def test_select_first_attribute_value_returns_none_when_no_match(self): """Device.select_first_attribute_value() returns None when no attribute matches.""" - device_data = decamelize(RAW_DEVICES) - device = Device(**device_data) + device = _make_device() value = device.select_first_attribute_value(["nonexistent", "also_nonexistent"]) assert value is None def test_select_first_attribute_value_empty_attributes(self): """Device.select_first_attribute_value() returns None for devices with no attributes.""" - test_device = {**RAW_DEVICES, "attributes": []} - device_data = decamelize(test_device) - device = Device(**device_data) + device = _make_device({**RAW_DEVICES, "attributes": []}) value = device.select_first_attribute_value(["core:Manufacturer"]) assert value is None def test_select_first_attribute_value_with_none_values(self): """Device.select_first_attribute_value() skips attributes with None values.""" - test_device = { - **RAW_DEVICES, - "attributes": [ - {"name": "core:Model", "type": 3, "value": None}, - {"name": "core:Manufacturer", "type": 3, "value": "VELUX"}, - ], - } - device_data = decamelize(test_device) - device = Device(**device_data) + device = _make_device( + { + **RAW_DEVICES, + "attributes": [ + {"name": "core:Model", "type": 3, "value": None}, + {"name": "core:Manufacturer", "type": 3, "value": "VELUX"}, + ], + } + ) value = device.select_first_attribute_value(["core:Model", "core:Manufacturer"]) assert value == "VELUX" @@ -387,21 +372,24 @@ def test_select_first_attribute_value_with_none_values(self): class TestStates: """Tests for the States container behaviour and getter semantics.""" + def _make_states(self, raw: list[dict] | None = None) -> States: + return converter.structure(raw, States) + def test_empty_states(self): """An empty list yields an empty States object with no state found.""" - states = States([]) + states = self._make_states([]) assert not states assert not states.get(STATE) def test_none_states(self): """A None value for states should behave as empty.""" - states = States(None) + states = self._make_states(None) assert not states assert not states.get(STATE) def test_getter(self): """Retrieve a known state and validate its properties.""" - states = States(RAW_STATES) + states = self._make_states(RAW_STATES) state = states.get(STATE) assert state assert state.name == STATE @@ -410,13 +398,13 @@ def test_getter(self): def test_getter_missing(self): """Requesting a missing state returns falsy (None).""" - states = States(RAW_STATES) + states = self._make_states(RAW_STATES) state = states.get("FooState") assert not state def test_select_returns_first_match(self): """select() returns the first state with a non-None value.""" - states = States(RAW_STATES) + states = self._make_states(RAW_STATES) state = states.select( ["nonexistent", "core:NameState", "internal:AlarmDelayState"] ) @@ -425,37 +413,78 @@ def test_select_returns_first_match(self): def test_select_returns_none_when_no_match(self): """select() returns None when no state matches.""" - states = States(RAW_STATES) + states = self._make_states(RAW_STATES) assert states.select(["nonexistent", "also_nonexistent"]) is None def test_select_value_returns_first_value(self): """select_value() returns the value of the first matching state.""" - states = States(RAW_STATES) + states = self._make_states(RAW_STATES) value = states.select_value(["nonexistent", "core:NameState"]) assert value == "alarm name" def test_select_value_returns_none_when_no_match(self): """select_value() returns None when no state matches.""" - states = States(RAW_STATES) + states = self._make_states(RAW_STATES) assert states.select_value(["nonexistent"]) is None def test_has_any_true(self): """has_any() returns True when at least one state exists.""" - states = States(RAW_STATES) + states = self._make_states(RAW_STATES) assert states.has_any(["nonexistent", "core:NameState"]) def test_has_any_false(self): """has_any() returns False when no state exists.""" - states = States(RAW_STATES) + states = self._make_states(RAW_STATES) assert not states.has_any(["nonexistent", "also_nonexistent"]) + def test_getitem_raises_keyerror_on_missing(self): + """Subscript access raises KeyError for missing states.""" + states = self._make_states(RAW_STATES) + with pytest.raises(KeyError, match="nonexistent"): + states["nonexistent"] + + def test_getitem_returns_state_on_hit(self): + """Subscript access returns the State for a known name.""" + states = self._make_states(RAW_STATES) + state = states[STATE] + assert state.name == STATE + + def test_contains_existing(self): + """'in' operator returns True for existing state names.""" + states = self._make_states(RAW_STATES) + assert STATE in states + + def test_contains_missing(self): + """'in' operator returns False for missing state names.""" + states = self._make_states(RAW_STATES) + assert "nonexistent" not in states + + def test_setitem_replaces_existing(self): + """Setting an existing state replaces it.""" + states = self._make_states(RAW_STATES) + new_state = State(name=STATE, type=DataType.INTEGER, value=42) + states[STATE] = new_state + assert states.get(STATE).value == 42 + + def test_setitem_appends_new(self): + """Setting a new state appends it.""" + states = self._make_states(RAW_STATES) + initial_len = len(states) + new_state = State(name="new:State", type=DataType.INTEGER, value=1) + states["new:State"] = new_state + assert len(states) == initial_len + 1 + assert states.get("new:State").value == 1 + class TestCommandDefinitions: """Tests for CommandDefinitions container and helper methods.""" + def _make_cmds(self, raw: list[dict]) -> CommandDefinitions: + return converter.structure(raw, CommandDefinitions) + def test_select_returns_first_match(self): """select() returns the first command name that exists.""" - cmds = CommandDefinitions( + cmds = self._make_cmds( [ {"command_name": "close", "nparams": 0}, {"command_name": "open", "nparams": 0}, @@ -466,66 +495,89 @@ def test_select_returns_first_match(self): def test_select_returns_none_when_no_match(self): """select() returns None when no command matches.""" - cmds = CommandDefinitions([{"command_name": "close", "nparams": 0}]) + cmds = self._make_cmds([{"command_name": "close", "nparams": 0}]) assert cmds.select(["nonexistent", "also_nonexistent"]) is None def test_has_any_true(self): """has_any() returns True when at least one command exists.""" - cmds = CommandDefinitions([{"command_name": "close", "nparams": 0}]) + cmds = self._make_cmds([{"command_name": "close", "nparams": 0}]) assert cmds.has_any(["nonexistent", "close"]) def test_has_any_false(self): """has_any() returns False when no command matches.""" - cmds = CommandDefinitions([{"command_name": "close", "nparams": 0}]) + cmds = self._make_cmds([{"command_name": "close", "nparams": 0}]) assert not cmds.has_any(["nonexistent", "also_nonexistent"]) + def test_getitem_raises_keyerror_on_missing(self): + """Subscript access raises KeyError for missing commands.""" + cmds = self._make_cmds([{"command_name": "close", "nparams": 0}]) + with pytest.raises(KeyError, match="nonexistent"): + cmds["nonexistent"] + + def test_getitem_returns_command_on_hit(self): + """Subscript access returns the CommandDefinition for a known command.""" + cmds = self._make_cmds([{"command_name": "close", "nparams": 0}]) + cmd = cmds["close"] + assert cmd.command_name == "close" + + def test_get_returns_none_on_missing(self): + """get() returns None for missing commands.""" + cmds = self._make_cmds([{"command_name": "close", "nparams": 0}]) + assert cmds.get("nonexistent") is None + + def test_contains_existing(self): + """'in' operator returns True for existing command names.""" + cmds = self._make_cmds([{"command_name": "close", "nparams": 0}]) + assert "close" in cmds + + def test_contains_missing(self): + """'in' operator returns False for missing command names.""" + cmds = self._make_cmds([{"command_name": "close", "nparams": 0}]) + assert "nonexistent" not in cmds + class TestDefinition: """Tests for Definition model and its helper methods.""" def test_get_state_definition_returns_first_match(self): - """get_state_definition() returns the first StateDefinition in definition.states. - - The definition is matched by `qualified_name` against the input list. - """ + """get_state_definition() returns the first StateDefinition in definition.states.""" definition = Definition( - commands=[], + commands=CommandDefinitions(), states=[ - {"qualified_name": "core:ClosureState", "type": "ContinuousState"}, - { - "qualified_name": "core:TargetClosureState", - "type": "ContinuousState", - }, + StateDefinition( + qualified_name="core:ClosureState", type="ContinuousState" + ), + StateDefinition( + qualified_name="core:TargetClosureState", type="ContinuousState" + ), ], ) - # Iterates definition.states in order, returns first match found state_def = definition.get_state_definition( ["core:TargetClosureState", "core:ClosureState"] ) assert state_def is not None - # core:ClosureState appears first in definition.states, so it's returned assert state_def.qualified_name == "core:ClosureState" - # Only asking for TargetClosureState works state_def2 = definition.get_state_definition(["core:TargetClosureState"]) assert state_def2 is not None assert state_def2.qualified_name == "core:TargetClosureState" def test_get_state_definition_returns_none_when_no_match(self): """get_state_definition() returns None when no state definition matches.""" - definition = Definition(commands=[], states=[]) + definition = Definition(commands=CommandDefinitions(), states=[]) assert definition.get_state_definition(["nonexistent"]) is None def test_has_state_definition_returns_true(self): """has_state_definition() returns True when a state definition matches.""" definition = Definition( - commands=[], + commands=CommandDefinitions(), states=[ - {"qualified_name": "core:ClosureState", "type": "ContinuousState"}, - { - "qualified_name": "core:TargetClosureState", - "type": "ContinuousState", - }, + StateDefinition( + qualified_name="core:ClosureState", type="ContinuousState" + ), + StateDefinition( + qualified_name="core:TargetClosureState", type="ContinuousState" + ), ], ) assert definition.has_state_definition(["core:ClosureState"]) @@ -536,16 +588,18 @@ def test_has_state_definition_returns_true(self): def test_has_state_definition_returns_false(self): """has_state_definition() returns False when no state definition matches.""" definition = Definition( - commands=[], + commands=CommandDefinitions(), states=[ - {"qualified_name": "core:ClosureState", "type": "ContinuousState"}, + StateDefinition( + qualified_name="core:ClosureState", type="ContinuousState" + ), ], ) assert not definition.has_state_definition(["nonexistent", "also_nonexistent"]) def test_has_state_definition_empty_states(self): """has_state_definition() returns False for definitions with no states.""" - definition = Definition(commands=[], states=[]) + definition = Definition(commands=CommandDefinitions(), states=[]) assert not definition.has_state_definition(["core:ClosureState"]) @@ -669,12 +723,103 @@ def test_action_to_payload_and_parameters_conversion(): cmd = Command( name=OverkizCommand.SET_LEVEL, parameters=[10, OverkizCommandParam.A], type=1 ) - action = Action("rts://2025-8464-6867/16756006", [cmd]) + action = Action(device_url="rts://2025-8464-6867/16756006", commands=[cmd]) payload = action.to_payload() assert payload["device_url"] == "rts://2025-8464-6867/16756006" assert payload["commands"][0]["name"] == "setLevel" assert payload["commands"][0]["type"] == 1 - # parameters should be converted to primitives (enum -> str) assert payload["commands"][0]["parameters"] == [10, "A"] + + +class TestEvent: + """Tests for Event structuring via the cattrs converter.""" + + def test_execution_state_changed_event(self): + """Optional[Enum] fields (old_state, new_state) are structured into enums.""" + raw = decamelize( + { + "timestamp": 1631130760744, + "setupOID": "741bc89f-a47b-4ad6-894d-a785c06956c2", + "execId": "c6f83624-ac10-3e01-653e-2b025fee956d", + "newState": "IN_PROGRESS", + "ownerKey": "741bc89f-a47b-4ad6-894d-a785c06956c2", + "type": 1, + "subType": 1, + "oldState": "TRANSMITTED", + "timeToNextState": 0, + "name": "ExecutionStateChangedEvent", + } + ) + event = converter.structure(raw, Event) + + assert event.name == EventName.EXECUTION_STATE_CHANGED + assert event.old_state is ExecutionState.TRANSMITTED + assert event.new_state is ExecutionState.IN_PROGRESS + assert event.setup_oid == "741bc89f-a47b-4ad6-894d-a785c06956c2" + + def test_failure_type_code_structured_as_enum(self): + """FailureType | None field is structured into an enum instance.""" + raw = decamelize( + { + "name": "ExecutionStateChangedEvent", + "timestamp": 123, + "failureTypeCode": 0, + } + ) + event = converter.structure(raw, Event) + + assert isinstance(event.failure_type_code, FailureType) + assert event.failure_type_code is FailureType.NO_FAILURE + + def test_optional_enum_fields_none_when_absent(self): + """Optional enum fields default to None when not present in the payload.""" + raw = decamelize( + { + "name": "GatewaySynchronizationEndedEvent", + "timestamp": 1631130645998, + "gatewayId": "9876-1234-8767", + } + ) + event = converter.structure(raw, Event) + + assert event.old_state is None + assert event.new_state is None + assert event.failure_type_code is None + + def test_device_state_changed_event_with_states(self): + """DeviceStateChangedEvent payload structures device_states as EventState.""" + raw = decamelize( + { + "timestamp": 1631130646544, + "setupOID": "741bc89f-a47b-4ad6-894d-a785c06956c2", + "deviceURL": "io://9876-1234-8767/4468654#1", + "deviceStates": [ + { + "name": "core:ElectricEnergyConsumptionState", + "type": 1, + "value": "23247220", + } + ], + "name": "DeviceStateChangedEvent", + } + ) + event = converter.structure(raw, Event) + + assert event.name == EventName.DEVICE_STATE_CHANGED + assert len(event.device_states) == 1 + assert isinstance(event.device_states[0], EventState) + + def test_event_fixture_structures_all_events(self): + """All events in the cloud fixture file structure without errors.""" + raw_events = json.loads((Path("tests/fixtures/event/events.json")).read_text()) + events = [converter.structure(decamelize(e), Event) for e in raw_events] + + assert len(events) == len(raw_events) + state_changed = [ + e for e in events if e.name == EventName.EXECUTION_STATE_CHANGED + ] + for e in state_changed: + assert isinstance(e.old_state, ExecutionState) + assert isinstance(e.new_state, ExecutionState) diff --git a/tests/test_ui_profile.py b/tests/test_ui_profile.py index df88bc72..ae85e55f 100644 --- a/tests/test_ui_profile.py +++ b/tests/test_ui_profile.py @@ -1,7 +1,10 @@ """Tests for UIProfileDefinition models.""" +from pyoverkiz.converter import converter from pyoverkiz.models import ( CommandParameter, + CommandPrototype, + StatePrototype, UIProfileCommand, UIProfileDefinition, UIProfileState, @@ -33,7 +36,7 @@ def test_command_parameter(): param = CommandParameter( optional=False, sensitive=False, - value_prototypes=[{"type": "INT", "min_value": 0, "max_value": 100}], + value_prototypes=[ValuePrototype(type="INT", min_value=0, max_value=100)], ) assert param.optional is False assert param.sensitive is False @@ -45,17 +48,17 @@ def test_ui_profile_command(): """Test UIProfileCommand with prototype.""" cmd = UIProfileCommand( name="setFanSpeedLevel", - prototype={ - "parameters": [ - { - "optional": False, - "sensitive": False, - "value_prototypes": [ - {"type": "INT", "min_value": 0, "max_value": 100} + prototype=CommandPrototype( + parameters=[ + CommandParameter( + optional=False, + sensitive=False, + value_prototypes=[ + ValuePrototype(type="INT", min_value=0, max_value=100) ], - } + ) ] - }, + ), description="Set the device fan speed level", ) assert cmd.name == "setFanSpeedLevel" @@ -68,11 +71,11 @@ def test_ui_profile_state(): """Test UIProfileState with prototype.""" state = UIProfileState( name="core:TemperatureState", - prototype={ - "value_prototypes": [ - {"type": "FLOAT", "min_value": -100.0, "max_value": 100.0} + prototype=StatePrototype( + value_prototypes=[ + ValuePrototype(type="FLOAT", min_value=-100.0, max_value=100.0) ] - }, + ), description="Current room temperature", ) assert state.name == "core:TemperatureState" @@ -81,11 +84,11 @@ def test_ui_profile_state(): assert len(state.prototype.value_prototypes) == 1 -def test_ui_profile_definition(): - """Test complete UIProfileDefinition.""" - profile = UIProfileDefinition( - name="AirFan", - commands=[ +def test_ui_profile_definition_from_api(): + """Test complete UIProfileDefinition structured from API-like dict.""" + raw = { + "name": "AirFan", + "commands": [ { "name": "setFanSpeedLevel", "prototype": { @@ -102,7 +105,7 @@ def test_ui_profile_definition(): "description": "Set fan speed", } ], - states=[ + "states": [ { "name": "core:FanSpeedState", "prototype": { @@ -113,22 +116,21 @@ def test_ui_profile_definition(): "description": "Current fan speed", } ], - form_factor=False, - ) + "form_factor": False, + } + profile = converter.structure(raw, UIProfileDefinition) assert profile.name == "AirFan" assert len(profile.commands) == 1 assert len(profile.states) == 1 assert profile.form_factor is False - # Verify command structure cmd = profile.commands[0] assert cmd.name == "setFanSpeedLevel" assert cmd.description == "Set fan speed" assert cmd.prototype is not None assert len(cmd.prototype.parameters) == 1 - # Verify state structure state = profile.states[0] assert state.name == "core:FanSpeedState" assert state.description == "Current fan speed" diff --git a/uv.lock b/uv.lock index 840bda4a..df3ce267 100644 --- a/uv.lock +++ b/uv.lock @@ -178,6 +178,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d6/cd/7e7ceeff26889d1fd923f069381e3b2b85ff6d46c6fd1409ed8f486cc06f/botocore-1.42.49-py3-none-any.whl", hash = "sha256:1c33544f72101eed4ccf903ebb667a803e14e25b2af4e0836e4b871da1c0af37", size = 14630510, upload-time = "2026-02-13T20:29:43.086Z" }, ] +[[package]] +name = "cattrs" +version = "26.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "attrs" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a0/ec/ba18945e7d6e55a58364d9fb2e46049c1c2998b3d805f19b703f14e81057/cattrs-26.1.0.tar.gz", hash = "sha256:fa239e0f0ec0715ba34852ce813986dfed1e12117e209b816ab87401271cdd40", size = 495672, upload-time = "2026-02-18T22:15:19.406Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/80/56/60547f7801b97c67e97491dc3d9ade9fbccbd0325058fd3dfcb2f5d98d90/cattrs-26.1.0-py3-none-any.whl", hash = "sha256:d1e0804c42639494d469d08d4f26d6b9de9b8ab26b446db7b5f8c2e97f7c3096", size = 73054, upload-time = "2026-02-18T22:15:17.958Z" }, +] + [[package]] name = "certifi" version = "2026.1.4" @@ -1139,6 +1152,7 @@ dependencies = [ { name = "attrs" }, { name = "backoff" }, { name = "boto3" }, + { name = "cattrs" }, { name = "warrant-lite" }, ] @@ -1168,6 +1182,7 @@ requires-dist = [ { name = "attrs", specifier = ">=21.2" }, { name = "backoff", specifier = ">=1.10.0,<3.0" }, { name = "boto3", specifier = ">=1.18.59,<2.0.0" }, + { name = "cattrs", specifier = ">=26.1.0" }, { name = "mkdocs", marker = "extra == 'docs'", specifier = ">=1.5.0" }, { name = "mkdocs-autorefs", marker = "extra == 'docs'", specifier = ">=1.0.0" }, { name = "mkdocs-material", marker = "extra == 'docs'", specifier = ">=9.5.0" },