Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
aorwall committed Jan 17, 2025
1 parent 1650462 commit b43e481
Show file tree
Hide file tree
Showing 12 changed files with 326 additions and 287 deletions.
363 changes: 175 additions & 188 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "moatless-testbeds"
version = "0.0.9"
version = "0.0.11"
description = "Run testbeds as isolated pods in a Kubernetes cluster"
authors = ["Albert Örwall <[email protected]>"]
repository = "https://github.com/aorwall/moatless-testbeds"
Expand Down
1 change: 0 additions & 1 deletion scripts/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import sys
import argparse

from dotenv import load_dotenv
from testbeds.sdk import TestbedSDK

logging.basicConfig(
Expand Down
28 changes: 25 additions & 3 deletions testbeds/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

import requests
from kubernetes import client
from werkzeug.exceptions import HTTPException, BadRequest, BadGateway

from testbeds.exceptions import TestbedBadRequestError
from testbeds.schema import (
RunCommandsRequest,
CommandExecutionResponse,
Expand Down Expand Up @@ -214,15 +216,35 @@ def execute_async(self, commands: list[str] | str) -> CommandExecutionResponse:

def get_execution_status(self) -> CommandExecutionResponse:
try:
if not self.base_url:
raise BadGateway(
description=f"No base URL configured for testbed {self.testbed_id}"
)

response = requests.get(f"{self.base_url}/exec")
response.raise_for_status()
response = CommandExecutionResponse.model_validate(response.json())
if response.status == "completed":
logger.info(f"Command execution completed in testbed {self.testbed_id}")
return response
except requests.RequestException as e:

except requests.exceptions.ConnectionError as e:
logger.warning(f"Connection error to testbed {self.testbed_id}. Error: {str(e)}")
status = self._read_testbed_status_detailed(self.testbed_id)
if status:
raise BadGateway(
description=f"Connection refused to testbed {self.testbed_id}. Status: {status.model_dump_json(indent=2)}"
)
else:
raise BadGateway(
description=f"Connection refused to testbed {self.testbed_id}. Unable to get current status."
)

except requests.exceptions.RequestException as e:
logger.error(f"Error during get_execution_status: {str(e)}")
raise e
raise BadGateway(
description=f"Request failed for testbed {self.testbed_id}: {str(e)}"
)

def get_diff(self) -> str:
"""Get the current git diff output."""
Expand Down Expand Up @@ -261,7 +283,7 @@ def apply_patch(self, patch: str) -> str:

if APPLY_PATCH_FAIL in response.output:
logger.error(f"Failed to apply patch: {patch}.\n\nOutput\n:{response.output}")
raise Exception(f"Failed to apply patch: {patch}.\n\nOutput\n:{response.output}")
raise BadRequest(description=f"Failed to apply patch: {patch}.\n\nOutput\n:{response.output}")

diff = self.get_diff()
logger.debug(f"Diff after patch: \n{diff}")
Expand Down
6 changes: 2 additions & 4 deletions testbeds/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import uuid
import signal
from functools import wraps
import sys

from flask import Flask, request, jsonify
from opentelemetry.sdk.trace import TracerProvider
Expand Down Expand Up @@ -83,14 +84,11 @@ def create_app():

@app.before_request
def setup_timeout():
# Setup the timeout handler
signal.signal(signal.SIGALRM, timeout_handler)
# Set the alarm
signal.alarm(30) # 30 seconds timeout
signal.alarm(30)

@app.after_request
def clear_timeout(response):
# Clear the alarm
signal.alarm(0)
return response

Expand Down
4 changes: 4 additions & 0 deletions testbeds/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@

class TestbedNotFoundError(Exception):
pass

class TestbedBadRequestError(Exception):
pass

4 changes: 2 additions & 2 deletions testbeds/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ class EvaluationResult(BaseModel):
patch_applied: bool = Field(
default=False, description="Whether the patch was successfully applied"
)
resolved: bool = Field(
default=False, description="Whether the problem was resolved"
resolved: Optional[bool] = Field(
default=None, description="Whether the problem was resolved"
)
tests_status: TestsStatus = Field(
default_factory=TestsStatus, description="Status of all tests"
Expand Down
101 changes: 63 additions & 38 deletions testbeds/sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from testbeds.schema import (
EvaluationResult,
CommandExecutionResponse,
TestRunResponse, SWEbenchInstance,
TestRunResponse, SWEbenchInstance, TestStatus,
)
from testbeds.swebench.constants import ResolvedStatus, APPLY_PATCH_FAIL, RUN_TESTS
from testbeds.swebench.log_parsers import parse_log
Expand All @@ -36,7 +36,7 @@ def __init__(
testbed_id: str,
instance_id: str | None = None,
instance: SWEbenchInstance | None = None,
dataset_name: str = "princeton-nlp/SWE-bench_Lite",
dataset_name: str | None = None,
run_id: str = "default",
base_url: str | None = None,
api_key: str | None = None,
Expand Down Expand Up @@ -94,13 +94,13 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def check_health(self, timeout: int = 30):
"""Check testbed health status."""
url = f"{self.base_url}/testbeds/{self.testbed_id}/health"
headers = self._generate_headers()

response = requests.get(url, headers=headers, timeout=timeout)
response.raise_for_status()

return response.status_code == 200 and response.json().get("status") == "OK"
try:
response = self._request("GET", "health", operation_timeout=timeout)
return response.get("status") == "OK"
except TestbedError as e:
if hasattr(e, 'response') and e.response.status_code in [503, 504]:
return False
raise

def _generate_traceparent(self):
return f"00-{self.trace_id}-{self.current_span_id or uuid.uuid4().hex[:16]}-01"
Expand Down Expand Up @@ -206,7 +206,10 @@ def wait_until_ready(self, timeout: float = 600):
logger.warning(f"Testbed {self.testbed_id} not ready yet, will retry (Tried for {wait_time} seconds)")
time.sleep(1)
except RequestException as e:
if e.response.status_code in [503, 504]:
if isinstance(e, requests.exceptions.ConnectionError):
logger.warning(f"Failed to connect to testbed {self.testbed_id}, will retry... (Tried for {wait_time} seconds)")
time.sleep(1)
elif e.response and e.response.status_code in [503, 504]:
logger.warning(f"Got response {e.response.status_code} indicating that the testbed {self.testbed_id} might not be ready yet, will retry... (Tried for {wait_time} seconds)")
time.sleep(1)
else:
Expand Down Expand Up @@ -271,7 +274,7 @@ def _generate_cache_key(self, test_files: list[str] | None, patch: str | None) -
return hashlib.sha256(combined.encode()).hexdigest()

def run_tests(
self, test_files: list[str] | None = None, patch: str | None = None
self, test_files: list[str] | None = None, patch: str | None = None, timeout: int = 1200
) -> TestRunResponse:
logger.debug(f"Executing run_tests with test_files={test_files} and patch={patch}")

Expand All @@ -285,50 +288,67 @@ def run_tests(
if patch:
self.apply_patch(patch)

data = {"test_files": test_files} if test_files else {}
self._request("POST", "run-tests", json=data)
response = self.get_execution_status()

start_time = time.time()
while response.status == "running":
if time.time() - start_time > 1200:
raise TimeoutError("Execution timed out after 1200 seconds")
sleep(0.1)
test_results = []
for test_file in test_files:
data = {"test_files": [test_file]}
self._request("POST", "run-tests", json=data)
response = self.get_execution_status()

if self.log_dir:
datetime_str = time.strftime("%Y%m%d-%H%M%S")
with open(f"{self.log_dir}/{datetime_str}_run_tests.log", "a") as f:
f.write(f"Response:\n{response.output}\n")
start_time = time.time()
while response.status == "running":
if time.time() - start_time > timeout:
raise TimeoutError(f"Execution timed out after {timeout} seconds")
sleep(1.0)
response = self.get_execution_status()

if self.log_dir:
datetime_str = time.strftime("%Y%m%d-%H%M%S")
with open(f"{self.log_dir}/{datetime_str}_run_tests.log", "a") as f:
f.write(f"Response:\n{response.output}\n")

log = response.output.split(f"{RUN_TESTS}\n")[-1]
test_result = parse_log(log, self.test_spec.repo)
log = response.output.split(f"{RUN_TESTS}\n")[-1]
test_result = parse_log(log, self.test_spec.repo)

if len(test_result) == 1 and test_result[0].status == TestStatus.ERROR:
test_result[0].file_path = test_file

filtered_test_result = []
for test in test_result:
if test.file_path != test_file:
logger.info(f"Skipping test {test.method} in {test.file_path}. Expected {test_file}")
else:
filtered_test_result.append(test)

test_results.extend(filtered_test_result)
logger.info(f"Finished running {test_file} tests. Got {len(filtered_test_result)} test results.")

filtered_test_result = []

statuses = {}
tests_by_file = {}

ignored_tests = 0
for test in test_result:
for test in test_results:
if test.method in self.ignored_tests.get(test.file_path, []):
ignored_tests += 1
continue

filtered_test_result.append(test)

if test.status not in statuses:
statuses[test.status] = 0

statuses[test.status] += 1
# Track tests by file
if test.file_path not in tests_by_file:
tests_by_file[test.file_path] = {"count": 0, "statuses": {}}

tests_by_file[test.file_path]["count"] += 1
if test.status not in tests_by_file[test.file_path]["statuses"]:
tests_by_file[test.file_path]["statuses"][test.status] = 0
tests_by_file[test.file_path]["statuses"][test.status] += 1

if ignored_tests:
logger.info(
f"Did run {len(test_result)} tests, ignored {ignored_tests} tests. {statuses}"
)
else:
logger.info(f"Did run {len(test_result)} tests. {statuses}")
summary = [f"{file_path}: {stats['count']} tests. {stats['statuses']}"
for file_path, stats in tests_by_file.items()]
logger.info(f"Test summary by file: {' | '.join(summary)}")

result = TestRunResponse(test_results=filtered_test_result, output=response.output)
result = TestRunResponse(test_results=filtered_test_result)

if self.test_cache is not None:
cache_key = self._generate_cache_key(test_files, patch)
Expand Down Expand Up @@ -373,6 +393,11 @@ def run_evaluation(
response = self.get_execution_status()
sleep(1)

if self.log_dir:
datetime_str = time.strftime("%Y%m%d-%H%M%S")
with open(f"{self.log_dir}/{datetime_str}_run_tests.log", "a") as f:
f.write(f"Response:\n{response.output}\n")

if "APPLY_PATCH_FAIL" in response.output:
logger.error("Failed to apply patch")
return EvaluationResult(
Expand Down
4 changes: 3 additions & 1 deletion testbeds/sdk/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def get_or_create_testbed(
def create_client(self,
instance_id: str | None = None,
instance: dict | SWEbenchInstance | None = None,
dataset_name: str = "princeton-nlp/SWE-bench_Lite",
dataset_name: str | None = None,
log_dir: str = None,
run_id: str = "default") -> TestbedClient:
if not instance_id and not instance:
raise ValueError("Either instance_id or instance must be provided")
Expand All @@ -97,6 +98,7 @@ def create_client(self,
instance_id=instance_id,
instance=instance,
dataset_name=dataset_name,
log_dir=log_dir,
run_id=run_id,
base_url=self.base_url,
api_key=self.api_key,
Expand Down
30 changes: 28 additions & 2 deletions testbeds/swebench/log_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ def parse_log(log: str, repo: str) -> list[TestResult]:
traceback_result = parse_traceback(traceback)
if traceback_result:
test_results.append(traceback_result)
else:
logger.warning(f"No tests found in log: {log}")

# Skip testbed prefix in file paths
for result in test_results:
Expand Down Expand Up @@ -103,6 +105,7 @@ def parse_log_pytest(log: str) -> list[TestResult]:
(re.compile(r"ERROR at teardown of (.*) ___.*"), "teardown"),
(re.compile(r"ERROR (.*) ___.*"), "general"),
]
log = clean_log(log)

for line in log.split("\n"):
if "short test summary info" in line:
Expand Down Expand Up @@ -256,12 +259,20 @@ def parse_log_pytest(log: str) -> list[TestResult]:

# Add failure outputs to corresponding failed or error tests
for test in test_results:
if test.status in [TestStatus.PASSED, TestStatus.SKIPPED]:
continue

if test.method in failure_outputs:
test.failure_output = "\n".join(failure_outputs[test.method])

if test.name in failure_outputs:
elif test.name in failure_outputs:
test.failure_output = "\n".join(failure_outputs[test.name])

# Truncate long outputs with teardown capture
if test.failure_output and len(test.failure_output.splitlines()) > 25:
teardown_idx = test.failure_output.find("--------------------------- Captured stdout teardown ---------------------------")
if teardown_idx != -1:
test.failure_output = test.failure_output[:teardown_idx].rstrip()

return test_results


Expand Down Expand Up @@ -686,6 +697,21 @@ def parse_log_matplotlib(log: str) -> list[TestResult]:
return test_results


def clean_log(log: str) -> str:
"""Remove ANSI color codes and escape sequences from log output"""
# Remove ANSI color codes like [0m[37m etc
log = re.sub(r'\x1b\[[0-9;]*[a-zA-Z]', '', log)

# Remove specific problematic control characters while preserving important ones
# \x0b: vertical tab
# \x0c: form feed
# \x1c-\x1f: file separators, group separators etc
control_chars = '\x0b\x0c\x1c\x1d\x1e\x1f'
log = log.translate(str.maketrans("", "", control_chars))

return log


MAP_REPO_TO_PARSER = {
"astropy/astropy": parse_log_pytest,
"django/django": parse_log_django,
Expand Down
Loading

0 comments on commit b43e481

Please sign in to comment.