-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathchunk.go
194 lines (166 loc) · 4.51 KB
/
chunk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package sqlog
import (
"sync"
"sync/atomic"
"time"
)
// Entry represents a formatted log entry
type Entry struct {
Time time.Time
Level int8
Content []byte
}
// Chunk stores up to 900 log entries that will be persisted in the storage
type Chunk struct {
mu sync.RWMutex // (next, Depth()) only
id int32 // The identifier of this chunk
cap int32 // Configured batch size
book int32 // Number of scheduled writes in this chunk
write int32 // Number of writes completed
size int64 // Size of content (in bytes)
epochStart int64 // First epoch
epochEnd int64 // Last epoch
retries int32 // Number of attempts to persist in the storage
locked atomic.Bool // Indicates if this chunk no longer accepts writes
next *Chunk // Pointer to the next chunk
entries [900]*Entry // The log entries in this chunk
}
// NewChunk creates a new chunk with the specified capacity
func NewChunk(cap int32) *Chunk {
return &Chunk{cap: min(max(1, cap), 900)}
}
// ID returns the identifier of this chunk
func (c *Chunk) ID() int32 {
return c.id
}
// Next returns the next chunk in the sequence
func (c *Chunk) Next() *Chunk {
c.Init(1) // ensures the next chunk is initialized
return c.next
}
// Size returns the size of this chunk (in bytes)
func (c *Chunk) Size() int64 {
return atomic.LoadInt64(&c.size)
}
// Size returns the number of flush retries for this chunk
func (c *Chunk) Retries() int32 {
return atomic.LoadInt32(&c.retries)
}
// Init initializes the next chunks
func (c *Chunk) Init(depth uint8) {
if depth > 0 {
if c.next == nil {
c.mu.Lock()
if c.cap <= 0 {
c.cap = 900
}
if c.next == nil {
c.next = &Chunk{cap: c.cap}
c.next.id = c.id + 1
}
c.mu.Unlock()
}
depth--
if depth > 0 {
c.next.Init(depth)
}
}
}
// Depth retrieves the number of non-empty chunks
func (c *Chunk) Depth() int {
if c.Empty() {
return 0
}
c.mu.RLock()
n := c.next
c.mu.RUnlock()
if n == nil {
return 1
}
return 1 + n.Depth()
}
// First retrieves the epoch of the first entry in this chunk
func (c *Chunk) First() int64 {
return c.epochStart
}
// Last retrieves the epoch of the last entry in this chunk
func (c *Chunk) Last() int64 {
return c.epochEnd
}
// TTL retrieves the age of the last log entry inserted in this chunk
func (c *Chunk) TTL() time.Duration {
index := atomic.LoadInt32(&c.write) - 1
if index < 0 || c.Empty() {
return 0
}
last := c.entries[index].Time
if last.IsZero() {
return 0
}
return time.Since(last)
}
// Ready indicates if this chunk is full and ready for a flush
func (c *Chunk) Ready() bool {
w := atomic.LoadInt32(&c.write)
if w == c.cap {
return true
}
b := atomic.LoadInt32(&c.book)
return (b > 0 && w == b && c.locked.Load())
}
// Empty indicates if no write attempts have been made
func (c *Chunk) Empty() bool {
return atomic.LoadInt32(&c.book) == 0
}
// Lock prevents further writes to this chunk.
// From this point on, writes will occur in the next chunk.
// This chunk will be ready for flushing after write confirmation.
func (c *Chunk) Lock() {
c.locked.Store(true)
}
// Locked checks if this chunk is locked for writing
func (c *Chunk) Locked() bool {
return c.locked.Load()
}
// List retrieves the list of written entries
func (c *Chunk) List() []*Entry {
return c.entries[:atomic.LoadInt32(&c.write)]
}
// Put attempts to write the log entry into this chunk.
// Returns the chunk that accepted the entry.
// If the chunk that accepted the entry is the same, it returns false in the second parameter.
func (c *Chunk) Put(e *Entry) (into *Chunk, isFull bool) {
if c.locked.Load() {
// chunk is locked
i, _ := c.Next().Put(e)
return i, true
}
index := (atomic.AddInt32(&c.book, 1) - 1)
if index > (c.cap - 1) {
// chunk is full
i, _ := c.Next().Put(e)
return i, true
}
defer atomic.AddInt32(&c.write, 1)
// safe write
c.entries[index] = e // @TODO: test to ensure it is safe
entryEpoch := e.Time.Unix()
if last := c.epochEnd; entryEpoch > last {
atomic.CompareAndSwapInt64(&c.epochEnd, last, entryEpoch)
}
if first := c.epochStart; (first == 0 || entryEpoch < first) && !atomic.CompareAndSwapInt64(&c.epochStart, first, entryEpoch) {
// unlikely to happen
for i := 0; i < 3; i++ {
first = c.epochStart
if entryEpoch < first {
if atomic.CompareAndSwapInt64(&c.epochStart, first, entryEpoch) {
break
}
} else {
break
}
}
}
atomic.AddInt64(&c.size, int64(len(e.Content)))
return c, false
}