-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhttp.go
154 lines (136 loc) · 4.02 KB
/
http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package gotelem
import (
"encoding/json"
"fmt"
"math"
"net/http"
)
var DefaultHTTPPublisher *HTTPPublisher = NewHTTPPublisher(300)
// The HTTP Publisher receives and stores the N latest
// observations. It implements ServeHTTP and will expose the available
// time series as JSON data.
type HTTPPublisher struct {
baseURL string
inbox chan *Observation
keep int
series map[string]*observationFIFOQueue
}
func (h *HTTPPublisher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fmt.Println(r.URL)
localPath := r.URL.Path
//println("localPath", localPath)
w.Header().Set("Content-Type", "application/json")
if localPath == "/" {
h.RespondAvailableSeries(w, r)
} else if localPath == "/series" {
h.RespondSelectedSeries(w, r)
}
}
func (h *HTTPPublisher) receiverChannel() chan<- *Observation {
return h.inbox
}
type timeseries struct {
Name string
URL string
}
func (h *HTTPPublisher) RespondAvailableSeries(w http.ResponseWriter, r *http.Request) {
series := make([]*timeseries, len(h.series))
i := 0
fmt.Println("url=", r.URL.String())
baseUrl := h.baseURL + "/series?q="
for k, _ := range h.series {
series[i] = ×eries{k, baseUrl + k}
i++
}
encoder := json.NewEncoder(w)
encoder.Encode(series)
}
type TimeSeries map[string][]TimeSeriesPoint
type TimeSeriesPoint struct {
Timestamp int64
Value float64
}
func (h *HTTPPublisher) RespondSelectedSeries(w http.ResponseWriter, r *http.Request) {
selected := r.URL.Query()["q"]
fmt.Println("selected=", selected)
result := make(map[string][]TimeSeriesPoint)
for _, name := range selected {
seriesData := h.series[name]
if seriesData == nil {
continue
}
observations := seriesData.values()
points := make([]TimeSeriesPoint, len(observations))
for i, o := range observations {
points[i] = TimeSeriesPoint{o.Timestamp.UnixNano(), o.Value}
}
result[name] = points
}
encoder := json.NewEncoder(w)
encoder.Encode(result)
}
func NewHTTPPublisher(keep int) *HTTPPublisher {
publisher := &HTTPPublisher{
keep: keep,
inbox: make(chan *Observation, 256),
series: make(map[string]*observationFIFOQueue)}
go publisher.processInbox()
return publisher
}
func (h *HTTPPublisher) SetBaseURL(url string) {
h.baseURL = url
}
func (h *HTTPPublisher) processInbox() {
for {
o := <-h.inbox
rb := h.series[o.Name]
if rb == nil {
rb = newObservationRingBuffer(h.keep)
h.series[o.Name] = rb
}
rb.update(o)
}
}
func newObservationRingBuffer(keep int) *observationFIFOQueue {
return newObservationRingBufferWithCapacity(keep, 1.3)
}
func newObservationRingBufferWithCapacity(keep int, capacityMultiplier float64) *observationFIFOQueue {
if capacityMultiplier < 1.0 {
panic("newObservationRingBuffer: capacityMultiplier cannot be < 1.0")
}
capacity := int(math.Ceil(float64(keep) * capacityMultiplier))
store := make([]*Observation, 0, capacity)
return &observationFIFOQueue{store: store, keep: keep}
}
// FIFO Queue with max size. Reuses the same slice for the lifetime of
// the queue to avoid generating garbage.
type observationFIFOQueue struct {
store []*Observation
keep int
oldestAt int
}
func (rb *observationFIFOQueue) values() (values []*Observation) {
values = make([]*Observation, len(rb.store)-rb.oldestAt)
copy(values, rb.store[rb.oldestAt:])
return
}
func (rb *observationFIFOQueue) update(o *Observation) {
//println(int(o.Value))
if len(rb.store) == cap(rb.store) {
// Buffer is full, move all data to start of buffer and reset
// oldestAt. Note that we discard the oldest value by slicing
// from oldestAt + 1.
//println("enter copy", int(o.Value), rb.oldestAt, len(rb.store), cap(rb.store))
copy(rb.store, rb.store[rb.oldestAt+1:])
rb.store = rb.store[0 : rb.keep-1]
//println("copied", rb.oldestAt, len(rb.store), cap(rb.store))
rb.oldestAt = 0
}
// Discard the oldest value. Note that right after a copy oldestAt
// will always be 1 less than keep and we already discarded the
// oldest value in the copy operation.
if len(rb.store) >= rb.keep {
rb.oldestAt++
}
rb.store = append(rb.store, o)
}