-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathq1-consumer.py
69 lines (64 loc) · 2.87 KB
/
q1-consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import pulsar, _pulsar
from datetime import datetime
import time
PULSAR_IP = '192.168.2.139'
if __name__ == '__main__':
#Pulsar setup
client = pulsar.Client('pulsar://' + PULSAR_IP + ':6650')
consumer = client.subscribe('DE2-lang', subscription_name='DE-Q1', consumer_type=_pulsar.ConsumerType.Shared)
agg_producer = client.create_producer('DE2-agg')
agg_producer_name = agg_producer.producer_name()
#language list
language = {}
##List of producers to the listening topic
producer_list = []
#Aggregation message
agg_msg = {}
agg_msg['type'] = 'Q1'
msg_count = 0
frequency = 100 #frequency of printing top list/send update
continue_flag = True
while continue_flag:
msg = consumer.receive()
msg_count += 1
now = datetime.now().strftime("%Y/%m/%d,%H:%M:%S")
try:
producer_name = msg.properties()['producer']
content = msg.data().decode('utf-8')
#Receive finish signal from producer
if content == 'finish':
if producer_name in producer_list:
print("[%s] A producer finished its job: %s" %(now, producer_name))
producer_list.remove(producer_name) #Remove finished producer
#If no producer is working
if not producer_list:
agg_msg['result'] = language
#Update the latest result to the aggregation server
agg_producer.send(str(agg_msg).encode('utf-8'), properties={'producer': agg_producer_name})
continue_flag = False
else:
if producer_name not in producer_list:
print("[%s] New producer: %s" %(now, producer_name))
producer_list.append(producer_name)
if content in language.keys():
language[content] += 1
else:
language[content] = 1
#Periodically print out list of languages and project counts
if msg_count % frequency == 1:
print("[%s] Current list of language count from %d messages:" %(now,msg_count))
print(language, "\n")
#Craft message to the aggregation server
agg_msg['result'] = language
#Send aggregation message
agg_producer.send(str(agg_msg).encode('utf-8'), properties={'producer': agg_producer_name})
consumer.acknowledge(msg)
except:
consumer.negative_acknowledge(msg)
time.sleep(1)
#Send ending signal to aggregation server
agg_producer.send("finish".encode('utf-8'), properties={'producer': agg_producer_name})
print("Fisnished all available jobs! Quitting...")
# Destroy pulsar client
agg_producer.close()
client.close()