-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtemplate.py
541 lines (438 loc) · 21.7 KB
/
template.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
import nltk, inspect, sys, hashlib, itertools
import numpy as np
from nltk.corpus import brown
# module for computing a Conditional Frequency Distribution
from nltk.probability import ConditionalFreqDist
# module for computing a Conditional Probability Distribution
from nltk.probability import ConditionalProbDist, LidstoneProbDist
from nltk.tag import map_tag
from adrive2 import trim_and_warn
assert map_tag('brown', 'universal', 'NR-TL') == 'NOUN', '''
Brown-to-Universal POS tag map is out of date.'''
class HMM:
def __init__(self, train_data):
"""
Initialise a new instance of the HMM.
:param train_data: The training dataset, a list of sentences with tags
:type train_data: list(list(tuple(str,str)))
"""
self.train_data = train_data
# Emission and transition probability distributions
self.emission_PD = None
self.transition_PD = None
self.states = []
self.viterbi = []
self.backpointer = []
# Q1
# Compute emission model using ConditionalProbDist with a LidstoneProbDist estimator.
# To achieve the latter, pass a function
# as the probdist_factory argument to ConditionalProbDist.
# This function should take 3 arguments
# and return a LidstoneProbDist initialised with +0.001 as gamma and an extra bin.
# See the documentation/help for ConditionalProbDist to see what arguments the
# probdist_factory function is called with.
def emission_model(self, train_data):
"""Compute an emission model based on labelled training data.
Don't forget to lowercase the observation otherwise it mismatches the test data.
:param train_data: The training dataset, a list of sentences with tags
:type train_data: list(list(tuple(str,str)))
:return: The emission probability distribution and a list of the states
:rtype: Tuple[ConditionalProbDist, list(str)]
"""
# Don't forget to lowercase the observation otherwise it mismatches the test data
# Do NOT add <s> or </s> to the input sentences
data = []
for s in train_data:
for (word, tag) in s:
data.append((tag, word.lower()))
if (tag not in self.states):
self.states.append(tag)
emission_FD = ConditionalFreqDist(data)
lidstone_PD = lambda emission_FD: LidstoneProbDist(emission_FD, 0.001, 1 + emission_FD.B())
self.emission_PD = ConditionalProbDist(emission_FD, lidstone_PD)
return self.emission_PD, self.states
# Q1
# Access function for testing the emission model
# For example model.elprob('VERB','is') might be -1.4
def elprob(self, state, word):
"""
The log of the estimated probability of emitting a word from a state
:param state: the state name
:type state: str
:param word: the word
:type word: str
:return: log base 2 of the estimated emission probability
:rtype: float
"""
return self.emission_PD[state].logprob(word)
# Q2
# Compute transition model using ConditionalProbDist with the same
# estimator as above (but without the extra bin)
# See comments for emission_model above for details on the estimator.
def transition_model(self, train_data):
"""
Compute a transition model using a ConditionalProbDist based on
labelled data.
:param train_data: The training dataset, a list of sentences with tags
:type train_data: list(list(tuple(str,str)))
:return: The transition probability distribution
:rtype: ConditionalProbDist
"""
# The data object should be an array of tuples of conditions and observations,
# in our case the tuples will be of the form (tag_(i),tag_(i+1)).
# DON'T FORGET TO ADD THE START SYMBOL </s> and the END SYMBOL </s>
data = []
for s in train_data:
data.append(("<s>", s[0][1]))
for i in range(len(s) - 1):
data.append((s[i][1], s[i + 1][1]))
data.append((s[len(s)-1][1], "</s>"))
transition_FD = ConditionalFreqDist(data)
lidstone_PD = lambda transition_FD: LidstoneProbDist(transition_FD, 0.001)
self.transition_PD = ConditionalProbDist(transition_FD, lidstone_PD)
return self.transition_PD
# Q2
# Access function for testing the transition model
# For example model.tlprob('VERB','VERB') might be -2.4
def tlprob(self, state1, state2):
"""
The log of the estimated probability of a transition from one state to another
:param state1: the first state name
:type state1: str
:param state2: the second state name
:type state2: str
:return: log base 2 of the estimated transition probability
:rtype: float
"""
return self.transition_PD[state1].logprob(state2)
# Train the HMM
def train(self):
"""
Trains the HMM from the training data
"""
self.emission_model(self.train_data)
self.transition_model(self.train_data)
# Part B: Implementing the Viterbi algorithm.
# Q3
# Initialise data structures for tagging a new sentence.
# Describe the data structures with comments.
# Use the models stored in the variables: self.emission_PD and self.transition_PD
# Input: first word in the sentence to tag and the total number of observations.
def initialise(self, observation, number_of_observations):
"""
Initialise data structures self.viterbi and self.backpointer for tagging a new sentence.
:param observation: the first word in the sentence to tag
:type observation: str
:param number_of_observations: the number of observations
:type number_of_observations: int
"""
# Initialise step 0 of viterbi, including
# transition from <s> to observation
# use costs (- log-base-2 probabilities)
# Initialise step 0 of backpointer
viterbi = np.zeros((len(self.states), number_of_observations))
self.backpointer = [[""] * (number_of_observations) for _ in range(len(self.states))]
for i in range(len(self.states)):
viterbi[i][0] = - (self.tlprob('<s>', self.states[i]) + self.elprob(self.states[i], observation))
for i in range(len(self.states)):
self.backpointer[i][0] = 0
self.viterbi = viterbi
"""
The data structure of viterbi is a 2d numpy array. The data type is numpy.float64. The number of rows equal to
the number of states. The number of columns equal to the number of observations. It could be assessed by first
calculating the index of the state. Then self.viterbi[index][step] will return the viterbi value of the specific
state in specific step.
The data structure of backpointer is a 2d array. The data type is string. The number of rows equal to the number
of states. The number of columns equal to the number of observations. It could be assessed by first
calculating the index of the state. Then self.backpointer[index][step] will return the backpointer for
the state at a given step.
"""
return
# Q3
# Access function for testing the viterbi data structure
# For example model.get_viterbi_value('VERB',2) might be 6.42
def get_viterbi_value(self, state, step):
"""
Return the current value from self.viterbi for
the state (tag) at a given step
:param state: A tag name
:type state: str
:param step: The (0-origin) number of a step: if negative,
counting backwards from the end, i.e. -1 means the last step
:type step: int
:return: The value (a cost) for state as of step
:rtype: float
"""
index = self.states.index(state)
return self.viterbi[index][step]
# Q3
# Access function for testing the backpointer data structure
# For example model.get_backpointer_value('VERB',2) might be 'NOUN'
def get_backpointer_value(self, state, step):
"""
Return the current backpointer from self.backpointer for
the state (tag) at a given step
:param state: A tag name
:type state: str
:param step: The (0-origin) number of a step: if negative,
counting backwards from the end, i.e. -1 means the last step
:type step: int
:return: The state name to go back to at step-1
:rtype: str
"""
index = self.states.index(state)
return self.backpointer[index][step]
# Q4a
# Tag a new sentence using the trained model and already initialised data structures.
# Use the models stored in the variables: self.emission_PD and self.transition_PD.
# Update the self.viterbi and self.backpointer data structures.
# Describe your implementation with comments.
def tag(self, observations):
"""
Tag a new sentence using the trained model and already initialised data structures.
:param observations: List of words (a sentence) to be tagged
:type observations: list(str)
:return: List of tags corresponding to each word of the input
"""
tags = []
# Use costs, not probabilities
for t in range(1, len(observations)):
for i in range(len(self.states)):
value = np.zeros(len(self.states))
for j in range(len(self.states)):
state_from = self.states[j]
state_to = self.states[i]
trans = self.viterbi[j][t - 1] - self.tlprob(state_from, state_to)
value[j] = trans - self.elprob(state_to, observations[t])
self.viterbi[i][t] = value.min()
self.backpointer[i][t] = self.states[value.argmin()]
# Add a termination step with cost based solely on cost of transition to </s> , end of sentence.
value = np.zeros(len(self.states))
for i in range(len(self.states)):
state_from = self.states[i]
state_to = "</s>"
value[i] = (self.viterbi[i][len(observations)-1]) - self.tlprob(state_from, state_to)
best_path_pointer = self.states[value.argmin()]
tags.append(best_path_pointer)
# Reconstruct the tag sequence using the backpointers.
# Return the tag sequence corresponding to the best path as a list.
# The order should match that of the words in the sentence.
n = len(observations) - 1
while (n >= 1):
prev = self.get_backpointer_value(best_path_pointer, n)
tags.insert(0, prev)
best_path_pointer = prev
n -= 1
return tags
def tag_sentence(self, sentence):
"""
Initialise the HMM, lower case and tag a sentence. Returns a list of tags.
:param sentence: the sentence
:type sentence: list(str)
:rtype: list(str)
"""
for i in range(len(sentence)):
sentence[i] = sentence[i].lower()
self.initialise(sentence[0], len(sentence))
return self.tag(sentence)
def answer_question4b():
"""
Report a hand-chosen tagged sequence that is incorrect, correct it
and discuss
:rtype: list(tuple(str,str)), list(tuple(str,str)), str
:return: incorrectly tagged sequence, correctly tagged sequence and your answer [max 280 chars]
"""
# One sentence, i.e. a list of word/tag pairs, in two versions
# 1) As tagged by your HMM
# 2) With wrong tags corrected by hand
tagged_sequence = [('``', '.'), ('My', 'DET'), ('taste', 'NOUN'), ('is', 'VERB'), ('gaudy', 'ADV'), ('.', '.')]
correct_sequence = [('``', '.'), ('My', 'DET'), ('taste', 'NOUN'), ('is', 'VERB'), ('gaudy', 'ADJ'), ('.', '.')]
# Why do you think the tagger tagged this example incorrectly?
answer = inspect.cleandoc("""The word 'gaudy' might rare in the train data as ADJ. Therefore, self.elprob('ADJ', 'gaudy') is low. There might more VERB followed by ADV than ADJ in the training data. The probability of transition model self.tlprob('VERB', 'ADV') is higher than self.tlprob('VERB', 'ADJ').
""")
return tagged_sequence, correct_sequence, trim_and_warn("Q4a", 280, answer)
# Q5a
def hard_em(labeled_data, unlabeled_data, k):
"""
Run k iterations of hard EM on the labeled and unlabeled data.
Follow the pseudo-code in the coursework instructions.
:param labeled_data:
:param unlabeled_data:
:param k: number of iterations
:type k: int
:return: HMM model trained with hard EM.
:rtype: HMM
"""
hmm = HMM(labeled_data)
hmm.train()
for i in range(k):
next_training_data = labeled_data.copy()
for s in unlabeled_data:
hmm_tagged_sentence = hmm.tag_sentence(s)
next_training_data.append(list(zip(s, hmm_tagged_sentence)))
hmm = HMM(next_training_data)
hmm.train()
return hmm
def answer_question5b():
"""
Sentence: In fact he seemed delighted to get rid of them .
Gold POS: ADP NOUN PRON VERB VERB PRT VERB ADJ ADP PRON .
T_0 : PRON VERB NUM ADP ADJ PRT VERB NUM ADP PRON .
T_k : PRON VERB PRON VERB ADJ PRT VERB NUM ADP NOUN .
1) T_0 erroneously tagged "he" as "NUM" and T_k correctly identifies it as "PRON".
Speculate why additional unlabeled data might have helped in that case.
Refer to the training data (inspect the 20 sentences!).
2) Where does T_k mislabel a word but T_0 is correct? Why do you think did hard EM hurt in that case?
:rtype: str
:return: your answer [max 500 chars]
"""
return trim_and_warn("Q5b", 500, inspect.cleandoc("""The emission model for ‘he’ is the same for all states in the t0 since ‘he’ doesn’t appear in the training set. ‘he’ is tagged as ‘Num’ because of the bias transition model. After a few iterations, the transition model tlprob(‘VERB’, ‘PRON’) and the emission model elprob(‘PRON’, ‘he’) might be higher. The last word ‘them’ is mislabeled by tk. Harm EM might overfit the data. tlprob(NUM, ‘ADP’) and tlprob(‘NOUN’, ‘.’) might be much larger than elprob(‘PRON’, ‘them’). Therefore, mistag the label.
"""))
def answer_question6():
"""
Suppose you have a hand-crafted grammar that has 100% coverage on
constructions but less than 100% lexical coverage.
How could you use a POS tagger to ensure that the grammar
produces a parse for any well-formed sentence,
even when it doesn't recognise the words within that sentence?
:rtype: str
:return: your answer [max 500 chars]
"""
return trim_and_warn("Q6", 500, inspect.cleandoc("""We use the pos-tagger to tag the whole sentences and extract the tags for unseen words. We can calculate the emission probability from the tags to those words. For each unseen word, combine the emission probability of this word with the original probability of words in the same tags then normalise the probability."""))
def answer_question7():
"""
Why else, besides the speedup already mentioned above, do you think we
converted the original Brown Corpus tagset to the Universal tagset?
What do you predict would happen if we hadn't done that? Why?
:rtype: str
:return: your answer [max 500 chars]
"""
return trim_and_warn("Q7", 500, inspect.cleandoc("""\
We converted the original Brown Corpus tagset to the Universal tagset because the Universal tagset has fewer tags. More tags being used meaning sparser data will be generated. For example, a word might have several tags in the Brown tagset. Compared to the same word with fewer tags in the Universal tagset. The emission probability of the word and each of its tags will be lower. Therefore, the model using Brown tagset will be less confident while tagging the word and have a lower accuracy.
"""))
def compute_acc(hmm, test_data, print_mistakes):
"""
Computes accuracy (0.0 - 1.0) of model on some data.
:param hmm: the HMM
:type hmm: HMM
:param test_data: the data to compute accuracy on.
:type test_data: list(list(tuple(str, str)))
:param print_mistakes: whether to print the first 10 model mistakes
:type print_mistakes: bool
:return: float
"""
correct = 0
incorrect = 0
count = 0
for sentence in test_data:
s = [word for (word, tag) in sentence]
tags = hmm.tag_sentence(s)
wrong = False
for ((word, gold), tag) in zip(sentence, tags):
if tag == gold:
correct += 1
else:
wrong = True
incorrect += 1
if print_mistakes and wrong and (count < 10):
print(s)
count += 1
return float(correct) / (correct + incorrect)
# Useful for testing
def isclose(a, b, rel_tol=1e-09, abs_tol=0.0):
# http://stackoverflow.com/a/33024979
return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)
def answers():
global tagged_sentences_universal, test_data_universal, \
train_data_universal, model, test_size, train_size, ttags, \
correct, incorrect, accuracy, \
good_tags, bad_tags, answer4b, answer5, answer6, answer7, answer5b, \
t0_acc, tk_acc
# Load the Brown corpus with the Universal tag set.
tagged_sentences_universal = brown.tagged_sents(categories='news', tagset='universal')
# Divide corpus into train and test data.
test_size = 500
train_size = len(tagged_sentences_universal) - test_size
# tail test set
test_data_universal = tagged_sentences_universal[-test_size:] # [:test_size]
train_data_universal = tagged_sentences_universal[:train_size] # [test_size:]
if hashlib.md5(''.join(map(lambda x: x[0],
train_data_universal[0] + train_data_universal[-1] + test_data_universal[0] +
test_data_universal[-1])).encode(
'utf-8')).hexdigest() != '164179b8e679e96b2d7ff7d360b75735':
print('!!!test/train split (%s/%s) incorrect -- this should not happen, please contact a TA !!!' % (
len(train_data_universal), len(test_data_universal)), file=sys.stderr)
# Create instance of HMM class and initialise the training set.
model = HMM(train_data_universal)
# Train the HMM.
model.train()
# Some preliminary sanity checks
# Use these as a model for other checks
e_sample = model.elprob('VERB', 'is')
if not (type(e_sample) == float and e_sample <= 0.0):
print('elprob value (%s) must be a log probability' % e_sample, file=sys.stderr)
t_sample = model.tlprob('VERB', 'VERB')
if not (type(t_sample) == float and t_sample <= 0.0):
print('tlprob value (%s) must be a log probability' % t_sample, file=sys.stderr)
if not (type(model.states) == list and \
len(model.states) > 0 and \
type(model.states[0]) == str):
print('model.states value (%s) must be a non-empty list of strings' % model.states, file=sys.stderr)
print('states: %s\n' % model.states)
######
# Try the model, and test its accuracy [won't do anything useful
# until you've filled in the tag method
######
s = 'the cat in the hat came back'.split()
ttags = model.tag_sentence(s)
print("Tagged a trial sentence:\n %s" % list(zip(s, ttags)))
v_sample = model.get_viterbi_value('VERB', 5)
if not (type(v_sample) == float and 0.0 <= v_sample):
print('viterbi value (%s) must be a cost' % v_sample, file=sys.stderr)
b_sample = model.get_backpointer_value('VERB', 5)
if not (type(b_sample) == str and b_sample in model.states):
print('backpointer value (%s) must be a state name' % b_sample, file=sys.stderr)
# check the model's accuracy (% correct) using the test set
accuracy = compute_acc(model, test_data_universal, print_mistakes=True)
print('\nTagging accuracy for test set of %s sentences: %.4f' % (test_size, accuracy))
# Tag the sentence again to put the results in memory for automarker.
model.tag_sentence(s)
# Question 5a
# Set aside the first 20 sentences of the training set
num_sentences = 20
semi_supervised_labeled = train_data_universal[:num_sentences] # type list(list(tuple(str, str)))
semi_supervised_unlabeled = [[word for (word, tag) in sent] for sent in train_data_universal[num_sentences:]] # type list(list(str))
print("Running hard EM for Q5a. This may take a while...")
t0 = hard_em(semi_supervised_labeled, semi_supervised_unlabeled, 0) # 0 iterations
tk = hard_em(semi_supervised_labeled, semi_supervised_unlabeled, 3)
print("done.")
t0_acc = compute_acc(t0, test_data_universal, print_mistakes=False)
tk_acc = compute_acc(tk, test_data_universal, print_mistakes=False)
print('\nTagging accuracy of T_0: %.4f' % (t0_acc))
print('\nTagging accuracy of T_k: %.4f' % (tk_acc))
########
# Print answers for 4b, 5b, 6 and 7.
bad_tags, good_tags, answer4b = answer_question4b()
print('\nA tagged-by-your-model version of a sentence:')
print(bad_tags)
print('The tagged version of this sentence from the corpus:')
print(good_tags)
print('\nDiscussion of the difference:')
print(answer4b)
answer5b = answer_question5b()
print("\nFor Q5b:")
print(answer5b)
answer6 = answer_question6()
print('\nFor Q6:')
print(answer6)
answer7 = answer_question7()
print('\nFor Q7:')
print(answer7)
if __name__ == '__main__':
if len(sys.argv) > 1 and sys.argv[1] == '--answers':
import adrive2
from autodrive_embed import run, carefulBind
with open("userErrs.txt", "w") as errlog:
run(globals(), answers, adrive2.a2answers, errlog)
else:
answers()