Skip to content

Commit

Permalink
Simplify parameter distribution (#148)
Browse files Browse the repository at this point in the history
  • Loading branch information
andig authored May 14, 2020
1 parent 40ea628 commit f45abfe
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 73 deletions.
58 changes: 20 additions & 38 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,25 +128,13 @@ func checkVersion() {
}
}

var teeIsChained bool // controles piping of first channel in teed chain

// tee splits a tee channel from an input channel and starts a goroutine
// for duplicateing channel values to replacement input and tee channel
func tee(in chan core.Param) (chan core.Param, <-chan core.Param) {
gen := make(chan core.Param)
tee := make(chan core.Param)

go func(teeIsChained bool) {
for i := range gen {
if teeIsChained {
in <- i
}
tee <- i
// handle UI update requests
func handleUI(triggerChan <-chan struct{}, loadPoints []*core.LoadPoint) {
for range triggerChan {
for _, lp := range loadPoints {
lp.Update()
}
}(teeIsChained)

teeIsChained = true
return gen, tee
}
}

func run(cmd *cobra.Command, args []string) {
Expand Down Expand Up @@ -174,8 +162,7 @@ func run(cmd *cobra.Command, args []string) {
loadPoints := loadConfig(conf, notificationChan)

// start broadcasting values
valueChan := make(chan core.Param)
triggerChan := make(chan struct{})
tee := &Tee{}

// setup influx
if viper.Get("influx") != nil {
Expand All @@ -187,28 +174,32 @@ func run(cmd *cobra.Command, args []string) {
conf.Influx.Password,
)

var teeChan <-chan core.Param
valueChan, teeChan = tee(valueChan)

// eliminate duplicate values
dedupe := server.NewDeduplicator(30*time.Minute, "socCharge")
teeChan = dedupe.Pipe(teeChan)
pipeChan := dedupe.Pipe(tee.Attach())

// reduce number of values written to influx
limiter := server.NewLimiter(5 * time.Second)
teeChan = limiter.Pipe(teeChan)
pipeChan = limiter.Pipe(pipeChan)

go influx.Run(teeChan)
go influx.Run(pipeChan)
}

// create webserver
socketHub := server.NewSocketHub()
httpd := server.NewHTTPd(uri, conf.Menu, loadPoints[0], socketHub)

var teeChan <-chan core.Param
valueChan, teeChan = tee(valueChan)
triggerChan := make(chan struct{})

go socketHub.Run(teeChan, triggerChan)
// handle UI update requests whenever browser connects
go handleUI(triggerChan, loadPoints)

// publish to UI
go socketHub.Run(tee.Attach(), triggerChan)

// setup values channel
valueChan := make(chan util.Param)
go tee.Run(valueChan)

// start all loadpoints
for _, lp := range loadPoints {
Expand All @@ -217,14 +208,5 @@ func run(cmd *cobra.Command, args []string) {
go lp.Run(conf.Interval)
}

// handle UI update requests whenever browser connects
go func() {
for range triggerChan {
for _, lp := range loadPoints {
lp.Update()
}
}
}()

log.FATAL.Println(httpd.ListenAndServe())
}
29 changes: 29 additions & 0 deletions cmd/tee.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package cmd

import "github.com/andig/evcc/util"

// Tee distributed parameters to subscribers
type Tee struct {
recv []chan<- util.Param
}

// Attach creates a new receiver channel and attaches it to the tee
func (t *Tee) Attach() <-chan util.Param {
out := make(chan util.Param)
t.Add(out)
return out
}

// Add attaches a receiver channel to the tee
func (t *Tee) Add(out chan<- util.Param) {
t.recv = append(t.recv, out)
}

// Run starts parameter distribution
func (t *Tee) Run(in <-chan util.Param) {
for msg := range in {
for _, recv := range t.recv {
recv <- msg
}
}
}
7 changes: 0 additions & 7 deletions core/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@ func (n *nilVal) String() string {
return "—"
}

// Param is the broadcast channel data type
type Param struct {
LoadPoint string
Key string
Val interface{}
}

// Configuration is the loadpoint feature structure
type Configuration struct {
Mode string `json:"mode"`
Expand Down
8 changes: 4 additions & 4 deletions core/loadpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Config struct {
Enable, Disable ThresholdConfig
}

// ThresholdConfig defines enable/disable hysteresis paramters
// ThresholdConfig defines enable/disable hysteresis parameters
type ThresholdConfig struct {
Delay time.Duration
Threshold float64
Expand All @@ -72,7 +72,7 @@ type LoadPoint struct {
bus evbus.Bus // event bus
triggerChan chan struct{} // API updates
notificationChan chan<- push.Event // notifications
uiChan chan<- Param // client push messages
uiChan chan<- util.Param // client push messages

Config `mapstructure:",squash"` // exposed public configuration
ChargerHandler `mapstructure:",squash"` // handle charger state and current
Expand Down Expand Up @@ -169,7 +169,7 @@ func (lp *LoadPoint) notify(event string, attributes map[string]interface{}) {

// publish sends values to UI and databases
func (lp *LoadPoint) publish(key string, val interface{}) {
lp.uiChan <- Param{
lp.uiChan <- util.Param{
LoadPoint: lp.Name,
Key: key,
Val: val,
Expand Down Expand Up @@ -228,7 +228,7 @@ func (lp *LoadPoint) evChargeCurrentHandler(current int64) {
}

// Prepare loadpoint configuration by adding missing helper elements
func (lp *LoadPoint) Prepare(uiChan chan<- Param, notificationChan chan<- push.Event) {
func (lp *LoadPoint) Prepare(uiChan chan<- util.Param, notificationChan chan<- push.Event) {
lp.notificationChan = notificationChan
lp.uiChan = uiChan

Expand Down
3 changes: 2 additions & 1 deletion core/loadpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/andig/evcc/mock"
"github.com/andig/evcc/provider"
"github.com/andig/evcc/push"
"github.com/andig/evcc/util"
"github.com/benbjohnson/clock"
"github.com/golang/mock/gomock"
)
Expand Down Expand Up @@ -66,7 +67,7 @@ func newLoadPoint(charger api.Charger, pv, gm, cm api.Meter) *LoadPoint {
lp.chargeMeter = cm
}

uiChan := make(chan Param)
uiChan := make(chan util.Param)
notificationChan := make(chan push.Event)

lp.Prepare(uiChan, notificationChan)
Expand Down
3 changes: 1 addition & 2 deletions server/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"sync"
"time"

"github.com/andig/evcc/core"
"github.com/andig/evcc/util"
influxdb "github.com/influxdata/influxdb1-client/v2"
)
Expand Down Expand Up @@ -131,7 +130,7 @@ func (m *Influx) asyncWriter(exit <-chan struct{}) <-chan struct{} {
}

// Run Influx publisher
func (m *Influx) Run(in <-chan core.Param) {
func (m *Influx) Run(in <-chan util.Param) {
exit := make(chan struct{}) // exit signals to stop writer
done := m.asyncWriter(exit) // done signals writer stopped

Expand Down
16 changes: 8 additions & 8 deletions server/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package server
import (
"time"

"github.com/andig/evcc/core"
"github.com/andig/evcc/util"
"github.com/benbjohnson/clock"
)

// Piper is the interface that data flow plugins must implement
type Piper interface {
Pipe(in <-chan core.Param) <-chan core.Param
Pipe(in <-chan util.Param) <-chan util.Param
}

type cacheItem struct {
Expand Down Expand Up @@ -41,7 +41,7 @@ func NewDeduplicator(interval time.Duration, filter ...string) Piper {
return l
}

func (l *Deduplicator) pipe(in <-chan core.Param, out chan<- core.Param) {
func (l *Deduplicator) pipe(in <-chan util.Param, out chan<- util.Param) {
for p := range in {
// use loadpoint + param.Key as lookup key to value cache
key := p.LoadPoint + "." + p.Key
Expand All @@ -58,8 +58,8 @@ func (l *Deduplicator) pipe(in <-chan core.Param, out chan<- core.Param) {
}

// Pipe creates a new filtered output channel for given input channel
func (l *Deduplicator) Pipe(in <-chan core.Param) <-chan core.Param {
out := make(chan core.Param)
func (l *Deduplicator) Pipe(in <-chan util.Param) <-chan util.Param {
out := make(chan util.Param)
go l.pipe(in, out)
return out
}
Expand All @@ -82,7 +82,7 @@ func NewLimiter(interval time.Duration) Piper {
return l
}

func (l *Limiter) pipe(in <-chan core.Param, out chan<- core.Param) {
func (l *Limiter) pipe(in <-chan util.Param, out chan<- util.Param) {
for p := range in {
// use loadpoint + param.Key as lookup key to value cache
key := p.LoadPoint + "." + p.Key
Expand All @@ -97,8 +97,8 @@ func (l *Limiter) pipe(in <-chan core.Param, out chan<- core.Param) {
}

// Pipe creates a new filtered output channel for given input channel
func (l *Limiter) Pipe(in <-chan core.Param) <-chan core.Param {
out := make(chan core.Param)
func (l *Limiter) Pipe(in <-chan util.Param) <-chan util.Param {
out := make(chan util.Param)
go l.pipe(in, out)
return out
}
20 changes: 10 additions & 10 deletions server/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"testing"
"time"

"github.com/andig/evcc/core"
"github.com/andig/evcc/util"
"github.com/benbjohnson/clock"
)

Expand All @@ -14,10 +14,10 @@ func TestLimiter(t *testing.T) {
clck := clock.NewMock()
l.clock = clck

in := make(chan core.Param)
in := make(chan util.Param)
out := l.Pipe(in)

p := core.Param{Key: "k", Val: 1}
p := util.Param{Key: "k", Val: 1}
in <- p

if o := <-out; o.Key != p.Key || o.Val != p.Val {
Expand Down Expand Up @@ -57,39 +57,39 @@ func TestDeduplicator(t *testing.T) {
clck := clock.NewMock()
l.clock = clck

in := make(chan core.Param)
in := make(chan util.Param)
out := l.Pipe(in)

p := core.Param{Key: "k", Val: 1}
p := util.Param{Key: "k", Val: 1}
in <- p

if o := <-out; o.Key != p.Key || o.Val != p.Val {
t.Errorf("unexpected param %v", o)
}

p = core.Param{Key: "k", Val: 2}
p = util.Param{Key: "k", Val: 2}
in <- p

if o := <-out; o.Key != p.Key || o.Val != p.Val {
t.Errorf("unexpected param %v", o)
}

// allow nils
p = core.Param{Key: "k", Val: nil}
p = util.Param{Key: "k", Val: nil}
in <- p

if o := <-out; o.Key != p.Key || o.Val != p.Val {
t.Errorf("unexpected param %v", o)
}

p = core.Param{Key: "filtered", Val: 3}
p = util.Param{Key: "filtered", Val: 3}
in <- p

if o := <-out; o.Key != p.Key || o.Val != p.Val {
t.Errorf("unexpected param %v", o)
}

p = core.Param{Key: "filtered", Val: 4}
p = util.Param{Key: "filtered", Val: 4}
in <- p

if o := <-out; o.Key != p.Key || o.Val != p.Val {
Expand All @@ -115,7 +115,7 @@ func TestDeduplicator(t *testing.T) {
}

// allow nils
p = core.Param{Key: "filtered", Val: nil}
p = util.Param{Key: "filtered", Val: nil}
in <- p

if o := <-out; o.Key != p.Key || o.Val != p.Val {
Expand Down
6 changes: 3 additions & 3 deletions server/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"net/http"
"time"

"github.com/andig/evcc/core"
"github.com/andig/evcc/util"
"github.com/gorilla/websocket"
)

Expand Down Expand Up @@ -98,7 +98,7 @@ func encode(v interface{}) (string, error) {
return s, nil
}

func (h *SocketHub) broadcast(i core.Param) {
func (h *SocketHub) broadcast(i util.Param) {
if len(h.clients) > 0 {
val, err := encode(i.Val)
if err != nil {
Expand All @@ -118,7 +118,7 @@ func (h *SocketHub) broadcast(i core.Param) {
}

// Run starts data and status distribution
func (h *SocketHub) Run(in <-chan core.Param, triggerChan chan<- struct{}) {
func (h *SocketHub) Run(in <-chan util.Param, triggerChan chan<- struct{}) {
for {
select {
case client := <-h.register:
Expand Down
8 changes: 8 additions & 0 deletions util/param.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package util

// Param is the broadcast channel data type
type Param struct {
LoadPoint string
Key string
Val interface{}
}

0 comments on commit f45abfe

Please sign in to comment.