-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathPCATParser.py
424 lines (363 loc) · 12.5 KB
/
PCATParser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from bs4 import BeautifulSoup
from bs4.element import Comment
import urllib.request, os, webbrowser, PyPDF2, nltk, pdfkit, re, wikipedia, json, sys, unicodedata, requests, time, signal
import wikipedia as wiki
sys.path.append
from knowledge_management.ProfileManager import *
#helper class for try_one
class Timeout(Exception):
pass
def try_one(func, t, **kwargs):
"""
Calls the function with the keyword arguments and after t seconds, interupts the call and moves on
Parameters
----------
func : function
the function to be called
t : int
the number of seconds
**kwargs : keyword-arguments
arguments you'd like to pass to func
Returns
-------
func's return type
"""
#helper function
def timeout_handler(signum, frame):
raise Timeout()
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(t) # triger alarm in t seconds
parsed_page = None
try:
t1=time.clock()
parsed_page = func(**kwargs)
t2=time.clock()
except Timeout:
print('{}() timed out after {} seconds'.format(func.__name__, t))
return None
finally:
signal.signal(signal.SIGALRM, old_handler)
signal.alarm(0)
return parsed_page
def tag_visible(element):
"""
Determines if an HTML tag is visible
Parameters
----------
element : BeautifulSoup.element
an HTML element
Returns
-------
bool
True if the element is visible, False else
"""
if element.parent.name in ['[document]', 'head', 'style', 'script', 'title', 'header', 'meta', 'footer']:
return False
if isinstance(element, Comment):
return False
if element.name in ['header','footer','button','nav']:
return False
return True
def text_from_html(body):
"""
Gets all of the visible text from the body of an HTML document
Parameters
----------
body : string
the body of an HTML document
Returns
-------
string
the visible text in the body
"""
soup = BeautifulSoup(body.decode("utf-8", "ignore"), 'lxml')
texts = soup.findAll(text=True)
visible_texts = filter(tag_visible, texts)
return " ".join(t.strip() for t in visible_texts)
def get_PDF_content(query_string, link):
"""
Gets all of the text from a PDF document
Parameters
----------
query_string : string
the query that generated the PDF document
link : string
the URL for the document
Returns
-------
string
the visible text in the PDF
"""
#read the PDF from the web
content=urllib.request.urlopen(link).read()
#name file and write to tmp directory
file_name = query_string+link
file_name = re.sub('[^A-Za-z0-9]+', '', file_name)
if len(file_name) > 100:
file_name = file_name[:100]
file_name = file_name + ".pdf"
fout=open(os.path.join("data/tmp", file_name), "wb")
fout.write(content)
fout.close()
#convert PDF to text
content = ""
#load PDF into PyPDF2
pdf = PyPDF2.PdfFileReader(os.path.join("data/tmp/", file_name))
if pdf.isEncrypted:
pdf.decrypt('')
#iterate pages
for i in range(pdf.getNumPages()):
#extract text from page and add to content
content += pdf.getPage(i).extractText() + "\n"
content = " ".join(content.replace("\xa0", " ").strip().split())
return content
def parse_single_page(link, query_string = "test"):
"""
Gets all of the text from web page
Parameters
----------
link : string
the URL for the document
query_string : string
the generating query, default is "test"
Returns
-------
tuple (bytes, string)
the source code (HTML/PDF) of the web page and the visible text
"""
if link[-4:] != '.pdf':
try:
html = urllib.request.urlopen(link).read()
return (html, bytes(text_from_html(html), 'utf-8').decode('utf-8', 'ignore'))
except Exception as e:
print(link + " threw the following exception " + str(e))
else:
try:
html = urllib.request.urlopen(link).read()
return (html, get_PDF_content(query_string, link))
except Exception as e:
print(link + " threw the following exception " + str(e))
def parser_iter(query_string, linkList):
"""
Parses the URLs in linkList using a timeout of 60 seconds on each page (a la try_one) and yields them as dictionaries.
Parameters
----------
query_string : string
the generating query, default is "test"
linkList : list of strings
list of URLs for the documents you would like to parse
Returns
-------
iterator of dicts
* dict['text'] (string) : the visible text on the web page
* dict['html'] (bytes) : the HTML code of the page (if it is HTML based)
* dict['pdf'] (bytes) : the PDF code of the page (if it is PDF based)
"""
for link in linkList:
print("...{:.2f}% done, processing link {}: {}".format(((linkList.index(link)+1)/len(linkList))*100,linkList.index(link), link))
doc = {'url' : link, 'query': query_string }
parsed_page = try_one(parse_single_page, 60, link=link, query_string=query_string)
if parsed_page != None:
if link[-4:] != '.pdf':
doc['html'] = parsed_page[0]
doc['text'] = parsed_page[1]
else:
doc['pdf'] = parsed_page[0]
doc['text'] = parsed_page[1]
yield doc
def contain(sent,word_list):
#you can figure it out
for i in range(len(word_list)):
if word_list[i] in sent:
return True
return False
def eightk_parser(link):
"""
Parses an SEC document known as an 8-K
Parameters
----------
link : string
the URL for the 8-K
Returns
-------
string
the important text for the 8-K
"""
try:
html = urllib.request.urlopen(link).read()
text_list = nltk.sent_tokenize(text_from_html(html).replace("\n", "."))
# print(text_list)
start = False
stop = False
info = ''
for sent in text_list:
if stop:
return info
elif contain(sent,['SIGNATURE']):
#print('end')
stop = True
elif start:
info += sent
elif contain(sent,['Item','ITEM']):
#print('start')
start = True
return info
except Exception as e:
print('{} threw an the following exception during 8K parsing {}'.format(link, str(e)))
def ex21_parser(link):
"""
Parses an SEC document known as an EX-21
Parameters
----------
link : string
the URL for the EX-21
Returns
-------
list of strings
the subsidiaries in the company listed in the EX-21
"""
try:
body = urllib.request.urlopen(link).read()
soup = BeautifulSoup(body, 'lxml')
table = soup.findAll('table')
if table != []:
sub_list = []
for t in table:
row = t.findAll('tr')
for r in row[1:]:
division = r.findAll('td')
#for d in division[0]:
if len(division) > 0:
d = division[0]
desc = d.get_text().strip('\n')
sub_list.append(desc)
if sub_list != []:
for i in range(len(sub_list)):
sub_list[i] = sub_list[i].replace("\xa0", " ").replace("\n", "").strip()
return sub_list
else:
html = urllib.request.urlopen(link).read()
text_list = text_from_html(html).splitlines()
for i in range(len(text_list)):
text_list[i] = re.sub('[\s][\s]+[\S]+', "", text_list[i].replace("\xa0", " ").replace("\n", "").replace("-", "").strip())
while "" in text_list:
try:
text_list.remove("")
except:
pass
return text_list
else:
html = urllib.request.urlopen(link).read()
text_list = text_from_html(html).splitlines()
for i in range(len(text_list)):
text_list[i] = re.sub('[\s][\s]+[\S]+', "", text_list[i].replace("\xa0", " ").replace("\n", "").replace("-", "").strip())
while "" in text_list:
try:
text_list.remove("")
except:
pass
return text_list
except Exception as e:
print('{} threw an the following exception during EX21 parsing {}'.format(link, str(e)))
def tenk_parser(link):
"""
Parses an SEC document known as an 10-K
Parameters
----------
link : string
the URL for the 10-K
Returns
-------
string
the important information in the 10-K
"""
try:
html = urllib.request.urlopen(link).read()
text_list = nltk.sent_tokenize(text_from_html(html))
start = False
stop = False
info = ''
for sent in text_list:
if contain(sent,'PART I') and contain(sent,'Item 1'):
start = True
if contain(sent,'Item 1A') and contain(sent,'PART I'):
stop = True
if stop:
return info
if start:
info += sent
except Exception as e:
print('{} threw an the following exception during 10K parsing {}'.format(link, str(e)))
def wikiParser(company):
"""
Search the Wikipedia page for a company and get wikipedia infobox
together with all other contents
Parameters
----------
company: str
the company you would like to query Wikipedia for
Returns
-------
tuple
dict
a dictionary of all other contents on wikipedia
dict
a dictionary of wikipedia infobox
str
page title
str
page url
beautifulsoup.table
wikipedia infobox HTML
"""
wiki_page = {}
wiki_table = {}
try:
page = wiki.page(title = company)
except:
print("Reading the wiki page, {} was not possible".format(company))
return (wiki_page, wiki_table, "", "", "<ul></ul>")
secs = page.sections
for sec in secs:
wiki_page[sec] = page.section(sec)
# Do the wikipedia table
link = page.url
body = urllib.request.urlopen(link).read()
soup = BeautifulSoup(body, 'lxml')
title = soup.find('title')
if title != None:
title = str(title).replace("<title>", "").replace("</title>", "").replace("- Wikipedia", "").strip()
try:
table = soup.find('table',{'class':'infobox vcard'})
rows = table.find_all('tr')
for row in rows:
right = row.find_all('td')
left = row.find_all('th')
for head, elem in zip(left, right):
filler = unicodedata.normalize("NFKD", head.get_text(strip=True))
els = elem.find_all('li')
if len(els) != 0:
temp_list = []
for el in els:
temp_list.append(unicodedata.normalize("NFKD",re.sub('\[[^()]*\]', "", el.get_text(strip=True))))
wiki_table[filler] = temp_list
elif head.text == "Founded":
wiki_table[filler] = unicodedata.normalize("NFKD",elem.get_text(strip=True).split(";", 1)[0])
elif elem.text != "":
wiki_table[filler] = unicodedata.normalize("NFKD",re.sub('\[[^()]*\]', "",elem.get_text(strip=True)))
except:
print("Wikipedia Table does not exist for {}".format(company))
return (wiki_page, wiki_table, title, link, table)
def main():
# pm = ProfileManager()
# for company in pm:
# print("Now getting information for {}".format(company['name']))
# print(wiki_parser(company['name']))
parse_single_page("http://journals.plos.org/plosone/article?id=10.1371/journal.pone.0128193")
# (wiki_page, wiki_table) = wikiParser_new('Apple Inc')
# print(wiki_page)
# print(wiki_table)
if __name__ == "__main__" :
main()