Skip to content

Commit

Permalink
removing flask, fixing print statements
Browse files Browse the repository at this point in the history
Signed-off-by: Fernando Rocha <[email protected]>
  • Loading branch information
rochabr committed Feb 5, 2025
1 parent 68165ac commit 519713b
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 86 deletions.
29 changes: 27 additions & 2 deletions jobs/python/http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ This section shows how to run both applications at once using [multi-app run tem

Open a new terminal window and run the multi app run template:

<!-- STEP
name: Run multi app run template
expected_stdout_lines:
- '== APP - job-service == Received job request...'
- '== APP - job-service == Executing maintenance job: Oil Change'
- '== APP - job-scheduler == Job Scheduled: C-3PO'
- '== APP - job-service == Received job request...'
- '== APP - job-service == Executing maintenance job: Limb Calibration'
expected_stderr_lines:
output_match_mode: substring
match_order: none
background: false
sleep: 60
timeout_seconds: 120
-->

```bash
dapr run -f .
```
Expand All @@ -46,7 +62,7 @@ The terminal console output should look similar to this, where:
== APP - job-service == Starting droid: R2-D2
== APP - job-service == Executing maintenance job: Oil Change
== APP - job-scheduler == Job Scheduled: C-3PO
== APP - job-scheduler == Job details: {"name":"C-3PO", "dueTime":"30s", "data":{"@type":"type.googleapis.com/google.protobuf.StringValue", "expression":"C-3PO:Limb Calibration"}}
== APP - job-scheduler == Job details: {"name":"C-3PO", "dueTime":"30s", "data":{"@type":"ttype.googleapis.com/google.protobuf.StringValue", "expression":"C-3PO:Limb Calibration"}}
```

After 30 seconds, the terminal output should present the `C-3PO` job being processed:
Expand All @@ -57,12 +73,21 @@ After 30 seconds, the terminal output should present the `C-3PO` job being proce
== APP - job-service == Executing maintenance job: Limb Calibration
```

To stop and clean up application processes:
<!-- END_STEP -->

2. Stop and clean up application processes

<!-- STEP
name: Stop multi-app run
sleep: 5
-->

```bash
dapr stop -f .
```

<!-- END_STEP -->

## Run apps individually

### Start the job service
Expand Down
95 changes: 45 additions & 50 deletions jobs/python/http/job-scheduler/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,74 +4,69 @@
import json

C3PO_JOB_BODY = {
"data": {
"Value": "C-3PO:Limb Calibration"
},
"dueTime": "30s"
"data": {"@type": "type.googleapis.com/google.protobuf.StringValue", "value": "C-3PO:Limb Calibration"},
"dueTime": "10s",
}

R2D2_JOB_BODY = {
"data": {
"Value": "R2-D2:Oil Change"
},
"data": {"@type": "type.googleapis.com/google.protobuf.StringValue", "value": "R2-D2:Oil Change"},
"dueTime": "2s"
}

def schedule_job(host: str, port: str, job_name: str, job_body: dict) -> None:
req_url = f"{host}:{port}/v1.0-alpha1/jobs/{job_name}"

try:
response = requests.post(
req_url,
json=job_body,
headers={"Content-Type": "application/json"},
timeout=15
)

# Accept both 200 and 204 as success codes
if response.status_code not in [200, 204]:
raise Exception(f"Failed to schedule job. Status code: {response.status_code}, Response: {response.text}")

print(f"Job Scheduled: {job_name}")
if response.text:
print(f"Response: {response.text}")

except requests.exceptions.RequestException as e:
print(f"Error scheduling job {job_name}: {str(e)}")
raise

def get_job_details(host: str, port: str, job_name: str) -> None:
req_url = f"{host}:{port}/v1.0-alpha1/jobs/{job_name}"

try:
response = requests.get(req_url, timeout=15)
if response.status_code in [200, 204]:
print(f"Job details for {job_name}: {response.text}")
else:
print(f"Failed to get job details. Status code: {response.status_code}, Response: {response.text}")

except requests.exceptions.RequestException as e:
print(f"Error getting job details for {job_name}: {str(e)}")
raise

def main():
# Sleep for 5 seconds to wait for job-service to start
# Wait for services to be ready
time.sleep(5)

dapr_host = os.getenv('DAPR_HOST', 'http://localhost')
scheduler_dapr_http_port = "6280"
scheduler_dapr_http_port = os.getenv('SCHEDULER_DAPR_HTTP_PORT', '6280')

# Schedule R2-D2 job
job_name = "R2-D2"
req_url = f"{dapr_host}:{scheduler_dapr_http_port}/v1.0-alpha1/jobs/{job_name}"

response = requests.post(
req_url,
json=R2D2_JOB_BODY,
headers={"Content-Type": "application/json"},
timeout=15
)

if response.status_code != 204:
raise Exception(f"Failed to register job event handler. Status code: {response.status_code}")

print(f"Job Scheduled: {job_name}")

schedule_job(dapr_host, scheduler_dapr_http_port, "R2-D2", R2D2_JOB_BODY)
time.sleep(5)

# Schedule C-3PO job
job_name = "C-3PO"
req_url = f"{dapr_host}:{scheduler_dapr_http_port}/v1.0-alpha1/jobs/{job_name}"

headers = {"Content-Type": "application/json; charset=utf-8"}

response = requests.post(
req_url,
json=C3PO_JOB_BODY,
headers=headers
)

if response.status_code != 204:
raise Exception(f"Failed to register job event handler. Status code: {response.status_code}")

print("Response json: ", response.text)
print(f"Job Scheduled: {job_name}")

schedule_job(dapr_host, scheduler_dapr_http_port, "C-3PO", C3PO_JOB_BODY)
time.sleep(5)

# Get C-3PO job details
job_name = "C-3PO"
req_url = f"{dapr_host}:{scheduler_dapr_http_port}/v1.0-alpha1/jobs/{job_name}"

response = requests.get(req_url)
if response.status_code == 200:
print(f"Get job details: {response.text}")
else:
print(f"Failed to get job details. Status code: {response.status_code}")

get_job_details(dapr_host, scheduler_dapr_http_port, "C-3PO")
time.sleep(5)

if __name__ == "__main__":
Expand Down
87 changes: 54 additions & 33 deletions jobs/python/http/job-service/app.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import os
from flask import Flask, request, jsonify
import json
import traceback

app = Flask(__name__)
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs

class DroidJob:
def __init__(self, droid: str, task: str):
Expand All @@ -16,36 +15,58 @@ def set_droid_job(decoded_value: str) -> DroidJob:
droid_array = droid_str.split(':')
return DroidJob(droid_array[0], droid_array[1])

@app.route('/job/<job>', methods=['POST'])
def handle_job(job):
print(f"Received job request...")

try:
print("Raw request data:", request.get_data().decode('utf-8'))
job_data = request.get_json()
print("Parsed job data:", json.dumps(job_data, indent=2))

if not job_data:
return "Error reading request body", 400

# In Dapr, the job data comes in a special "data" field within the request
value = job_data.get('data', {}).get('value', '')
print(f"Extracted value: {value}")

# Create DroidJob from value
droid_job = set_droid_job(value)

print(f"Starting droid: {droid_job.droid}")
print(f"Executing maintenance job: {droid_job.task}")

return "", 200
class JobHandler(BaseHTTPRequestHandler):
def _send_response(self, status_code: int, message: str = ""):
self.send_response(status_code)
self.send_header('Content-type', 'application/json')
self.end_headers()
if message:
self.wfile.write(json.dumps({"message": message}).encode('utf-8'))

def do_POST(self):
print('Received job request...', flush=True)

except Exception as e:
print(f"Error processing job request:")
print(traceback.format_exc())
return f"Error processing job: {str(e)}", 400
try:
# Check if path starts with /job/
if not self.path.startswith('/job/'):
self._send_response(404, "Not Found")
return

# Read request body
content_length = int(self.headers.get('Content-Length', 0))
raw_data = self.rfile.read(content_length).decode('utf-8')

# Parse outer JSON data
outer_data = json.loads(raw_data)

# The payload might be double-encoded, so try parsing again if it's a string
if isinstance(outer_data, str):
job_data = json.loads(outer_data)
else:
job_data = outer_data

# Extract value directly from the job data
value = job_data.get('value', '')

# Create DroidJob from value
droid_job = set_droid_job(value)

print("Starting droid: " + droid_job.droid, flush=True)
print("Executing maintenance job: " + droid_job.task, flush=True)

self._send_response(200)

except Exception as e:
print("Error processing job request:", flush= True)
print(traceback.format_exc())
self._send_response(400, f"Error processing job: {str(e)}")

def run_server(port: int):
server_address = ('', port)
httpd = HTTPServer(server_address, JobHandler)
print("Server started on port " + str(port), flush=True)
httpd.serve_forever()

if __name__ == '__main__':
app_port = os.getenv('APP_PORT', '6200')
print(f"Server started on port {app_port}")
app.run(host='0.0.0.0', port=int(app_port))
app_port = int(os.getenv('APP_PORT', '6200'))
run_server(app_port)
2 changes: 2 additions & 0 deletions jobs/python/http/makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
include ../../../docker.mk
include ../../../validate.mk
1 change: 0 additions & 1 deletion jobs/python/http/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
flask==3.0.0
requests==2.31.0

0 comments on commit 519713b

Please sign in to comment.