Skip to content

Commit

Permalink
typing: fix further mypy errors (#728)
Browse files Browse the repository at this point in the history
* typing: fix mypy errors

* fix tests

* more fixes

* simplify code

* more fixes
  • Loading branch information
adbar authored Oct 24, 2024
1 parent 52d21a6 commit 8f8d376
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 86 deletions.
3 changes: 2 additions & 1 deletion tests/metadata_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ def test_meta():

# catch errors
metadata = extract_metadata('')
assert all(getattr(metadata, a) is None for a in metadata.__slots__)
target_slots = set(metadata.__slots__) - {"body", "commentsbody"}
assert all(getattr(metadata, a) is None for a in target_slots)
metadata = extract_metadata('<html><title></title></html>')
assert metadata.sitename is None
metadata = extract_metadata('<html><head><title>' + 'AAA'*10000 + '</title></head></html>')
Expand Down
2 changes: 1 addition & 1 deletion trafilatura/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ def probe_homepage(args: Any) -> None:
result = html2txt(result)
if (
result
and len(result) > options.min_extracted_size
and len(result) > options.min_extracted_size # type: ignore[attr-defined]
and any(c.isalpha() for c in result)
):
if (
Expand Down
63 changes: 31 additions & 32 deletions trafilatura/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from copy import copy, deepcopy
from typing import Any, Dict, Optional, Set, Tuple, Union

from lxml.etree import _Element, XPath, strip_tags
from lxml.etree import _Element, Element, XPath, strip_tags
from lxml.html import HtmlElement

# own
Expand Down Expand Up @@ -119,7 +119,7 @@ def trafilatura_sequence(
)

# rescue: baseline extraction on original/dirty tree
if len_text < options.min_extracted_size and not options.focus == "precision":
if len_text < options.min_extracted_size and not options.focus == "precision": # type: ignore[attr-defined]
postbody, temp_text, len_text = baseline(deepcopy(tree_backup))
LOGGER.debug("non-clean extracted length: %s (extraction)", len_text)

Expand Down Expand Up @@ -198,32 +198,31 @@ def bare_extraction(
# PendingDeprecationWarning
# )

# load data
try:
# regroup extraction options
if not options or not isinstance(options, Extractor):
options = Extractor(
config=config,
output_format=output_format,
fast=no_fallback,
precision=favor_precision,
recall=favor_recall,
comments=include_comments,
formatting=include_formatting,
links=include_links,
images=include_images,
tables=include_tables,
dedup=deduplicate,
lang=target_language,
max_tree_size=max_tree_size,
url=url,
with_metadata=with_metadata,
only_with_metadata=only_with_metadata,
author_blacklist=author_blacklist,
url_blacklist=url_blacklist,
date_params=date_extraction_params,
)
# regroup extraction options
if not options or not isinstance(options, Extractor):
options = Extractor(
config=config,
output_format=output_format,
fast=no_fallback,
precision=favor_precision,
recall=favor_recall,
comments=include_comments,
formatting=include_formatting,
links=include_links,
images=include_images,
tables=include_tables,
dedup=deduplicate,
lang=target_language,
max_tree_size=max_tree_size,
url=url,
with_metadata=with_metadata,
only_with_metadata=only_with_metadata,
author_blacklist=author_blacklist,
url_blacklist=url_blacklist,
date_params=date_extraction_params,
)

try:
# load the HTML tree
tree = load_html(filecontent)
if tree is None:
Expand Down Expand Up @@ -282,7 +281,7 @@ def bare_extraction(
cleaned_tree, options
)
else:
commentsbody, temp_comments, len_comments = None, "", 0
commentsbody, temp_comments, len_comments = Element("body"), "", 0
if options.focus == "precision":
cleaned_tree = prune_unwanted_nodes(cleaned_tree, REMOVE_COMMENTS_XPATH)

Expand All @@ -305,11 +304,11 @@ def bare_extraction(
)
raise ValueError
# size checks
if options.comments and len_comments < options.min_extracted_comm_size:
if options.comments and len_comments < options.min_extracted_comm_size: # type: ignore[attr-defined]
LOGGER.debug("not enough comments: %s", options.source)
if (
len_text < options.min_output_size
and len_comments < options.min_output_comm_size
len_text < options.min_output_size # type: ignore[attr-defined]
and len_comments < options.min_output_comm_size # type: ignore[attr-defined]
):
LOGGER.debug(
"text and comments not long enough: %s %s %s",
Expand Down Expand Up @@ -450,7 +449,7 @@ def extract(
)

# post-processing
if not document:
if not document or not isinstance(document, Document):
return None

if options.format not in TXT_FORMATS:
Expand Down
29 changes: 19 additions & 10 deletions trafilatura/downloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,23 @@ def _send_urllib_request(
return None


def _handle_response(
url: str, response: Response, decode: bool, options: Extractor
) -> Optional[Union[Response, str]]: # todo: only return str
"Internal function to run safety checks on response result."
def _is_suitable_response(url: str, response: Response, options: Extractor) -> bool:
"Check if the response conforms to formal criteria."
lentest = len(response.html or response.data or "")
if response.status != 200:
LOGGER.error("not a 200 response: %s for URL %s", response.status, url)
return False
# raise error instead?
elif is_acceptable_length(lentest, options):
if not is_acceptable_length(lentest, options):
return False
return True


def _handle_response(
url: str, response: Response, decode: bool, options: Extractor
) -> Optional[Union[Response, str]]: # todo: only return str
"Internal function to run safety checks on response result."
if _is_suitable_response(url, response, options):
return response.html if decode else response
# catchall
return None
Expand Down Expand Up @@ -244,7 +252,8 @@ def fetch_url(
if response and response.data:
if not options:
options = Extractor(config=config)
return _handle_response(url, response, True, options)
if _is_suitable_response(url, response, options):
return response.html
return None


Expand Down Expand Up @@ -290,14 +299,14 @@ def _pycurl_is_live_page(url: str) -> bool:
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
curl.setopt(pycurl.SSL_VERIFYHOST, 0)
# Set option to avoid getting the response body
curl.setopt(curl.NOBODY, True)
curl.setopt(curl.NOBODY, True) # type: ignore[attr-defined]
if PROXY_URL:
curl.setopt(pycurl.PRE_PROXY, PROXY_URL)
# Perform the request
try:
curl.perform()
# Get the response code
page_exists = curl.getinfo(curl.RESPONSE_CODE) < 400
page_exists = curl.getinfo(curl.RESPONSE_CODE) < 400 # type: ignore[attr-defined]
except pycurl.error as err:
LOGGER.debug("pycurl HEAD error: %s %s", url, err)
page_exists = False
Expand Down Expand Up @@ -351,7 +360,7 @@ def add_to_compressed_dict(

def load_download_buffer(
url_store: UrlStore, sleep_time: float = 5.0
) -> Tuple[Optional[List[str]], UrlStore]:
) -> Tuple[List[str], UrlStore]:
"""Determine threading strategy and draw URLs respecting domain-based back-off rules."""
while True:
bufferlist = url_store.get_download_urls(time_limit=sleep_time, max_urls=10**5)
Expand Down Expand Up @@ -442,7 +451,7 @@ def _send_pycurl_request(
# ip_info = curl.getinfo(curl.PRIMARY_IP)

resp = Response(
bufferbytes, curl.getinfo(curl.RESPONSE_CODE), curl.getinfo(curl.EFFECTIVE_URL)
bufferbytes, curl.getinfo(curl.RESPONSE_CODE), curl.getinfo(curl.EFFECTIVE_URL) # type: ignore[attr-defined]
)
curl.close()

Expand Down
5 changes: 3 additions & 2 deletions trafilatura/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ def try_readability(htmlinput: HtmlElement) -> HtmlElement:
try:
doc = ReadabilityDocument(htmlinput, min_text_length=25, retry_length=250)
# force conversion to utf-8 (see #319)
return fromstring_bytes(doc.summary())
summary = fromstring_bytes(doc.summary())
return summary if summary is not None else HtmlElement()
except Exception as err:
LOGGER.warning('readability_lxml failed: %s', err)
return HtmlElement('div')
return HtmlElement()


def compare_extraction(tree: HtmlElement, backup_tree: HtmlElement, body: _Element, text: str, len_text: int, options: Any) -> Tuple[_Element, str, int]:
Expand Down
8 changes: 4 additions & 4 deletions trafilatura/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,9 @@ def extract_metainfo(

def examine_title_element(
tree: HtmlElement,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
) -> Tuple[str, Optional[str], Optional[str]]:
"""Extract text segments out of main <title> element."""
title = None
title = ""
title_element = tree.find(".//head//title")
if title_element is not None:
title = trim(title_element.text_content())
Expand All @@ -355,8 +355,8 @@ def extract_title(tree: HtmlElement) -> Optional[str]:
if title:
return title
# extract using x-paths
title = extract_metainfo(tree, TITLE_XPATHS)
if title is not None:
title = extract_metainfo(tree, TITLE_XPATHS) or ""
if title:
return title
# extract using title tag
title, first, second = examine_title_element(tree)
Expand Down
5 changes: 2 additions & 3 deletions trafilatura/readability_lxml.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,8 @@ def summary(self) -> str:
LOGGER.debug(
"Ruthless and lenient parsing did not work. Returning raw html"
)
article = self.doc.find("body")
if article is None:
article = self.doc
body = self.doc.find("body")
article = body if body is not None else self.doc

cleaned_article = self.sanitize(article, candidates)
article_length = len(cleaned_article or "")
Expand Down
64 changes: 51 additions & 13 deletions trafilatura/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@
from configparser import ConfigParser
from datetime import datetime
from html import unescape
from typing import Any, Dict, Optional, Set
from typing import Any, Dict, List, Optional, Set

try:
from os import sched_getaffinity
CPU_COUNT = len(sched_getaffinity(0))
except ImportError:
from os import cpu_count
CPU_COUNT = cpu_count()
CPU_COUNT = cpu_count() or 1

from pathlib import Path

from lxml.etree import XPath
from lxml.etree import _Element, Element, XPath

from .utils import line_processing

Expand Down Expand Up @@ -59,6 +59,7 @@ def use_config(
}


# todo Python >= 3.10: use dataclass with slots=True
class Extractor:
"Defines a class to store all extraction options."
__slots__ = [
Expand Down Expand Up @@ -203,6 +204,7 @@ def set_date_params(extensive: bool = True) -> Dict[str, Any]:
}


# todo Python >= 3.10: use dataclass with slots=True
class Document:
"Defines a class to store all necessary data and metadata fields for extracted information."
__slots__ = [
Expand Down Expand Up @@ -230,16 +232,52 @@ class Document:
# 'locale'?
]

def __init__(self) -> None:
for slot in self.__slots__:
setattr(self, slot, None)

def __getattr__(self, name: str) -> None:
raise AttributeError(f"{name} attribute not present in Document")

def __setattr__(self, name: str, value: Any) -> None:
if name in self.__slots__:
object.__setattr__(self, name, value)
def __init__(
self,
*,
title: Optional[str] = None,
author: Optional[str] = None,
url: Optional[str] = None,
hostname: Optional[str] = None,
description: Optional[str] = None,
sitename: Optional[str] = None,
date: Optional[str] = None,
categories: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
fingerprint: Optional[str] = None,
idval: Optional[str] = None,
license_val: Optional[str] = None,
body: _Element = Element("body"),
comments: Optional[str] = None,
commentsbody: _Element = Element("body"),
raw_text: Optional[str] = None,
text: Optional[str] = None,
language: Optional[str] = None,
image: Optional[str] = None,
pagetype: Optional[str] = None,
filedate: Optional[str] = None,
):
self.title: Optional[str] = title
self.author: Optional[str] = author
self.url: Optional[str] = url
self.hostname: Optional[str] = hostname
self.description: Optional[str] = description
self.sitename: Optional[str] = sitename
self.date: Optional[str] = date
self.categories: Optional[List[str]] = categories
self.tags: Optional[List[str]] = tags
self.fingerprint: Optional[str] = fingerprint
self.id: Optional[str] = idval
self.license: Optional[str] = license_val
self.body: _Element = body
self.comments: Optional[str] = comments
self.commentsbody: _Element = commentsbody
self.raw_text: Optional[str] = raw_text
self.text: Optional[str] = text
self.language: Optional[str] = language
self.image: Optional[str] = image
self.pagetype: Optional[str] = pagetype
self.filedate: Optional[str] = filedate

@classmethod
def from_dict(cls: Any, data: Dict[str, Any]) -> Any:
Expand Down
4 changes: 2 additions & 2 deletions trafilatura/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
# COMMENTS_BLACKLIST = ('( Abmelden / Ändern )') # Fill in your details below|Trage deine Daten unten|Kommentar verfassen|Bitte logge dich|Hinterlasse einen Kommentar| to %s| mit %s)


def handle_compressed_file(filecontent: bytes) -> Union[bytes, str]:
def handle_compressed_file(filecontent: bytes) -> bytes:
"""
Don't trust response headers and try to decompress a binary string
with a cascade of installed packages. Use magic numbers when available.
Expand Down Expand Up @@ -289,7 +289,7 @@ def line_processing(line: str, preserve_space: bool = False, trailing_space: boo
if not preserve_space:
# remove newlines that are not related to punctuation or markup
# remove non-printable chars and normalize space characters (including Unicode spaces)
new_line = trim(LINES_TRIMMING.sub(r" ", new_line)) # type: ignore[assignment]
new_line = trim(LINES_TRIMMING.sub(r" ", new_line))
# prune empty lines
if all(map(str.isspace, new_line)):
new_line = None # type: ignore[assignment]
Expand Down
Loading

0 comments on commit 8f8d376

Please sign in to comment.