-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsocket_sender_server.py
46 lines (33 loc) · 1.05 KB
/
socket_sender_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# socket_sender_server.py
import asyncio
import random
from uuid import uuid4
from dataclasses import dataclass
from dataplace import ModelIO, Sender, Controller, Callback, SpaceStore
@dataclass(slots=True, frozen=True)
class Data(ModelIO):
id: str
value: int
async def produce(controller: Controller) -> None:
while controller.running:
await controller.async_hold()
await controller.async_callback(
Data(id=str(uuid4()), value=random.randint(0, 9))
)
await asyncio.sleep(1)
def main() -> None:
store = SpaceStore[int, Data](lambda data: data.value, Data)
server = Sender.Socket.Server(host="127.0.0.1", port=5555)
controller = Controller(
callbacks=[
Callback(store.add, types={Data}),
Callback(server.call, types={Data}),
Callback(print, types={Data})
]
)
loop = asyncio.new_event_loop()
loop.create_task(produce(controller))
loop.create_task(server.start())
loop.run_forever()
if __name__ == "__main__":
main()