|
10 | 10 |
|
11 | 11 | from pyoverkiz._case import decamelize |
12 | 12 | from pyoverkiz.converter import converter |
13 | | -from pyoverkiz.enums import DataType, Protocol |
| 13 | +from pyoverkiz.enums import DataType, EventName, ExecutionState, FailureType, Protocol |
14 | 14 | from pyoverkiz.models import ( |
15 | 15 | CommandDefinitions, |
16 | 16 | Definition, |
17 | 17 | Device, |
| 18 | + Event, |
18 | 19 | EventState, |
19 | 20 | Setup, |
20 | 21 | State, |
@@ -723,3 +724,95 @@ def test_action_to_payload_and_parameters_conversion(): |
723 | 724 | assert payload["commands"][0]["name"] == "setLevel" |
724 | 725 | assert payload["commands"][0]["type"] == 1 |
725 | 726 | assert payload["commands"][0]["parameters"] == [10, "A"] |
| 727 | + |
| 728 | + |
| 729 | +class TestEvent: |
| 730 | + """Tests for Event structuring via the cattrs converter.""" |
| 731 | + |
| 732 | + def test_execution_state_changed_event(self): |
| 733 | + """Optional[Enum] fields (old_state, new_state) are structured into enums.""" |
| 734 | + raw = decamelize( |
| 735 | + { |
| 736 | + "timestamp": 1631130760744, |
| 737 | + "setupOID": "741bc89f-a47b-4ad6-894d-a785c06956c2", |
| 738 | + "execId": "c6f83624-ac10-3e01-653e-2b025fee956d", |
| 739 | + "newState": "IN_PROGRESS", |
| 740 | + "ownerKey": "741bc89f-a47b-4ad6-894d-a785c06956c2", |
| 741 | + "type": 1, |
| 742 | + "subType": 1, |
| 743 | + "oldState": "TRANSMITTED", |
| 744 | + "timeToNextState": 0, |
| 745 | + "name": "ExecutionStateChangedEvent", |
| 746 | + } |
| 747 | + ) |
| 748 | + event = converter.structure(raw, Event) |
| 749 | + |
| 750 | + assert event.name == EventName.EXECUTION_STATE_CHANGED |
| 751 | + assert event.old_state is ExecutionState.TRANSMITTED |
| 752 | + assert event.new_state is ExecutionState.IN_PROGRESS |
| 753 | + assert event.setup_oid == "741bc89f-a47b-4ad6-894d-a785c06956c2" |
| 754 | + |
| 755 | + def test_failure_type_code_structured_as_enum(self): |
| 756 | + """FailureType | None field is structured into an enum instance.""" |
| 757 | + raw = decamelize( |
| 758 | + { |
| 759 | + "name": "ExecutionStateChangedEvent", |
| 760 | + "timestamp": 123, |
| 761 | + "failureTypeCode": 0, |
| 762 | + } |
| 763 | + ) |
| 764 | + event = converter.structure(raw, Event) |
| 765 | + |
| 766 | + assert isinstance(event.failure_type_code, FailureType) |
| 767 | + assert event.failure_type_code is FailureType.NO_FAILURE |
| 768 | + |
| 769 | + def test_optional_enum_fields_none_when_absent(self): |
| 770 | + """Optional enum fields default to None when not present in the payload.""" |
| 771 | + raw = decamelize( |
| 772 | + { |
| 773 | + "name": "GatewaySynchronizationEndedEvent", |
| 774 | + "timestamp": 1631130645998, |
| 775 | + "gatewayId": "9876-1234-8767", |
| 776 | + } |
| 777 | + ) |
| 778 | + event = converter.structure(raw, Event) |
| 779 | + |
| 780 | + assert event.old_state is None |
| 781 | + assert event.new_state is None |
| 782 | + assert event.failure_type_code is None |
| 783 | + |
| 784 | + def test_device_state_changed_event_with_states(self): |
| 785 | + """DeviceStateChangedEvent payload structures device_states as EventState.""" |
| 786 | + raw = decamelize( |
| 787 | + { |
| 788 | + "timestamp": 1631130646544, |
| 789 | + "setupOID": "741bc89f-a47b-4ad6-894d-a785c06956c2", |
| 790 | + "deviceURL": "io://9876-1234-8767/4468654#1", |
| 791 | + "deviceStates": [ |
| 792 | + { |
| 793 | + "name": "core:ElectricEnergyConsumptionState", |
| 794 | + "type": 1, |
| 795 | + "value": "23247220", |
| 796 | + } |
| 797 | + ], |
| 798 | + "name": "DeviceStateChangedEvent", |
| 799 | + } |
| 800 | + ) |
| 801 | + event = converter.structure(raw, Event) |
| 802 | + |
| 803 | + assert event.name == EventName.DEVICE_STATE_CHANGED |
| 804 | + assert len(event.device_states) == 1 |
| 805 | + assert isinstance(event.device_states[0], EventState) |
| 806 | + |
| 807 | + def test_event_fixture_structures_all_events(self): |
| 808 | + """All events in the cloud fixture file structure without errors.""" |
| 809 | + raw_events = json.loads((Path("tests/fixtures/event/events.json")).read_text()) |
| 810 | + events = [converter.structure(decamelize(e), Event) for e in raw_events] |
| 811 | + |
| 812 | + assert len(events) == len(raw_events) |
| 813 | + state_changed = [ |
| 814 | + e for e in events if e.name == EventName.EXECUTION_STATE_CHANGED |
| 815 | + ] |
| 816 | + for e in state_changed: |
| 817 | + assert isinstance(e.old_state, ExecutionState) |
| 818 | + assert isinstance(e.new_state, ExecutionState) |
0 commit comments