-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathextract_from_dump.py
235 lines (192 loc) · 8.62 KB
/
extract_from_dump.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
from multiprocessing import Queue, Process, cpu_count
from bz2 import BZ2File
from lxml.etree import iterparse, Element
from typing import Union, NoReturn
from timeit import default_timer as timer
import json
import wikitextparser as wtp
from config import DUMP_PATH, PROCESS_TEMPLATES, PROCESS_ARTICLES
from constants import WRITE_PATHS, BROKEN_ARTICLES
from src.data import Article, Template, TemplateRedirect
from src.utils.etc import empty_file
from src.utils.xml import is_article_title_ru, is_template, is_template_title_ru, is_redirect, is_article, \
is_article_title_proper, is_element_page, get_page_id, get_raw_wiki, get_page_title, get_redirect_title
from src.utils.wiki import find_ru_section, parse_ru_section, clean_template_name, parse_template_page, \
remove_no_include, is_obscene, has_ru_homonyms
assert cpu_count() > 4
NUM_PROCESSES = 4
def process_element(element: Element) -> Union[Article, Template, None]:
"""
Parses an individual XML element. Returns an instance of a corresponding
class if successful, None otherwise.
Additionally, clears all parent elements of a given element to conserve memory.
:param element: XML element
:return: an instance of Article or Template class or None
"""
# we're only interested in `page` elements in this dump
if is_element_page(element):
cur_id: int = get_page_id(element)
cur_title: str = get_page_title(element)
cur_wiki: str = get_raw_wiki(element)
# filter out incorrectly formatted articles
if cur_title in BROKEN_ARTICLES:
wiki_page = None
elif (is_article(element)
and is_article_title_ru(cur_title)
and not is_redirect(element)
and PROCESS_ARTICLES):
is_proper: bool = is_article_title_proper(cur_title)
wiki_page = Article(id_=cur_id,
title=cur_title,
raw_wiki=cur_wiki,
is_proper=is_proper)
elif (is_template(element)
and is_template_title_ru(cur_title)
and not is_redirect(element)
and PROCESS_TEMPLATES):
cur_title: str = clean_template_name(cur_title)
wiki_page = Template(id_=cur_id,
title=cur_title,
raw_wiki=cur_wiki)
elif (is_template(element)
and is_template_title_ru(cur_title)
and is_redirect(element)
and PROCESS_TEMPLATES):
cur_title: str = clean_template_name(cur_title)
redirect_title: str = get_redirect_title(element)
redirect_title: str = clean_template_name(redirect_title)
wiki_page = TemplateRedirect(id_=cur_id,
title=cur_title,
raw_wiki=cur_wiki,
redirect_title=redirect_title)
# not interested in other cases
else:
wiki_page = None
# remove data from current element
element.clear()
# clear all preceding elements
for ancestor in element.xpath('ancestor-or-self::*'):
while ancestor.getprevious() is not None:
del ancestor.getparent()[0]
return wiki_page
def parse_dump(dump_path: str, conn: Queue) -> NoReturn:
"""
Iteratively goes through a compressed XML dump. Extracts information
from page elements. If extraction is successful (i.e. not None), then
the processed information is passed on to an output queue.
:param dump_path: path leading to the compressed XML dump
:param conn: output queue
"""
with BZ2File(dump_path) as bz_file:
for index, (_, elem) in enumerate(iterparse(bz_file)):
if index % 100000 == 0:
print("\r" + f"Processed {index} XML elements...", end="")
processed_page = process_element(elem)
if processed_page:
conn.put(processed_page)
# we need to stop each individual process down the line
# after we're done parsing the dump
for _ in range(NUM_PROCESSES):
conn.put('STOP')
# total number of elements processed for reference
print("\n" + f"Total elements processed: {index}")
def parse_wiki(in_conn: Queue, out_conn: Queue) -> NoReturn:
"""
Processes actual wiki text of a given page. Receives data from an input queue
and puts results into an output queue.
Meant to run in multiple parallel processes.
:param in_conn: input queue
:param out_conn: output queue
"""
# runs in an infinite cycle until it receives a stop signal
while True:
data: Union[Article, Template, str] = in_conn.get()
# upon receiving the stop signal
# pass it further down the line and break the infinite cycle
if data == "STOP":
out_conn.put(data)
break
# parse the article type
if isinstance(data, Article):
wiki_data = wtp.parse(data.raw_wiki)
wiki_data = find_ru_section(wiki_data)
# proceed if Russian language section is found
if wiki_data:
# parse found section
parsed_wiki_data = parse_ru_section(wiki_data)
# proceed if morphological information and segmentation data were found
if parsed_wiki_data:
parsed_wiki_data["id"] = data.id_
parsed_wiki_data["title"] = data.title
parsed_wiki_data["type"] = "article"
parsed_wiki_data["is_proper"] = data.is_proper
parsed_wiki_data["is_obscene"] = is_obscene(data.raw_wiki)
parsed_wiki_data["has_homonyms"] = has_ru_homonyms(data.raw_wiki)
out_conn.put(parsed_wiki_data)
# Template class contains data from a page dedicated to a particular template
elif isinstance(data, Template):
wiki_data = wtp.parse(remove_no_include(data.raw_wiki))
if wiki_data:
parsed_wiki_data = parse_template_page(wiki_data)
if parsed_wiki_data:
parsed_wiki_data["id"] = data.id_
parsed_wiki_data["title"] = data.title
parsed_wiki_data["type"] = "template"
out_conn.put(parsed_wiki_data)
# we're interested only in template redirects
# those are essentially aliases for template names
# although only one page contains data
elif isinstance(data, TemplateRedirect):
# here we don't actually need any wiki data from the page body
parsed_wiki_data = {
"id": data.id_,
"title": data.title,
"redirect_title": data.redirect_title,
"type": "template_redirect"
}
out_conn.put(parsed_wiki_data)
# not interested in any other cases
else:
pass
def write_result(paths: dict, conn: Queue) -> NoReturn:
"""
Receives data from input queue and writes it to a single line
in a file. The file path depends on specified data type.
:param paths: dictionary containing data types as keys and corresponding
paths as values.
:param conn: input queue
"""
for _, path in paths.items():
empty_file(path)
while True:
data: Union[dict, str] = conn.get()
if data == "STOP":
break
data_type = data["type"]
path = paths.get(data_type, None)
if path:
with open(path, "a") as file:
file.write(json.dumps(data, ensure_ascii=False) + "\n")
if __name__ == "__main__":
# to check total time elapsed
start = timer()
# create necessary queues
task_queue = Queue()
write_queue = Queue()
# wiki pages are completely independent of each other (except morphological templates, but we'll deal
# with them separately), so it makes sense to parallelize their parsing
wiki_process = [Process(target=parse_wiki, args=(task_queue, write_queue)) for _ in range(NUM_PROCESSES)]
# this can be also done independently of XML and wiki parsing
write_process = Process(target=write_result, args=(WRITE_PATHS, write_queue))
# start all the processes
for process in wiki_process:
process.start()
write_process.start()
# run the main function
parse_dump(DUMP_PATH, task_queue)
# close the processes
for process in wiki_process:
process.join()
write_process.join()
end = timer()
print("\n" + f"Time elapsed: {end - start:.4f}")