-
Notifications
You must be signed in to change notification settings - Fork 68
/
Copy pathshared_buffer_test.go
127 lines (102 loc) · 2.01 KB
/
shared_buffer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package channels
import "testing"
func TestSharedBufferSingleton(t *testing.T) {
buf := NewSharedBuffer(3)
ch := buf.NewChannel()
for i := 0; i < 5; i++ {
ch.In() <- (*int)(nil)
ch.In() <- (*int)(nil)
ch.In() <- (*int)(nil)
select {
case ch.In() <- (*int)(nil):
t.Error("Wrote to full shared-buffer")
default:
}
<-ch.Out()
<-ch.Out()
<-ch.Out()
select {
case <-ch.Out():
t.Error("Read from empty shared-buffer")
default:
}
}
ch.Close()
buf.Close()
}
func TestSharedBufferMultiple(t *testing.T) {
buf := NewSharedBuffer(3)
ch1 := buf.NewChannel()
ch2 := buf.NewChannel()
ch1.In() <- (*int)(nil)
ch1.In() <- (*int)(nil)
ch1.In() <- (*int)(nil)
select {
case ch2.In() <- (*int)(nil):
t.Error("Wrote to full shared-buffer")
case <-ch2.Out():
t.Error("Read from empty channel")
default:
}
<-ch1.Out()
for i := 0; i < 10; i++ {
ch2.In() <- (*int)(nil)
select {
case ch1.In() <- (*int)(nil):
t.Error("Wrote to full shared-buffer")
case ch2.In() <- (*int)(nil):
t.Error("Wrote to full shared-buffer")
default:
}
<-ch2.Out()
}
<-ch1.Out()
<-ch1.Out()
ch1.Close()
ch2.Close()
buf.Close()
}
func TestSharedBufferConcurrent(t *testing.T) {
const threads = 10
const iters = 200
buf := NewSharedBuffer(3)
done := make(chan bool)
for i := 0; i < threads; i++ {
go func() {
ch := buf.NewChannel()
for i := 0; i < iters; i++ {
ch.In() <- i
val := <-ch.Out()
if val.(int) != i {
t.Error("Mismatched value out of channel")
}
}
ch.Close()
done <- true
}()
}
for i := 0; i < threads; i++ {
<-done
}
close(done)
buf.Close()
}
func ExampleSharedBuffer() {
// never more than 3 elements in the pipeline at once
buf := NewSharedBuffer(3)
ch1 := buf.NewChannel()
ch2 := buf.NewChannel()
// or, instead of a straight pipe, implement your pipeline step
Pipe(ch1, ch2)
// inputs
go func() {
for i := 0; i < 20; i++ {
ch1.In() <- i
}
ch1.Close()
}()
for _ = range ch2.Out() {
// outputs
}
buf.Close()
}