1+ """Authentication strategies for Overkiz API."""
2+
13from __future__ import annotations
24
35import asyncio
1113
1214import boto3
1315from aiohttp import ClientSession , FormData
16+ from botocore .client import BaseClient
1417from botocore .config import Config
1518from warrant_lite import WarrantLite
1619
5154
5255
5356class BaseAuthStrategy (AuthStrategy ):
57+ """Base class for authentication strategies."""
58+
5459 def __init__ (
5560 self ,
5661 session : ClientSession ,
@@ -82,6 +87,8 @@ async def close(self) -> None:
8287
8388
8489class SessionLoginStrategy (BaseAuthStrategy ):
90+ """Authentication strategy using session-based login."""
91+
8592 def __init__ (
8693 self ,
8794 credentials : UsernamePasswordCredentials ,
@@ -90,17 +97,20 @@ def __init__(
9097 ssl_context : ssl .SSLContext | bool ,
9198 api_type : APIType ,
9299 ) -> None :
100+ """Initialize SessionLoginStrategy with given parameters."""
93101 super ().__init__ (session , server , ssl_context , api_type )
94102 self .credentials = credentials
95103
96104 async def login (self ) -> None :
105+ """Perform login using username and password."""
97106 payload = {
98107 "userId" : self .credentials .username ,
99108 "userPassword" : self .credentials .password ,
100109 }
101110 await self ._post_login (payload )
102111
103112 async def _post_login (self , data : Mapping [str , Any ]) -> None :
113+ """Post login data to the server and handle response."""
104114 async with self .session .post (
105115 f"{ self .server .endpoint } login" ,
106116 data = data ,
@@ -117,6 +127,8 @@ async def _post_login(self, data: Mapping[str, Any]) -> None:
117127
118128
119129class SomfyAuthStrategy (BaseAuthStrategy ):
130+ """Authentication strategy using Somfy OAuth2."""
131+
120132 def __init__ (
121133 self ,
122134 credentials : UsernamePasswordCredentials ,
@@ -125,11 +137,13 @@ def __init__(
125137 ssl_context : ssl .SSLContext | bool ,
126138 api_type : APIType ,
127139 ) -> None :
140+ """Initialize SomfyAuthStrategy with given parameters."""
128141 super ().__init__ (session , server , ssl_context , api_type )
129142 self .credentials = credentials
130143 self .context = AuthContext ()
131144
132145 async def login (self ) -> None :
146+ """Perform login using Somfy OAuth2."""
133147 await self ._request_access_token (
134148 grant_type = "password" ,
135149 extra_fields = {
@@ -139,6 +153,7 @@ async def login(self) -> None:
139153 )
140154
141155 async def refresh_if_needed (self ) -> bool :
156+ """Refresh Somfy OAuth2 tokens if needed."""
142157 if not self .context .is_expired () or not self .context .refresh_token :
143158 return False
144159
@@ -149,6 +164,7 @@ async def refresh_if_needed(self) -> bool:
149164 return True
150165
151166 def auth_headers (self , path : str | None = None ) -> Mapping [str , str ]:
167+ """Return authentication headers for a request path."""
152168 if self .context .access_token :
153169 return {"Authorization" : f"Bearer { self .context .access_token } " }
154170
@@ -190,6 +206,8 @@ async def _request_access_token(
190206
191207
192208class CozytouchAuthStrategy (SessionLoginStrategy ):
209+ """Authentication strategy using Cozytouch session-based login."""
210+
193211 def __init__ (
194212 self ,
195213 credentials : UsernamePasswordCredentials ,
@@ -198,9 +216,11 @@ def __init__(
198216 ssl_context : ssl .SSLContext | bool ,
199217 api_type : APIType ,
200218 ) -> None :
219+ """Initialize CozytouchAuthStrategy with given parameters."""
201220 super ().__init__ (credentials , session , server , ssl_context , api_type )
202221
203222 async def login (self ) -> None :
223+ """Perform login using Cozytouch username and password."""
204224 form = FormData (
205225 {
206226 "grant_type" : "password" ,
@@ -239,6 +259,8 @@ async def login(self) -> None:
239259
240260
241261class NexityAuthStrategy (SessionLoginStrategy ):
262+ """Authentication strategy using Nexity session-based login."""
263+
242264 def __init__ (
243265 self ,
244266 credentials : UsernamePasswordCredentials ,
@@ -247,12 +269,14 @@ def __init__(
247269 ssl_context : ssl .SSLContext | bool ,
248270 api_type : APIType ,
249271 ) -> None :
272+ """Initialize NexityAuthStrategy with given parameters."""
250273 super ().__init__ (credentials , session , server , ssl_context , api_type )
251274
252275 async def login (self ) -> None :
276+ """Perform login using Nexity username and password."""
253277 loop = asyncio .get_event_loop ()
254278
255- def _client () -> boto3 . session . Session . client :
279+ def _client () -> BaseClient :
256280 return boto3 .client (
257281 "cognito-idp" , config = Config (region_name = NEXITY_COGNITO_REGION )
258282 )
@@ -287,6 +311,8 @@ def _client() -> boto3.session.Session.client:
287311
288312
289313class LocalTokenAuthStrategy (BaseAuthStrategy ):
314+ """Authentication strategy using a local API token."""
315+
290316 def __init__ (
291317 self ,
292318 credentials : LocalTokenCredentials ,
@@ -295,18 +321,23 @@ def __init__(
295321 ssl_context : ssl .SSLContext | bool ,
296322 api_type : APIType ,
297323 ) -> None :
324+ """Initialize LocalTokenAuthStrategy with given parameters."""
298325 super ().__init__ (session , server , ssl_context , api_type )
299326 self .credentials = credentials
300327
301328 async def login (self ) -> None :
329+ """Validate that a token is provided for local API access."""
302330 if not self .credentials .token :
303331 raise InvalidTokenException ("Local API requires a token." )
304332
305333 def auth_headers (self , path : str | None = None ) -> Mapping [str , str ]:
334+ """Return authentication headers for a request path."""
306335 return {"Authorization" : f"Bearer { self .credentials .token } " }
307336
308337
309338class RexelAuthStrategy (BaseAuthStrategy ):
339+ """Authentication strategy using Rexel OAuth2."""
340+
310341 def __init__ (
311342 self ,
312343 credentials : RexelOAuthCodeCredentials ,
@@ -315,11 +346,13 @@ def __init__(
315346 ssl_context : ssl .SSLContext | bool ,
316347 api_type : APIType ,
317348 ) -> None :
349+ """Initialize RexelAuthStrategy with given parameters."""
318350 super ().__init__ (session , server , ssl_context , api_type )
319351 self .credentials = credentials
320352 self .context = AuthContext ()
321353
322354 async def login (self ) -> None :
355+ """Perform login using Rexel OAuth2 authorization code."""
323356 await self ._exchange_token (
324357 {
325358 "grant_type" : "authorization_code" ,
@@ -331,6 +364,7 @@ async def login(self) -> None:
331364 )
332365
333366 async def refresh_if_needed (self ) -> bool :
367+ """ "Refresh Rexel OAuth2 tokens if needed."""
334368 if not self .context .is_expired () or not self .context .refresh_token :
335369 return False
336370
@@ -345,11 +379,13 @@ async def refresh_if_needed(self) -> bool:
345379 return True
346380
347381 def auth_headers (self , path : str | None = None ) -> Mapping [str , str ]:
382+ """Return authentication headers for a request path."""
348383 if self .context .access_token :
349384 return {"Authorization" : f"Bearer { self .context .access_token } " }
350385 return {}
351386
352387 async def _exchange_token (self , payload : Mapping [str , str ]) -> None :
388+ """Exchange authorization code or refresh token for access token."""
353389 form = FormData (payload )
354390 async with self .session .post (
355391 REXEL_OAUTH_TOKEN_URL ,
@@ -373,6 +409,7 @@ async def _exchange_token(self, payload: Mapping[str, str]) -> None:
373409
374410 @staticmethod
375411 def _ensure_consent (access_token : str ) -> None :
412+ """Ensure that the Rexel token has the required consent."""
376413 payload = _decode_jwt_payload (access_token )
377414 consent = payload .get ("consent" )
378415 if consent != REXEL_REQUIRED_CONSENT :
@@ -382,6 +419,8 @@ def _ensure_consent(access_token: str) -> None:
382419
383420
384421class BearerTokenAuthStrategy (BaseAuthStrategy ):
422+ """Authentication strategy using a static bearer token."""
423+
385424 def __init__ (
386425 self ,
387426 credentials : TokenCredentials ,
@@ -390,16 +429,19 @@ def __init__(
390429 ssl_context : ssl .SSLContext | bool ,
391430 api_type : APIType ,
392431 ) -> None :
432+ """Initialize BearerTokenAuthStrategy with given parameters."""
393433 super ().__init__ (session , server , ssl_context , api_type )
394434 self .credentials = credentials
395435
396436 def auth_headers (self , path : str | None = None ) -> Mapping [str , str ]:
437+ """Return authentication headers for a request path."""
397438 if self .credentials .token :
398439 return {"Authorization" : f"Bearer { self .credentials .token } " }
399440 return {}
400441
401442
402443def _decode_jwt_payload (token : str ) -> dict [str , Any ]:
444+ """Decode the payload of a JWT token."""
403445 parts = token .split ("." )
404446 if len (parts ) < 2 :
405447 raise InvalidTokenException ("Malformed JWT received." )
0 commit comments