Skip to content

Commit

Permalink
Merge pull request #399 from rustprooflabs/dev
Browse files Browse the repository at this point in the history
Draft: Merge dev into main
  • Loading branch information
rustprooflabs authored Aug 7, 2024
2 parents 0c009a5 + 733ebd7 commit ede13a4
Show file tree
Hide file tree
Showing 15 changed files with 360 additions and 322 deletions.
39 changes: 0 additions & 39 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ RAM=2
# to make unit test results visible at the end.
.PHONY: all
all: docker-exec-region docker-exec-input-file \
docker-exec-replication-w-input-file \
docker-exec-default unit-tests

.PHONY: docker-clean
Expand Down Expand Up @@ -115,44 +114,6 @@ docker-exec-input-file: build-run-docker



.PHONE: docker-exec-replication-w-input-file
docker-exec-replication-w-input-file: build-run-docker
# NOTE: This step tests --replication file for an initial load.
# It does **NOT** test the actual replication process for updating data
# using replication mode. Testing actual replication over time in this format
# will not be trivial. The historic file used (2021-01-13) cannot be used
# to seed a replication process, there is a time limit in upstream software
# that requires more recency to the source data. This also cannot simply
# download a file from Geofabrik, as the "latest" file will not have a diff
# to apply so also will not test the actual replication.
#
# Open a PR, Issue, discussion on https://github.com/rustprooflabs/pgosm-flex
# if you have an idea on how to implement this testing functionality.

# copy with arbitrary file name to test --input-file
docker cp tests/data/district-of-columbia-2021-01-13.osm.pbf \
pgosm:/app/output/$(INPUT_FILE_NAME)

# allow files created in later step to be created
docker exec -it pgosm \
chown $(CURRENT_UID):$(CURRENT_GID) /app/output/
# Needed for unit-tests
docker exec -it pgosm \
chown $(CURRENT_UID):$(CURRENT_GID) /app/docker/

# process it, this time without providing the region but directly the filename
docker exec -it \
-e POSTGRES_PASSWORD=mysecretpassword \
-e POSTGRES_USER=postgres \
-u $(CURRENT_UID):$(CURRENT_GID) \
pgosm python3 docker/pgosm_flex.py \
--layerset=minimal \
--ram=$(RAM) \
--replication \
--input-file=/app/output/$(INPUT_FILE_NAME) \
--skip-qgis-style --skip-nested # Make this test run faster


.PHONE: docker-exec-region
docker-exec-region: build-run-docker
# copy for simulating region
Expand Down
6 changes: 6 additions & 0 deletions db/deploy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# PgOSM Flex SQL deploy scripts

The scripts in this folder are executed during PgOSM Flex initialization via
the `prepare_osm_schema()` function in `docker/db.py`.
New or removed files in this folder must be adjusted in that function
as appropriate.
5 changes: 5 additions & 0 deletions db/deploy/replication_functions.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
/*
Creates functions used for maintaining data when --replication is used.
These functions are also used when using `--update append` mode of
PgOSM Flex.
*/
BEGIN;


Expand Down
48 changes: 40 additions & 8 deletions docker/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,10 @@ def log_pg_details():


def prepare_pgosm_db(skip_qgis_style, db_path, import_mode, schema_name):
"""Runs through series of steps to prepare database for PgOSM.
"""Runs through steps to prepare the target database for PgOSM Flex.
Includes additional preparation for using --replication and --updated=append
modes.
Parameters
--------------------------
Expand Down Expand Up @@ -245,6 +248,9 @@ def prepare_pgosm_db(skip_qgis_style, db_path, import_mode, schema_name):
schema_name=schema_name)
run_insert_pgosm_road(db_path=db_path, schema_name=schema_name)

if import_mode.replication_update or import_mode.update == 'append':
osm2pgsql_replication_start()


def start_import(pgosm_region, pgosm_date, srid, language, layerset, git_info,
osm2pgsql_version, import_mode, schema_name, input_file):
Expand Down Expand Up @@ -477,7 +483,7 @@ def get_db_conn(conn_string):
return conn


def pgosm_after_import(flex_path):
def pgosm_after_import(flex_path: str) -> bool:
"""Runs post-processing SQL via Lua script.
Layerset logic is established via environment variable, must happen
Expand Down Expand Up @@ -508,17 +514,38 @@ def pgosm_after_import(flex_path):


def pgosm_nested_admin_polygons(flex_path: str, schema_name: str):
"""Runs stored procedure to calculate nested admin polygons via psql.
"""Runs two stored procedures to calculate nested admin polygons via psql.
Parameters
----------------------
flex_path : str
schema_name : str
"""
sql_raw = f'CALL {schema_name}.build_nested_admin_polygons();'
# Populate the table
sql_raw_1 = f'CALL {schema_name}.populate_place_polygon_nested();'

conn_string = os.environ['PGOSM_CONN']
cmds = ['psql', '-d', conn_string, '-c', sql_raw]
cmds = ['psql', '-d', conn_string, '-c', sql_raw_1]
LOGGER.info('Populating place_polygon_nested table (osm.populate_place_polygon_nested() )')
output = subprocess.run(cmds,
text=True,
cwd=flex_path,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
LOGGER.info(f'Nested polygon output: \n {output.stdout}')

if output.returncode != 0:
err_msg = f'Failed to populate nested polygon data. Return code: {output.returncode}'
LOGGER.error(err_msg)
sys.exit(f'{err_msg} - Check the log output for details.')


# Build the data
sql_raw_2 = f' CALL {schema_name}.build_nested_admin_polygons();'

conn_string = os.environ['PGOSM_CONN']
cmds = ['psql', '-d', conn_string, '-c', sql_raw_2]
LOGGER.info('Building nested polygons... (this can take a while)')
output = subprocess.run(cmds,
text=True,
Expand All @@ -537,18 +564,23 @@ def pgosm_nested_admin_polygons(flex_path: str, schema_name: str):

def osm2pgsql_replication_start():
"""Runs pre-replication step to clean out FKs that would prevent updates.
This function is necessary for using `--replication (osm2pgsql-replication)
and `--update append` mode.
"""
LOGGER.info('Prep database to allow data updates.')
# This use of append applies to both osm2pgsql --append and osm2pgsq-replication, not renaming from "append"
sql_raw = 'CALL osm.append_data_start();'

with get_db_conn(conn_string=connection_string()) as conn:
cur = conn.cursor()
cur.execute(sql_raw)


def osm2pgsql_replication_finish(skip_nested):
"""Runs post-replication step to put FKs back and refresh materialied views.
def osm2pgsql_replication_finish(skip_nested: bool):
"""Runs post-replication step to refresh materialized views and rebuild
nested data when appropriate.
Only needed for `--replication`, not used for `--update append` mode.
Parameters
---------------------
Expand Down
8 changes: 6 additions & 2 deletions docker/geofabrik.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ def set_date_from_metadata(pbf_file: str):
os.environ['PBF_TIMESTAMP'] = meta_timestamp


def pbf_download_needed(pbf_file_with_date: str, md5_file_with_date: str,
pgosm_date: str) -> bool:
def pbf_download_needed(pbf_file_with_date: str,
md5_file_with_date: str,
pgosm_date: str
) -> bool:
"""Decides if the PBF/MD5 files need to be downloaded.
Parameters
Expand All @@ -123,6 +125,8 @@ def pbf_download_needed(pbf_file_with_date: str, md5_file_with_date: str,
"""
logger = logging.getLogger('pgosm-flex')
# If the PBF file exists, check for the MD5 file too.
logger.debug(f'Checking for PBF File: {pbf_file_with_date}')

if os.path.exists(pbf_file_with_date):
logger.info(f'PBF File exists {pbf_file_with_date}')

Expand Down
22 changes: 14 additions & 8 deletions docker/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ def verify_checksum(md5_file: str, path: str):
logger.debug('md5sum validated')


def set_env_vars(region, subregion, srid, language, pgosm_date, layerset,
layerset_path, replication, schema_name):
def set_env_vars(region: str, subregion: str, srid: str, language: str,
pgosm_date: str, layerset: str,
layerset_path: str, schema_name: str, skip_nested: bool):
"""Sets environment variables needed by PgOSM Flex. Also creates DB
record in `osm.pgosm_flex` table.
Expand All @@ -122,11 +123,11 @@ def set_env_vars(region, subregion, srid, language, pgosm_date, layerset,
language : str
pgosm_date : str
layerset : str
Name of layerset matching the INI filename.
layerset_path : str
str when set, or None
replication : bool
Indicates when osm2pgsql-replication is used
schema_name : str
skip_nested : bool
"""
logger = logging.getLogger('pgosm-flex')
logger.debug('Ensuring env vars are not set from prior run')
Expand Down Expand Up @@ -159,6 +160,7 @@ def set_env_vars(region, subregion, srid, language, pgosm_date, layerset,
pgosm_region = get_region_combined(region, subregion)
logger.debug(f'PGOSM_REGION_COMBINED: {pgosm_region}')

os.environ['SKIP_NESTED'] = str(skip_nested)


def get_region_combined(region: str, subregion: str) -> str:
Expand Down Expand Up @@ -225,7 +227,7 @@ def get_git_info(tag_only: bool=False) -> str:


def unset_env_vars():
"""Unsets environment variables used by PgOSM Flex.
"""Unset environment variables used by PgOSM Flex.
Does not pop POSTGRES_DB on purpose to allow non-Docker operation.
"""
Expand All @@ -239,6 +241,7 @@ def unset_env_vars():
os.environ.pop('PGOSM_CONN', None)
os.environ.pop('PGOSM_CONN_PG', None)
os.environ.pop('SCHEMA_NAME', None)
os.environ.pop('SKIP_NESTED', None)


class ImportMode():
Expand Down Expand Up @@ -310,17 +313,17 @@ def okay_to_run(self, prior_import: dict) -> bool:
"""
self.logger.debug(f'Checking if it is okay to run...')
if self.force:
self.logger.warn(f'Using --force, kiss existing data goodbye')
self.logger.warning('Using --force, kiss existing data goodbye.')
return True

# If no prior imports, do not require force
if len(prior_import) == 0:
self.logger.debug(f'No prior import found, okay to proceed.')
self.logger.debug('No prior import found, okay to proceed.')
return True

prior_replication = prior_import['replication']

# Check git version against latest.
# Check PgOSM version using Git tags
# If current version is lower than prior version from latest import, stop.
prior_import_version = prior_import['pgosm_flex_version_no_hash']
git_tag = get_git_info(tag_only=True)
Expand All @@ -345,6 +348,9 @@ def okay_to_run(self, prior_import: dict) -> bool:
self.logger.debug('Okay to proceed with replication')
return True

if self.update == 'append':
return True

msg = 'Prior data exists in the osm schema and --force was not used.'
self.logger.error(msg)
return False
Expand Down
11 changes: 7 additions & 4 deletions docker/pgosm_flex.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ def run_pgosm_flex(ram, region, subregion, debug, force,
region = input_file

helpers.set_env_vars(region, subregion, srid, language, pgosm_date,
layerset, layerset_path, replication, schema_name)
layerset, layerset_path, schema_name,
skip_nested)
db.wait_for_postgres()
if force and db.pg_conn_parts()['pg_host'] == 'localhost':
msg = 'Using --force with the built-in database is unnecessary.'
Expand Down Expand Up @@ -267,7 +268,6 @@ def run_replication_update(skip_nested, flex_path):
"""
logger = logging.getLogger('pgosm-flex')
conn_string = db.connection_string()
db.osm2pgsql_replication_start()

update_cmd = """
osm2pgsql-replication update -d $PGOSM_CONN \
Expand Down Expand Up @@ -531,10 +531,13 @@ def run_post_processing(flex_path, skip_nested, import_mode, schema_name):
logger = logging.getLogger('pgosm-flex')

if not import_mode.run_post_sql:
logger.info('Running with --update append: Skipping post-processing SQL')
msg = 'Running with --update append: Skipping post-processing SQL.'
msg += ' Running osm2pgsql_replication_finish() instead.'
logger.info(msg)
db.osm2pgsql_replication_finish(skip_nested=skip_nested)
return True

post_processing_sql = db.pgosm_after_import(flex_path)
post_processing_sql = db.pgosm_after_import(flex_path=flex_path)

if skip_nested:
logger.info('Skipping calculating nested polygons')
Expand Down
18 changes: 9 additions & 9 deletions docker/tests/test_geofabrik.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
REGION_US = 'north-america/us'
SUBREGION_DC = 'district-of-columbia'
LAYERSET = 'default'
PGOSM_DATE = '2021-12-02'
PGOSM_DATE = '2021-01-13'

PBF_FILE_WITH_DATE = f'/app/tests/data/district-of-columbia-{PGOSM_DATE}.osm.pbf'
MD5_FILE_WITH_DATE = f'/app/tests/data/district-of-columbia-{PGOSM_DATE}.osm.pbf.md5'


class GeofabrikTests(unittest.TestCase):
Expand All @@ -19,8 +22,8 @@ def setUp(self):
pgosm_date=PGOSM_DATE,
layerset=LAYERSET,
layerset_path=None,
replication=False,
schema_name='osm')
schema_name='osm',
skip_nested=True)


def tearDown(self):
Expand All @@ -34,15 +37,15 @@ def test_get_region_filename_returns_subregion_when_exists(self):
def test_get_region_filename_returns_region_when_subregion_None(self):
# Override Subregion to None
helpers.unset_env_vars()
helpers.set_env_vars(region='north-america/us',
helpers.set_env_vars(region=REGION_US,
subregion=None,
srid=3857,
language=None,
pgosm_date=PGOSM_DATE,
layerset=LAYERSET,
layerset_path=None,
replication=False,
schema_name='osm')
schema_name='osm',
skip_nested=True)

result = geofabrik.get_region_filename()
expected = f'{REGION_US}-latest.osm.pbf'
Expand All @@ -64,7 +67,6 @@ def test_get_pbf_url_returns_proper_with_region_and_subregion(self):

def test_pbf_download_needed_returns_boolean(self):
pgosm_date = geofabrik.helpers.get_today()
region_filename = geofabrik.get_region_filename()
expected = bool
result = geofabrik.pbf_download_needed(pbf_file_with_date='does-not-matter',
md5_file_with_date='not-a-file',
Expand All @@ -73,11 +75,9 @@ def test_pbf_download_needed_returns_boolean(self):

def test_pbf_download_needed_returns_true_when_file_not_exists(self):
pgosm_date = geofabrik.helpers.get_today()
region_filename = geofabrik.get_region_filename()
expected = True
result = geofabrik.pbf_download_needed(pbf_file_with_date='does-not-matter',
md5_file_with_date='not-a-file',
pgosm_date=pgosm_date)
self.assertEqual(expected, result)


Loading

0 comments on commit ede13a4

Please sign in to comment.