-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathIndexRatings.py
39 lines (28 loc) · 1.09 KB
/
IndexRatings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import csv
from collections import deque
import elasticsearch
from elasticsearch import helpers
def readMovies():
csvfile = open('ml-latest-small/movies.csv', 'r')
reader = csv.DictReader( csvfile )
titleLookup = {}
for movie in reader:
titleLookup[movie['movieId']] = movie['title']
return titleLookup
def readRatings():
csvfile = open('ml-latest-small/ratings.csv', 'r')
titleLookup = readMovies()
reader = csv.DictReader( csvfile )
for line in reader:
rating = {}
rating['user_id'] = int(line['userId'])
rating['movie_id'] = int(line['movieId'])
rating['title'] = titleLookup[line['movieId']]
rating['rating'] = float(line['rating'])
rating['timestamp'] = int(line['timestamp'])
yield rating
es = elasticsearch.Elasticsearch()
es.indices.delete(index="ratings",ignore=404)
#deque(helpers.parallel_bulk(es,readRatings(),index="ratings",doc_type="rating"), maxlen=0)
deque(helpers.parallel_bulk(es,readRatings(),index="ratings"), maxlen=0)
es.indices.refresh()