forked from henrique-efonseca/Astro-Pi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
223 lines (171 loc) · 6.52 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# Astro Pi Mission Space Lab
# Theme: Life on Earth
# Team: cos(π) - Coding of Space
# Libraries
from sense_hat import SenseHat
from ephem import readtle
from picamera import PiCamera
from gpiozero import CPUTemperature
from time import sleep
from pisense import SenseHAT, array
from colorzero import Color
import logging
from logzero import setup_logger, logger
import os
import datetime
import threading
# Raspberry Pi tools
sh = SenseHat()
hat = SenseHAT()
cam = PiCamera()
# Latest TLE data for ISS location
name = "ISS (ZARYA)"
l1 = "1 25544U 98067A 18327.76881777 .00002477 00000-0 44843-4 0 9999"
l2 = "2 25544 51.6406 303.4674 0005305 77.2314 344.6784 15.54011739143334"
iss = readtle(name, l1, l2)
# Datetime variable to store the start time
start_time = datetime.datetime.now()
# Datetime variable to store the current time
now_time = datetime.datetime.now()
# Sets working directory and names the log files
dir_path = os.path.dirname(os.path.realpath(__file__))
setup_logger('log1', dir_path + '/data01.csv')
setup_logger('log2', dir_path + '/data02.csv')
logger_1 = logging.getLogger('log1')
logger_2 = logging.getLogger('log2')
def security():
"""
Function to get data from the environment on the ISS and compare it to
the referenced values. This is used to check if anyone is near
the AstroPi and to set colors and messages on the Sense Hat
depending on Human Presence.
"""
global start_time, now_time
temperature = round(sh.get_temperature(), 2)
humidity = round(sh.get_humidity(), 2)
pressure = round(sh.get_pressure() * 100, 2)
black = (0, 0, 0)
white = (255, 255, 255)
r = Color('red')
g = Color('green')
red_line = [r, r, r, r, r, r, r, r]
green_line = [g, g, g, g, g, g, g, g]
red_screen = array(red_line * 8)
green_screen = array(green_line * 8)
message = "Temperature: " + \
str(temperature) + " Humidity: " + \
str(humidity) + " Pressure : " + str(pressure)
while now_time < start_time + datetime.timedelta(minutes=174):
if temperature < 18.3 or temperature > 26.7:
hat.screen.fade_to(red_screen)
sleep(3)
sh.show_message("Please step aside from the AstroPi!",
text_colour=white, back_colour=black, scroll_speed=0.04)
# Update the current time
now_time = datetime.datetime.now()
elif humidity < 55 or humidity > 65:
hat.screen.fade_to(red_screen)
sleep(3)
sh.show_message("Please step aside from the AstroPi!",
text_colour=white, back_colour=black, scroll_speed=0.04)
# Update the current time
now_time = datetime.datetime.now()
elif pressure < 97900 or pressure > 102700:
hat.screen.fade_to(red_screen)
sleep(3)
sh.show_message("Please step aside from the AstroPi!",
text_colour=white, back_colour=black, scroll_speed=0.04)
# Update the current time
now_time = datetime.datetime.now()
else:
hat.screen.fade_to(green_screen)
sleep(5)
sh.show_message(message, scroll_speed=0.07)
# Update the current time
now_time = datetime.datetime.now()
def get_Lat_Lon():
"""
Function to get the latitude and longitude values
from the 'ephem' library and write them to EXIF data for the
photographys.
"""
global start_time, now_time
while now_time < start_time + datetime.timedelta(minutes=174):
iss.compute()
long_value = [float(i) for i in str(iss.sublong).split(":")]
if long_value[0] < 0:
long_value[0] = abs(long_value[0])
cam.exif_tags['GPS.GPSLongitudeRef'] = "W"
# Update the current time
now_time = datetime.datetime.now()
else:
cam.exif_tags['GPS.GPSLongitudeRef'] = "E"
# Update the current time
now_time = datetime.datetime.now()
cam.exif_tags['GPS.GPSLongitude'] = '%d/1,%d/1,%d/10' % (
long_value[0], long_value[1], long_value[2] * 10)
lat_value = [float(i) for i in str(iss.sublat).split(":")]
if lat_value[0] < 0:
lat_value[0] = abs(lat_value[0])
cam.exif_tags['GPS.GPSLatitudeRef'] = "S"
# Update the current time
now_time = datetime.datetime.now()
else:
cam.exif_tags['GPS.GPSLatitudeRef'] = "N"
# Update the current time
now_time = datetime.datetime.now()
cam.exif_tags['GPS.GPSLatitude'] = '%d/1,%d/1,%d/10' % (
lat_value[0], lat_value[1], lat_value[2] * 10)
return(str(lat_value), str(long_value))
def photography():
"""
Function to take photographs every 5 seconds
"""
global start_time, now_time
# Variable to name the photographys
photo_counter = 1
while now_time < start_time + datetime.timedelta(minutes=174):
cam.resolution = (1296, 972)
cam.capture(dir_path + '/photo_' +
str(photo_counter).zfill(4) + '.jpg')
photo_counter += 1
sleep(5)
# Update the current time
now_time = datetime.datetime.now()
def data():
"""
Function to read the data gathered by the Raspberry Pi and store it in designated log file
"""
global start_time, now_time
while now_time < start_time + datetime.timedelta(minutes=174):
lat, lon = get_Lat_Lon()
cpu = CPUTemperature()
temperature = round(sh.get_temperature(), 2)
humidity = round(sh.get_humidity(), 2)
pressure = round(sh.get_pressure() * 100, 2)
logger_1.info("Temperature: {}ºC; Humidity: {}%; Pressure: {}Pa; CPU_Temperature: {:.2f}ºC".format(
temperature, humidity, pressure, cpu.temperature))
logger_2.info(" Latitude: {}; Longitude: {}".format(lat, lon))
sleep(5)
# Update the current time
now_time = datetime.datetime.now()
def MainFunction():
"""
Function to run all the other functions in a multi-threading procedure
"""
try:
t1 = threading.Thread(target=security)
t2 = threading.Thread(target=get_Lat_Lon)
t3 = threading.Thread(target=photography)
t4 = threading.Thread(target=data)
t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
except Exception as e:
logger.error("An error occurred: " + str(e))
MainFunction()