-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
Copy pathbench.py
150 lines (125 loc) · 3.6 KB
/
bench.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from peewee import *
db = SqliteDatabase(':memory:')
#db = PostgresqlDatabase('peewee_test', host='127.0.0.1', port=26257, user='root')
#db = PostgresqlDatabase('peewee_test', host='127.0.0.1', user='postgres')
class Base(Model):
class Meta:
database = db
class Register(Base):
value = IntegerField()
class Collection(Base):
name = TextField()
class Item(Base):
collection = ForeignKeyField(Collection, backref='items')
name = TextField()
import functools
import time
def timed(fn):
@functools.wraps(fn)
def inner(*args, **kwargs):
times = []
N = 10
for i in range(N):
start = time.perf_counter()
fn(i, *args, **kwargs)
times.append(time.perf_counter() - start)
print('%0.3f ... %s' % (round(sum(times) / N, 3), fn.__name__))
return inner
def populate_register(s, n):
for i in range(s, n):
Register.create(value=i)
def populate_collections(n, n_i):
for i in range(n):
c = Collection.create(name=str(i))
for j in range(n_i):
Item.create(collection=c, name=str(j))
@timed
def insert(i):
with db.atomic():
populate_register((i * 1000), (i + 1) * 1000)
@timed
def batch_insert(i):
it = range(i * 1000, (i + 1) * 1000)
for i in db.batch_commit(it, 100):
Register.insert(value=i).execute()
@timed
def bulk_insert(i):
with db.atomic():
for i in range(i * 1000, (i + 1) * 1000, 100):
data = [(j,) for j in range(i, i + 100)]
Register.insert_many(data, fields=[Register.value]).execute()
@timed
def bulk_create(i):
with db.atomic():
data = [Register(value=i) for i in range(i * 1000, (i + 1) * 1000)]
Register.bulk_create(data, batch_size=100)
@timed
def select(i):
query = Register.select()
for row in query:
pass
@timed
def select_related_dbapi_raw(i):
query = Item.select(Item, Collection).join(Collection)
cursor = db.execute(query)
for row in cursor:
pass
@timed
def insert_related(i):
with db.atomic():
populate_collections(30, 60)
@timed
def select_related(i):
query = Item.select(Item, Collection).join(Collection)
for item in query:
pass
@timed
def select_related_left(i):
query = Collection.select(Collection, Item).join(Item, JOIN.LEFT_OUTER)
for collection in query:
pass
@timed
def select_related_dicts(i):
query = Item.select(Item, Collection).join(Collection).dicts()
for row in query:
pass
@timed
def select_related_objects(i):
query = Item.select(Item, Collection).join(Collection).objects()
for item in query:
pass
@timed
def select_prefetch(i):
query = prefetch(Collection.select(), Item)
for c in query:
for i in c.items:
pass
@timed
def select_prefetch_join(i):
query = prefetch(Collection.select(), Item,
prefetch_type=PREFETCH_TYPE.JOIN)
for c in query:
for i in c.items:
pass
if __name__ == '__main__':
db.create_tables([Register, Collection, Item])
insert()
insert_related()
Register.delete().execute()
batch_insert()
assert Register.select().count() == 10000
Register.delete().execute()
bulk_insert()
assert Register.select().count() == 10000
Register.delete().execute()
bulk_create()
assert Register.select().count() == 10000
select()
select_related()
select_related_left()
select_related_objects()
select_related_dicts()
select_related_dbapi_raw()
select_prefetch()
select_prefetch_join()
db.drop_tables([Register, Collection, Item])