diff --git a/tests/feeds_tests.py b/tests/feeds_tests.py index 82883c2b..4d37e748 100644 --- a/tests/feeds_tests.py +++ b/tests/feeds_tests.py @@ -7,174 +7,276 @@ import sys from unittest.mock import patch -from trafilatura import cli, feeds +from courlan import get_hostinfo +from trafilatura.cli import main +from trafilatura.feeds import ( + FeedParameters, + determine_feed, + extract_links, + find_feed_urls, + handle_link_list, +) logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) TEST_DIR = os.path.abspath(os.path.dirname(__file__)) -RESOURCES_DIR = os.path.join(TEST_DIR, 'resources') +RESOURCES_DIR = os.path.join(TEST_DIR, "resources") XMLDECL = '\n' def test_atom_extraction(): - '''Test link extraction from an Atom feed''' - assert feeds.extract_links(None, 'example.org', 'https://example.org', '') == [] - assert len(feeds.extract_links('', 'example.org', 'https://example.org', '')) == 0 - filepath = os.path.join(RESOURCES_DIR, 'feed1.atom') - with open(filepath) as f: + """Test link extraction from an Atom feed""" + params = FeedParameters("https://example.org", "example.org", "") + assert not extract_links(None, params) + assert len(extract_links("", params)) == 0 + + filepath = os.path.join(RESOURCES_DIR, "feed1.atom") + with open(filepath, "r", encoding="utf-8") as f: teststring = f.read() - assert len(feeds.extract_links(teststring, 'example.org', 'https://example.org', '')) > 0 + assert len(extract_links(teststring, params)) > 0 + + params = FeedParameters("https://www.dwds.de", "dwds.de", "") assert ( len( - feeds.extract_links( + extract_links( f'{XMLDECL}', - 'dwds.de', - 'https://www.dwds.de', - '', + params, ) ) == 0 ) + + params = FeedParameters("http://example.org", "example.org", "http://example.org") assert ( len( - feeds.extract_links( + extract_links( f'{XMLDECL}', - 'example.org', - 'http://example.org/', - 'http://example.org', + params, ) ) == 0 ) + + params = FeedParameters("https://example.org", "example.org", "") assert ( len( - feeds.extract_links( + extract_links( f'{XMLDECL}', - 'example.org', - 'https://example.org', - '', + params, ) ) == 0 ) - assert feeds.extract_links( - f'{XMLDECL}', - 'example.org', - 'http://example.org/', - 'http://example.org', - ) == ['http://example.org/article1/'] # TODO: remove slash? + + params = FeedParameters("http://example.org/", "example.org", "http://example.org") + assert extract_links( + f'{XMLDECL}', params + ) == [ + "http://example.org/article1/" + ] # TODO: remove slash? def test_rss_extraction(): - '''Test link extraction from a RSS feed''' + """Test link extraction from a RSS feed""" + params = FeedParameters("http://example.org/", "example.org", "") assert ( len( - feeds.extract_links( - f'{XMLDECL}http://example.org/article1/', - 'example.org', - 'http://example.org/', - '', - ) + extract_links(f"{XMLDECL}http://example.org/article1/", params) ) == 1 ) # CDATA - assert feeds.extract_links( - f'{XMLDECL}', - 'example.org', - 'http://example.org/', - '', - ) == ['http://example.org/article1/'] # TODO: remove slash? + assert extract_links( + f"{XMLDECL}", params + ) == [ + "http://example.org/article1/" + ] # TODO: remove slash? + # spaces - assert len(feeds.extract_links(XMLDECL + '\r\n https://www.ak-kurier.de/akkurier/www/artikel/108815-sinfonisches-blasorchester-spielt-1500-euro-fuer-kinder-in-drk-krankenhaus-kirchen-ein ', 'ak-kurier.de', 'https://www.ak-kurier.de/', '')) == 1 + params = FeedParameters("https://www.ak-kurier.de/", "ak-kurier.de", "") assert ( len( - feeds.extract_links( - f'{XMLDECL}http://example.org/', - 'example.org', - 'http://example.org', - 'http://example.org', + extract_links( + XMLDECL + + "\r\n https://www.ak-kurier.de/akkurier/www/artikel/108815-sinfonisches-blasorchester-spielt-1500-euro-fuer-kinder-in-drk-krankenhaus-kirchen-ein ", + params, ) ) - == 0 + == 1 ) + + params = FeedParameters("http://example.org", "example.org", "http://example.org") + assert len(extract_links(f"{XMLDECL}http://example.org/", params)) == 0 + + params = FeedParameters("http://example.org", "example.org", "") + assert len(extract_links(f"{XMLDECL}https://example.org", params)) == 0 + + params = FeedParameters("https://www.dwds.de", "dwds.de", "https://www.dwds.de") + assert extract_links( + f"{XMLDECL}/api/feed/themenglossar/Corona", params + ) == ["https://www.dwds.de/api/feed/themenglossar/Corona"] + + params = FeedParameters("https://example.org", "example.org", "") + filepath = os.path.join(RESOURCES_DIR, "feed2.rss") + with open(filepath, "r", encoding="utf-8") as f: + teststring = f.read() + assert len(extract_links(teststring, params)) > 0 + + +def test_json_extraction(): + """Test link extraction from a JSON feed""" + # find link + params = FeedParameters("https://www.jsonfeed.org", "jsonfeed.org", "") assert ( len( - feeds.extract_links( - f'{XMLDECL}https://example.org', - 'example.org', - 'http://example.org/', - '', + determine_feed( + '>', + params, ) ) - == 0 + == 1 ) - assert feeds.extract_links( - f'{XMLDECL}/api/feed/themenglossar/Corona', - 'www.dwds.de', - 'https://www.dwds.de', - 'https://www.dwds.de', - ) == ['https://www.dwds.de/api/feed/themenglossar/Corona'] - filepath = os.path.join(RESOURCES_DIR, 'feed2.rss') - with open(filepath) as f: - teststring = f.read() - assert len(feeds.extract_links(teststring, 'example.com', 'https://example.org', '')) > 0 - -def test_json_extraction(): - '''Test link extraction from a JSON feed''' - # find link - assert len(feeds.determine_feed('>', 'jsonfeed.org', 'https://www.jsonfeed.org')) == 1 # extract data - filepath = os.path.join(RESOURCES_DIR, 'feed.json') - with open(filepath) as f: + assert not extract_links("{/}", params) + + filepath = os.path.join(RESOURCES_DIR, "feed.json") + with open(filepath, "r", encoding="utf-8") as f: teststring = f.read() - links = feeds.extract_links(teststring, 'npr.org', 'https://npr.org', '') + params = FeedParameters("https://npr.org", "npr.org", "") + links = extract_links(teststring, params) assert len(links) == 25 + # id as a backup - links = feeds.extract_links(r'{"version":"https:\/\/jsonfeed.org\/version\/1","items":[{"id":"https://www.example.org/1","title":"Test"}]}', 'example.org', 'https://example.org', '') + params = FeedParameters("https://example.org", "example.org", "") + links = extract_links( + r'{"version":"https:\/\/jsonfeed.org\/version\/1","items":[{"id":"https://www.example.org/1","title":"Test"}]}', + params, + ) assert len(links) == 1 def test_feeds_helpers(): - '''Test helper functions for feed extraction''' + """Test helper functions for feed extraction""" + params = FeedParameters("https://example.org", "example.org", "https://example.org") + domainname, baseurl = get_hostinfo("https://example.org") + assert domainname == params.domain and baseurl == params.base + # nothing useful - assert len(feeds.determine_feed('', 'example.org', 'https://example.org')) == 0 - assert len(feeds.determine_feed('', 'example.org', 'https://example.org')) == 0 + assert len(determine_feed("", params)) == 0 + assert ( + len( + determine_feed( + '', + params, + ) + ) + == 0 + ) # useful - assert len(feeds.determine_feed('', 'example.org', 'https://example.org')) == 1 - assert len(feeds.determine_feed('', 'example.org', 'https://example.org')) == 1 - assert len(feeds.determine_feed('', 'example.org', 'https://example.org')) == 1 - assert len(feeds.determine_feed('', 'example.org', 'https://example.org')) == 1 - assert len(feeds.determine_feed('', 'example.org', 'https://example.org')) == 1 + assert ( + len( + determine_feed( + '', + params, + ) + ) + == 1 + ) + assert ( + len( + determine_feed( + '', + params, + ) + ) + == 1 + ) + assert ( + len( + determine_feed( + '', + params, + ) + ) + == 1 + ) + assert ( + len( + determine_feed( + '', + params, + ) + ) + == 1 + ) + assert ( + len( + determine_feed( + '', + params, + ) + ) + == 1 + ) # no comments wanted - assert len(feeds.determine_feed('', 'example.org', 'https://example.org')) == 0 + assert ( + len( + determine_feed( + '', + params, + ) + ) + == 0 + ) + # invalid links - assert len(feeds.determine_feed('', 'example.org', 'https://example.org')) == 0 + params = FeedParameters("example.org", "example.org", "https://example.org") # fix + assert ( + len( + determine_feed( + '', + params, + ) + ) + == 0 + ) + # detecting in -elements - assert feeds.determine_feed('', 'example.org', 'https://example.org') == ['https://example.org/feed.xml'] - assert feeds.determine_feed('', 'example.org', 'https://example.org') == ['https://example.org/feed.atom'] - assert feeds.determine_feed('', 'example.org', 'https://example.org') == ['https://example.org/rss'] + params = FeedParameters("https://example.org", "example.org", "https://example.org") + assert determine_feed( + '', params + ) == ["https://example.org/feed.xml"] + assert determine_feed( + '', params + ) == ["https://example.org/feed.atom"] + assert determine_feed( + '', params + ) == ["https://example.org/rss"] # feed discovery - assert feeds.find_feed_urls('http://') == [] - assert feeds.find_feed_urls('https://httpbun.org/status/404') == [] + assert not find_feed_urls("http://") + assert not find_feed_urls("https://httpbun.org/status/404") # Feedburner/Google links - assert feeds.handle_link_list(['https://feedproxy.google.com/ABCD'], 'example.org', 'https://example.org') == ['https://feedproxy.google.com/ABCD'] + assert handle_link_list(["https://feedproxy.google.com/ABCD"], params) == [ + "https://feedproxy.google.com/ABCD" + ] # override failed checks - assert feeds.handle_link_list(['https://feedburner.com/kat/1'], 'example.org', 'https://example.org') == ['https://feedburner.com/kat/1'] + assert handle_link_list(["https://feedburner.com/kat/1"], params) == [ + "https://feedburner.com/kat/1" + ] # diverging domain names - assert feeds.handle_link_list(['https://www.software.info/1'], 'example.org', 'https://example.org') == [] + assert not handle_link_list(["https://www.software.info/1"], params) def test_cli_behavior(): - '''Test command-line interface with respect to feeds''' - testargs = ['', '--list', '--feed', 'https://httpbun.org/xml'] - with patch.object(sys, 'argv', testargs): - assert cli.main() is None + """Test command-line interface with respect to feeds""" + testargs = ["", "--list", "--feed", "https://httpbun.org/xml"] + with patch.object(sys, "argv", testargs): + assert main() is None -if __name__ == '__main__': +if __name__ == "__main__": test_atom_extraction() test_rss_extraction() test_json_extraction() diff --git a/trafilatura/feeds.py b/trafilatura/feeds.py index 4172ed3b..8821c67c 100644 --- a/trafilatura/feeds.py +++ b/trafilatura/feeds.py @@ -8,144 +8,199 @@ import json import logging import re + from itertools import islice +from typing import List, Optional -from courlan import (check_url, clean_url, filter_urls, fix_relative_urls, - get_hostinfo, validate_url) +from courlan import ( + check_url, + clean_url, + filter_urls, + fix_relative_urls, + get_hostinfo, + validate_url, +) from .downloads import fetch_url from .settings import MAX_LINKS -from .utils import is_similar_domain, load_html +from .utils import is_similar_domain, load_html, uniquify_list LOGGER = logging.getLogger(__name__) -FEED_TYPES = {'application/atom+xml', 'application/json', 'application/rdf+xml', 'application/rss+xml', 'application/x.atom+xml', 'application/x-atom+xml', 'text/atom+xml', 'text/plain', 'text/rdf+xml', 'text/rss+xml', 'text/xml'} -FEED_OPENING = re.compile(r'<(feed|rss|\?xml)') +FEED_TYPES = { + "application/atom+xml", + "application/json", + "application/rdf+xml", + "application/rss+xml", + "application/x.atom+xml", + "application/x-atom+xml", + "text/atom+xml", + "text/plain", + "text/rdf+xml", + "text/rss+xml", + "text/xml", +} + +FEED_OPENING = re.compile(r"<(feed|rss|\?xml)") + LINK_ATTRS = re.compile(r'(?:\s*)(?:)?(?:\s*)') -BLACKLIST = re.compile(r'\bcomments\b') # no comment feed +LINK_ELEMENTS = re.compile( + r"(?:\s*)(?:)?(?:\s*)" +) + +BLACKLIST = re.compile(r"\bcomments\b") # no comment feed + +LINK_VALIDATION_RE = re.compile(r"\.(?:atom|rdf|rss|xml)$|\b(?:atom|rss)\b") -def handle_link_list(linklist, domainname, baseurl, target_lang=None, external=False): - '''Examine links to determine if they are valid and - lead to a web page''' +class FeedParameters: + "Store necessary information to proceed a feed." + __slots__ = ["base", "domain", "ext", "lang", "ref"] + + def __init__( + self, + baseurl: str, + domainname: str, + reference: str, + external: bool = False, + target_lang: Optional[str] = None, + ) -> None: + self.base: str = baseurl + self.domain: str = domainname + self.ext: bool = external + self.lang: Optional[str] = target_lang + self.ref: str = reference + + +def handle_link_list(linklist: List[str], params: FeedParameters) -> List[str]: + """Examine links to determine if they are valid and + lead to a web page""" output_links = [] # sort and uniq for item in sorted(set(linklist)): # fix and check - link = fix_relative_urls(baseurl, item) + link = fix_relative_urls(params.base, item) # control output for validity - checked = check_url(link, language=target_lang) + checked = check_url(link, language=params.lang) if checked is not None: - if not external and not "feed" in link and not is_similar_domain(domainname, checked[1]): - LOGGER.warning('Rejected, diverging domain names: %s %s', domainname, checked[1]) + if ( + not params.ext + and not "feed" in link + and not is_similar_domain(params.domain, checked[1]) + ): + LOGGER.warning( + "Rejected, diverging domain names: %s %s", params.domain, checked[1] + ) else: output_links.append(checked[0]) # Feedburner/Google feeds - elif 'feedburner' in item or 'feedproxy' in item: + elif "feedburner" in item or "feedproxy" in item: output_links.append(item) return output_links -def extract_links(feed_string, domainname, baseurl, reference, target_lang=None, external=False): - '''Extract links from Atom and RSS feeds''' +def extract_links(feed_string: str, params: FeedParameters) -> List[str]: + """Extract links from Atom and RSS feeds""" feed_links = [] # check if it's a feed if feed_string is None: - LOGGER.debug('Empty feed: %s', domainname) + LOGGER.debug("Empty feed: %s", params.domain) return feed_links feed_string = feed_string.strip() # typical first and second lines absent - if not FEED_OPENING.match(feed_string) and not \ - ('' in feed_string: + elif "" in feed_string: feed_links.extend( - [m[1].strip() for m in islice(LINK_ELEMENTS.finditer(feed_string, re.DOTALL), MAX_LINKS)] + [ + m[1].strip() + for m in islice( + LINK_ELEMENTS.finditer(feed_string, re.DOTALL), MAX_LINKS + ) + ] ) # refine - output_links = handle_link_list(feed_links, domainname, baseurl, target_lang, external) - output_links = [l for l in output_links if l != reference and l.count('/') > 2] + output_links = handle_link_list(feed_links, params) + output_links = [l for l in output_links if l != params.ref and l.count("/") > 2] # log result if feed_links: - LOGGER.debug('Links found: %s of which %s valid', len(feed_links), len(output_links)) + LOGGER.debug( + "Links found: %s of which %s valid", len(feed_links), len(output_links) + ) else: - LOGGER.debug('Invalid feed for %s', domainname) + LOGGER.debug("Invalid feed for %s", params.domain) return output_links -def determine_feed(htmlstring, baseurl, reference): - '''Try to extract the feed URL from the home page. - Adapted from http://www.aaronsw.com/2002/feedfinder/''' +def determine_feed(htmlstring: str, params: FeedParameters) -> List[str]: + """Try to extract the feed URL from the home page. + Adapted from http://www.aaronsw.com/2002/feedfinder/""" # parse the page to look for feeds tree = load_html(htmlstring) # safeguard if tree is None: - LOGGER.debug('Invalid HTML/Feed page: %s', baseurl) + LOGGER.debug("Invalid HTML/Feed page: %s", params.base) return [] feed_urls = [] - for linkelem in tree.xpath('//link[@rel="alternate"]'): - # discard elements without links - if 'href' not in linkelem.attrib: - continue - # most common case - if 'type' in linkelem.attrib and linkelem.get('type') in FEED_TYPES: - feed_urls.append(linkelem.get('href')) - # websites like geo.de - elif 'atom' in linkelem.get('href') or 'rss' in linkelem.get('href'): - feed_urls.append(linkelem.get('href')) + for linkelem in tree.xpath('//link[@rel="alternate"][@href]'): + # most common case + websites like geo.de + if ( + "type" in linkelem.attrib and linkelem.get("type") in FEED_TYPES + ) or LINK_VALIDATION_RE.search(linkelem.get("href", "")): + feed_urls.append(linkelem.get("href")) # backup if not feed_urls: - for linkelem in tree.xpath('//a[@href]'): - if linkelem.get('href')[-4:].lower() in ('.rss', '.rdf', '.xml'): - feed_urls.append(linkelem.get('href')) - elif linkelem.get('href')[-5:].lower() == '.atom': - feed_urls.append(linkelem.get('href')) - elif 'atom' in linkelem.get('href') or 'rss' in linkelem.get('href'): - feed_urls.append(linkelem.get('href')) + for linkelem in tree.xpath("//a[@href]"): + link = linkelem.get("href", "") + if LINK_VALIDATION_RE.search(link): + feed_urls.append(link) # refine output_urls = [] - for link in sorted(set(feed_urls)): - link = fix_relative_urls(baseurl, link) + for link in uniquify_list(feed_urls): + link = fix_relative_urls(params.base, link) link = clean_url(link) - if link is None or link == reference or validate_url(link)[0] is False: + if link is None or link == params.ref or validate_url(link)[0] is False: continue if BLACKLIST.search(link): continue output_urls.append(link) # log result - LOGGER.debug('Feed URLs found: %s of which %s valid', len(feed_urls), len(output_urls)) + LOGGER.debug( + "Feed URLs found: %s of which %s valid", len(feed_urls), len(output_urls) + ) return output_urls -def find_feed_urls(url, target_lang=None, external=False): +def find_feed_urls( + url: str, target_lang: Optional[str] = None, external: bool = False +) -> List[str]: """Try to find feed URLs. Args: @@ -162,46 +217,49 @@ def find_feed_urls(url, target_lang=None, external=False): """ domainname, baseurl = get_hostinfo(url) if domainname is None: - LOGGER.warning('Invalid URL: %s', url) + LOGGER.warning("Invalid URL: %s", url) return [] + params = FeedParameters(baseurl, domainname, url, external, target_lang) urlfilter = None downloaded = fetch_url(url) if downloaded is not None: # assume it's a feed - feed_links = extract_links(downloaded, domainname, baseurl, url, target_lang, external) + feed_links = extract_links(downloaded, params) if len(feed_links) == 0: # assume it's a web page - for feed in determine_feed(downloaded, baseurl, url): + for feed in determine_feed(downloaded, params): feed_string = fetch_url(feed) - feed_links.extend(extract_links(feed_string, domainname, baseurl, url, target_lang, external)) + feed_links.extend(extract_links(feed_string, params)) # filter triggered, prepare it if len(url) > len(baseurl) + 2: urlfilter = url # return links found if len(feed_links) > 0: feed_links = filter_urls(feed_links, urlfilter) - LOGGER.debug('%s feed links found for %s', len(feed_links), domainname) + LOGGER.debug("%s feed links found for %s", len(feed_links), domainname) return feed_links - LOGGER.debug('No usable feed links found: %s', url) + LOGGER.debug("No usable feed links found: %s", url) else: - LOGGER.error('Could not download web page: %s', url) - if url.strip('/') != baseurl: + LOGGER.error("Could not download web page: %s", url) + if url.strip("/") != baseurl: return try_homepage(baseurl, target_lang) # try alternative: Google News if target_lang is not None: downloaded = fetch_url( - f'https://news.google.com/rss/search?q=site:{baseurl}&hl={target_lang}&scoring=n&num=100' + f"https://news.google.com/rss/search?q=site:{baseurl}&hl={target_lang}&scoring=n&num=100" ) if downloaded is not None: - feed_links = extract_links(downloaded, domainname, baseurl, url, target_lang, external) + feed_links = extract_links(downloaded, params) feed_links = filter_urls(feed_links, urlfilter) - LOGGER.debug('%s Google news links found for %s', len(feed_links), domainname) + LOGGER.debug( + "%s Google news links found for %s", len(feed_links), domainname + ) return feed_links return [] -def try_homepage(baseurl, target_lang): - '''Shift into reverse and try the homepage instead of the particular feed - page that was given as input.''' - LOGGER.debug('Probing homepage for feeds instead: %s', baseurl) +def try_homepage(baseurl: str, target_lang: Optional[str]) -> List[str]: + """Shift into reverse and try the homepage instead of the particular feed + page that was given as input.""" + LOGGER.debug("Probing homepage for feeds instead: %s", baseurl) return find_feed_urls(baseurl, target_lang)