From 7ecfe9d754e7b7045554fdd0e3df2eb8c21a6899 Mon Sep 17 00:00:00 2001 From: andig Date: Wed, 20 May 2020 11:02:42 +0200 Subject: [PATCH] Simplify waiting for initial value similar to websocket plugin --- meter/sma.go | 111 ++++++++++++++++++++++------------------------ meter/sma_test.go | 16 +++---- provider/mqtt.go | 41 +++++++++-------- 3 files changed, 83 insertions(+), 85 deletions(-) diff --git a/meter/sma.go b/meter/sma.go index 27cbf55e4a..08ab2180dc 100644 --- a/meter/sma.go +++ b/meter/sma.go @@ -1,7 +1,7 @@ package meter import ( - "errors" + "fmt" "sync" "time" @@ -15,22 +15,27 @@ const ( waitTimeout = 50 * time.Millisecond // interval when waiting for initial value ) -// SMA supporting SMA Home Manager 2.0 and SMA Energy Meter 30 -type SMA struct { - log *util.Logger - uri string - serial string +// values bundles SMA readings +type values struct { power float64 energy float64 currentL1 float64 currentL2 float64 currentL3 float64 - powerO sma.Obis - energyO sma.Obis - updated time.Time - recv chan sma.Telegram - mux sync.Mutex - once sync.Once +} + +// SMA supporting SMA Home Manager 2.0 and SMA Energy Meter 30 +type SMA struct { + log *util.Logger + uri string + serial string + values values + powerO sma.Obis + energyO sma.Obis + updated time.Time + recv chan sma.Telegram + mux sync.Mutex + once sync.Once } // NewSMAFromConfig creates a SMA Meter from generic config @@ -79,41 +84,25 @@ func NewSMA(uri, serial, power, energy string) api.Meter { return sm } -// waitForInitialValue makes sure we don't start with an error -func (sm *SMA) waitForInitialValue() { - sm.mux.Lock() - defer sm.mux.Unlock() - - if sm.updated.IsZero() { - sm.log.TRACE.Print("waiting for initial value") - - // wait for initial update - for sm.updated.IsZero() { - sm.mux.Unlock() - time.Sleep(waitTimeout) - sm.mux.Lock() - } - } -} - // update the actual meter data func (sm *SMA) updateMeterValues(msg sma.Telegram) { sm.mux.Lock() + defer sm.mux.Unlock() if sm.powerO != "" { // use user-defined obis if power, ok := msg.Values[sm.powerO]; ok { - sm.power = power + sm.values.power = power sm.updated = time.Now() } } else { - sm.power = msg.Values[sma.ImportPower] - msg.Values[sma.ExportPower] + sm.values.power = msg.Values[sma.ImportPower] - msg.Values[sma.ExportPower] sm.updated = time.Now() } if sm.energyO != "" { if energy, ok := msg.Values[sm.energyO]; ok { - sm.energy = energy + sm.values.energy = energy sm.updated = time.Now() } else { sm.log.WARN.Println("missing obis for energy") @@ -121,27 +110,25 @@ func (sm *SMA) updateMeterValues(msg sma.Telegram) { } if currentL1, ok := msg.Values[sma.CurrentL1]; ok { - sm.currentL1 = currentL1 + sm.values.currentL1 = currentL1 sm.updated = time.Now() } else { sm.log.WARN.Println("missing obis for currentL1") } if currentL2, ok := msg.Values[sma.CurrentL2]; ok { - sm.currentL2 = currentL2 + sm.values.currentL2 = currentL2 sm.updated = time.Now() } else { sm.log.WARN.Println("missing obis for currentL2") } if currentL3, ok := msg.Values[sma.CurrentL3]; ok { - sm.currentL3 = currentL3 + sm.values.currentL3 = currentL3 sm.updated = time.Now() } else { sm.log.WARN.Println("missing obis for currentL3") } - - sm.mux.Unlock() } // receive processes the channel message containing the multicast data @@ -155,30 +142,45 @@ func (sm *SMA) receive() { } } -// CurrentPower implements the Meter.CurrentPower interface -func (sm *SMA) CurrentPower() (float64, error) { - sm.once.Do(sm.waitForInitialValue) +// waitForInitialValue makes sure we don't start with an error +func (sm *SMA) waitForInitialValue() { sm.mux.Lock() defer sm.mux.Unlock() - if time.Since(sm.updated) > udpTimeout { - return 0, errors.New("recv timeout") - } + if sm.updated.IsZero() { + sm.log.TRACE.Print("waiting for initial value") - return sm.power, nil + // wait for initial update + for sm.updated.IsZero() { + sm.mux.Unlock() + time.Sleep(waitTimeout) + sm.mux.Lock() + } + } } -// Currents implements the MeterCurrent interface -func (sm *SMA) Currents() (float64, float64, float64, error) { +func (sm *SMA) hasValue() (values, error) { sm.once.Do(sm.waitForInitialValue) sm.mux.Lock() defer sm.mux.Unlock() - if time.Since(sm.updated) > udpTimeout { - return 0, 0, 0, errors.New("recv timeout") + if elapsed := time.Since(sm.updated); elapsed > udpTimeout { + return values{}, fmt.Errorf("recv timeout: %v", elapsed.Truncate(time.Second)) } - return sm.currentL1, sm.currentL2, sm.currentL3, nil + return sm.values, nil +} + +// CurrentPower implements the Meter.CurrentPower interface +func (sm *SMA) CurrentPower() (float64, error) { + values, err := sm.hasValue() + return values.power, err +} + +// Currents implements the MeterCurrent interface +func (sm *SMA) Currents() (float64, float64, float64, error) { + values, err := sm.hasValue() + return values.currentL1, sm.values.currentL2, sm.values.currentL3, err } // SMAEnergy decorates SMA with api.MeterEnergy interface @@ -188,13 +190,6 @@ type SMAEnergy struct { // TotalEnergy implements the api.MeterEnergy interface func (sm *SMAEnergy) TotalEnergy() (float64, error) { - sm.once.Do(sm.waitForInitialValue) - sm.mux.Lock() - defer sm.mux.Unlock() - - if time.Since(sm.updated) > udpTimeout { - return 0, errors.New("recv timeout") - } - - return sm.energy, nil + values, err := sm.hasValue() + return values.energy, err } diff --git a/meter/sma_test.go b/meter/sma_test.go index c78432bb9c..9012884598 100644 --- a/meter/sma_test.go +++ b/meter/sma_test.go @@ -56,20 +56,20 @@ func TestSMAUpdateMeterValues(t *testing.T) { } sm.updateMeterValues(tt.messsage) - if sm.power != tt.wantPower { - t.Errorf("Listener.processMessage() got Power %v, want %v", sm.power, tt.wantPower) + if sm.values.power != tt.wantPower { + t.Errorf("Listener.processMessage() got Power %v, want %v", sm.values.power, tt.wantPower) } - if sm.currentL1 != tt.wantCurrentL1 { - t.Errorf("Listener.processMessage() got CurrentL1 %v, want %v", sm.currentL1, tt.wantCurrentL1) + if sm.values.currentL1 != tt.wantCurrentL1 { + t.Errorf("Listener.processMessage() got CurrentL1 %v, want %v", sm.values.currentL1, tt.wantCurrentL1) } - if sm.currentL2 != tt.wantCurrentL2 { - t.Errorf("Listener.processMessage() got CurrentL2 %v, want %v", sm.currentL2, tt.wantCurrentL2) + if sm.values.currentL2 != tt.wantCurrentL2 { + t.Errorf("Listener.processMessage() got CurrentL2 %v, want %v", sm.values.currentL2, tt.wantCurrentL2) } - if sm.currentL3 != tt.wantCurrentL3 { - t.Errorf("Listener.processMessage() got CurrentL3 %v, want %v", sm.currentL3, tt.wantCurrentL3) + if sm.values.currentL3 != tt.wantCurrentL3 { + t.Errorf("Listener.processMessage() got CurrentL3 %v, want %v", sm.values.currentL3, tt.wantCurrentL3) } }) diff --git a/provider/mqtt.go b/provider/mqtt.go index 6f005d42e8..2477a8aa6c 100644 --- a/provider/mqtt.go +++ b/provider/mqtt.go @@ -249,21 +249,30 @@ func (h *msgHandler) waitForInitialValue() { } } -func (h *msgHandler) floatGetter() (float64, error) { +func (h *msgHandler) hasValue() (string, error) { h.once.Do(h.waitForInitialValue) h.mux.Lock() defer h.mux.Unlock() if elapsed := time.Since(h.updated); h.timeout != 0 && elapsed > h.timeout { - return 0, fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second)) + return "", fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second)) } - val, err := strconv.ParseFloat(h.payload, 64) + return h.payload, nil +} + +func (h *msgHandler) floatGetter() (float64, error) { + v, err := h.hasValue() if err != nil { - return 0, fmt.Errorf("%s invalid: '%s'", h.topic, h.payload) + return 0, err } - return h.scale * val, nil + f, err := strconv.ParseFloat(v, 64) + if err != nil { + return 0, fmt.Errorf("%s invalid: '%s'", h.topic, v) + } + + return f * h.scale, nil } func (h *msgHandler) intGetter() (int64, error) { @@ -272,25 +281,19 @@ func (h *msgHandler) intGetter() (int64, error) { } func (h *msgHandler) stringGetter() (string, error) { - h.once.Do(h.waitForInitialValue) - h.mux.Lock() - defer h.mux.Unlock() - - if elapsed := time.Since(h.updated); h.timeout != 0 && elapsed > h.timeout { - return "", fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second)) + v, err := h.hasValue() + if err != nil { + return "", err } - return string(h.payload), nil + return string(v), nil } func (h *msgHandler) boolGetter() (bool, error) { - h.once.Do(h.waitForInitialValue) - h.mux.Lock() - defer h.mux.Unlock() - - if elapsed := time.Since(h.updated); h.timeout != 0 && elapsed > h.timeout { - return false, fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second)) + v, err := h.hasValue() + if err != nil { + return false, err } - return util.Truish(string(h.payload)), nil + return util.Truish(v), nil }