forked from TomerFi/home-assistant-custom-components
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbroadlink_s1c.py
312 lines (263 loc) · 12.4 KB
/
broadlink_s1c.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
"""////////////////////////////////////////////////////////////////////////////////////////////////
Home Assistant Custom Component for Broadlink S1C Alarm kit integration as a Sensor platform.
Build by TomerFi
Please visit https://github.com/TomerFi/home-assistant-custom-components for more custom components
if error occures, raise the log level to debug mode and analyze the logs:
custom_components.sensor.broadlink_s1c: debug
installation notes:
place this file in the following folder and restart home assistant:
/config/custom_components/sensor
yaml configuration example:
sensor:
- platform: broadlink_s1c
ip_address: "xxx.xxx.xxx.xxx" # set your s1c hub local ip address
mac: "XX:XX:XX:XX:XX:XX" # set your s1c hub mac address
////////////////////////////////////////////////////////////////////////////////////////////////"""
import binascii
import socket
import datetime
import logging
import asyncio
import traceback
import json
import threading
import voluptuous as vol
from homeassistant.helpers.entity import Entity
import homeassistant.helpers.config_validation as cv
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (CONF_IP_ADDRESS, CONF_MAC, CONF_TIMEOUT, STATE_UNKNOWN, STATE_OPEN, STATE_CLOSED,
EVENT_HOMEASSISTANT_STOP, STATE_ALARM_DISARMED, STATE_ALARM_ARMED_HOME, STATE_ALARM_ARMED_AWAY)
from homeassistant.util.dt import now
"""current broadlink moudle in ha is of version 0.5 which doesn't supports s1c hubs, usuing version 0.6 from github"""
# REQUIREMENTS = ['https://github.com/mjg59/python-broadlink/archive/master.zip#broadlink==0.6']
"""one of the broadlink 0.6 requirements is the pycrypto library which is blocked ever since HA 0.64.0, my forked repository of python-broadlink is working with its replacement pycryptodome"""
# REQUIREMENTS = ['https://github.com/TomerFi/python-broadlink/archive/master.zip#broadlink==0.6']
"""home assistant 0.67.1 is using broadlink 0.8 so there is no need for special requirements"""
REQUIREMENTS = []
_LOGGER = logging.getLogger(__name__)
"""platform specifics"""
DOMAIN = 'sensor'
ENTITY_ID_FORMAT = DOMAIN + '.broadlink_s1c_{}'
DEFAULT_TIMEOUT = 10
"""additional states that doesn't exists in homeassistant.const"""
STATE_NO_MOTION = "no_motion"
STATE_MOTION_DETECTED = "motion_detected"
STATE_TAMPERED = "tampered"
STATE_ALARM_SOS = "sos"
"""sensor update event details"""
UPDATE_EVENT = "BROADLINK_S1C_SENSOR_UPDATE"
EVENT_PROPERTY_NAME = "name"
EVENT_PROPERTY_STATE = "state"
"""sensor types and icons"""
SENSOR_TYPE_DOOR_SENSOR = "Door Sensor"
SENSOR_TYPE_DOOR_SENSOR_ICON = "mdi:door"
SENSOR_TYPE_MOTION_SENSOR = "Motion Sensor"
SENSOR_TYPE_MOTION_SENSOR_ICON = "mdi:walk"
SENSOR_TYPE_KEY_FOB = "Key Fob"
SENSOR_TYPE_KEY_FOB_ICON = "mdi:remote"
SENSOR_DEFAULT_ICON = "mdi:security-home"
"""platform configuration schema"""
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_IP_ADDRESS): cv.string,
vol.Required(CONF_MAC): cv.string,
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int
})
"""set up broadlink s1c platform"""
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
_LOGGER.debug("starting platform setup")
"""get configuration params"""
ip_address = config.get(CONF_IP_ADDRESS)
mac = config.get(CONF_MAC).encode().replace(b':', b'')
mac_addr = binascii.unhexlify(mac)
timeout = config.get(CONF_TIMEOUT)
"""initiate connection to s1c hub"""
conn_obj = HubConnection(ip_address, mac_addr, timeout)
"""discovering the sensors and initiating entities"""
raw_data = conn_obj.get_initial_data()
sensors = []
for i, sensor in enumerate(raw_data["sensors"]):
sensors.append(S1C_SENSOR(hass, sensor["name"], sensor["type"], conn_obj.parse_status(sensor["type"], str(sensor["status"])), now()))
if sensors:
async_add_devices(sensors, True)
"""starting the sensors status change watcher"""
WatchSensors(hass, conn_obj).start()
return True
class S1C_SENSOR(Entity):
"""representation of the sensor entity"""
def __init__(self, hass, name, sensor_type, status, last_changed):
"""initialize the sensor entity"""
self.entity_id = ENTITY_ID_FORMAT.format(name.replace(' ', '_').replace('-', '_').lower())
self._hass = hass
self._name = name
self._sensor_type = sensor_type
self._state = status
self._last_changed = last_changed
"""registering entity for event listenting"""
hass.bus.async_listen(UPDATE_EVENT, self.async_event_listener)
_LOGGER.debug(self._name + " initiated")
@property
def name(self):
"""friendly name"""
return self._name
@property
def should_poll(self):
"""entity should be polled for updates"""
return False
@property
def state(self):
"""sensor state"""
return self._state
@property
def icon(self):
"""sensor icon"""
if (self._sensor_type == SENSOR_TYPE_DOOR_SENSOR):
return SENSOR_TYPE_DOOR_SENSOR_ICON
elif (self._sensor_type == SENSOR_TYPE_KEY_FOB):
return SENSOR_TYPE_KEY_FOB_ICON
elif (self._sensor_type == SENSOR_TYPE_MOTION_SENSOR):
return SENSOR_TYPE_MOTION_SENSOR_ICON
else:
return SENSOR_DEFAULT_ICON
@property
def device_state_attributes(self):
"""sensor state attributes"""
return {
"sensor_type": self._sensor_type,
"last_changed": self._last_changed
}
@asyncio.coroutine
def async_event_listener(self, event):
"""handling incoming events and update ha state"""
if (event.data.get(EVENT_PROPERTY_NAME) == self._name):
_LOGGER.debug(self._name + " received " + UPDATE_EVENT)
self._state = event.data.get(EVENT_PROPERTY_STATE)
self._last_changed = event.time_fired
yield from self.async_update_ha_state()
class HubConnection(object):
"""s1c hub connection and utility class"""
def __init__(self, ip_addr, mac_addr, timeout):
"""initialize the connection object"""
import broadlink
self._hub = broadlink.S1C((ip_addr, 80), mac_addr, None)
self._hub.timeout = timeout
self._authorized = self.authorize()
if (self._authorized):
_LOGGER.info("succesfully connected to s1c hub")
self._initial_data = self._hub.get_sensors_status()
else:
_LOGGER.error("failed to connect s1c hub, not authorized. please fix the problem and restart the system")
self._initial_data = None
def authorize(self, retry=3):
"""authorize connection to s1c hub"""
try:
auth = self._hub.auth()
except socket.timeout:
auth = False
if not auth and retry > 0:
return self.authorize(retry-1)
return auth
def get_initial_data(self):
"""return initial data for discovery"""
return self._initial_data
def get_hub_connection(self):
"""return the connection object"""
return self._hub
def parse_status(self, sensor_type, sensor_status):
"""parse sensors status"""
if sensor_type == SENSOR_TYPE_DOOR_SENSOR and sensor_status in ("0", "128"):
return STATE_CLOSED
elif sensor_type == SENSOR_TYPE_DOOR_SENSOR and sensor_status in ("16", "144"):
return STATE_OPEN
elif sensor_type == SENSOR_TYPE_DOOR_SENSOR and sensor_status == "48":
return STATE_TAMPERED
elif sensor_type == SENSOR_TYPE_MOTION_SENSOR and sensor_status in ("0", "128"):
return STATE_NO_MOTION
elif sensor_type == SENSOR_TYPE_MOTION_SENSOR and sensor_status == "16":
return STATE_MOTION_DETECTED
elif sensor_type == SENSOR_TYPE_MOTION_SENSOR and sensor_status == "32":
return STATE_TAMPERED
elif sensor_type == SENSOR_TYPE_KEY_FOB and sensor_status == "16":
return STATE_ALARM_DISARMED
elif sensor_type == SENSOR_TYPE_KEY_FOB and sensor_status == "32":
return STATE_ALARM_ARMED_AWAY
elif sensor_type == SENSOR_TYPE_KEY_FOB and sensor_status == "64":
return STATE_ALARM_ARMED_HOME
elif sensor_type == SENSOR_TYPE_KEY_FOB and sensor_status in ("0", "128"):
return STATE_ALARM_SOS
else:
_LOGGER.debug("unknow status " + sensor_status + "for type " + sensor_type)
return STATE_UNKNOWN
class WatchSensors(threading.Thread):
"""sensor status change watcher class"""
def __init__(self, hass, conn_obj):
threading.Thread.__init__(self)
"""initialize the watcher"""
self._hass = hass
self._ok_to_run = False
self._conn_obj = conn_obj
self._last_exception_dt = None
self._exception_count = 0
if (self._conn_obj._authorized):
self._ok_to_run = True
self._hub = self._conn_obj.get_hub_connection()
def run(self):
"""register stop function for event listening"""
self._hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self.stop)
"""get initial sensors data"""
if not (self._conn_obj.get_initial_data() is None):
old_status = self._conn_obj.get_initial_data()
else:
old_status = self._hub.get_sensors_status()
"""start watcher loop"""
_LOGGER.info("starting sensors watch")
while self._ok_to_run:
try:
current_status = self._hub.get_sensors_status()
for i, sensor in enumerate(current_status["sensors"]):
current_fixed_status = self._conn_obj.parse_status(sensor["type"], str(sensor["status"]))
previous_fixed_status = self._conn_obj.parse_status(old_status["sensors"][i]["type"], str(old_status["sensors"][i]["status"]))
if not (current_fixed_status == previous_fixed_status):
_LOGGER.debug("status change tracked from: " + json.dumps(old_status["sensors"][i]))
_LOGGER.debug("status change tracked to: " + json.dumps(sensor))
self.launch_state_change_event(sensor["name"], current_fixed_status)
old_status = current_status
except:
_LOGGER.warning("exception while getting sensors status: " + traceback.format_exc())
self.check_loop_run()
continue
_LOGGER.info("sensors watch done")
def check_loop_run(self):
"""max exceptions allowed in loop before exiting"""
max_exceptions_before_stop = 50
"""max minutes to remmember the last excption"""
max_minutes_from_last_exception = 1
current_dt = now()
if not (self._last_exception_dt is None):
if (self._last_exception_dt.year == current_dt.year and self._last_exception_dt.month == current_dt.month and self._last_exception_dt.day == current_dt.day):
calc_dt = current_dt - self._last_exception_dt
diff = divmod(calc_dt.days * 86400 + calc_dt.seconds, 60)
if (diff[0] > max_minutes_from_last_exception):
self._exception_count = 0
else:
self._exception_count += 1
else:
self._exception_count = 0
else:
self._exception_count = 0
if not (max_exceptions_before_stop > self._exception_count):
_LOGGER.error("max exceptions allowed in watch loop exceeded, stoping watch loop")
self._ok_to_run = False
self._last_exception_dt = current_dt
def stop(self, event):
"""handle stop request for events"""
_LOGGER.debug("received :" + event.event_type)
self._ok_to_run = False
def launch_state_change_event(self, name, status):
"""launch events for state changes"""
_LOGGER.debug("launching event for " + name + "for state changed to " + status)
self._hass.bus.fire(UPDATE_EVENT,
{
EVENT_PROPERTY_NAME: name,
EVENT_PROPERTY_STATE: status
})