-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample.py
333 lines (274 loc) · 12 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
import random
import time
from pathlib import Path
import shapely
import skimage
import numpy as np
import seamless_seg
from PIL import Image
def test_overlap_trim():
geom_a = shapely.box(0, 0, 100, 100)
geom_b = shapely.box(0, 60, 100, 160)
out_geom_a, _, _, _, _ = seamless_seg.overlap_weights(geom_a, [geom_b])
out_geom_b, _, _, _, _ = seamless_seg.overlap_weights(geom_b, [geom_a])
print(shapely.area(shapely.intersection(out_geom_a, out_geom_b)))
# is 40*100 = 4000
out_geom_a, _, _, _, _ = seamless_seg.overlap_weights(geom_a, [geom_b], (0, 0, None, -20))
out_geom_b, _, _, _, _ = seamless_seg.overlap_weights(geom_b, [geom_a], (0, 20, None, None))
print(shapely.area(shapely.intersection(out_geom_a, out_geom_b)))
def show_overlap_weights_regular():
np.random.seed(123459)
# Creating test data
central_geom = shapely.box(200, 300, 500, 800)
# Regular grid
boxes = [
shapely.box(-50, -150, 250, 350), #tl
shapely.box(450, -150, 750, 350), #bl
shapely.box(-50, 750, 250, 1250), #tr
shapely.box(450, 750, 750, 1250), #br
shapely.box(200, -150, 500, 350), # l
shapely.box(-50, 300, 250, 800), # t
shapely.box(200, 750, 500, 1250), # r
shapely.box(450, 300, 750, 800), # b
]
central_tile = np.ones((300, 500, 3), dtype=np.uint8)
central_tile *= np.random.randint(20, 255, (3,), dtype=np.uint8)
N = len(boxes)
nearby_tiles = []
for i in range(N):
nearby_tiles.append(np.ones((300, 500, 3), dtype=np.uint8))
nearby_tiles[i] *= np.random.randint(20, 255, (3,), dtype=np.uint8)
# Run seamless_seg to calculate new central tile
weights = seamless_seg.overlap_weights(central_geom, boxes)
out = seamless_seg.apply_weights(central_tile, nearby_tiles, weights)
# Printing output for inspection
Image.fromarray(out.astype(np.uint8)).save('test_overlap_weights_regular.png')
def show_overlap_weights_irregular():
np.random.seed(123459)
# Creating test data
# Irregular
central_geom = shapely.box(200, 300, 500, 800)
boxes = [
shapely.box(150, 150, 450, 650),
shapely.box(300, 200, 600, 700),
shapely.box(100, 500, 400, 1000),
shapely.box(450, 650, 750, 1150),
]
central_tile = np.ones((300, 500, 3), dtype=np.uint8)
central_tile *= np.random.randint(20, 255, (3,), dtype=np.uint8)
N = len(boxes)
nearby_tiles = []
for i in range(N):
nearby_tiles.append(np.ones((300, 500, 3), dtype=np.uint8))
nearby_tiles[i] *= np.random.randint(20, 255, (3,), dtype=np.uint8)
# Run seamless_seg to calculate new central tile
weights = seamless_seg.overlap_weights(central_geom, boxes)
out = seamless_seg.apply_weights(central_tile, nearby_tiles, weights)
# Printing output for inspection
Image.fromarray(out.astype(np.uint8)).save('test_overlap_weights_irregular.png')
def _random_tile_gen(shape, length=None):
# Generates infinite fake tiles, where each tile is a single, randomly selected colour
count = 0
while True:
tile = np.ones(shape, dtype=np.uint8)
tile *= np.random.randint(20, 255, (shape[-1],), dtype=np.uint8)
yield tile
count += 1
if length is not None and count >= length:
break
def minimal_random_colour_grid(image_size, tile_size, overlap):
def _input_generator(plan):
shape = (*tile_size, 3)
for index, geom in seamless_seg.get_plan_input_geoms(plan):
# Creating fake data; in real use cases, should yield image data from within geom
# Note: geom is a shapely.Geometry
tile = np.ones(shape, dtype=np.uint8)
tile *= np.random.randint(20, 255, (3,), dtype=np.uint8)
yield tile
# Iterate over output tiles; in this case, write directly to a np array
# But in real use cases, you can write the tile to disk (e.g. rasterio/tifffile)
plan, grid = seamless_seg.plan_regular_grid(image_size, tile_size, overlap)
in_tiles = _input_generator(plan)
out_img = np.zeros((*image_size, 3))
for index, out_geom, out_tile in seamless_seg.run_plan(plan, in_tiles):
y_slc, x_slc = seamless_seg.shape_to_slices(out_geom)
out_img[y_slc, x_slc] = out_tile
# All done!
def test_coerce_grid_corrupt():
np.random.seed(2342352)
image_size = (1024, 1024)
image_shape = (*image_size, 3)
tile_size = (48, 52)
tile_shape = (*tile_size, 3)
# Clean version, no corruption on grid, normal regular grid
area = shapely.Polygon([[540, 125], [180, 690], [730, 565]])
grid = seamless_seg.regular_grid(image_size, tile_size, (10, 20), area)
plan = seamless_seg.plan_from_grid(grid)
ingeoms = seamless_seg.get_plan_input_geoms(plan)
in_tiles = [tile for tile in _random_tile_gen(tile_shape, len(ingeoms))]
out_img_clean = np.zeros(image_shape)
for index, out_geom, out_tile in seamless_seg.run_plan(plan, iter(in_tiles)):
y_slc, x_slc = seamless_seg.shape_to_slices(out_geom)
out_img_clean[y_slc, x_slc] = out_tile
vis_folder = Path('vis')
vis_folder.mkdir(exist_ok=True)
Image.fromarray(out_img_clean.astype(np.uint8)).save(vis_folder / 'clean_grid.png')
# Corrupt grid: offset cells in the middle slightly
grid_central = grid[1:-1, 1:-1]
grid_np_coords = shapely.get_coordinates(grid_central)
offset = np.random.randint(-1, 1, (grid_np_coords.shape[0]//5, 2))
for i in range(5):
grid_np_coords[i::5] += offset
shapely.set_coordinates(grid_central, grid_np_coords)
grid[1:-1, 1:-1] = grid_central
# Corrupt grid: flatten, randomise order
grid_list = list(grid.flatten())
random.shuffle(grid_list)
grid_np_flat = np.array(grid_list)
# Now make it work with seamless_seg
boundss = np.array([cell.bounds for cell in grid_np_flat if cell is not None])
coerced_grid, flat_to_grid_map = seamless_seg.coerce_to_grid(boundss)
plan = seamless_seg.plan_from_grid(coerced_grid, margin=(3, 8))
out_img_corrupt = np.zeros(image_shape)
for index, out_geom, out_tile in seamless_seg.run_plan(plan, iter(in_tiles)):
y_slc, x_slc = seamless_seg.shape_to_slices(out_geom)
out_img_corrupt[y_slc, x_slc] = out_tile
vis_folder = Path('vis')
vis_folder.mkdir(exist_ok=True)
Image.fromarray(out_img_corrupt.astype(np.uint8)).save(vis_folder / 'corrupt_grid.png')
def random_colour_grid(
image_size,
tile_size,
overlap,
area=None,
actually_run=False,
fname='out_img.png'
):
np.random.seed(123459)
draw_area = area is not None
print(f'For an image sized {image_size}, with tile {tile_size} and overlap {overlap}')
start = time.perf_counter()
plan, grid = seamless_seg.plan_regular_grid(image_size, tile_size, overlap, area)
end = time.perf_counter()
print(f'Planning takes: {end-start:4.2f}s')
max_loaded, load_actions, write_actions = seamless_seg.analyse_plan(plan)
print(f'Plan holds a maximum of {max_loaded} tiles in memory at once.')
print(f'Plan loads {load_actions} tiles and writes {write_actions} tiles.')
print(f'That is, plan holds {max_loaded/load_actions:4.1%} of tiles in memory')
if not actually_run:
print()
return
start = time.perf_counter()
# Create fake random data
in_tile_gen = _random_tile_gen((*tile_size, 3))
# Run plan
out_img = np.zeros((*image_size, 3))
for index, out_geom, out_tile in seamless_seg.run_plan(plan, in_tile_gen):
y_slc, x_slc = seamless_seg.shape_to_slices(out_geom)
out_img[y_slc, x_slc] = out_tile
end = time.perf_counter()
print(f'Running plan takes: {end-start:4.2f}s')
print()
# Save out_img to disk
if draw_area:
coords = shapely.get_coordinates(area)
rr, cc = skimage.draw.polygon(coords[:, 0], coords[:, 1])
out_img[rr, cc] = 255
vis_folder = Path('vis')
vis_folder.mkdir(exist_ok=True)
Image.fromarray(out_img.astype(np.uint8)).save(vis_folder / fname)
def test_batched_colour_grid(
image_size,
tile_size,
overlap,
fname='out_img.png'
):
random_tiles = _random_tile_gen((*tile_size, 3))
batch_size = 16
def _get_tiles(indexs, geoms):
return np.stack([next(random_tiles) for _ in geoms])
def _input_generator(plan):
geoms = seamless_seg.get_plan_input_geoms(plan)
return seamless_seg.threaded_batched_tile_get(geoms, batch_size, _get_tiles, batch_size*3)
# Iterate over output tiles; in this case, write directly to a np array
# But in real use cases, you can write the tile to disk (e.g. rasterio/tifffile)
plan, grid = seamless_seg.plan_regular_grid(image_size, tile_size, overlap)
in_tiles = _input_generator(plan)
out_img = np.zeros((*image_size, 3))
for index, out_geom, out_tile in seamless_seg.run_plan(plan, in_tiles):
y_slc, x_slc = seamless_seg.shape_to_slices(out_geom)
out_img[y_slc, x_slc] = out_tile
# Save out_img to disk
vis_folder = Path('vis')
vis_folder.mkdir(exist_ok=True)
Image.fromarray(out_img.astype(np.uint8)).save(vis_folder / fname)
def random_colour_grid_visualise_cache(image_size, tile_size, overlap, do_print=False):
plan, grid = seamless_seg.plan_regular_grid(image_size, tile_size, overlap)
visualisation = np.zeros((*grid.shape[:2], 3), dtype=np.uint8)
vis_cache_folder = Path('vis/cache-vis')
vis_cache_folder.mkdir(exist_ok=True)
i = 0
def _on_load(index):
nonlocal i
if do_print:
print(f'{i:04d}: loading {index}')
visualisation[index[0], index[1], 0] = 255
Image.fromarray(visualisation).save(vis_cache_folder / f'cache_{i:04d}.png')
i += 1
def _on_unload(index):
nonlocal i
if do_print:
print(f'{i:04d}: unloading {index}')
visualisation[index[0], index[1], 0] = 0
Image.fromarray(visualisation).save(vis_cache_folder / f'cache_{i:04d}.png')
i += 1
def _on_disk_evict(index):
nonlocal i
if do_print:
print(f'{i:04d}: evicting {index} to disk')
visualisation[index[0], index[1], 1] = 255
visualisation[index[0], index[1], 0] = 0
Image.fromarray(visualisation).save(vis_cache_folder / f'cache_{i:04d}.png')
i += 1
def _on_disk_restore(index):
nonlocal i
if do_print:
print(f'{i:04d}: restoring {index} from disk')
visualisation[index[0], index[1], 1] = 0
visualisation[index[0], index[1], 0] = 255
Image.fromarray(visualisation).save(vis_cache_folder / f'cache_{i:04d}.png')
i += 1
def _on_step(n):
# Image.fromarray(visualisation).save(vis_cache_folder / f'cache_{n:04d}.png')
pass
in_tile_gen = _random_tile_gen((*tile_size, 3))
out_tiles = seamless_seg.run_plan(
plan,
in_tile_gen,
10,
disk_cache_dir=Path('vis/data-cache'),
on_load=_on_load,
on_unload=_on_unload,
on_disk_evict=_on_disk_evict,
on_disk_restore=_on_disk_restore,
on_step=_on_step,
)
for index, out_geom, out_tile in out_tiles:
if do_print:
print(f'{i:04d}: writing {index}')
visualisation[index[0], index[1], 2] = 255
Image.fromarray(visualisation).save(vis_cache_folder / f'cache_{i:04d}.png')
i += 1
def main():
area = shapely.Polygon([
[26, 14], [5, 20], [19, 28], [5, 44], [15, 55], [22, 40],
[38, 55], [44, 37], [26, 28], [44, 19], [40, 4], [17, 6]
])
random_colour_grid((48, 64), (5, 5), (2, 2), actually_run=True, fname='small_grid.png')
random_colour_grid((128, 86), (7, 7), (2, 2), area=area, actually_run=True, fname='small_grid_w_area.png')
random_colour_grid((256, 256), (58, 84), (6, 12), actually_run=True, fname='mid_grid.png')
# random_colour_grid((40000, 40000), (256, 256), (64, 64), actually_run=False)
# random_colour_grid_visualise_cache((48, 64), (11,11), (2, 2))
test_batched_colour_grid((128, 86), (7, 7), (2, 2), fname='small_grid_batched.png')
if __name__ == '__main__':
main()