forked from minus34/gnaf-loader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathload-gnaf.py
877 lines (706 loc) · 39 KB
/
load-gnaf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
# *********************************************************************************************************************
# load-gnaf.py
# *********************************************************************************************************************
#
# A script for loading raw GNAF & PSMA Admin boundaries and creating flattened, complete, easy to use versions of them
#
# Author: Hugh Saalmans
# GitHub: minus34
# Twitter: @minus34
#
# Copyright:
# - Code is licensed under an Apache License, version 2.0
# - Data is copyright PSMA - SOON TO BE licensed under a Creative Commons (By Attribution) license
# Process:
# 1. Loads raw GNAF into Postgres from PSV files, using COPY
# 2. Loads raw PSMA Admin Boundaries from Shapefiles into Postgres using shp2pgsql (part of PostGIS)
# 3. Creates flattened and simplified GNAF tables containing all relevant data
# 4. Creates a ready to use Locality Boundaries table containing a number of fixes to overcome known data issues
# 5. Splits the locality boundary for Melbourne into 2, one for each of its postcodes (3000 & 3004)
# 6. Creates final principal & alias address tables containing fixes based on the above locality customisations
# 7. Creates an almost correct Postcode Boundary table from locality boundary aggregates with address based postcodes
# 8. Adds primary and foreign keys to confirm data integrity across the reference tables
#
# *********************************************************************************************************************
import multiprocessing
import math
import os
import subprocess
import platform
import psycopg2
import argparse
from datetime import datetime
def main():
parser = argparse.ArgumentParser(
description='A quick way to load the complete GNAF and PSMA Admin Boundaries into Postgres, '
'simplified and ready to use as reference data for geocoding, analysis and visualisation.')
parser.add_argument(
'--prevacuum', action='store_true', default=False, help='Forces database to be vacuumed after dropping tables.')
parser.add_argument(
'--raw-fk', action='store_true', default=False,
help='Creates primary & foreign keys for the raw GNAF tables (adds time to data load)')
parser.add_argument(
'--raw-unlogged', action='store_true', default=False,
help='Creates unlogged raw GNAF tables, speeding up the import. Only specify this option if you don\'t care '
'about the raw data afterwards - they will be lost if the server crashes!')
parser.add_argument(
'--max-processes', type=int, default=6,
help='Maximum number of parallel processes to use for the data load. (Set it to the number of cores on the '
'Postgres server minus 2, limit to 12 if 16+ cores - there is minimal benefit beyond 12). Defaults to 6.')
parser.add_argument(
'--boundary-tag', action='store_true', default=True,
help='Tags all addresses with admin boundary IDs for creating aggregates and choropleth maps. '
'IMPORTANT: this will add 15-60 minutes to the process if you have PostGIS 2.2. '
'WARNING: if you have PostGIS 2.1 or lower - this process can take hours')
# PG Options
parser.add_argument(
'--pghost',
help='Host name for Postgres server. Defaults to PGHOST environment variable if set, otherwise localhost.')
parser.add_argument(
'--pgport', type=int,
help='Port number for Postgres server. Defaults to PGPORT environment variable if set, otherwise 5432.')
parser.add_argument(
'--pgdb',
help='Database name for Postgres server. Defaults to PGDATABASE environment variable if set, '
'otherwise psma_201602.')
parser.add_argument(
'--pguser',
help='Username for Postgres server. Defaults to PGUSER environment variable if set, otherwise postgres.')
parser.add_argument(
'--pgpassword',
help='Password for Postgres server. Defaults to PGPASSWORD environment variable if set, '
'otherwise \'password\'.')
# schema names for the raw gnaf, flattened reference and admin boundary tables
parser.add_argument(
'--raw-gnaf-schema', default='raw_gnaf',
help='Schema name to store raw GNAF tables in. Defaults to \'raw_gnaf\'.')
parser.add_argument(
'--raw-admin-schema', default='raw_admin_bdys',
help='Schema name to store raw admin boundary tables in. Defaults to \'raw_admin_bdys\'.')
parser.add_argument(
'--gnaf-schema', default='gnaf',
help='Destination schema name to store final GNAF tables in. Defaults to \'gnaf\'.')
parser.add_argument(
'--admin-schema', default='admin_bdys',
help='Destination schema name to store final admin boundary tables in. Defaults to \'admin_bdys\'.')
# directories
parser.add_argument(
'--gnaf-tables-path', required=True,
help='Path to source GNAF tables (*.psv files). This directory must be accessible by the Postgres server, '
'and the local path to the directory for the server must be set via the local-server-dir argument '
'if it differs from this path.')
parser.add_argument(
'--local-server-dir',
help='Local path on server corresponding to gnaf-tables-path, if different to gnaf-tables-path.')
parser.add_argument(
'--admin-bdys-path', required=True, help='Local path to source admin boundary files.')
# states to load
parser.add_argument('--states', nargs='+', choices=["ACT", "NSW", "NT", "OT", "QLD", "SA", "TAS", "VIC", "WA"],
default=["ACT", "NSW", "NT", "OT", "QLD", "SA", "TAS", "VIC", "WA"],
help='List of states to load data for. Defaults to all states.')
args = parser.parse_args()
settings = dict()
settings['vacuum_db'] = args.prevacuum
settings['primary_foreign_keys'] = args.raw_fk
settings['unlogged_tables'] = args.raw_unlogged
settings['max_concurrent_processes'] = args.max_processes
settings['states_to_load'] = args.states
settings['boundary_tag'] = args.boundary_tag
settings['raw_gnaf_schema'] = args.raw_gnaf_schema
settings['raw_admin_bdys_schema'] = args.raw_admin_schema
settings['gnaf_schema'] = args.gnaf_schema
settings['admin_bdys_schema'] = args.admin_schema
settings['gnaf_network_directory'] = args.gnaf_tables_path.replace("\\", "/")
if args.local_server_dir:
settings['gnaf_pg_server_local_directory'] = args.local_server_dir.replace("\\", "/")
else:
settings['gnaf_pg_server_local_directory'] = settings['gnaf_network_directory']
settings['admin_bdys_local_directory'] = args.admin_bdys_path.replace("\\", "/")
# create postgres connect string
settings['pg_host'] = args.pghost or os.getenv("PGHOST", "localhost")
settings['pg_port'] = args.pgport or os.getenv("PGPORT", 5432)
settings['pg_db'] = args.pgdb or os.getenv("PGDATABASE", "psma_201602")
settings['pg_user'] = args.pguser or os.getenv("PGUSER", "postgres")
settings['pg_password'] = args.pgpassword or os.getenv("PGPASSWORD", "password")
settings['pg_connect_string'] = "dbname='{0}' host='{1}' port='{2}' user='{3}' password='{4}'".format(
settings['pg_db'], settings['pg_host'], settings['pg_port'], settings['pg_user'], settings['pg_password'])
# set postgres script directory
settings['sql_dir'] = os.path.join(os.path.dirname(os.path.realpath(__file__)), "postgres-scripts")
# set the list of admin bdys to create analysis tables for and to boundary tag with
admin_bdy_list = list()
admin_bdy_list.append(["locality_bdys", "locality_pid"])
admin_bdy_list.append(["commonwealth_electorates", "ce_pid"])
admin_bdy_list.append(["local_government_areas", "lga_pid"])
admin_bdy_list.append(["local_government_wards", "ward_pid"])
admin_bdy_list.append(["state_bdys", "state_pid"])
admin_bdy_list.append(["state_lower_house_electorates", "se_lower_pid"])
admin_bdy_list.append(["state_upper_house_electorates", "se_upper_pid"])
settings['admin_bdy_list'] = admin_bdy_list
full_start_time = datetime.now()
# connect to Postgres
try:
pg_conn = psycopg2.connect(settings['pg_connect_string'])
except psycopg2.Error:
print "Unable to connect to database\nACTION: Check your Postgres parameters and/or database security"
return False
pg_conn.autocommit = True
pg_cur = pg_conn.cursor()
# add postgis to database (in the public schema) - run this in a try first time to confirm db user has privileges
try:
pg_cur.execute("SET search_path = public, pg_catalog; CREATE EXTENSION IF NOT EXISTS postgis")
except psycopg2.Error:
print "Unable to add PostGIS extension\nACTION: Check your Postgres user privileges or PostGIS install"
return False
# get Postgres, PostGIS & GEOS versions
pg_cur.execute("SELECT version()")
pg_version = pg_cur.fetchone()[0].replace("PostgreSQL ", "").split(",")[0]
pg_cur.execute("SELECT PostGIS_full_version()")
lib_strings = pg_cur.fetchone()[0].replace("\"", "").split(" ")
postgis_version = "UNKNOWN"
postgis_version_num = 0.0
geos_version = "UNKNOWN"
geos_version_num = 0.0
settings['st_subdivide_supported'] = False
for lib_string in lib_strings:
if lib_string[:8] == "POSTGIS=":
postgis_version = lib_string.replace("POSTGIS=", "")
postgis_version_num = float(postgis_version[:3])
if lib_string[:5] == "GEOS=":
geos_version = lib_string.replace("GEOS=", "")
geos_version_num = float(geos_version[:3])
if postgis_version_num >= 2.2 and geos_version_num >= 3.5:
settings['st_subdivide_supported'] = True
print ""
print "Running on Postgres {0} and PostGIS {1} (with GEOS {2})".format(pg_version, postgis_version, geos_version)
# PART 1 - load gnaf from PSV files
print ""
start_time = datetime.now()
print "Part 1 of 4 : Start raw GNAF load : {0}".format(start_time)
drop_tables_and_vacuum_db(pg_cur, settings)
create_raw_gnaf_tables(pg_cur, settings)
populate_raw_gnaf(settings)
index_raw_gnaf(settings)
if settings['primary_foreign_keys']:
create_primary_foreign_keys(settings)
else:
print "\t- Step 6 of 6 : primary & foreign keys NOT created"
# set postgres search path back to the default
pg_cur.execute("SET search_path = public, pg_catalog")
print "Part 1 of 4 : Raw GNAF loaded! : {0}".format(datetime.now() - start_time)
# PART 2 - load raw admin boundaries from Shapefiles
print ""
start_time = datetime.now()
print "Part 2 of 4 : Start raw admin boundary load : {0}".format(start_time)
load_raw_admin_boundaries(pg_cur, settings)
prep_admin_bdys(pg_cur, settings)
create_admin_bdys_for_analysis(settings)
print "Part 2 of 4 : Raw admin boundaries loaded! : {0}".format(datetime.now() - start_time)
# PART 3 - create flattened and standardised GNAF and Administrative Boundary reference tables
print ""
start_time = datetime.now()
print "Part 3 of 4 : Start create reference tables : {0}".format(start_time)
create_reference_tables(pg_cur, settings)
print "Part 3 of 4 : Reference tables created! : {0}".format(datetime.now() - start_time)
# PART 4 - Boundary tag GNAF addresses
print ""
if settings['boundary_tag']:
start_time = datetime.now()
print "Part 4 of 4 : Start boundary tagging addresses : {0}".format(start_time)
boundary_tag_gnaf(pg_cur, settings)
print "Part 4 of 4 : Addresses boundary tagged: {0}".format(datetime.now() - start_time)
else:
print "Part 4 of 4 : Addresses NOT boundary tagged"
# # PART 5 - QA - CODE NOT STARTED!
# print ""
# start_time = datetime.now()
# print "Part 5 of 5 : QA results : {0}".format(start_time)
# qa_tables(pg_cur, settings)
# print "Part 5 of 5 : results QA'd : {0}".format(datetime.now() - start_time)
pg_cur.close()
pg_conn.close()
print "Total time : : {0}".format(datetime.now() - full_start_time)
def drop_tables_and_vacuum_db(pg_cur, settings):
# Step 1 of 6 : drop tables
start_time = datetime.now()
pg_cur.execute(open_sql_file("01-01-drop-tables.sql", settings))
print "\t- Step 1 of 6 : tables dropped : {0}".format(datetime.now() - start_time)
# # Step 2 of 6 : vacuum database (if requested)
# start_time = datetime.now()
# if vacuum_db:
# pg_cur.execute("VACUUM")
# print "\t- Step 2 of 6 : database vacuumed : {0}".format(datetime.now() - start_time)
# else:
print "\t- Step 2 of 6 : database NOT vacuumed"
def create_raw_gnaf_tables(pg_cur, settings):
# Step 3 of 6 : create tables
start_time = datetime.now()
# prep create table sql scripts (note: file doesn't contain any schema prefixes on table names)
sql = open(os.path.join(settings['sql_dir'], "01-03-raw-gnaf-create-tables.sql"), "r").read()
# create schema and set as search path
if settings['raw_gnaf_schema'] != "public":
pg_cur.execute("CREATE SCHEMA IF NOT EXISTS {0} AUTHORIZATION {1}"
.format(settings['raw_gnaf_schema'], settings['pg_user']))
pg_cur.execute("SET search_path = {0}".format(settings['raw_gnaf_schema'],))
# alter create table script to run on chosen schema
sql = sql.replace("SET search_path = public", "SET search_path = {0}".format(settings['raw_gnaf_schema'],))
# set tables to unlogged to speed up the load? (if requested)
# -- they'll have to be rebuilt using this script again after a system crash --
if settings['unlogged_tables']:
sql = sql.replace("CREATE TABLE ", "CREATE UNLOGGED TABLE ")
unlogged_string = "UNLOGGED "
else:
unlogged_string = ""
if settings['pg_user'] != "postgres":
# alter create table script to run with correct Postgres user name
sql = sql.replace("postgres", settings['pg_user'])
# create raw gnaf tables
pg_cur.execute(sql)
print "\t- Step 3 of 6 : {1}tables created : {0}".format(datetime.now() - start_time, unlogged_string)
# load raw gnaf authority & state tables using multiprocessing
def populate_raw_gnaf(settings):
# Step 4 of 6 : load raw gnaf authority & state tables
start_time = datetime.now()
# authority code file list
sql_list = get_raw_gnaf_files("authority_code", settings)
# add state file lists
for state in settings['states_to_load']:
print "\t\t- Loading state {}".format(state)
sql_list.extend(get_raw_gnaf_files(state, settings))
# are there any files to load?
if len(sql_list) == 0:
print "No raw GNAF PSV files found\nACTION: Check your 'gnaf_network_directory' path"
print "\t- Step 4 of 6 : table populate FAILED!"
else:
# load all PSV files using multiprocessing
multiprocess_list("sql", sql_list, settings)
print "\t- Step 4 of 6 : tables populated : {0}".format(datetime.now() - start_time)
def get_raw_gnaf_files(prefix, settings):
sql_list = []
prefix = prefix.lower()
# get a dictionary of all files matching the filename prefix
for root, dirs, files in os.walk(settings['gnaf_network_directory']):
for file_name in files:
if file_name.lower().startswith(prefix + "_"):
if file_name.lower().endswith(".psv"):
file_path = os.path.join(root, file_name)\
.replace(settings['gnaf_network_directory'], settings['gnaf_pg_server_local_directory'])
table = file_name.lower().replace(prefix + "_", "", 1).replace("_psv.psv", "")
# if a non-Windows Postgres server OS - fix file path
if settings['gnaf_pg_server_local_directory'][0:1] == "/":
file_path = file_path.replace("\\", "/")
# print file_path
sql = "COPY {0}.{1} FROM '{2}' DELIMITER '|' CSV HEADER;"\
.format(settings['raw_gnaf_schema'], table, file_path)
sql_list.append(sql)
return sql_list
# index raw gnaf using multiprocessing
def index_raw_gnaf(settings):
# Step 5 of 6 : create indexes
start_time = datetime.now()
raw_sql_list = open_sql_file("01-05-raw-gnaf-create-indexes.sql", settings).split("\n")
sql_list = []
for sql in raw_sql_list:
if sql[0:2] != "--" and sql[0:2] != "":
sql_list.append(sql)
multiprocess_list("sql", sql_list, settings)
print "\t- Step 5 of 6 : indexes created: {0}".format(datetime.now() - start_time)
# create raw gnaf primary & foreign keys (for data integrity) using multiprocessing
def create_primary_foreign_keys(settings):
start_time = datetime.now()
key_sql = open(os.path.join(settings['sql_dir'], "01-06-raw-gnaf-create-primary-foreign-keys.sql"), "r").read()
key_sql_list = key_sql.split("--")
sql_list = []
for sql in key_sql_list:
sql = sql.strip()
if sql[0:6] == "ALTER ":
# add schema to tables names, in case raw gnaf schema not the default
sql = sql.replace("ALTER TABLE ONLY ", "ALTER TABLE ONLY " + settings['raw_gnaf_schema'] + ".")
sql_list.append(sql)
# run queries in separate processes
multiprocess_list("sql", sql_list, settings)
print "\t- Step 6 of 6 : primary & foreign keys created : {0}".format(datetime.now() - start_time)
# loads the admin bdy shapefiles using the shp2pgsql command line tool (part of PostGIS), using multiprocessing
def load_raw_admin_boundaries(pg_cur, settings):
start_time = datetime.now()
# drop existing views
pg_cur.execute(open_sql_file("02-01-drop-admin-bdy-views.sql", settings))
# add locality class authority code table
settings['states_to_load'].extend(["authority_code"])
# create schema
if settings['raw_admin_bdys_schema'] != "public":
pg_cur.execute("CREATE SCHEMA IF NOT EXISTS {0} AUTHORIZATION {1}"
.format(settings['raw_admin_bdys_schema'], settings['pg_user']))
# set psql connect string and password
psql_str = "psql -U {0} -d {1} -h {2} -p {3}"\
.format(settings['pg_user'], settings['pg_db'], settings['pg_host'], settings['pg_port'])
password_str = ''
if not os.getenv("PGPASSWORD"):
if platform.system() == "Windows":
password_str = "SET"
else:
password_str = "export"
password_str += " PGPASSWORD={0}&&".format(settings['pg_password'])
# get file list
table_list = []
cmd_list1 = []
cmd_list2 = []
for state in settings['states_to_load']:
state = state.lower()
# get a dictionary of Shapefiles and DBFs matching the state
for root, dirs, files in os.walk(settings['admin_bdys_local_directory']):
for file_name in files:
if file_name.lower().startswith(state + "_"):
if file_name.lower().endswith("_shp.dbf"):
# change file type for spatial files
if file_name.lower().endswith("_polygon_shp.dbf"):
spatial = True
bdy_file = os.path.join(root, file_name.replace(".dbf", ".shp"))
else:
spatial = False
bdy_file = os.path.join(root, file_name)
bdy_table = file_name.lower().replace(state + "_", "aus_", 1).replace("_shp.dbf", "")
# set command line parameters depending on whether this is the 1st state (for creating tables)
table_list_add = False
if bdy_table not in table_list:
table_list_add = True
if spatial:
params = "-d -D -s 4283 -i"
else:
params = "-d -D -G -n -i"
else:
if spatial:
params = "-a -D -s 4283 -i"
else:
params = "-a -D -G -n -i"
cmd = "{0}shp2pgsql {1} \"{2}\" {3}.{4} | {5}".format(
password_str, params, bdy_file, settings['raw_admin_bdys_schema'], bdy_table, psql_str)
# if locality file from Towns folder: don't add - it's a duplicate
if "town points" not in bdy_file.lower():
if table_list_add:
table_list.append(bdy_table)
cmd_list1.append(cmd)
else:
cmd_list2.append(cmd)
else:
if not bdy_file.lower().endswith("_locality_shp.dbf"):
if table_list_add:
table_list.append(bdy_table)
cmd_list1.append(cmd)
else:
cmd_list2.append(cmd)
# print '\n'.join(cmd_list1)
# print '\n'.join(cmd_list2)
# are there any files to load?
if len(cmd_list1) == 0:
print "No Admin Boundary files found\nACTION: Check your 'admin-bdys-path' argument"
else:
# load files in separate processes -
# do the commands that create the tables first before attempting the subsequent insert commands
multiprocess_list("cmd", cmd_list1, settings)
multiprocess_list("cmd", cmd_list2, settings)
print "\t- Step 1 of 3 : raw admin boundaries loaded : {0}".format(datetime.now() - start_time)
def prep_admin_bdys(pg_cur, settings):
# Step 2 of 3 : create admin bdy tables read to be used
start_time = datetime.now()
if settings['admin_bdys_schema'] != "public":
pg_cur.execute("CREATE SCHEMA IF NOT EXISTS {0} AUTHORIZATION {1}"
.format(settings['admin_bdys_schema'], settings['pg_user']))
# create table using multiprocessing - using flag in file to split file up into sets of statements
sql_list = open_sql_file("02-02-prep-admin-bdys-tables.sql", settings).split("-- # --")
multiprocess_list("sql", sql_list, settings)
# Special case - remove custom outback bdy if South Australia not requested
if 'SA' not in settings['states_to_load']:
pg_cur.execute(prep_sql("DELETE FROM admin_bdys.locality_bdys WHERE locality_pid = 'SA999999'", settings))
pg_cur.execute(prep_sql("VACUUM ANALYZE admin_bdys.locality_bdys", settings))
print "\t- Step 2 of 3 : admin boundaries prepped : {0}".format(datetime.now() - start_time)
def create_admin_bdys_for_analysis(settings):
# Step 3 of 3 : create admin bdy tables optimised for spatial analysis
start_time = datetime.now()
if settings['st_subdivide_supported']:
template_sql = open_sql_file("02-03-create-admin-bdy-analysis-tables_template.sql", settings)
sql_list = list()
for table in settings['admin_bdy_list']:
sql = template_sql.format(table[0], table[1])
if table[0] == 'locality_bdys': # special case, need to change schema name
# sql = sql.replace(settings['raw_admin_bdys_schema'], settings['admin_bdys_schema'])
sql = sql.replace("name", "locality_name")
sql_list.append(sql)
multiprocess_list("sql", sql_list, settings)
print "\t- Step 3 of 3 : admin boundaries for analysis created : {0}".format(datetime.now() - start_time)
else:
print "\t- Step 3 of 3 : admin boundaries for analysis NOT created - requires PostGIS 2.2+ with GEOS 3.5.0+"
# create gnaf reference tables by flattening raw gnaf address, streets & localities into a usable form
# also creates all supporting lookup tables and usable admin bdy tables
def create_reference_tables(pg_cur, settings):
# set postgres search path back to the default
pg_cur.execute("SET search_path = public, pg_catalog")
# create schemas
if settings['gnaf_schema'] != "public":
pg_cur.execute("CREATE SCHEMA IF NOT EXISTS {0} AUTHORIZATION {1}"
.format(settings['gnaf_schema'], settings['pg_user']))
# Step 1 of 14 : create reference tables
start_time = datetime.now()
pg_cur.execute(open_sql_file("03-01-reference-create-tables.sql", settings))
print "\t- Step 1 of 14 : create reference tables : {0}".format(datetime.now() - start_time)
# Step 2 of 14 : populate localities
start_time = datetime.now()
pg_cur.execute(open_sql_file("03-02-reference-populate-localities.sql", settings))
print "\t- Step 2 of 14 : localities populated : {0}".format(datetime.now() - start_time)
# Step 3 of 14 : populate locality aliases
start_time = datetime.now()
pg_cur.execute(open_sql_file("03-03-reference-populate-locality-aliases.sql", settings))
print "\t- Step 3 of 14 : locality aliases populated : {0}".format(datetime.now() - start_time)
# Step 4 of 14 : populate locality neighbours
start_time = datetime.now()
pg_cur.execute(open_sql_file("03-04-reference-populate-locality-neighbours.sql", settings))
print "\t- Step 4 of 14 : locality neighbours populated : {0}".format(datetime.now() - start_time)
# Step 5 of 14 : populate streets
start_time = datetime.now()
pg_cur.execute(open_sql_file("03-05-reference-populate-streets.sql", settings))
print "\t- Step 5 of 14 : streets populated : {0}".format(datetime.now() - start_time)
# Step 6 of 14 : populate street aliases
start_time = datetime.now()
pg_cur.execute(open_sql_file("03-06-reference-populate-street-aliases.sql", settings))
print "\t- Step 6 of 14 : street aliases populated : {0}".format(datetime.now() - start_time)
# Step 7 of 14 : populate addresses, using multiprocessing
start_time = datetime.now()
sql = open_sql_file("03-07-reference-populate-addresses-1.sql", settings)
sql_list = split_sql_into_list(pg_cur, sql, settings['gnaf_schema'], "streets", "str", "gid", settings)
multiprocess_list('sql', sql_list, settings)
pg_cur.execute(prep_sql("ANALYZE gnaf.temp_addresses;", settings))
print "\t- Step 7 of 14 : addresses populated : {0}".format(datetime.now() - start_time)
# Step 8 of 14 : populate principal alias lookup
start_time = datetime.now()
pg_cur.execute(open_sql_file("03-08-reference-populate-address-alias-lookup.sql", settings))
print "\t- Step 8 of 14 : principal alias lookup populated : {0}".format(datetime.now() - start_time)
# Step 9 of 14 : populate primary secondary lookup
start_time = datetime.now()
pg_cur.execute(open_sql_file("03-09-reference-populate-address-secondary-lookup.sql", settings))
pg_cur.execute(prep_sql("VACUUM ANALYSE gnaf.address_secondary_lookup", settings))
print "\t- Step 9 of 14 : primary secondary lookup populated : {0}".format(datetime.now() - start_time)
# Step 10 of 14 : split the Melbourne locality into its 2 postcodes (3000, 3004)
start_time = datetime.now()
pg_cur.execute(open_sql_file("03-10-reference-split-melbourne.sql", settings))
print "\t- Step 10 of 14 : Melbourne split : {0}".format(datetime.now() - start_time)
# Step 11 of 14 : finalise localities assigned to streets and addresses
start_time = datetime.now()
pg_cur.execute(open_sql_file("03-11-reference-finalise-localities.sql", settings))
print "\t- Step 11 of 14 : localities finalised : {0}".format(datetime.now() - start_time)
# Step 12 of 14 : finalise addresses, using multiprocessing
start_time = datetime.now()
sql = open_sql_file("03-12-reference-populate-addresses-2.sql", settings)
sql_list = split_sql_into_list(pg_cur, sql, settings['gnaf_schema'], "localities", "loc", "gid", settings)
multiprocess_list('sql', sql_list, settings)
# turf the temp address table
pg_cur.execute(prep_sql("DROP TABLE IF EXISTS gnaf.temp_addresses", settings))
print "\t- Step 12 of 14 : addresses finalised : {0}".format(datetime.now() - start_time)
# Step 13 of 14 : create almost correct postcode boundaries by aggregating localities, using multiprocessing
start_time = datetime.now()
sql = open_sql_file("03-13-reference-derived-postcode-bdys.sql", settings)
sql_list = []
for state in settings['states_to_load']:
state_sql = sql.replace("GROUP BY ", "WHERE state = '{0}' GROUP BY ".format(state))
sql_list.append(state_sql)
multiprocess_list("sql", sql_list, settings)
# create analysis table?
if settings['st_subdivide_supported']:
pg_cur.execute(open_sql_file("03-13a-create-postcode-analysis-table.sql", settings))
print "\t- Step 13 of 14 : postcode boundaries created : {0}".format(datetime.now() - start_time)
# Step 14 of 14 : create indexes, primary and foreign keys, using multiprocessing
start_time = datetime.now()
raw_sql_list = open_sql_file("03-14-reference-create-indexes.sql", settings).split("\n")
sql_list = []
for sql in raw_sql_list:
if sql[0:2] != "--" and sql[0:2] != "":
sql_list.append(sql)
multiprocess_list("sql", sql_list, settings)
print "\t- Step 14 of 14 : create primary & foreign keys and indexes : {0}".format(datetime.now() - start_time)
def boundary_tag_gnaf(pg_cur, settings):
# Step 1 of 3 : tag gnaf addresses with admin boundary IDs, using multiprocessing
start_time = datetime.now()
# remove localities, postcodes and states as these IDs are already assigned to GNAF addresses
table_list = list()
for table in settings['admin_bdy_list']:
if table[0] not in ["locality_bdys", "state_bdys"]:
table_list.append([table[0], table[1]])
# create temp tables
template_sql = open_sql_file("04-01a-bdy-tag-create-table-template.sql", settings)
for table in table_list:
pg_cur.execute(template_sql.format(table[0],))
# create temp tables of bdy tagged gnaf_pids
template_sql = open_sql_file("04-01b-bdy-tag-template.sql", settings)
sql_list = list()
for table in table_list:
sql = template_sql.format(table[0], table[1])
# if no analysis tables created - use the full tables instead of the subdivided ones
# WARNING: this can add hours to the processing
if settings['st_subdivide_supported']:
table_name = "{0}_analysis".format(table[0],)
else:
table_name = table[0]
sql_list.extend(
split_sql_into_list(pg_cur, sql, settings['admin_bdys_schema'], table_name, "bdys", "gid", settings))
multiprocess_list("sql", sql_list, settings)
print "\t- Step 1 of 3 : gnaf addresses tagged with admin boundary IDs: {0}".format(datetime.now() - start_time)
start_time = datetime.now()
# Step 2 of 3 : delete invalid matches, create indexes and analyse tables
sql_list = list()
for table in table_list:
sql = "DELETE FROM {0}.temp_{1}_tags WHERE gnaf_state <> bdy_state AND gnaf_state <> 'OT';" \
"CREATE INDEX temp_{1}_tags_gnaf_pid_idx ON {0}.temp_{1}_tags USING btree(gnaf_pid);" \
"ANALYZE {0}.temp_{1}_tags".format(settings['gnaf_schema'], table[0])
sql_list.append(sql)
multiprocess_list("sql", sql_list, settings)
print "\t- Step 2 of 3 : invalid matches deleted & bdy tag indexes created : {0}"\
.format(datetime.now() - start_time)
start_time = datetime.now()
# Step 3 of 3 : create boundary tag table
# create final table
pg_cur.execute("DROP TABLE IF EXISTS {0}.address_admin_boundaries CASCADE".format(settings['gnaf_schema'],))
create_table_list = list()
create_table_list.append("CREATE TABLE {0}.address_admin_boundaries (gid serial NOT NULL,"
"gnaf_pid character varying(16) NOT NULL,"
"alias_principal character(1) NOT NULL,"
"locality_pid character varying(16) NOT NULL,"
"locality_name character varying(100) NOT NULL,"
"postcode character varying(4),"
"state character varying(3) NOT NULL"
.format(settings['gnaf_schema'],))
for table in table_list:
pid_field = table[1]
name_field = pid_field. replace("_pid", "_name")
create_table_list.append(", {0} character varying(15), {1} character varying(100)"
.format(pid_field, name_field))
create_table_list.append(") WITH (OIDS=FALSE);ALTER TABLE {0}.address_admin_boundaries OWNER TO postgres"
.format(settings['gnaf_schema'],))
pg_cur.execute("".join(create_table_list))
# create insert statement for multiprocessing
insert_field_list = list()
insert_field_list.append("(gnaf_pid, alias_principal, locality_pid, locality_name, postcode, state")
insert_join_list = list()
insert_join_list.append("FROM {0}.address_principals AS pnts ".format(settings['gnaf_schema']))
select_field_list = list()
select_field_list.append("SELECT pnts.gnaf_pid, pnts.alias_principal, pnts.locality_pid, "
"pnts.locality_name, pnts.postcode, pnts.state")
for table in table_list:
pid_field = table[1]
name_field = pid_field. replace("_pid", "_name")
insert_field_list.append(", {0}, {1}".format(pid_field, name_field))
select_field_list.append(", temp_{0}_tags.bdy_pid, temp_{0}_tags.bdy_name ".format(table[0]))
insert_join_list.append("LEFT OUTER JOIN {0}.temp_{1}_tags ON pnts.gnaf_pid = temp_{1}_tags.gnaf_pid "
.format(settings['gnaf_schema'], table[0]))
insert_field_list.append(") ")
insert_statement_list = list()
insert_statement_list.append("INSERT INTO {0}.address_admin_boundaries ".format(settings['gnaf_schema'],))
insert_statement_list.append("".join(insert_field_list))
insert_statement_list.append("".join(select_field_list))
insert_statement_list.append("".join(insert_join_list))
sql = "".join(insert_statement_list) + ";"
sql_list = split_sql_into_list(pg_cur, sql, settings['gnaf_schema'], "address_principals", "pnts", "gid", settings)
# print "\n".join(sql_list)
multiprocess_list("sql", sql_list, settings)
print "\t- Step 3 of 3 : gnaf bdy tag table created : {0}".format(datetime.now() - start_time)
# takes a list of sql queries or command lines and runs them using multiprocessing
def multiprocess_list(mp_type, work_list, settings):
pool = multiprocessing.Pool(processes=settings['max_concurrent_processes'])
num_jobs = len(work_list)
if mp_type == "sql":
results = pool.imap_unordered(run_sql_multiprocessing, [[w, settings] for w in work_list])
else:
results = pool.imap_unordered(run_command_line, work_list)
pool.close()
pool.join()
result_list = list(results)
num_results = len(result_list)
if num_jobs > num_results:
print "\t- A MULTIPROCESSING PROCESS FAILED WITHOUT AN ERROR\nACTION: Check the record counts"
for result in result_list:
if result != "SUCCESS":
print result
def run_sql_multiprocessing(args):
the_sql = args[0]
settings = args[1]
pg_conn = psycopg2.connect(settings['pg_connect_string'])
pg_conn.autocommit = True
pg_cur = pg_conn.cursor()
# set raw gnaf database schema (it's needed for the primary and foreign key creation)
if settings['raw_gnaf_schema'] != "public":
pg_cur.execute("SET search_path = {0}, public, pg_catalog".format(settings['raw_gnaf_schema'],))
try:
pg_cur.execute(the_sql)
result = "SUCCESS"
except psycopg2.Error, e:
result = "SQL FAILED! : {0} : {1}".format(the_sql, e.message)
pg_cur.close()
pg_conn.close()
return result
def run_command_line(cmd):
# run the command line without any output (it'll still tell you if it fails)
try:
fnull = open(os.devnull, "w")
subprocess.call(cmd, shell=True, stdout=fnull, stderr=subprocess.STDOUT)
result = "SUCCESS"
except Exception, e:
result = "COMMAND FAILED! : {0} : {1}".format(cmd, e.message)
return result
def open_sql_file(file_name, settings):
sql = open(os.path.join(settings['sql_dir'], file_name), "r").read()
return prep_sql(sql, settings)
# change schema names in an array of SQL script if schemas not the default
def prep_sql_list(sql_list, settings):
output_list = []
for sql in sql_list:
output_list.append(prep_sql(sql, settings))
return output_list
# change schema names in the SQL script if not the default
def prep_sql(sql, settings):
if settings['raw_gnaf_schema'] != "raw_gnaf":
sql = sql.replace(" raw_gnaf.", " {0}.".format(settings['raw_gnaf_schema'],))
if settings['gnaf_schema'] != "gnaf":
sql = sql.replace(" gnaf.", " {0}.".format(settings['gnaf_schema'],))
if settings['raw_admin_bdys_schema'] != "raw_admin_bdys":
sql = sql.replace(" raw_admin_bdys.", " {0}.".format(settings['raw_admin_bdys_schema'],))
if settings['admin_bdys_schema'] != "admin_bdys":
sql = sql.replace(" admin_bdys.", " {0}.".format(settings['admin_bdys_schema'],))
return sql
def split_sql_into_list(pg_cur, the_sql, table_schema, table_name, table_alias, table_gid, settings):
# get min max gid values from the table to split
min_max_sql = "SELECT MIN({2}) AS min, MAX({2}) AS max FROM {0}.{1}".format(table_schema, table_name, table_gid)
pg_cur.execute(min_max_sql)
result = pg_cur.fetchone()
min_pkey = int(result[0])
max_pkey = int(result[1])
diff = max_pkey - min_pkey
# Number of records in each query
rows_per_request = int(math.floor(float(diff) / float(settings['max_concurrent_processes']))) + 1
# If less records than processes or rows per request, reduce both to allow for a minimum of 15 records each process
if float(diff) / float(settings['max_concurrent_processes']) < 10.0:
rows_per_request = 10
processes = int(math.floor(float(diff) / 10.0)) + 1
print "\t\t- running {0} processes (adjusted due to low row count in table to split)".format(processes)
else:
processes = settings['max_concurrent_processes']
# create list of sql statements to run with multiprocessing
sql_list = []
start_pkey = min_pkey - 1
for i in range(0, processes):
end_pkey = start_pkey + rows_per_request
where_clause = " WHERE {0}.{3} > {1} AND {0}.{3} <= {2}".format(table_alias, start_pkey, end_pkey, table_gid)
if "WHERE " in the_sql:
mp_sql = the_sql.replace(" WHERE ", where_clause + " AND ")
elif "GROUP BY " in the_sql:
mp_sql = the_sql.replace("GROUP BY ", where_clause + " GROUP BY ")
elif "ORDER BY " in the_sql:
mp_sql = the_sql.replace("ORDER BY ", where_clause + " ORDER BY ")
else:
if ";" in the_sql:
mp_sql = the_sql.replace(";", where_clause + ";")
else:
mp_sql = the_sql + where_clause
print "\t\t- NOTICE: no ; found at the end of the SQL statement"
sql_list.append(mp_sql)
start_pkey = end_pkey
# print '\n'.join(sql_list)
return sql_list
if __name__ == '__main__':
main()