Skip to content

Commit

Permalink
Merge pull request #171 from andig/feature/websocket
Browse files Browse the repository at this point in the history
Add websocket handler
  • Loading branch information
andig authored May 20, 2020
2 parents 4f033fb + 7ecfe9d commit 7bc8727
Show file tree
Hide file tree
Showing 8 changed files with 372 additions and 88 deletions.
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ EVCC is an extensible EV Charge Controller with PV integration implemented in [G
- [MQTT](#mqtt-readwrite)
- [Script](#script-readwrite)
- [HTTP](#http-readwrite)
- [Websocket](#websocket-read-only)
- [Combined status](#combined-status-read-only)
- [Developer information](#developer-information)
- [Background](#background)
Expand Down Expand Up @@ -374,7 +375,7 @@ Sample configuration:
type: mqtt
topic: mbmd/sdm1-1/Power
timeout: 30s # don't accept values older than timeout
scale: 0.001 # floating point factor applied to result, e.g. for kW to W conversion
scale: 0.001 # floating point factor applied to result, e.g. for Wh to kWh conversion
```

Sample write configuration:
Expand Down Expand Up @@ -435,6 +436,20 @@ Sample write configuration:
body: %v # only applicable for PUT or POST requests
```

### Websocket (read only)

The `websocket` plugin implements a web socket listener. Includes the ability to read and parse JSON using jq-like queries. It can for example be used to receive messages from Volkszähler's push server.

Sample configuration (read only):

```yaml
type: http
uri: ws://<volkszaehler host:port>/socket
jq: .data | select(.uuid=="<uuid>") .tuples[0][1] # parse message json
scale: 0.001 # floating point factor applied to result, e.g. for Wh to kWh conversion
timeout: 30s # error if no update received in 30 seconds
```

### Combined status (read only)

The `combined` status plugin is used to convert a mixed boolean status of plugged/charging into an EVCC-compatible charger status of A..F. It is typically used together with OpenWB MQTT integration.
Expand Down
111 changes: 53 additions & 58 deletions meter/sma.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package meter

import (
"errors"
"fmt"
"sync"
"time"

Expand All @@ -15,22 +15,27 @@ const (
waitTimeout = 50 * time.Millisecond // interval when waiting for initial value
)

// SMA supporting SMA Home Manager 2.0 and SMA Energy Meter 30
type SMA struct {
log *util.Logger
uri string
serial string
// values bundles SMA readings
type values struct {
power float64
energy float64
currentL1 float64
currentL2 float64
currentL3 float64
powerO sma.Obis
energyO sma.Obis
updated time.Time
recv chan sma.Telegram
mux sync.Mutex
once sync.Once
}

// SMA supporting SMA Home Manager 2.0 and SMA Energy Meter 30
type SMA struct {
log *util.Logger
uri string
serial string
values values
powerO sma.Obis
energyO sma.Obis
updated time.Time
recv chan sma.Telegram
mux sync.Mutex
once sync.Once
}

// NewSMAFromConfig creates a SMA Meter from generic config
Expand Down Expand Up @@ -79,69 +84,51 @@ func NewSMA(uri, serial, power, energy string) api.Meter {
return sm
}

// waitForInitialValue makes sure we don't start with an error
func (sm *SMA) waitForInitialValue() {
sm.mux.Lock()
defer sm.mux.Unlock()

if sm.updated.IsZero() {
sm.log.TRACE.Print("waiting for initial value")

// wait for initial update
for sm.updated.IsZero() {
sm.mux.Unlock()
time.Sleep(waitTimeout)
sm.mux.Lock()
}
}
}

// update the actual meter data
func (sm *SMA) updateMeterValues(msg sma.Telegram) {
sm.mux.Lock()
defer sm.mux.Unlock()

if sm.powerO != "" {
// use user-defined obis
if power, ok := msg.Values[sm.powerO]; ok {
sm.power = power
sm.values.power = power
sm.updated = time.Now()
}
} else {
sm.power = msg.Values[sma.ImportPower] - msg.Values[sma.ExportPower]
sm.values.power = msg.Values[sma.ImportPower] - msg.Values[sma.ExportPower]
sm.updated = time.Now()
}

if sm.energyO != "" {
if energy, ok := msg.Values[sm.energyO]; ok {
sm.energy = energy
sm.values.energy = energy
sm.updated = time.Now()
} else {
sm.log.WARN.Println("missing obis for energy")
}
}

if currentL1, ok := msg.Values[sma.CurrentL1]; ok {
sm.currentL1 = currentL1
sm.values.currentL1 = currentL1
sm.updated = time.Now()
} else {
sm.log.WARN.Println("missing obis for currentL1")
}

if currentL2, ok := msg.Values[sma.CurrentL2]; ok {
sm.currentL2 = currentL2
sm.values.currentL2 = currentL2
sm.updated = time.Now()
} else {
sm.log.WARN.Println("missing obis for currentL2")
}

if currentL3, ok := msg.Values[sma.CurrentL3]; ok {
sm.currentL3 = currentL3
sm.values.currentL3 = currentL3
sm.updated = time.Now()
} else {
sm.log.WARN.Println("missing obis for currentL3")
}

sm.mux.Unlock()
}

// receive processes the channel message containing the multicast data
Expand All @@ -155,30 +142,45 @@ func (sm *SMA) receive() {
}
}

// CurrentPower implements the Meter.CurrentPower interface
func (sm *SMA) CurrentPower() (float64, error) {
sm.once.Do(sm.waitForInitialValue)
// waitForInitialValue makes sure we don't start with an error
func (sm *SMA) waitForInitialValue() {
sm.mux.Lock()
defer sm.mux.Unlock()

if time.Since(sm.updated) > udpTimeout {
return 0, errors.New("recv timeout")
}
if sm.updated.IsZero() {
sm.log.TRACE.Print("waiting for initial value")

return sm.power, nil
// wait for initial update
for sm.updated.IsZero() {
sm.mux.Unlock()
time.Sleep(waitTimeout)
sm.mux.Lock()
}
}
}

// Currents implements the MeterCurrent interface
func (sm *SMA) Currents() (float64, float64, float64, error) {
func (sm *SMA) hasValue() (values, error) {
sm.once.Do(sm.waitForInitialValue)
sm.mux.Lock()
defer sm.mux.Unlock()

if time.Since(sm.updated) > udpTimeout {
return 0, 0, 0, errors.New("recv timeout")
if elapsed := time.Since(sm.updated); elapsed > udpTimeout {
return values{}, fmt.Errorf("recv timeout: %v", elapsed.Truncate(time.Second))
}

return sm.currentL1, sm.currentL2, sm.currentL3, nil
return sm.values, nil
}

// CurrentPower implements the Meter.CurrentPower interface
func (sm *SMA) CurrentPower() (float64, error) {
values, err := sm.hasValue()
return values.power, err
}

// Currents implements the MeterCurrent interface
func (sm *SMA) Currents() (float64, float64, float64, error) {
values, err := sm.hasValue()
return values.currentL1, sm.values.currentL2, sm.values.currentL3, err
}

// SMAEnergy decorates SMA with api.MeterEnergy interface
Expand All @@ -188,13 +190,6 @@ type SMAEnergy struct {

// TotalEnergy implements the api.MeterEnergy interface
func (sm *SMAEnergy) TotalEnergy() (float64, error) {
sm.once.Do(sm.waitForInitialValue)
sm.mux.Lock()
defer sm.mux.Unlock()

if time.Since(sm.updated) > udpTimeout {
return 0, errors.New("recv timeout")
}

return sm.energy, nil
values, err := sm.hasValue()
return values.energy, err
}
16 changes: 8 additions & 8 deletions meter/sma_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ func TestSMAUpdateMeterValues(t *testing.T) {
}

sm.updateMeterValues(tt.messsage)
if sm.power != tt.wantPower {
t.Errorf("Listener.processMessage() got Power %v, want %v", sm.power, tt.wantPower)
if sm.values.power != tt.wantPower {
t.Errorf("Listener.processMessage() got Power %v, want %v", sm.values.power, tt.wantPower)
}

if sm.currentL1 != tt.wantCurrentL1 {
t.Errorf("Listener.processMessage() got CurrentL1 %v, want %v", sm.currentL1, tt.wantCurrentL1)
if sm.values.currentL1 != tt.wantCurrentL1 {
t.Errorf("Listener.processMessage() got CurrentL1 %v, want %v", sm.values.currentL1, tt.wantCurrentL1)
}

if sm.currentL2 != tt.wantCurrentL2 {
t.Errorf("Listener.processMessage() got CurrentL2 %v, want %v", sm.currentL2, tt.wantCurrentL2)
if sm.values.currentL2 != tt.wantCurrentL2 {
t.Errorf("Listener.processMessage() got CurrentL2 %v, want %v", sm.values.currentL2, tt.wantCurrentL2)
}

if sm.currentL3 != tt.wantCurrentL3 {
t.Errorf("Listener.processMessage() got CurrentL3 %v, want %v", sm.currentL3, tt.wantCurrentL3)
if sm.values.currentL3 != tt.wantCurrentL3 {
t.Errorf("Listener.processMessage() got CurrentL3 %v, want %v", sm.values.currentL3, tt.wantCurrentL3)
}

})
Expand Down
8 changes: 8 additions & 0 deletions provider/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func NewFloatGetterFromConfig(log *util.Logger, config Config) (res FloatGetter)
res = NewCalcFromConfig(log, config.Other)
case "http":
res = NewHTTPProviderFromConfig(log, config.Other).FloatGetter
case "websocket", "ws":
res = NewSocketProviderFromConfig(log, config.Other).FloatGetter
case "mqtt":
pc := mqttFromConfig(log, config.Other)
res = MQTT.FloatGetter(pc.Topic, pc.Scale, pc.Timeout)
Expand All @@ -90,6 +92,8 @@ func NewIntGetterFromConfig(log *util.Logger, config Config) (res IntGetter) {
switch strings.ToLower(config.Type) {
case "http":
res = NewHTTPProviderFromConfig(log, config.Other).IntGetter
case "websocket", "ws":
res = NewSocketProviderFromConfig(log, config.Other).IntGetter
case "mqtt":
pc := mqttFromConfig(log, config.Other)
res = MQTT.IntGetter(pc.Topic, int64(pc.Scale), pc.Timeout)
Expand All @@ -113,6 +117,8 @@ func NewStringGetterFromConfig(log *util.Logger, config Config) (res StringGette
switch strings.ToLower(config.Type) {
case "http":
res = NewHTTPProviderFromConfig(log, config.Other).StringGetter
case "websocket", "ws":
res = NewSocketProviderFromConfig(log, config.Other).StringGetter
case "mqtt":
pc := mqttFromConfig(log, config.Other)
res = MQTT.StringGetter(pc.Topic, pc.Timeout)
Expand All @@ -136,6 +142,8 @@ func NewBoolGetterFromConfig(log *util.Logger, config Config) (res BoolGetter) {
switch strings.ToLower(config.Type) {
case "http":
res = NewHTTPProviderFromConfig(log, config.Other).BoolGetter
case "websocket", "ws":
res = NewSocketProviderFromConfig(log, config.Other).BoolGetter
case "mqtt":
pc := mqttFromConfig(log, config.Other)
res = MQTT.BoolGetter(pc.Topic, pc.Timeout)
Expand Down
2 changes: 0 additions & 2 deletions provider/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

// HTTP implements HTTP request provider
type HTTP struct {
log *util.Logger
*util.HTTPHelper
url, method string
headers map[string]string
Expand Down Expand Up @@ -57,7 +56,6 @@ func NewHTTPProviderFromConfig(log *util.Logger, other map[string]interface{}) *
logger := util.NewLogger("http")

p := &HTTP{
log: logger,
HTTPHelper: util.NewHTTPHelper(logger),
url: cc.URI,
method: cc.Method,
Expand Down
41 changes: 22 additions & 19 deletions provider/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,21 +249,30 @@ func (h *msgHandler) waitForInitialValue() {
}
}

func (h *msgHandler) floatGetter() (float64, error) {
func (h *msgHandler) hasValue() (string, error) {
h.once.Do(h.waitForInitialValue)
h.mux.Lock()
defer h.mux.Unlock()

if elapsed := time.Since(h.updated); h.timeout != 0 && elapsed > h.timeout {
return 0, fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second))
return "", fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second))
}

val, err := strconv.ParseFloat(h.payload, 64)
return h.payload, nil
}

func (h *msgHandler) floatGetter() (float64, error) {
v, err := h.hasValue()
if err != nil {
return 0, fmt.Errorf("%s invalid: '%s'", h.topic, h.payload)
return 0, err
}

return h.scale * val, nil
f, err := strconv.ParseFloat(v, 64)
if err != nil {
return 0, fmt.Errorf("%s invalid: '%s'", h.topic, v)
}

return f * h.scale, nil
}

func (h *msgHandler) intGetter() (int64, error) {
Expand All @@ -272,25 +281,19 @@ func (h *msgHandler) intGetter() (int64, error) {
}

func (h *msgHandler) stringGetter() (string, error) {
h.once.Do(h.waitForInitialValue)
h.mux.Lock()
defer h.mux.Unlock()

if elapsed := time.Since(h.updated); h.timeout != 0 && elapsed > h.timeout {
return "", fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second))
v, err := h.hasValue()
if err != nil {
return "", err
}

return string(h.payload), nil
return string(v), nil
}

func (h *msgHandler) boolGetter() (bool, error) {
h.once.Do(h.waitForInitialValue)
h.mux.Lock()
defer h.mux.Unlock()

if elapsed := time.Since(h.updated); h.timeout != 0 && elapsed > h.timeout {
return false, fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second))
v, err := h.hasValue()
if err != nil {
return false, err
}

return util.Truish(string(h.payload)), nil
return util.Truish(v), nil
}
Loading

0 comments on commit 7bc8727

Please sign in to comment.