Skip to content

Commit

Permalink
fix: sqlite thread safety
Browse files Browse the repository at this point in the history
  • Loading branch information
ajskateboarder committed Jan 23, 2024
1 parent dd9ff49 commit 3cb4cd0
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 68 deletions.
103 changes: 41 additions & 62 deletions scripts/generate_reviews.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,93 +3,72 @@
from __future__ import annotations

import os
import sqlite3
import sys
from functools import partial
from threading import Lock
from uuid import uuid4
import logging

from sqlite3worker.sqlite3worker import Sqlite3Worker

from crawling import bestsellers_reviews
from crawling.dicts import Reviews

from wordsmyth import rate

lock = Lock()

logging.basicConfig(
format="[%(levelname)s] %(asctime)s: %(message)s",
datefmt="%d-%b-%y %H:%M:%S",
level=logging.DEBUG,
# filename="something.log",
)
logging.getLogger("selenium").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)


class LockedSqliteConnection:
"""https://stackoverflow.com/a/41206801"""

def __init__(self, dburi: str) -> None:
self.lock = Lock()
self.connection = sqlite3.connect(dburi, check_same_thread=False)
self.cursor: sqlite3.Cursor = None # type: ignore

def __enter__(self) -> LockedSqliteConnection:
self.lock.acquire()
self.cursor = self.connection.cursor()
return self

def __exit__(self, *_) -> None:
self.lock.release()
self.connection.commit()
if self.cursor is not None:
self.cursor.close()
self.cursor = None # type: ignore


def process_reviews(reviews: Reviews, db: LockedSqliteConnection) -> None:
def process_reviews(reviews: Reviews, db: Sqlite3Worker) -> None:
productId = reviews["productId"]
with lock:
for review in reviews["items"]:
if review["reviewText"].strip() == "":
return
with db:
db.cursor.execute(
f"CREATE TABLE IF NOT EXISTS {productId}(text, actual, prediction, flags)"
for review in reviews["items"]:
if review["reviewText"].strip() == "":
return
db.execute(
f"CREATE TABLE IF NOT EXISTS {productId}(text, actual, prediction, flags)"
)

try:
prediction, flags = rate(
review["reviewText"]
.replace(
" The media could not be loaded.\n ",
"",
)

try:
prediction, flags = rate(
review["reviewText"]
.replace(
" The media could not be loaded.\n ",
"",
)
.strip()
)
except Exception:
return
try:
db.cursor.execute(
f"INSERT INTO {productId} VALUES(?, ?, ?, ?)",
(
review["reviewText"],
review["overall"],
prediction,
",".join(flags),
),
)
except AttributeError:
db.cursor = db.connection.cursor()
db.cursor.execute(
f"INSERT INTO {productId} VALUES(?, ?, ?)",
(review["reviewText"], review["overall"], prediction),
)
.strip(),
flags=True,
)
except Exception:
return
try:
db.execute(
f"INSERT INTO {productId} VALUES(?, ?, ?, ?)",
(
review["reviewText"],
review["overall"],
prediction,
",".join(flags),
),
)
except AttributeError:
db.execute(
f"INSERT INTO {productId} VALUES(?, ?, ?, ?)",
(review["reviewText"], review["overall"], prediction, flags),
)


def main() -> None:
HEADLESS = False

db = LockedSqliteConnection(sys.argv[1])
location = f"{sys.argv[1].split('.')[0]}{str(uuid4())}.sqlite"
db = Sqlite3Worker(location)
logging.info("Writing reviews to %s", location)

scraper = bestsellers_reviews(partial(process_reviews, db=db), HEADLESS)
scraper(os.environ["EMAIL"], os.environ["PASSWORD"])
Expand Down
4 changes: 3 additions & 1 deletion src/crawling/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,19 @@ def bestsellers_reviews(callback: Callable, headless: bool) -> Scraper:

def scraper(email: str, password: str) -> None:
logging.info("Starting product ID gatherer")

with AmazonScraper(headless) as products:
logging.info("Collecting product IDs")
product_ids = products.get_bestselling()
logging.info("Collected following IDs: %s", ",".join(product_ids))

logging.info("Initializing review gatherer")

with AmazonScraper(headless) as prop:
with ParallelAmazonScraper(headless) as scrapers:
scrapers.captcha_hook = kitty_captcha
logging.info("Logging scrapers in")
scrapers.login(email, password)

for product_id in product_ids:
logging.info("Initiating scrape process for: %s", product_id)
logging.info("\tCollecting review proportions")
Expand Down
25 changes: 20 additions & 5 deletions src/crawling/threaded_reviews.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def select_reviews(content: Any) -> list[Review]:
reviews.append(
cast(Review, {"reviewText": body.strip(), "overall": rating})
)
logging.debug("Selected %s", reviews)
return reviews

def _scrape_single(
Expand All @@ -159,13 +160,21 @@ def _scrape_single(
limit: Optional[int] = None,
) -> None:
map_star = {1: "one", 2: "two", 3: "three", 4: "four", 5: "five"}
if limit:
counter = count(0)
counter = count(0)

logging.debug(
"Fetching %s reviews in %s star category", limit, map_star[category]
"Fetching %s reviews in %s star category for product %s",
limit,
map_star[category],
asin,
)
for page in range(1, 11):
logging.debug(
"Fetching %s star reviews in page %s for product %s",
map_star[category],
page,
asin,
)
browser.get(
f"https://www.amazon.com/product-reviews/{asin}/"
f"?ie=UTF8&reviewerType=all_reviews&pageNumber={page}&filterByStar={map_star[category]}_star"
Expand All @@ -174,10 +183,16 @@ def _scrape_single(
content = soup.select("div[data-hook='review']")
items = []
for item in self.select_reviews(content):
if next(counter) >= limit: # type: ignore
if limit is not None and next(counter) >= limit: # type: ignore
return
items.append(item)
callback({"items": items, "productId": asin})
logging.debug("Got %s items", len(items))
try:
callback({"items": items, "productId": asin})
except Exception as exc:
logging.error(
"Callback for product %s received exception: %s", asin, exc
)

def scrape(
self,
Expand Down

0 comments on commit 3cb4cd0

Please sign in to comment.