-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue_test.go
249 lines (211 loc) · 5.06 KB
/
queue_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
package bucket
import (
"fmt"
"runtime"
"sync"
"testing"
"golang.org/x/exp/rand"
"golang.org/x/sync/errgroup"
)
// TestTxQueuePushPopSingle pushes then pops a single element in a queue.
func TestTxQueuePushPopSingle(t *testing.T) {
t.Parallel()
var queue txQueue
// Push a test element.
pushed := queue.push(func(tx Transaction) error {
return nil
})
// Pop it back off the queue.
switch popped := queue.pop(); popped {
case nil:
t.Error("pushed element was lost")
case pushed:
default:
t.Error("popped an element that we never pushed")
}
// Verify that the queue is empty.
switch popped := queue.pop(); popped {
case nil:
case pushed:
t.Error("unexpectedly popped the pushed element later")
default:
t.Error("popped an element that was never pushed when queue should be empty")
}
}
// TestTxQueuePushRemoveSingle pushes then removes a single element in a queue.
func TestTxQueuePushRemoveSingle(t *testing.T) {
t.Parallel()
var queue txQueue
// Push a test element.
pushed := queue.push(func(tx Transaction) error {
return nil
})
// Remove it from the queue.
queue.remove(pushed)
// Verify that the queue is empty.
switch popped := queue.pop(); popped {
case nil:
case pushed:
t.Error("unexpectedly popped the pushed element after removal")
default:
t.Error("popped an element that was never pushed when queue should be empty")
}
}
// TestTxQueuePushPopMany pushes then pops many elements in a queue, verifying that they are processed in the correct order.
func TestTxQueuePushPopMany(t *testing.T) {
t.Parallel()
// Push many elements onto a queue.
var queue txQueue
var elems []*txQueueElem
var i int
for len(elems) < 100 {
j := len(elems) + 1
elems = append(elems, queue.push(func(tx Transaction) error {
// Check that this was run in the correct order.
prev := i
i = j
if prev != j-1 {
return fmt.Errorf("ran %d after %d", j, i)
}
return nil
}))
}
// Pop everything off of the queue and verify the order.
var n int
var nerrs int
for {
elem := queue.pop()
if elem == nil {
break
}
defer elem.done()
err := elem.fn(nil)
if err != nil {
t.Error(err)
nerrs++
if nerrs == 10 {
t.Error("too many errors")
return
}
}
n++
}
if n != len(elems) {
t.Errorf("pushed %d elements but popped %d", n, len(elems))
}
}
// TestTxQueuePushRemoveMany pushes then removes many elements in a queue.
func TestTxQueuePushRemoveMany(t *testing.T) {
t.Parallel()
// Push many elements onto the queue.
var queue txQueue
var elems []*txQueueElem
for len(elems) < 100 {
elems = append(elems, queue.push(func(tx Transaction) error {
return nil
}))
}
// Randomly shuffle the elements.
rand.New(rand.NewSource(4)).Shuffle(len(elems), func(i, j int) {
elems[i], elems[j] = elems[j], elems[i]
})
// Remove the elements in the shuffled order.
for _, e := range elems {
queue.remove(e)
e.done()
}
}
// TestTxQueuePushPopConcurrent pushes many elements concurrently while popping them.
func TestTxQueuePushPopConcurrent(t *testing.T) {
t.Parallel()
var queue txQueue
var wg sync.WaitGroup
defer wg.Wait()
// Concurrently push things onto the queue.
n := 100
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
elem := queue.push(func(tx Transaction) error {
n--
return nil
})
<-elem.commitNotify
elem.done()
}()
}
// Pop the elements (while the pushes are maybe still running).
for n > 0 {
elem := queue.pop()
if elem == nil {
// Wait a short time for stuff to happen.
runtime.Gosched()
continue
}
elem.commitNotify <- elem.fn(nil)
}
}
// TestTxQueuePushRemoveConcurrent pushes and removes many elements concurrently.
func TestTxQueuePushRemoveConcurrent(t *testing.T) {
t.Parallel()
// Run a bunch of operations concurrently.
var queue txQueue
var group errgroup.Group
var mu sync.Mutex
for i := 0; i < 100; i++ {
group.Go(func() error {
// Push an element without holding a lock.
tx := queue.push(func(tx Transaction) error {
return nil
})
// Remove the element from the queue under a lock.
mu.Lock()
defer mu.Unlock()
queue.remove(tx)
tx.done()
return nil
})
}
err := group.Wait()
if err != nil {
t.Error(err)
}
}
// TestTxQueuePushPopRemoveConcurrent pushes and pops/removes many elements concurrently.
func TestTxQueueConcurrentPushPopRemove(t *testing.T) {
t.Parallel()
var queue txQueue
var group errgroup.Group
var mu sync.Mutex
for i := 0; i < 100; i++ {
group.Go(func() error {
// Push an element without holding a lock.
tx := queue.push(func(tx Transaction) error {
return nil
})
// Acquire the lock.
mu.Lock()
defer mu.Unlock()
select {
case err := <-tx.commitNotify:
// This element has already been popped.
tx.done()
return err
default:
}
// Remove this element from the queue.
queue.remove(tx)
tx.done()
// Pop the next element and notify it (if present).
if next := queue.pop(); next != nil {
next.commitNotify <- next.fn(nil)
}
return nil
})
}
err := group.Wait()
if err != nil {
t.Error(err)
}
}