-
Notifications
You must be signed in to change notification settings - Fork 34
Expand file tree
/
Copy pathfactory.py
More file actions
107 lines (92 loc) · 3.01 KB
/
factory.py
File metadata and controls
107 lines (92 loc) · 3.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""Factory to build authentication strategies based on server and credentials."""
from __future__ import annotations
import ssl
from aiohttp import ClientSession
from pyoverkiz.auth.credentials import (
Credentials,
LocalTokenCredentials,
RexelOAuthCodeCredentials,
TokenCredentials,
UsernamePasswordCredentials,
)
from pyoverkiz.auth.strategies import (
AuthStrategy,
BearerTokenAuthStrategy,
CozytouchAuthStrategy,
LocalTokenAuthStrategy,
NexityAuthStrategy,
RexelAuthStrategy,
SessionLoginStrategy,
SomfyAuthStrategy,
)
from pyoverkiz.enums import APIType, Server
from pyoverkiz.models import ServerConfig
def build_auth_strategy(
*,
server_config: ServerConfig,
credentials: Credentials,
session: ClientSession,
ssl_context: ssl.SSLContext | bool,
) -> AuthStrategy:
"""Build the correct auth strategy for the given server and credentials."""
server: Server | None = server_config.server
if server == Server.SOMFY_EUROPE:
return SomfyAuthStrategy(
_ensure_credentials(credentials, UsernamePasswordCredentials),
session,
server_config,
ssl_context,
)
if server in {
Server.ATLANTIC_COZYTOUCH,
Server.THERMOR_COZYTOUCH,
Server.SAUTER_COZYTOUCH,
}:
return CozytouchAuthStrategy(
_ensure_credentials(credentials, UsernamePasswordCredentials),
session,
server_config,
ssl_context,
)
if server == Server.NEXITY:
return NexityAuthStrategy(
_ensure_credentials(credentials, UsernamePasswordCredentials),
session,
server_config,
ssl_context,
)
if server == Server.REXEL:
return RexelAuthStrategy(
_ensure_credentials(credentials, RexelOAuthCodeCredentials),
session,
server_config,
ssl_context,
)
if server_config.api_type == APIType.LOCAL:
if isinstance(credentials, LocalTokenCredentials):
return LocalTokenAuthStrategy(
credentials, session, server_config, ssl_context
)
return BearerTokenAuthStrategy(
_ensure_credentials(credentials, TokenCredentials),
session,
server_config,
ssl_context,
)
if isinstance(credentials, TokenCredentials) and not isinstance(
credentials, LocalTokenCredentials
):
return BearerTokenAuthStrategy(credentials, session, server_config, ssl_context)
return SessionLoginStrategy(
_ensure_credentials(credentials, UsernamePasswordCredentials),
session,
server_config,
ssl_context,
)
def _ensure_credentials[C: Credentials](
credentials: Credentials, expected: type[C]
) -> C:
"""Validate that credentials match the expected type."""
if not isinstance(credentials, expected):
raise TypeError(f"{expected.__name__} are required for this server.")
return credentials