forked from Panizghi/HACK-THE-NORTH-2022
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
119 lines (88 loc) · 3.57 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from doctest import master
from fastapi import FastAPI, Request, WebSocket
from fastapi.responses import JSONResponse
import uvicorn
import json
import time
class MasterControl:
def __init__(self):
self.vector = {"direction": "hover", "magnitude": 0.0, "prediction": None}
self.speech_vector = {"direction": "hover", "magnitude": 0.0, "prediction": None}
self.is_opencv = False
def update_vector(self, vector):
self.vector = vector
def update_speech_vector(self, speech_vector):
self.speech_vector = speech_vector
def set_opencv_true(self):
self.is_opencv = True
def set_opencv_false(self):
self.is_opencv = False
def get_is_opencv(self):
return self.is_opencv
def reset_vector(self):
self.vector["direction"] = "hover"
self.vector["magnitude"] = 0.0
self.vector["prediction"] = None
self.speech_vector["direction"] = "hover"
self.speech_vector["magnitude"] = 0.0
self.speech_vector["prediction"] = None
def get_vector(self):
if self.is_opencv:
return self.vector
else:
return self.speech_vector
master_control_obj = MasterControl()
app = FastAPI()
@app.get("/")
async def home():
data = {"data": "The world's first hands-free, multi-purpose drone."}
return JSONResponse(content=data)
@app.websocket("/ws/get_drone_motor_sdk_command")
async def ws_get_drone_motor_sdk_command(websocket: WebSocket):
await websocket.accept()
while True:
data = {"data": "go_straight"}
client_data = await websocket.receive_text()
client_data_json = json.loads(client_data)
data["data"]["clientMsg"] = client_data_json
await websocket.send_text(json.dumps(data))
@app.websocket("/ws/speech_socket")
async def ws_speech_socket(websocket: WebSocket):
await websocket.accept()
while True:
data = {"data": {"status":"speech is received", "clientMsg": ""}}
client_data = await websocket.receive_text()
client_data_json = json.loads(client_data)
data["data"]["clientMsg"] = client_data_json
master_control_obj.update_speech_vector(client_data_json)
await websocket.send_text(json.dumps(data))
@app.websocket("/ws/opencv_socket")
async def ws_opencv_socket(websocket: WebSocket):
await websocket.accept()
while True:
data = {"data": {"status":"opencv is received", "clientMsg": ""}}
client_data = await websocket.receive_text()
client_data_json = json.loads(client_data)
data["data"]["clientMsg"] = client_data_json
print(data)
master_control_obj.update_vector(data["data"]["clientMsg"])
print(client_data_json["predictedClass"])
if client_data_json["predictedClass"] is None:
print("NOOOOOOOOOOOOONE!")
master_control_obj.set_opencv_false()
else:
master_control_obj.set_opencv_true()
await websocket.send_text(json.dumps(data))
@app.websocket("/ws/sdk_controller_socket")
async def ws_controller_socket(websocket: WebSocket):
await websocket.accept()
while True:
time.sleep(0.1)
data = {"data": {"status": "SDK controller is received", "clientMsg": "", "commandVector": master_control_obj.get_vector()}}
client_data = await websocket.receive_text()
client_data_json = json.loads(client_data)
data["data"]["clientMsg"] = client_data_json
await websocket.send_text(json.dumps(data))
if __name__ == "__main__":
print("new")
uvicorn.run(app, host="0.0.0.0", port=8000)