Skip to content

Commit 82eab33

Browse files
authored
Merge pull request #382 from dvd-dev/hilo_state
Hilo_state corruption logic
2 parents f10e537 + c97f9a2 commit 82eab33

File tree

5 files changed

+88
-37
lines changed

5 files changed

+88
-37
lines changed

pyhilo/api.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ async def _async_request(
291291
try:
292292
data = await resp.json(content_type=None)
293293
except json.decoder.JSONDecodeError:
294-
LOG.warning(f"JSON Decode error: {resp.__dict__}")
294+
LOG.warning("JSON Decode error: %s", resp.__dict__)
295295
message = await resp.text()
296296
data = {"error": message}
297297
else:
@@ -353,15 +353,15 @@ async def _async_handle_on_backoff(self, _: dict[str, Any]) -> None:
353353
err: ClientResponseError = err_info[1].with_traceback(err_info[2]) # type: ignore
354354

355355
if err.status in (401, 403):
356-
LOG.warning(f"Refreshing websocket token {err.request_info.url}")
356+
LOG.warning("Refreshing websocket token %s", err.request_info.url)
357357
if (
358358
"client/negotiate" in str(err.request_info.url)
359359
and err.request_info.method == "POST"
360360
):
361361
LOG.info(
362362
"401 detected on websocket, refreshing websocket token. Old url: {self.ws_url} Old Token: {self.ws_token}"
363363
)
364-
LOG.info(f"401 detected on {err.request_info.url}")
364+
LOG.info("401 detected on %s", err.request_info.url)
365365
async with self._backoff_refresh_lock_ws:
366366
await self.refresh_ws_token()
367367
await self.get_websocket_params()
@@ -480,7 +480,7 @@ async def fb_install(self, fb_id: str) -> None:
480480
json=body,
481481
)
482482
except ClientResponseError as err:
483-
LOG.error(f"ClientResponseError: {err}")
483+
LOG.error("ClientResponseError: %s", err)
484484
if err.status in (401, 403):
485485
raise InvalidCredentialsError("Invalid credentials") from err
486486
raise RequestError(err) from err
@@ -518,14 +518,14 @@ async def android_register(self) -> None:
518518
data=parsed_body,
519519
)
520520
except ClientResponseError as err:
521-
LOG.error(f"ClientResponseError: {err}")
521+
LOG.error("ClientResponseError: %s", err)
522522
if err.status in (401, 403):
523523
raise InvalidCredentialsError("Invalid credentials") from err
524524
raise RequestError(err) from err
525525
LOG.debug("Android client register: %s", resp)
526526
msg: str = resp.get("message", "")
527527
if msg.startswith("Error="):
528-
LOG.error(f"Android registration error: {msg}")
528+
LOG.error("Android registration error: %s", msg)
529529
raise RequestError
530530
token = msg.split("=")[-1]
531531
LOG.debug("Calling set_state android_register")

pyhilo/device/__init__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def __init__(
5151
def update(self, **kwargs: Dict[str, Union[str, int, Dict]]) -> None:
5252
# TODO(dvd): This has to be re-written, this is not dynamic at all.
5353
if self._api.log_traces:
54-
LOG.debug(f"[TRACE] Adding device {kwargs}")
54+
LOG.debug("[TRACE] Adding device %s", kwargs)
5555
for orig_att, val in kwargs.items():
5656
att = camel_to_snake(orig_att)
5757
if reading_att := HILO_READING_TYPES.get(orig_att):
@@ -70,7 +70,7 @@ def update(self, **kwargs: Dict[str, Union[str, int, Dict]]) -> None:
7070
self.update_readings(DeviceReading(**reading)) # type: ignore
7171

7272
if att not in HILO_DEVICE_ATTRIBUTES:
73-
LOG.warning(f"Unknown device attribute {att}: {val}")
73+
LOG.warning("Unknown device attribute %s: %s", att, val)
7474
continue
7575
elif att in HILO_LIST_ATTRIBUTES:
7676
# This is where we generated the supported_attributes and settable_attributes
@@ -108,7 +108,7 @@ def update(self, **kwargs: Dict[str, Union[str, int, Dict]]) -> None:
108108

109109
async def set_attribute(self, attribute: str, value: Union[str, int, None]) -> None:
110110
if dev_attribute := cast(DeviceAttribute, self._api.dev_atts(attribute)):
111-
LOG.debug(f"{self._tag} Setting {dev_attribute} to {value}")
111+
LOG.debug("%s Setting %s to %s", self._tag, dev_attribute, value)
112112
await self._set_attribute(dev_attribute, value)
113113
return
114114
LOG.warning(
@@ -134,7 +134,7 @@ async def _set_attribute(
134134
)
135135
)
136136
else:
137-
LOG.warning(f"{self._tag} Invalid attribute {attribute} for device")
137+
LOG.warning("%s Invalid attribute %s for device", self._tag, attribute)
138138

139139
def get_attribute(self, attribute: str) -> Union[DeviceReading, None]:
140140
if dev_attribute := cast(DeviceAttribute, self._api.dev_atts(attribute)):
@@ -245,7 +245,7 @@ def __init__(self, **kwargs: Dict[str, Any]):
245245
else ""
246246
)
247247
if not self.device_attribute:
248-
LOG.warning(f"Received invalid reading for {self.device_id}: {kwargs}")
248+
LOG.warning("Received invalid reading for %s: %s", self.device_id, kwargs)
249249

250250
def __repr__(self) -> str:
251251
return f"<Reading {self.device_attribute.attr} {self.value}{self.unit_of_measurement}>"

pyhilo/devices.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def generate_device(self, device: dict) -> HiloDevice:
8787
try:
8888
device_type = HILO_DEVICE_TYPES[dev.type]
8989
except KeyError:
90-
LOG.warning(f"Unknown device type {dev.type}, adding as Sensor")
90+
LOG.warning("Unknown device type %s, adding as Sensor", dev.type)
9191
device_type = "Sensor"
9292
dev.__class__ = globals()[device_type]
9393
return dev

pyhilo/util/state.py

Lines changed: 66 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
import asyncio
66
from datetime import datetime
7+
import os
78
from os.path import isfile
9+
import tempfile
810
from typing import Any, ForwardRef, TypedDict, TypeVar, get_type_hints
911

1012
import aiofiles
@@ -77,7 +79,7 @@ class StateDict(TypedDict, total=False):
7779
T = TypeVar("T", bound="StateDict")
7880

7981

80-
def _get_defaults(cls: type[T]) -> dict[str, Any]:
82+
def _get_defaults(cls: type[T]) -> T:
8183
"""Generate a default dict based on typed dict
8284
8385
This function recursively creates a nested dictionary structure that mirrors
@@ -117,22 +119,71 @@ def _get_defaults(cls: type[T]) -> dict[str, Any]:
117119
return new_dict # type: ignore[return-value]
118120

119121

120-
async def get_state(state_yaml: str) -> StateDict:
122+
def _write_state(state_yaml: str, state: dict[str, Any] | StateDict) -> None:
123+
"Write state atomically to a temp file, this prevents reading a file being written to"
124+
125+
dir_name = os.path.dirname(os.path.abspath(state_yaml))
126+
content = yaml.dump(state)
127+
with tempfile.NamedTemporaryFile(
128+
mode="w", dir=dir_name, delete=False, suffix=".tmp"
129+
) as tmp:
130+
tmp.write(content)
131+
tmp_path = tmp.name
132+
os.chmod(tmp_path, 0o644)
133+
os.replace(tmp_path, state_yaml)
134+
135+
136+
async def get_state(state_yaml: str, _already_locked: bool = False) -> StateDict:
121137
"""Read in state yaml.
122138
123139
:param state_yaml: filename where to read the state
124140
:type state_yaml: ``str``
141+
:param _already_locked: Whether the lock is already held by the caller (e.g. set_state).
142+
Prevents deadlock when corruption recovery needs to write defaults.
143+
:type _already_locked: ``bool``
125144
:rtype: ``StateDict``
126145
"""
127146
if not isfile(
128147
state_yaml
129148
): # noqa: PTH113 - isfile is fine and simpler in this case.
130-
return _get_defaults(StateDict) # type: ignore
131-
async with aiofiles.open(state_yaml, mode="r") as yaml_file:
132-
LOG.debug("Loading state from yaml")
133-
content = await yaml_file.read()
134-
state_yaml_payload: StateDict = yaml.safe_load(content)
135-
return state_yaml_payload
149+
return _get_defaults(StateDict)
150+
151+
try:
152+
async with aiofiles.open(state_yaml, mode="r") as yaml_file:
153+
LOG.debug("Loading state from yaml")
154+
content = await yaml_file.read()
155+
156+
state_yaml_payload: StateDict | None = yaml.safe_load(content)
157+
158+
# Handle corrupted/empty YAML files
159+
if state_yaml_payload is None or not isinstance(state_yaml_payload, dict):
160+
LOG.warning(
161+
"State file %s is corrupted or empty, reinitializing with defaults",
162+
state_yaml,
163+
)
164+
defaults = _get_defaults(StateDict)
165+
if _already_locked:
166+
_write_state(state_yaml, defaults)
167+
else:
168+
async with lock:
169+
_write_state(state_yaml, defaults)
170+
return defaults
171+
172+
return state_yaml_payload
173+
174+
except yaml.YAMLError as e:
175+
LOG.error(
176+
"Failed to parse state file %s: %s. Reinitializing with defaults.",
177+
state_yaml,
178+
e,
179+
)
180+
defaults = _get_defaults(StateDict)
181+
if _already_locked:
182+
_write_state(state_yaml, defaults)
183+
else:
184+
async with lock:
185+
_write_state(state_yaml, defaults)
186+
return defaults
136187

137188

138189
async def set_state(
@@ -143,6 +194,7 @@ async def set_state(
143194
),
144195
) -> None:
145196
"""Save state yaml.
197+
146198
:param state_yaml: filename where to read the state
147199
:type state_yaml: ``str``
148200
:param key: Key name
@@ -152,14 +204,11 @@ async def set_state(
152204
:rtype: ``StateDict``
153205
"""
154206
async with lock: # note ic-dev21: on lock le fichier pour être sûr de finir la job
155-
current_state = await get_state(state_yaml) or {}
207+
current_state = await get_state(state_yaml, _already_locked=True) or {}
156208
merged_state: dict[str, Any] = {key: {**current_state.get(key, {}), **state}} # type: ignore[dict-item]
157209
new_state: dict[str, Any] = {**current_state, **merged_state}
158-
async with aiofiles.open(state_yaml, mode="w") as yaml_file:
159-
LOG.debug("Saving state to yaml file")
160-
# TODO: Use asyncio.get_running_loop() and run_in_executor to write
161-
# to the file in a non blocking manner. Currently, the file writes
162-
# are properly async but the yaml dump is done synchronously on the
163-
# main event loop.
164-
content = yaml.dump(new_state)
165-
await yaml_file.write(content)
210+
LOG.debug("Saving state to yaml file")
211+
# TODO: Use asyncio.get_running_loop() and run_in_executor to write
212+
# to the file in a non blocking manner. Currently, yaml.dump is
213+
# synchronous on the main event loop.
214+
_write_state(state_yaml, new_state)

pyhilo/websocket.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,9 @@ async def _async_receive_json(self) -> list[Dict[str, Any]]:
173173
response = await self._client.receive(300)
174174

175175
if response.type in (WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.CLOSING):
176-
LOG.error(f"Websocket: Received event to close connection: {response.type}")
176+
LOG.error(
177+
"Websocket: Received event to close connection: %s", response.type
178+
)
177179
raise ConnectionClosedError("Connection was closed.")
178180

179181
if response.type == WSMsgType.ERROR:
@@ -183,7 +185,7 @@ async def _async_receive_json(self) -> list[Dict[str, Any]]:
183185
raise ConnectionFailedError
184186

185187
if response.type != WSMsgType.TEXT:
186-
LOG.error(f"Websocket: Received invalid message: {response}")
188+
LOG.error("Websocket: Received invalid message: %s", response)
187189
raise InvalidMessageError(f"Received non-text message: {response.type}")
188190

189191
messages: list[Dict[str, Any]] = []
@@ -196,7 +198,7 @@ async def _async_receive_json(self) -> list[Dict[str, Any]]:
196198
except ValueError as v_exc:
197199
raise InvalidMessageError("Received invalid JSON") from v_exc
198200
except json.decoder.JSONDecodeError as j_exc:
199-
LOG.error(f"Received invalid JSON: {msg}")
201+
LOG.error("Received invalid JSON: %s", msg)
200202
LOG.exception(j_exc)
201203
data = {}
202204

@@ -307,14 +309,14 @@ async def async_connect(self) -> None:
307309
**proxy_env,
308310
)
309311
except (ClientError, ServerDisconnectedError, WSServerHandshakeError) as err:
310-
LOG.error(f"Unable to connect to WS server {err}")
312+
LOG.error("Unable to connect to WS server %s", err)
311313
if hasattr(err, "status") and err.status in (401, 403, 404, 409):
312314
raise InvalidCredentialsError("Invalid credentials") from err
313315
except Exception as err:
314-
LOG.error(f"Unable to connect to WS server {err}")
316+
LOG.error("Unable to connect to WS server %s", err)
315317
raise CannotConnectError(err) from err
316318

317-
LOG.info(f"Connected to websocket server {self._api.endpoint}")
319+
LOG.info("Connected to websocket server %s", self._api.endpoint)
318320

319321
# Quick pause to prevent race condition
320322
await asyncio.sleep(0.05)
@@ -353,11 +355,11 @@ async def async_listen(self) -> None:
353355
LOG.info("Websocket: Listen cancelled.")
354356
raise
355357
except ConnectionClosedError as err:
356-
LOG.error(f"Websocket: Closed while listening: {err}")
358+
LOG.error("Websocket: Closed while listening: %s", err)
357359
LOG.exception(err)
358360
pass
359361
except InvalidMessageError as err:
360-
LOG.warning(f"Websocket: Received invalid json : {err}")
362+
LOG.warning("Websocket: Received invalid json : %s", err)
361363
pass
362364
finally:
363365
LOG.info("Websocket: Listen completed; cleaning up")

0 commit comments

Comments
 (0)