Skip to content

Commit

Permalink
worker/game_process: make worker connection init more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
irgolic committed Jan 3, 2025
1 parent 12bf9d2 commit 9a063b8
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 55 deletions.
4 changes: 3 additions & 1 deletion dweam/game_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ async def main():

log.info("Parsed args", game_type=game_type, game_id=game_id, port=port)

log.info("Attempting to connect to parent", host='127.0.0.1', port=port)

try:
# Connect to parent process
reader, writer = await asyncio.open_connection(
Expand All @@ -168,7 +170,7 @@ async def main():
)
log.info("Connected to parent")
except Exception as e:
log.error("Failed to connect to parent", error=str(e))
log.exception("Failed to connect to parent")
raise

# Load the game implementation
Expand Down
118 changes: 64 additions & 54 deletions dweam/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,61 @@ async def _collect_process_output(self, process: Process) -> tuple[str | None, s

return stdout_str, stderr_str

async def attempt_connection(self, server, client_connected):
base_timeout = 5.0
max_retries = 3

for attempt in range(max_retries):
timeout = base_timeout * (2 ** attempt)
self.log.info("Attempting connection", attempt=attempt + 1, timeout=timeout)

async with server:
# Server is only needed for initial connection acceptance
# Once reader/writer are established, the connection remains
# active even after the server is closed
server_task = asyncio.create_task(server.serve_forever())

try:
# Wait for either client connection or process termination
done, pending = await asyncio.wait([
asyncio.create_task(client_connected.wait()),
asyncio.create_task(self.process.wait())
], timeout=timeout, return_when=asyncio.FIRST_COMPLETED)

# If process completed first, it means it crashed
if self.process.returncode is not None:
stdout_str, stderr_str = await self._collect_process_output(self.process)
self.log.error(
"Process terminated before connection",
returncode=self.process.returncode,
stderr=stderr_str,
stdout=stdout_str,
pid=self.process.pid
)
raise RuntimeError(f"Process terminated with return code {self.process.returncode}")

# If we get here and nothing completed, it was a timeout
if not done:
self.log.warning("Connection attempt timed out",
pid=self.process.pid,
is_running=self.process.returncode is None)
if attempt == max_retries - 1:
self.log.error("Final connection attempt timed out",
pid=self.process.pid,
is_running=self.process.returncode is None)
raise asyncio.TimeoutError("Worker process failed to connect")
continue # Try next attempt

return # Successfully connected

finally:
# Cancel server task
server_task.cancel()
try:
await server_task
except asyncio.CancelledError:
pass

async def start(self):
"""Start the worker process and establish communication"""
if sys.platform == "win32":
Expand Down Expand Up @@ -201,60 +256,15 @@ async def handle_client(reader, writer):

# Try to connect with timeout
try:
# Start serving (but don't block)
async with server:
self.log.debug("Starting server")
server_task = asyncio.create_task(server.serve_forever())

try:
# Wait for either client connection or process termination
done, pending = await asyncio.wait([
asyncio.create_task(client_connected.wait()),
asyncio.create_task(self.process.wait())
], timeout=5, return_when=asyncio.FIRST_COMPLETED)

# If process completed first, it means it crashed
if self.process.returncode is not None:
stdout_str, stderr_str = await self._collect_process_output(self.process)
self.log.error(
"Process terminated before connection",
returncode=self.process.returncode,
stderr=stderr_str,
stdout=stdout_str
)
raise RuntimeError(f"Process terminated with return code {self.process.returncode}")

# If we get here and nothing completed, it was a timeout
if not done:
raise asyncio.TimeoutError("Worker process failed to connect")
except asyncio.TimeoutError:
# If timeout occurs, collect stderr/stdout before raising
stdout_str, stderr_str = await self._collect_process_output(self.process)

self.log.error(
"Worker failed to connect",
returncode=self.process.returncode,
stdout=stdout_str,
stderr=stderr_str,
)
raise TimeoutError("Worker process failed to connect")
finally:
# Cancel server task
server_task.cancel()
try:
await server_task
except asyncio.CancelledError:
pass

if not self.reader or not self.writer:
raise RuntimeError("No connection received")

# Only start monitoring after successful connection
asyncio.create_task(self._monitor_process_output(self.process.stdout, "stdout"))
asyncio.create_task(self._monitor_process_output(self.process.stderr, "stderr"))

self.log.info("Client connected")

await self.attempt_connection(server, client_connected)
if not self.reader or not self.writer:
raise RuntimeError("No connection received")

# Only start monitoring after successful connection
asyncio.create_task(self._monitor_process_output(self.process.stdout, "stdout"))
asyncio.create_task(self._monitor_process_output(self.process.stderr, "stderr"))

self.log.info("Client connected")
return # Success!
except Exception:
self.log.exception("Error during connection")
Expand Down

0 comments on commit 9a063b8

Please sign in to comment.