From 68cceb840470954f0dbd6d2856cabbaf91c2a89a Mon Sep 17 00:00:00 2001 From: andig Date: Wed, 20 May 2020 09:21:38 +0200 Subject: [PATCH 1/3] Add Websocket plugin --- README.md | 17 +++- provider/config.go | 8 ++ provider/socket.go | 217 +++++++++++++++++++++++++++++++++++++++++++++ util/jq/jq.go | 48 ++++++++++ 4 files changed, 289 insertions(+), 1 deletion(-) create mode 100644 provider/socket.go diff --git a/README.md b/README.md index e7ee196324..89eb16c2e2 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ EVCC is an extensible EV Charge Controller with PV integration implemented in [G - [MQTT](#mqtt-readwrite) - [Script](#script-readwrite) - [HTTP](#http-readwrite) + - [Websocket](#websocket-read-only) - [Combined status](#combined-status-read-only) - [Developer information](#developer-information) - [Background](#background) @@ -374,7 +375,7 @@ Sample configuration: type: mqtt topic: mbmd/sdm1-1/Power timeout: 30s # don't accept values older than timeout -scale: 0.001 # floating point factor applied to result, e.g. for kW to W conversion +scale: 0.001 # floating point factor applied to result, e.g. for Wh to kWh conversion ``` Sample write configuration: @@ -435,6 +436,20 @@ Sample write configuration: body: %v # only applicable for PUT or POST requests ``` +### Websocket (read only) + +The `websocket` plugin implements a web socket listener. Includes the ability to read and parse JSON using jq-like queries. It can for example be used to receive messages from Volkszähler's push server. + +Sample configuration (read only): + +```yaml +type: http +uri: ws:///socket +jq: .data | select(.uuid=="") .tuples[0][1] # parse message json +scale: 0.001 # floating point factor applied to result, e.g. for Wh to kWh conversion +timeout: 30s # error if no update received in 30 seconds +``` + ### Combined status (read only) The `combined` status plugin is used to convert a mixed boolean status of plugged/charging into an EVCC-compatible charger status of A..F. It is typically used together with OpenWB MQTT integration. diff --git a/provider/config.go b/provider/config.go index 2f28e0fdf7..6e7e7d3f41 100644 --- a/provider/config.go +++ b/provider/config.go @@ -67,6 +67,8 @@ func NewFloatGetterFromConfig(log *util.Logger, config Config) (res FloatGetter) res = NewCalcFromConfig(log, config.Other) case "http": res = NewHTTPProviderFromConfig(log, config.Other).FloatGetter + case "websocket", "ws": + res = NewSocketProviderFromConfig(log, config.Other).FloatGetter case "mqtt": pc := mqttFromConfig(log, config.Other) res = MQTT.FloatGetter(pc.Topic, pc.Scale, pc.Timeout) @@ -90,6 +92,8 @@ func NewIntGetterFromConfig(log *util.Logger, config Config) (res IntGetter) { switch strings.ToLower(config.Type) { case "http": res = NewHTTPProviderFromConfig(log, config.Other).IntGetter + case "websocket", "ws": + res = NewSocketProviderFromConfig(log, config.Other).IntGetter case "mqtt": pc := mqttFromConfig(log, config.Other) res = MQTT.IntGetter(pc.Topic, int64(pc.Scale), pc.Timeout) @@ -113,6 +117,8 @@ func NewStringGetterFromConfig(log *util.Logger, config Config) (res StringGette switch strings.ToLower(config.Type) { case "http": res = NewHTTPProviderFromConfig(log, config.Other).StringGetter + case "websocket", "ws": + res = NewSocketProviderFromConfig(log, config.Other).StringGetter case "mqtt": pc := mqttFromConfig(log, config.Other) res = MQTT.StringGetter(pc.Topic, pc.Timeout) @@ -136,6 +142,8 @@ func NewBoolGetterFromConfig(log *util.Logger, config Config) (res BoolGetter) { switch strings.ToLower(config.Type) { case "http": res = NewHTTPProviderFromConfig(log, config.Other).BoolGetter + case "websocket", "ws": + res = NewSocketProviderFromConfig(log, config.Other).BoolGetter case "mqtt": pc := mqttFromConfig(log, config.Other) res = MQTT.BoolGetter(pc.Topic, pc.Timeout) diff --git a/provider/socket.go b/provider/socket.go new file mode 100644 index 0000000000..caa132be08 --- /dev/null +++ b/provider/socket.go @@ -0,0 +1,217 @@ +package provider + +import ( + "crypto/tls" + "fmt" + "math" + "net/http" + "strconv" + "sync" + "time" + + "github.com/andig/evcc/util" + "github.com/andig/evcc/util/jq" + "github.com/gorilla/websocket" + "github.com/itchyny/gojq" +) + +// Socket implements websocket request provider +type Socket struct { + *util.HTTPHelper + mux sync.Mutex + once sync.Once + url string + headers map[string]string + scale float64 + jq *gojq.Query + timeout time.Duration + val interface{} + updated time.Time +} + +// NewSocketProviderFromConfig creates a HTTP provider +func NewSocketProviderFromConfig(log *util.Logger, other map[string]interface{}) *Socket { + cc := struct { + URI string + Headers map[string]string + Jq string + Scale float64 + Insecure bool + Auth Auth + Timeout time.Duration + }{} + util.DecodeOther(log, other, &cc) + + logger := util.NewLogger("ws") + + p := &Socket{ + HTTPHelper: util.NewHTTPHelper(logger), + url: cc.URI, + headers: cc.Headers, + scale: cc.Scale, + timeout: cc.Timeout, + } + + // handle basic auth + if cc.Auth.Type != "" { + if p.headers == nil { + p.headers = make(map[string]string) + } + NewAuth(log, cc.Auth, p.headers) + } + + // ignore the self signed certificate + if cc.Insecure { + customTransport := http.DefaultTransport.(*http.Transport).Clone() + customTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + p.HTTPHelper.Client.Transport = customTransport + } + + if cc.Jq != "" { + op, err := gojq.Parse(cc.Jq) + if err != nil { + log.FATAL.Fatalf("config: invalid jq query: %s", p.jq) + } + + p.jq = op + } + + go p.listen() + + return p +} + +func (p *Socket) listen() { + log := p.HTTPHelper.Log + + headers := make(http.Header) + for k, v := range p.headers { + headers.Set(k, v) + } + + for { + client, _, err := websocket.DefaultDialer.Dial(p.url, headers) + if err != nil { + log.ERROR.Println("dial:", err) + } + + for { + _, b, err := client.ReadMessage() + if err != nil { + log.TRACE.Println("read:", err) + _ = client.Close() + break + } + + log.TRACE.Printf("recv: %s", b) + + p.mux.Lock() + if p.jq != nil { + v, err := jq.Query(p.jq, b) + if err == nil { + p.val = v + p.updated = time.Now() + } else { + log.WARN.Printf("invalid: %s", string(b)) + } + } else { + p.val = string(b) + p.updated = time.Now() + } + p.mux.Unlock() + } + } +} + +func (p *Socket) waitForInitialValue() { + p.mux.Lock() + defer p.mux.Unlock() + + if p.updated.IsZero() { + p.HTTPHelper.Log.TRACE.Println("wait for initial value") + + // wait for initial update + for p.updated.IsZero() { + p.mux.Unlock() + time.Sleep(waitTimeout) + p.mux.Lock() + } + } +} + +func (p *Socket) hasValue() (interface{}, error) { + p.once.Do(p.waitForInitialValue) + p.mux.Lock() + defer p.mux.Unlock() + + if elapsed := time.Since(p.updated); p.timeout != 0 && elapsed > p.timeout { + return nil, fmt.Errorf("outdated: %v", elapsed.Truncate(time.Second)) + } + + return p.val, nil +} + +// StringGetter sends string request +func (p *Socket) StringGetter() (string, error) { + v, err := p.hasValue() + if err != nil { + return "", err + } + + return jq.String(v) +} + +// FloatGetter parses float from string getter +func (p *Socket) FloatGetter() (float64, error) { + v, err := p.hasValue() + if err != nil { + return 0, err + } + + // v is always string when jq not used + if p.jq == nil { + v, err = strconv.ParseFloat(v.(string), 64) + if err != nil { + return 0, err + } + } + + f, err := jq.Float64(v) + return f * p.scale, err +} + +// IntGetter parses int64 from float getter +func (p *Socket) IntGetter() (int64, error) { + v, err := p.hasValue() + if err != nil { + return 0, err + } + + // v is always string when jq not used + if p.jq == nil { + v, err = strconv.ParseInt(v.(string), 10, 64) + if err != nil { + return 0, err + } + } + + i, err := jq.Int64(v) + f := float64(i) * p.scale + + return int64(math.Round(f)), err +} + +// BoolGetter parses bool from string getter +func (p *Socket) BoolGetter() (bool, error) { + v, err := p.hasValue() + if err != nil { + return false, err + } + + // v is always string when jq not used + if p.jq == nil { + v = util.Truish(v.(string)) + } + + return jq.Bool(v) +} diff --git a/util/jq/jq.go b/util/jq/jq.go index 4902a4f13b..0bb0995565 100644 --- a/util/jq/jq.go +++ b/util/jq/jq.go @@ -2,6 +2,7 @@ package jq import ( "encoding/json" + "fmt" "github.com/itchyny/gojq" "github.com/pkg/errors" @@ -31,3 +32,50 @@ func Query(query *gojq.Query, input []byte) (interface{}, error) { return v, nil } + +// Float64 converts interface to float64 +func Float64(v interface{}) (float64, error) { + switch v := v.(type) { + case int: + return float64(v), nil + case float64: + return v, nil + default: + return 0, fmt.Errorf("unexpected float type: %T", v) + } +} + +// Int64 converts interface to int64 +func Int64(v interface{}) (int64, error) { + switch v := v.(type) { + case int: + return int64(v), nil + case float64: + if float64(int64(v)) == v { + return int64(v), nil + } + return 0, fmt.Errorf("unexpected int64: %v", v) + default: + return 0, fmt.Errorf("unexpected int64 type: %T", v) + } +} + +// String converts interface to string +func String(v interface{}) (string, error) { + switch v := v.(type) { + case string: + return v, nil + default: + return "", fmt.Errorf("unexpected string type: %T", v) + } +} + +// Bool converts interface to bool +func Bool(v interface{}) (bool, error) { + switch v := v.(type) { + case bool: + return v, nil + default: + return false, fmt.Errorf("unexpected bool type: %T", v) + } +} From b2d6f8c72420febb8c559071e80363e2911fc554 Mon Sep 17 00:00:00 2001 From: andig Date: Wed, 20 May 2020 11:02:23 +0200 Subject: [PATCH 2/3] Use embedded logger --- provider/http.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/provider/http.go b/provider/http.go index 049e90acf2..0c6c833482 100644 --- a/provider/http.go +++ b/provider/http.go @@ -17,7 +17,6 @@ import ( // HTTP implements HTTP request provider type HTTP struct { - log *util.Logger *util.HTTPHelper url, method string headers map[string]string @@ -57,7 +56,6 @@ func NewHTTPProviderFromConfig(log *util.Logger, other map[string]interface{}) * logger := util.NewLogger("http") p := &HTTP{ - log: logger, HTTPHelper: util.NewHTTPHelper(logger), url: cc.URI, method: cc.Method, From 7ecfe9d754e7b7045554fdd0e3df2eb8c21a6899 Mon Sep 17 00:00:00 2001 From: andig Date: Wed, 20 May 2020 11:02:42 +0200 Subject: [PATCH 3/3] Simplify waiting for initial value similar to websocket plugin --- meter/sma.go | 111 ++++++++++++++++++++++------------------------ meter/sma_test.go | 16 +++---- provider/mqtt.go | 41 +++++++++-------- 3 files changed, 83 insertions(+), 85 deletions(-) diff --git a/meter/sma.go b/meter/sma.go index 27cbf55e4a..08ab2180dc 100644 --- a/meter/sma.go +++ b/meter/sma.go @@ -1,7 +1,7 @@ package meter import ( - "errors" + "fmt" "sync" "time" @@ -15,22 +15,27 @@ const ( waitTimeout = 50 * time.Millisecond // interval when waiting for initial value ) -// SMA supporting SMA Home Manager 2.0 and SMA Energy Meter 30 -type SMA struct { - log *util.Logger - uri string - serial string +// values bundles SMA readings +type values struct { power float64 energy float64 currentL1 float64 currentL2 float64 currentL3 float64 - powerO sma.Obis - energyO sma.Obis - updated time.Time - recv chan sma.Telegram - mux sync.Mutex - once sync.Once +} + +// SMA supporting SMA Home Manager 2.0 and SMA Energy Meter 30 +type SMA struct { + log *util.Logger + uri string + serial string + values values + powerO sma.Obis + energyO sma.Obis + updated time.Time + recv chan sma.Telegram + mux sync.Mutex + once sync.Once } // NewSMAFromConfig creates a SMA Meter from generic config @@ -79,41 +84,25 @@ func NewSMA(uri, serial, power, energy string) api.Meter { return sm } -// waitForInitialValue makes sure we don't start with an error -func (sm *SMA) waitForInitialValue() { - sm.mux.Lock() - defer sm.mux.Unlock() - - if sm.updated.IsZero() { - sm.log.TRACE.Print("waiting for initial value") - - // wait for initial update - for sm.updated.IsZero() { - sm.mux.Unlock() - time.Sleep(waitTimeout) - sm.mux.Lock() - } - } -} - // update the actual meter data func (sm *SMA) updateMeterValues(msg sma.Telegram) { sm.mux.Lock() + defer sm.mux.Unlock() if sm.powerO != "" { // use user-defined obis if power, ok := msg.Values[sm.powerO]; ok { - sm.power = power + sm.values.power = power sm.updated = time.Now() } } else { - sm.power = msg.Values[sma.ImportPower] - msg.Values[sma.ExportPower] + sm.values.power = msg.Values[sma.ImportPower] - msg.Values[sma.ExportPower] sm.updated = time.Now() } if sm.energyO != "" { if energy, ok := msg.Values[sm.energyO]; ok { - sm.energy = energy + sm.values.energy = energy sm.updated = time.Now() } else { sm.log.WARN.Println("missing obis for energy") @@ -121,27 +110,25 @@ func (sm *SMA) updateMeterValues(msg sma.Telegram) { } if currentL1, ok := msg.Values[sma.CurrentL1]; ok { - sm.currentL1 = currentL1 + sm.values.currentL1 = currentL1 sm.updated = time.Now() } else { sm.log.WARN.Println("missing obis for currentL1") } if currentL2, ok := msg.Values[sma.CurrentL2]; ok { - sm.currentL2 = currentL2 + sm.values.currentL2 = currentL2 sm.updated = time.Now() } else { sm.log.WARN.Println("missing obis for currentL2") } if currentL3, ok := msg.Values[sma.CurrentL3]; ok { - sm.currentL3 = currentL3 + sm.values.currentL3 = currentL3 sm.updated = time.Now() } else { sm.log.WARN.Println("missing obis for currentL3") } - - sm.mux.Unlock() } // receive processes the channel message containing the multicast data @@ -155,30 +142,45 @@ func (sm *SMA) receive() { } } -// CurrentPower implements the Meter.CurrentPower interface -func (sm *SMA) CurrentPower() (float64, error) { - sm.once.Do(sm.waitForInitialValue) +// waitForInitialValue makes sure we don't start with an error +func (sm *SMA) waitForInitialValue() { sm.mux.Lock() defer sm.mux.Unlock() - if time.Since(sm.updated) > udpTimeout { - return 0, errors.New("recv timeout") - } + if sm.updated.IsZero() { + sm.log.TRACE.Print("waiting for initial value") - return sm.power, nil + // wait for initial update + for sm.updated.IsZero() { + sm.mux.Unlock() + time.Sleep(waitTimeout) + sm.mux.Lock() + } + } } -// Currents implements the MeterCurrent interface -func (sm *SMA) Currents() (float64, float64, float64, error) { +func (sm *SMA) hasValue() (values, error) { sm.once.Do(sm.waitForInitialValue) sm.mux.Lock() defer sm.mux.Unlock() - if time.Since(sm.updated) > udpTimeout { - return 0, 0, 0, errors.New("recv timeout") + if elapsed := time.Since(sm.updated); elapsed > udpTimeout { + return values{}, fmt.Errorf("recv timeout: %v", elapsed.Truncate(time.Second)) } - return sm.currentL1, sm.currentL2, sm.currentL3, nil + return sm.values, nil +} + +// CurrentPower implements the Meter.CurrentPower interface +func (sm *SMA) CurrentPower() (float64, error) { + values, err := sm.hasValue() + return values.power, err +} + +// Currents implements the MeterCurrent interface +func (sm *SMA) Currents() (float64, float64, float64, error) { + values, err := sm.hasValue() + return values.currentL1, sm.values.currentL2, sm.values.currentL3, err } // SMAEnergy decorates SMA with api.MeterEnergy interface @@ -188,13 +190,6 @@ type SMAEnergy struct { // TotalEnergy implements the api.MeterEnergy interface func (sm *SMAEnergy) TotalEnergy() (float64, error) { - sm.once.Do(sm.waitForInitialValue) - sm.mux.Lock() - defer sm.mux.Unlock() - - if time.Since(sm.updated) > udpTimeout { - return 0, errors.New("recv timeout") - } - - return sm.energy, nil + values, err := sm.hasValue() + return values.energy, err } diff --git a/meter/sma_test.go b/meter/sma_test.go index c78432bb9c..9012884598 100644 --- a/meter/sma_test.go +++ b/meter/sma_test.go @@ -56,20 +56,20 @@ func TestSMAUpdateMeterValues(t *testing.T) { } sm.updateMeterValues(tt.messsage) - if sm.power != tt.wantPower { - t.Errorf("Listener.processMessage() got Power %v, want %v", sm.power, tt.wantPower) + if sm.values.power != tt.wantPower { + t.Errorf("Listener.processMessage() got Power %v, want %v", sm.values.power, tt.wantPower) } - if sm.currentL1 != tt.wantCurrentL1 { - t.Errorf("Listener.processMessage() got CurrentL1 %v, want %v", sm.currentL1, tt.wantCurrentL1) + if sm.values.currentL1 != tt.wantCurrentL1 { + t.Errorf("Listener.processMessage() got CurrentL1 %v, want %v", sm.values.currentL1, tt.wantCurrentL1) } - if sm.currentL2 != tt.wantCurrentL2 { - t.Errorf("Listener.processMessage() got CurrentL2 %v, want %v", sm.currentL2, tt.wantCurrentL2) + if sm.values.currentL2 != tt.wantCurrentL2 { + t.Errorf("Listener.processMessage() got CurrentL2 %v, want %v", sm.values.currentL2, tt.wantCurrentL2) } - if sm.currentL3 != tt.wantCurrentL3 { - t.Errorf("Listener.processMessage() got CurrentL3 %v, want %v", sm.currentL3, tt.wantCurrentL3) + if sm.values.currentL3 != tt.wantCurrentL3 { + t.Errorf("Listener.processMessage() got CurrentL3 %v, want %v", sm.values.currentL3, tt.wantCurrentL3) } }) diff --git a/provider/mqtt.go b/provider/mqtt.go index 6f005d42e8..2477a8aa6c 100644 --- a/provider/mqtt.go +++ b/provider/mqtt.go @@ -249,21 +249,30 @@ func (h *msgHandler) waitForInitialValue() { } } -func (h *msgHandler) floatGetter() (float64, error) { +func (h *msgHandler) hasValue() (string, error) { h.once.Do(h.waitForInitialValue) h.mux.Lock() defer h.mux.Unlock() if elapsed := time.Since(h.updated); h.timeout != 0 && elapsed > h.timeout { - return 0, fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second)) + return "", fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second)) } - val, err := strconv.ParseFloat(h.payload, 64) + return h.payload, nil +} + +func (h *msgHandler) floatGetter() (float64, error) { + v, err := h.hasValue() if err != nil { - return 0, fmt.Errorf("%s invalid: '%s'", h.topic, h.payload) + return 0, err } - return h.scale * val, nil + f, err := strconv.ParseFloat(v, 64) + if err != nil { + return 0, fmt.Errorf("%s invalid: '%s'", h.topic, v) + } + + return f * h.scale, nil } func (h *msgHandler) intGetter() (int64, error) { @@ -272,25 +281,19 @@ func (h *msgHandler) intGetter() (int64, error) { } func (h *msgHandler) stringGetter() (string, error) { - h.once.Do(h.waitForInitialValue) - h.mux.Lock() - defer h.mux.Unlock() - - if elapsed := time.Since(h.updated); h.timeout != 0 && elapsed > h.timeout { - return "", fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second)) + v, err := h.hasValue() + if err != nil { + return "", err } - return string(h.payload), nil + return string(v), nil } func (h *msgHandler) boolGetter() (bool, error) { - h.once.Do(h.waitForInitialValue) - h.mux.Lock() - defer h.mux.Unlock() - - if elapsed := time.Since(h.updated); h.timeout != 0 && elapsed > h.timeout { - return false, fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second)) + v, err := h.hasValue() + if err != nil { + return false, err } - return util.Truish(string(h.payload)), nil + return util.Truish(v), nil }