|
1 | | -"""Centralized cattrs converter for structuring Overkiz API responses into models.""" |
| 1 | +"""Centralized cattrs converter for structuring Overkiz API responses.""" |
2 | 2 |
|
3 | 3 | from __future__ import annotations |
4 | 4 |
|
|
8 | 8 |
|
9 | 9 | import attr |
10 | 10 | import cattrs |
11 | | -from cattrs.dispatch import StructureHook |
12 | 11 |
|
13 | 12 | from pyoverkiz.models import ( |
14 | 13 | CommandDefinition, |
|
18 | 17 | ) |
19 | 18 |
|
20 | 19 |
|
21 | | -def _is_union(t: Any) -> bool: |
22 | | - """Check if a type is a Union or PEP 604 union (X | Y).""" |
23 | | - return get_origin(t) is Union or isinstance(t, types.UnionType) |
24 | | - |
25 | | - |
26 | 20 | def _is_primitive_union(t: Any) -> bool: |
27 | | - """Check if a union type only contains primitive/JSON-native types and enums. |
28 | | -
|
29 | | - Returns False if any union member is an attrs class (needs structuring). |
30 | | - """ |
31 | | - if not _is_union(t): |
| 21 | + """True for union types containing only JSON-native types and enums (no attrs classes).""" |
| 22 | + origin = get_origin(t) |
| 23 | + if origin is not Union and not isinstance(t, types.UnionType): |
32 | 24 | return False |
33 | | - for arg in get_args(t): |
34 | | - if arg is type(None): |
35 | | - continue |
36 | | - if isinstance(arg, type) and attr.has(arg): |
37 | | - return False |
38 | | - return True |
| 25 | + return all( |
| 26 | + arg is type(None) or not (isinstance(arg, type) and attr.has(arg)) |
| 27 | + for arg in get_args(t) |
| 28 | + ) |
39 | 29 |
|
40 | 30 |
|
41 | 31 | def _make_converter() -> cattrs.Converter: |
42 | | - c = cattrs.Converter(forbid_extra_keys=False) |
| 32 | + c = cattrs.Converter() |
43 | 33 |
|
44 | | - # ------------------------------------------------------------------ |
45 | | - # Primitive union types (str | Enum, StateType, etc.): passthrough |
46 | | - # These only contain JSON-native types and enums, no attrs classes. |
47 | | - # ------------------------------------------------------------------ |
| 34 | + # Primitive unions like StateType (str | int | float | … | None): pass through as-is |
48 | 35 | c.register_structure_hook_func(_is_primitive_union, lambda v, _: v) |
49 | 36 |
|
50 | | - # ------------------------------------------------------------------ |
51 | | - # Optional[AttrsClass] unions: structure the non-None member |
52 | | - # ------------------------------------------------------------------ |
53 | | - def _is_optional_attrs(t: Any) -> bool: |
54 | | - if not _is_union(t): |
55 | | - return False |
56 | | - args = get_args(t) |
57 | | - non_none = [a for a in args if a is not type(None)] |
58 | | - return ( |
59 | | - len(non_none) == 1 |
60 | | - and isinstance(non_none[0], type) |
61 | | - and attr.has(non_none[0]) |
62 | | - ) |
63 | | - |
64 | | - def _structure_optional_attrs(v: Any, t: Any) -> Any: |
65 | | - if v is None: |
66 | | - return None |
67 | | - args = get_args(t) |
68 | | - cls = next(a for a in args if a is not type(None)) |
69 | | - return c.structure(v, cls) |
70 | | - |
71 | | - c.register_structure_hook_func(_is_optional_attrs, _structure_optional_attrs) |
72 | | - |
73 | | - # ------------------------------------------------------------------ |
74 | | - # Enums: use the enum constructor which handles UnknownEnumMixin |
75 | | - # ------------------------------------------------------------------ |
| 37 | + # Enums: call the constructor so UnknownEnumMixin._missing_ can handle unknown values |
76 | 38 | c.register_structure_hook_func( |
77 | 39 | lambda t: isinstance(t, type) and issubclass(t, Enum), |
78 | 40 | lambda v, t: v if isinstance(v, t) else t(v), |
79 | 41 | ) |
80 | 42 |
|
81 | | - # ------------------------------------------------------------------ |
82 | | - # attrs classes: silently discard unknown keys from API responses |
83 | | - # ------------------------------------------------------------------ |
84 | | - def _make_attrs_hook(cls: type) -> StructureHook: |
85 | | - inner: StructureHook = cattrs.gen.make_dict_structure_fn(cls, c) |
86 | | - |
87 | | - def hook(val: Any, _: type) -> Any: |
88 | | - if isinstance(val, dict): |
89 | | - fields = {a.name for a in attr.fields(cls)} |
90 | | - val = {k: v for k, v in val.items() if k in fields} |
91 | | - return inner(val, cls) |
92 | | - |
93 | | - return hook |
94 | | - |
95 | | - c.register_structure_hook_factory(attr.has, _make_attrs_hook) |
96 | | - |
97 | | - # ------------------------------------------------------------------ |
98 | | - # Container types with custom __init__ |
99 | | - # ------------------------------------------------------------------ |
| 43 | + # Custom container types that take a list in __init__ |
100 | 44 | def _structure_states(val: Any, _: type) -> States: |
101 | | - if isinstance(val, States): |
102 | | - return val |
103 | 45 | if val is None: |
104 | 46 | return States() |
105 | 47 | return States([c.structure(s, State) for s in val]) |
106 | 48 |
|
107 | 49 | def _structure_command_definitions(val: Any, _: type) -> CommandDefinitions: |
108 | | - if isinstance(val, CommandDefinitions): |
109 | | - return val |
110 | 50 | return CommandDefinitions([c.structure(cd, CommandDefinition) for cd in val]) |
111 | 51 |
|
112 | 52 | c.register_structure_hook(States, _structure_states) |
|
0 commit comments