Skip to content

Commit

Permalink
use futures for python 2.7 compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
elmiomar committed Jun 25, 2024
1 parent 30c6a3e commit 00eeaf9
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
42 changes: 33 additions & 9 deletions python/nistoar/pdr/preserv/archive/scripts/poll_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
import subprocess
import sys
import argparse
from concurrent.futures import ThreadPoolExecutor

# To ensure Python 2.7 compatibility
try:
from concurrent.futures import ThreadPoolExecutor
except ImportError:
from futures import ThreadPoolExecutor

# This is to be able to import modules from the archive package
sys.path.append(os.path.join(os.path.dirname(__file__), "..", ".."))
Expand All @@ -18,9 +23,13 @@ def init_logging():
"""
Initialize logging. Logs are written to `logs/sqs_messages.log`.
"""
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)

logging.basicConfig(
level=logging.INFO,
filename="logs/sqs_messages.log",
filename=os.path.join(log_dir, "sqs_messages.log"),
format="%(asctime)s - %(levelname)s - PID: %(process)d - %(message)s",
)

Expand Down Expand Up @@ -64,7 +73,7 @@ def load_config(source="variable", filename=None, url=None):
elif source == "file":
if filename is None:
raise ValueError("A filename must be provided.")
logging.info(f"Loading configuration from file: {filename}")
logging.info("Loading configuration from file: {}".format(filename))
try:
with open(filename, "r") as file:
return json.load(file)
Expand All @@ -77,15 +86,15 @@ def load_config(source="variable", filename=None, url=None):
elif source == "http":
if url is None:
raise ValueError("A URL must be provided.")
logging.info(f"Loading configuration from URL: {url}")
logging.info("Loading configuration from URL: {}".format(url))
try:
import requests

response = requests.get(url)
response.raise_for_status() # This will raise an HTTPError for bad response
return response.json()
except requests.RequestException as e:
logging.error(f"Failed to load configuration from URL: {e}")
logging.error("Failed to load configuration from URL: {}".format(e))
raise
else:
raise ValueError("Unknown configuration source specified.")
Expand All @@ -107,11 +116,22 @@ def handle_message(message_content):
script_path = os.path.join(script_dir, "process_message.py")
message_data = json.dumps(message_content)

command = ["python", script_path, "--aipid={}".format(message_content.get("aipid"))]
# Command including the aipid argument.
# The aipid argument here serves as a placeholder only for monitoring purposes.
command = ["python", script_path, "--aipid={}".format(message_content["aipid"])]

# Launch process as a system callx
try:
subprocess.run(command, input=message_data, text=True, check=True)
logging.info("Subprocess executed successfully.")
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = process.communicate(input=message_data.encode())
logging.info("Subprocess executed successfully. Output: {}".format(stdout))
if stderr:
logging.error("Subprocess error: {}".format(stderr))
except subprocess.CalledProcessError as e:
logging.error("Subprocess failed with {}, error: {}".format(e.returncode, e))

Expand All @@ -122,6 +142,10 @@ def poll_messages(client):
"""
logging.info("Polling for messages...")
response = client.receive_completion_message()
if response is None:
logging.info("No messages received.")
return

messages = response.get("Messages", [])
if messages:
logging.info("Received {} messages.".format(len(messages)))
Expand Down Expand Up @@ -164,7 +188,7 @@ def main():
schema_path = config["schema_file"]

if not os.path.isabs(schema_path):
schema_path = os.path.join(root_dir, schema_path)
schema_path = os.path.join(root_dir, "schema", schema_path)
logging.info("Using schema file at: {}".format(schema_path))
validator = JSONSchemaValidator(schema_path)
client = SQSArchiveClient(config, validator)
Expand Down
1 change: 1 addition & 0 deletions python/nistoar/pdr/preserv/archive/scripts/process_message.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/usr/bin/python
import sys
import json
import logging
Expand Down

0 comments on commit 00eeaf9

Please sign in to comment.