-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbatch.go
155 lines (130 loc) · 3.43 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package bucket
import "io"
// batchTx is a transaction implementation used to implement batching.
// It adds an extra layer of overlay over a normal transaction.
type batchTx struct {
// base is the base transaction.
base *tx
// version is a version number used for sanity checking.
version uint64
// overlay holds updated key-value pairs.
// An empty value represents a delete.
overlay map[string]string
}
var _ Transaction = (*batchTx)(nil)
func (tx *batchTx) Get(key string) (string, error) {
switch {
case tx.base == nil:
// This transaction is being used after it has been committed/aborted.
panic("access to finished transaction")
case key == "":
// An empty key is not representable, and can be assumed to not be present.
return "", nil
case len(key) >= 1<<30:
// An 1 GiB+ key is not representable, and can be assumed to not be present.
return "", nil
}
// Look in the overlay first.
if v, ok := tx.overlay[key]; ok {
return v, nil
}
// Look in the base transaction.
return tx.base.Get(key)
}
func (tx *batchTx) Each(fn func(key, value string) error) error {
if tx.base == nil {
// This transaction is being used after it has been committed/aborted.
panic("access to finished transaction")
}
// Save the starting version for sanity checking.
startVer := tx.version
// Start by processing the unmodified data.
err := tx.base.Each(func(key, value string) error {
if tx.version != startVer {
// The tx was modified.
panic("tx modified during each")
}
if _, ok := tx.overlay[key]; ok {
// The key is modified in the overlay.
// It will be processed later.
return nil
}
// Process the pair.
return fn(key, value)
})
if err != nil {
return err
}
// Process the modified data.
for k, v := range tx.overlay {
if tx.version != startVer {
// The tx was modified.
panic("tx modified during each")
}
if v == "" {
// This is a deletion marker.
continue
}
// Process the modified pair.
err := fn(k, v)
if err != nil {
return err
}
}
if tx.version != startVer {
// The tx was modified.
panic("tx modified during each")
}
return nil
}
func (tx *batchTx) WriteTo(w io.Writer) (int64, error) {
txw := startTx(w)
err := tx.Each(txw.writePair)
if err != nil {
return int64(txw.abort()), err
}
n, err := txw.finish()
return int64(n), err
}
func (tx *batchTx) Set(key, value string) error {
// Update the version number.
tx.version++
switch {
case tx.base == nil:
// This transaction is being used after it has been committed/aborted.
panic("access to finished transaction")
case key == "":
// An empty key is not representable.
return errEmptyKey
case value == "":
// An empty value is not representable.
return errEmptyValue
case len(key) >= 1<<30:
// The key is too large to be represented.
return errKeyTooLarge
case len(value) >= 1<<30:
// The value is too large to be represented.
return errValueTooLarge
}
// Update the overlay.
tx.overlay[key] = value
return nil
}
func (tx *batchTx) Delete(key string) error {
// Update the version number.
tx.version++
switch {
case tx.base == nil:
// This transaction is being used after it has been committed/aborted.
panic("access to finished transaction")
case key == "":
// An empty key is not representable.
return nil
case len(key) >= 1<<30:
// The key is too large to be represented.
return nil
}
// Insert a deletion marker in the overlay.
tx.overlay[key] = ""
return nil
}