diff --git a/jobs/python/http/README.md b/jobs/python/http/README.md index aae99f489..8922da699 100644 --- a/jobs/python/http/README.md +++ b/jobs/python/http/README.md @@ -29,6 +29,22 @@ This section shows how to run both applications at once using [multi-app run tem Open a new terminal window and run the multi app run template: + + ```bash dapr run -f . ``` @@ -46,7 +62,7 @@ The terminal console output should look similar to this, where: == APP - job-service == Starting droid: R2-D2 == APP - job-service == Executing maintenance job: Oil Change == APP - job-scheduler == Job Scheduled: C-3PO -== APP - job-scheduler == Job details: {"name":"C-3PO", "dueTime":"30s", "data":{"@type":"type.googleapis.com/google.protobuf.StringValue", "expression":"C-3PO:Limb Calibration"}} +== APP - job-scheduler == Job details: {"name":"C-3PO", "dueTime":"30s", "data":{"@type":"ttype.googleapis.com/google.protobuf.StringValue", "expression":"C-3PO:Limb Calibration"}} ``` After 30 seconds, the terminal output should present the `C-3PO` job being processed: @@ -57,12 +73,21 @@ After 30 seconds, the terminal output should present the `C-3PO` job being proce == APP - job-service == Executing maintenance job: Limb Calibration ``` -To stop and clean up application processes: + + +2. Stop and clean up application processes + + ```bash dapr stop -f . ``` + + ## Run apps individually ### Start the job service diff --git a/jobs/python/http/job-scheduler/app.py b/jobs/python/http/job-scheduler/app.py index b0b6a6c22..26c2e31b6 100644 --- a/jobs/python/http/job-scheduler/app.py +++ b/jobs/python/http/job-scheduler/app.py @@ -4,74 +4,69 @@ import json C3PO_JOB_BODY = { - "data": { - "Value": "C-3PO:Limb Calibration" - }, - "dueTime": "30s" + "data": {"@type": "type.googleapis.com/google.protobuf.StringValue", "value": "C-3PO:Limb Calibration"}, + "dueTime": "10s", } R2D2_JOB_BODY = { - "data": { - "Value": "R2-D2:Oil Change" - }, + "data": {"@type": "type.googleapis.com/google.protobuf.StringValue", "value": "R2-D2:Oil Change"}, "dueTime": "2s" } +def schedule_job(host: str, port: str, job_name: str, job_body: dict) -> None: + req_url = f"{host}:{port}/v1.0-alpha1/jobs/{job_name}" + + try: + response = requests.post( + req_url, + json=job_body, + headers={"Content-Type": "application/json"}, + timeout=15 + ) + + # Accept both 200 and 204 as success codes + if response.status_code not in [200, 204]: + raise Exception(f"Failed to schedule job. Status code: {response.status_code}, Response: {response.text}") + + print(f"Job Scheduled: {job_name}") + if response.text: + print(f"Response: {response.text}") + + except requests.exceptions.RequestException as e: + print(f"Error scheduling job {job_name}: {str(e)}") + raise + +def get_job_details(host: str, port: str, job_name: str) -> None: + req_url = f"{host}:{port}/v1.0-alpha1/jobs/{job_name}" + + try: + response = requests.get(req_url, timeout=15) + if response.status_code in [200, 204]: + print(f"Job details for {job_name}: {response.text}") + else: + print(f"Failed to get job details. Status code: {response.status_code}, Response: {response.text}") + + except requests.exceptions.RequestException as e: + print(f"Error getting job details for {job_name}: {str(e)}") + raise + def main(): - # Sleep for 5 seconds to wait for job-service to start + # Wait for services to be ready time.sleep(5) dapr_host = os.getenv('DAPR_HOST', 'http://localhost') - scheduler_dapr_http_port = "6280" + scheduler_dapr_http_port = os.getenv('SCHEDULER_DAPR_HTTP_PORT', '6280') # Schedule R2-D2 job - job_name = "R2-D2" - req_url = f"{dapr_host}:{scheduler_dapr_http_port}/v1.0-alpha1/jobs/{job_name}" - - response = requests.post( - req_url, - json=R2D2_JOB_BODY, - headers={"Content-Type": "application/json"}, - timeout=15 - ) - - if response.status_code != 204: - raise Exception(f"Failed to register job event handler. Status code: {response.status_code}") - - print(f"Job Scheduled: {job_name}") - + schedule_job(dapr_host, scheduler_dapr_http_port, "R2-D2", R2D2_JOB_BODY) time.sleep(5) # Schedule C-3PO job - job_name = "C-3PO" - req_url = f"{dapr_host}:{scheduler_dapr_http_port}/v1.0-alpha1/jobs/{job_name}" - - headers = {"Content-Type": "application/json; charset=utf-8"} - - response = requests.post( - req_url, - json=C3PO_JOB_BODY, - headers=headers - ) - - if response.status_code != 204: - raise Exception(f"Failed to register job event handler. Status code: {response.status_code}") - - print("Response json: ", response.text) - print(f"Job Scheduled: {job_name}") - + schedule_job(dapr_host, scheduler_dapr_http_port, "C-3PO", C3PO_JOB_BODY) time.sleep(5) # Get C-3PO job details - job_name = "C-3PO" - req_url = f"{dapr_host}:{scheduler_dapr_http_port}/v1.0-alpha1/jobs/{job_name}" - - response = requests.get(req_url) - if response.status_code == 200: - print(f"Get job details: {response.text}") - else: - print(f"Failed to get job details. Status code: {response.status_code}") - + get_job_details(dapr_host, scheduler_dapr_http_port, "C-3PO") time.sleep(5) if __name__ == "__main__": diff --git a/jobs/python/http/job-service/app.py b/jobs/python/http/job-service/app.py index 3e8b95b10..1e15721a4 100644 --- a/jobs/python/http/job-service/app.py +++ b/jobs/python/http/job-service/app.py @@ -1,9 +1,8 @@ import os -from flask import Flask, request, jsonify import json import traceback - -app = Flask(__name__) +from http.server import HTTPServer, BaseHTTPRequestHandler +from urllib.parse import urlparse, parse_qs class DroidJob: def __init__(self, droid: str, task: str): @@ -16,36 +15,58 @@ def set_droid_job(decoded_value: str) -> DroidJob: droid_array = droid_str.split(':') return DroidJob(droid_array[0], droid_array[1]) -@app.route('/job/', methods=['POST']) -def handle_job(job): - print(f"Received job request...") - - try: - print("Raw request data:", request.get_data().decode('utf-8')) - job_data = request.get_json() - print("Parsed job data:", json.dumps(job_data, indent=2)) - - if not job_data: - return "Error reading request body", 400 - - # In Dapr, the job data comes in a special "data" field within the request - value = job_data.get('data', {}).get('value', '') - print(f"Extracted value: {value}") - - # Create DroidJob from value - droid_job = set_droid_job(value) - - print(f"Starting droid: {droid_job.droid}") - print(f"Executing maintenance job: {droid_job.task}") - - return "", 200 +class JobHandler(BaseHTTPRequestHandler): + def _send_response(self, status_code: int, message: str = ""): + self.send_response(status_code) + self.send_header('Content-type', 'application/json') + self.end_headers() + if message: + self.wfile.write(json.dumps({"message": message}).encode('utf-8')) + + def do_POST(self): + print('Received job request...', flush=True) - except Exception as e: - print(f"Error processing job request:") - print(traceback.format_exc()) - return f"Error processing job: {str(e)}", 400 + try: + # Check if path starts with /job/ + if not self.path.startswith('/job/'): + self._send_response(404, "Not Found") + return + + # Read request body + content_length = int(self.headers.get('Content-Length', 0)) + raw_data = self.rfile.read(content_length).decode('utf-8') + + # Parse outer JSON data + outer_data = json.loads(raw_data) + + # The payload might be double-encoded, so try parsing again if it's a string + if isinstance(outer_data, str): + job_data = json.loads(outer_data) + else: + job_data = outer_data + + # Extract value directly from the job data + value = job_data.get('value', '') + + # Create DroidJob from value + droid_job = set_droid_job(value) + + print("Starting droid: " + droid_job.droid, flush=True) + print("Executing maintenance job: " + droid_job.task, flush=True) + + self._send_response(200) + + except Exception as e: + print("Error processing job request:", flush= True) + print(traceback.format_exc()) + self._send_response(400, f"Error processing job: {str(e)}") + +def run_server(port: int): + server_address = ('', port) + httpd = HTTPServer(server_address, JobHandler) + print("Server started on port " + str(port), flush=True) + httpd.serve_forever() if __name__ == '__main__': - app_port = os.getenv('APP_PORT', '6200') - print(f"Server started on port {app_port}") - app.run(host='0.0.0.0', port=int(app_port)) \ No newline at end of file + app_port = int(os.getenv('APP_PORT', '6200')) + run_server(app_port) \ No newline at end of file diff --git a/jobs/python/http/makefile b/jobs/python/http/makefile new file mode 100644 index 000000000..e7a8826bf --- /dev/null +++ b/jobs/python/http/makefile @@ -0,0 +1,2 @@ +include ../../../docker.mk +include ../../../validate.mk \ No newline at end of file diff --git a/jobs/python/http/requirements.txt b/jobs/python/http/requirements.txt index 84ade4817..077c95d8a 100644 --- a/jobs/python/http/requirements.txt +++ b/jobs/python/http/requirements.txt @@ -1,2 +1 @@ -flask==3.0.0 requests==2.31.0 \ No newline at end of file