From da166fc1124c5e31432e0ab288413f5e39cdcf06 Mon Sep 17 00:00:00 2001 From: Andy Tsao Date: Fri, 24 Jan 2025 15:32:04 +0800 Subject: [PATCH 1/2] feat: convert fetch_group_emails to async function. --- eaia/gmail.py | 32 +++++++++++++++++++++----------- scripts/run_ingest.py | 2 +- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/eaia/gmail.py b/eaia/gmail.py index b848658..4bf8b3a 100644 --- a/eaia/gmail.py +++ b/eaia/gmail.py @@ -1,7 +1,7 @@ import logging from datetime import datetime, timedelta, time from pathlib import Path -from typing import Iterable +from typing import AsyncIterable import pytz import os @@ -10,6 +10,7 @@ from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build +import asyncio import base64 from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText @@ -165,12 +166,12 @@ def send_email( send_message(service, "me", response_message) -def fetch_group_emails( +async def fetch_group_emails( to_email, minutes_since: int = 30, gmail_token: str | None = None, gmail_secret: str | None = None, -) -> Iterable[EmailData]: +) -> AsyncIterable[EmailData]: creds = get_credentials(gmail_token, gmail_secret) service = build("gmail", "v1", credentials=creds) @@ -181,11 +182,12 @@ def fetch_group_emails( nextPageToken = None # Fetch messages matching the query while True: - results = ( - service.users() - .messages() - .list(userId="me", q=query, pageToken=nextPageToken) - .execute() + loop = asyncio.get_running_loop() + results = await loop.run_in_executor( + None, + lambda: service.users().messages().list( + userId="me", q=query, pageToken=nextPageToken + ).execute() ) if "messages" in results: messages.extend(results["messages"]) @@ -196,14 +198,22 @@ def fetch_group_emails( count = 0 for message in messages: try: - msg = ( - service.users().messages().get(userId="me", id=message["id"]).execute() + msg = await loop.run_in_executor( + None, + lambda: service.users().messages().get( + userId="me", id=message["id"] + ).execute() ) thread_id = msg["threadId"] payload = msg["payload"] headers = payload.get("headers") # Get the thread details - thread = service.users().threads().get(userId="me", id=thread_id).execute() + thread = await loop.run_in_executor( + None, + lambda: service.users().threads().get( + userId="me", id=thread_id + ).execute() + ) messages_in_thread = thread["messages"] # Check the last message in the thread last_message = messages_in_thread[-1] diff --git a/scripts/run_ingest.py b/scripts/run_ingest.py index 83e826e..f15f7a7 100644 --- a/scripts/run_ingest.py +++ b/scripts/run_ingest.py @@ -30,7 +30,7 @@ async def main( ) # TODO: This really should be async - for email in fetch_group_emails( + async for email in fetch_group_emails( email_address, minutes_since=minutes_since, gmail_token=gmail_token, From 1a4ac1f9aed93acdb79519ccca21510e86c2474a Mon Sep 17 00:00:00 2001 From: Andy Tsao Date: Fri, 24 Jan 2025 16:22:06 +0800 Subject: [PATCH 2/2] remove TODO tag in run_ingest.py --- scripts/run_ingest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/scripts/run_ingest.py b/scripts/run_ingest.py index f15f7a7..93f9aa9 100644 --- a/scripts/run_ingest.py +++ b/scripts/run_ingest.py @@ -29,7 +29,6 @@ async def main( url=url ) - # TODO: This really should be async async for email in fetch_group_emails( email_address, minutes_since=minutes_since,