forked from designer-living/pyblustream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample.py
90 lines (68 loc) · 2.66 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import asyncio
import logging
from pyblustream.listener import SourceChangeListener
from pyblustream.matrix import Matrix
class MyListener(SourceChangeListener):
def __init__(self, connected_event: asyncio.Event = None):
self._connected_event = connected_event
def source_changed(self, output_id, input_id):
# Your code to run when the source changes
print(f"Source Changed Output {output_id}, input {input_id}")
pass
def power_changed(self, power: bool):
# Your code to run when the power changes
print(f"Power changed: {power} ")
pass
def connected(self):
# Your code to run on a successful connection to the matrix
# For now we set event to say the matrix is connected
if self._connected_event is not None:
self._connected_event.set()
print("Connected")
def disconnected(self):
# Your code to run when disconnected from the matrix
# Note: the library will try to reconnect, so you don't need to
print("Disconnected")
async def stop_in(seconds, event: asyncio.Event):
await asyncio.sleep(seconds)
event.set()
async def main():
# Set to your details
ip = "192.168.1.160"
port = 23
# Create a matrix
matrix = Matrix(ip, port)
# Register a listener so you can handle state changes
connected_event = asyncio.Event()
matrix.register_listener(MyListener(connected_event))
# You always need to connect to the matrix - best to do this after
# adding your listener to avoid missing the initial status that is returned on start up
# Either
#matrix.connect()
#await connected_event.wait()
# Or the async way
await matrix.async_connect()
print(matrix.output_names)
print(matrix.input_names)
print(matrix.mac)
print(matrix.device_name)
print(matrix.firmware_version)
await asyncio.sleep(20000)
# Programmatically change the source for output 2 to input 3.
matrix.change_source(input_id=6, output_id=5)
print("--- All Outputs --- ")
all_outputs = matrix.status_of_all_outputs()
print(all_outputs)
print("--- Input for Zone 1 --- ")
input_for_zone_one = matrix.status_of_output(output_id=1)
print(input_for_zone_one)
# Force the matrix to refresh its status
# This is done automatically on startup/reconnect, so you shouldn't need to do this
matrix.update_status()
await asyncio.sleep(1)
matrix.close()
await asyncio.sleep(1)
print("Done")
if __name__ == '__main__':
logging.basicConfig(level='INFO')
asyncio.run(main())