Skip to content

Commit

Permalink
Adding support for water_sensor (#68)
Browse files Browse the repository at this point in the history
* water_sensor capability

* Create water_sensor.py

* Create test_water_sensor.py

* Update hub.py

* Update water_sensor.py

* Update test_water_sensor.py

* Update test_water_sensor.py

* Update test_water_sensor.py

* Update test_water_sensor.py

* Update test_water_sensor.py

* Update test_water_sensor.py

* Update test_water_sensor.py
  • Loading branch information
sanjoyg authored Apr 18, 2024
1 parent 0716bd3 commit 895523e
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 0 deletions.
29 changes: 29 additions & 0 deletions src/dirigera/devices/water_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from __future__ import annotations
from typing import Any, Dict
from .device import Attributes, Device
from ..hub.abstract_smart_home_hub import AbstractSmartHomeHub

class WaterSensorAttributes(Attributes):
battery_percentage: int
water_leak_detected: bool

class WaterSensor(Device):
dirigera_client: AbstractSmartHomeHub
attributes: WaterSensorAttributes

def reload(self) -> WaterSensor:
data = self.dirigera_client.get(route=f"/devices/{self.id}")
return WaterSensor(dirigeraClient=self.dirigera_client, **data)

def set_name(self, name: str) -> None:
if "customName" not in self.capabilities.can_receive:
raise AssertionError("This sensor does not support the set_name function")

data = [{"attributes": {"customName": name}}]
self.dirigera_client.patch(route=f"/devices/{self.id}", data=data)
self.attributes.custom_name = name

def dict_to_water_sensor(
data: Dict[str, Any], dirigera_client: AbstractSmartHomeHub
) -> WaterSensor:
return WaterSensor(dirigeraClient=dirigera_client, **data)
19 changes: 19 additions & 0 deletions src/dirigera/hub/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ..devices.motion_sensor import MotionSensor, dict_to_motion_sensor
from ..devices.open_close_sensor import OpenCloseSensor, dict_to_open_close_sensor
from ..devices.scene import Action, Info, Scene, SceneType, Trigger, dict_to_scene
from ..devices.water_sensor import WaterSensor, dict_to_water_sensor

urllib3.disable_warnings(category=InsecureRequestWarning)

Expand Down Expand Up @@ -342,6 +343,24 @@ def get_scene_by_id(self, scene_id: str) -> Scene:
data = self.get(f"/scenes/{scene_id}")
return dict_to_scene(data, self)

def get_water_sensors(self) -> List[WaterSensor]:
"""
Fetches all water sensors registered in the Hub
"""
devices = self.get("/devices")
water_sensors = list(filter(lambda x: x["type"] == "waterSensor", devices))
return [dict_to_water_sensor(water_sensor, self) for water_sensor in water_sensors]

def get_water_sensor_by_id(self, id_: str) -> WaterSensor:
"""
Fetches a water sensor by its id
if that water sensors does not exist or is a device of another type raises ValueError
"""
water_sensor = self._get_device_data_by_id(id_)
if water_sensor["type"] != "waterSensor":
raise ValueError("Device is not a WaterSensor")
return dict_to_water_sensor(water_sensor, self)

def get_all_devices(self) -> List[Device]:
"""
Fetches all devices registered in the Hub
Expand Down
116 changes: 116 additions & 0 deletions tests/test_water_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from typing import Dict
import pytest
from src.dirigera.hub.abstract_smart_home_hub import FakeDirigeraHub
from src.dirigera.devices.water_sensor import dict_to_water_sensor
from src.dirigera.devices.water_sensor import WaterSensor

@pytest.fixture(name="fake_client")
def fixture_fake_client() -> FakeDirigeraHub:
return FakeDirigeraHub()


@pytest.fixture(name="fake_water_sensor_dict")
def fixture_water_sensor_dict() -> Dict:
return {
"id": "2b107b0b-73f0-4809-a900-4783273d7104_1",
"type": "sensor",
"deviceType": "waterSensor",
"createdAt": "2024-04-17T12:19:50.000Z",
"isReachable": True,
"lastSeen": "2024-04-17T12:34:42.000Z",
"attributes": {
"customName": "Watermelder",
"firmwareVersion": "1.0.7",
"hardwareVersion": "1",
"manufacturer": "IKEA of Sweden",
"model": "BADRING Water Leakage Sensor",
"productCode": "E2202",
"serialNumber": "3410F4FFFE8F815D",
"batteryPercentage": 100,
"waterLeakDetected": True,
"permittingJoin": False,
"otaPolicy": "autoUpdate",
"otaProgress": 0,
"otaScheduleEnd": "00:00",
"otaScheduleStart": "00:00",
"otaState": "readyToCheck",
"otaStatus": "upToDate"
},
"capabilities": {
"canSend": [],
"canReceive": [
"customName"
]
},
"room": {
"id": "f1743e4c-3a87-4f6b-90a4-3e915b8ed753",
"name": "Zolder",
"color": "ikea_pink_no_8",
"icon": "rooms_washing_machine"
},
"deviceSet": [],
"remoteLinks": [],
"isHidden": False
}


@pytest.fixture(name="fake_water_sensor")
def fixture_water_sensor(fake_water_sensor_dict: Dict, fake_client: FakeDirigeraHub) -> WaterSensor:
return WaterSensor(dirigeraClient=fake_client, **fake_water_sensor_dict)

def test_set_name(fake_water_sensor: WaterSensor, fake_client: FakeDirigeraHub) -> None:
new_name = "teapot"
fake_water_sensor.set_name(new_name)
action = fake_client.patch_actions.pop()
assert action["route"] == f"/devices/{fake_water_sensor.id}"
assert action["data"] == [{"attributes": {"customName": new_name}}]
assert fake_water_sensor.attributes.custom_name == new_name

def test_dict_to_water_sensor(fake_client: FakeDirigeraHub) -> None:
data = {
"id": "2b107b0b-73f0-4809-a900-4783273d7104_1",
"type": "sensor",
"deviceType": "waterSensor",
"createdAt": "2024-04-17T12:19:50.000Z",
"isReachable": True,
"lastSeen": "2024-04-17T12:34:42.000Z",
"attributes": {
"customName": "Watermelder",
"firmwareVersion": "1.0.7",
"hardwareVersion": "1",
"manufacturer": "IKEA of Sweden",
"model": "BADRING Water Leakage Sensor",
"productCode": "E2202",
"serialNumber": "3410F4FFFE8F815D",
"batteryPercentage": 100,
"waterLeakDetected": True,
"permittingJoin": False,
"otaPolicy": "autoUpdate",
"otaProgress": 0,
"otaScheduleEnd": "00:00",
"otaScheduleStart": "00:00",
"otaState": "readyToCheck",
"otaStatus": "upToDate"
},
"capabilities": {
"canSend": [],
"canReceive": [
"customName"
]
},
"room": {
"id": "f1743e4c-3a87-4f6b-90a4-3e915b8ed753",
"name": "Zolder",
"color": "ikea_pink_no_8",
"icon": "rooms_washing_machine"
},
"deviceSet": [],
"remoteLinks": [],
"isHidden": False
}
water_sensor = dict_to_water_sensor(data, fake_client)
assert water_sensor.dirigera_client == fake_client
assert water_sensor.id == data["id"]
assert water_sensor.is_reachable == data["isReachable"]
assert water_sensor.attributes.battery_percentage == 100
assert water_sensor.attributes.water_leak_detected

0 comments on commit 895523e

Please sign in to comment.