-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwebsocket_sender_client.py
46 lines (33 loc) · 1.05 KB
/
websocket_sender_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# websocket_sender_client.py
import asyncio
from uuid import uuid4
import random
from dataclasses import dataclass
from dataplace import ModelIO, Sender, Callback, SpaceStore, Controller
@dataclass(slots=True, frozen=True)
class Data(ModelIO):
id: str
value: int
async def produce(controller: Controller) -> None:
while controller.running:
await controller.async_hold()
await controller.async_callback(
Data(id=str(uuid4()), value=random.randint(0, 9))
)
await asyncio.sleep(1)
def main() -> None:
store = SpaceStore[int, Data](lambda data: data.value, Data)
client = Sender.WebSocket.Client(url="ws://127.0.0.1:5555")
controller = Controller(
callbacks=[
Callback(store.add, types={Data}),
Callback(client.call, types={Data}),
Callback(print, types={Data})
]
)
loop = asyncio.new_event_loop()
loop.create_task(produce(controller))
loop.create_task(client.start())
loop.run_forever()
if __name__ == "__main__":
main()