-
Notifications
You must be signed in to change notification settings - Fork 34
Expand file tree
/
Copy pathfactory.py
More file actions
123 lines (102 loc) · 3.53 KB
/
factory.py
File metadata and controls
123 lines (102 loc) · 3.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""Factory to build authentication strategies based on server and credentials."""
from __future__ import annotations
from typing import TYPE_CHECKING
from pyoverkiz.auth.credentials import (
Credentials,
LocalTokenCredentials,
RexelOAuthCodeCredentials,
TokenCredentials,
UsernamePasswordCredentials,
)
from pyoverkiz.auth.strategies import (
AuthStrategy,
BearerTokenAuthStrategy,
CozytouchAuthStrategy,
LocalTokenAuthStrategy,
NexityAuthStrategy,
RexelAuthStrategy,
SessionLoginStrategy,
SomfyAuthStrategy,
)
from pyoverkiz.enums import APIType, Server
if TYPE_CHECKING:
import ssl
from aiohttp import ClientSession
from pyoverkiz.models import ServerConfig
def build_auth_strategy(
*,
server_config: ServerConfig,
credentials: Credentials,
session: ClientSession,
ssl_context: ssl.SSLContext | bool,
) -> AuthStrategy:
"""Build the correct auth strategy for the given server and credentials."""
server: Server | None = server_config.server
if server == Server.SOMFY_EUROPE:
return SomfyAuthStrategy(
_ensure_username_password(credentials),
session,
server_config,
ssl_context,
)
if server in {
Server.ATLANTIC_COZYTOUCH,
Server.THERMOR_COZYTOUCH,
Server.SAUTER_COZYTOUCH,
}:
return CozytouchAuthStrategy(
_ensure_username_password(credentials),
session,
server_config,
ssl_context,
)
if server == Server.NEXITY:
return NexityAuthStrategy(
_ensure_username_password(credentials),
session,
server_config,
ssl_context,
)
if server == Server.REXEL:
return RexelAuthStrategy(
_ensure_rexel(credentials),
session,
server_config,
ssl_context,
)
if server_config.api_type == APIType.LOCAL:
if isinstance(credentials, LocalTokenCredentials):
return LocalTokenAuthStrategy(
credentials, session, server_config, ssl_context
)
return BearerTokenAuthStrategy(
_ensure_token(credentials),
session,
server_config,
ssl_context,
)
if isinstance(credentials, TokenCredentials) and not isinstance(
credentials, LocalTokenCredentials
):
return BearerTokenAuthStrategy(credentials, session, server_config, ssl_context)
return SessionLoginStrategy(
_ensure_username_password(credentials),
session,
server_config,
ssl_context,
)
def _ensure_username_password(credentials: Credentials) -> UsernamePasswordCredentials:
"""Validate that credentials are username/password based."""
if not isinstance(credentials, UsernamePasswordCredentials):
raise TypeError("UsernamePasswordCredentials are required for this server.")
return credentials
def _ensure_token(credentials: Credentials) -> TokenCredentials:
"""Validate that credentials carry a bearer token."""
if not isinstance(credentials, TokenCredentials):
raise TypeError("TokenCredentials are required for this server.")
return credentials
def _ensure_rexel(credentials: Credentials) -> RexelOAuthCodeCredentials:
"""Validate that credentials are of Rexel OAuth code type."""
if not isinstance(credentials, RexelOAuthCodeCredentials):
raise TypeError("RexelOAuthCodeCredentials are required for this server.")
return credentials