Skip to content

Commit

Permalink
Simplify waiting for initial value similar to websocket plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
andig committed May 20, 2020
1 parent b2d6f8c commit 7ecfe9d
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 85 deletions.
111 changes: 53 additions & 58 deletions meter/sma.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package meter

import (
"errors"
"fmt"
"sync"
"time"

Expand All @@ -15,22 +15,27 @@ const (
waitTimeout = 50 * time.Millisecond // interval when waiting for initial value
)

// SMA supporting SMA Home Manager 2.0 and SMA Energy Meter 30
type SMA struct {
log *util.Logger
uri string
serial string
// values bundles SMA readings
type values struct {
power float64
energy float64
currentL1 float64
currentL2 float64
currentL3 float64
powerO sma.Obis
energyO sma.Obis
updated time.Time
recv chan sma.Telegram
mux sync.Mutex
once sync.Once
}

// SMA supporting SMA Home Manager 2.0 and SMA Energy Meter 30
type SMA struct {
log *util.Logger
uri string
serial string
values values
powerO sma.Obis
energyO sma.Obis
updated time.Time
recv chan sma.Telegram
mux sync.Mutex
once sync.Once
}

// NewSMAFromConfig creates a SMA Meter from generic config
Expand Down Expand Up @@ -79,69 +84,51 @@ func NewSMA(uri, serial, power, energy string) api.Meter {
return sm
}

// waitForInitialValue makes sure we don't start with an error
func (sm *SMA) waitForInitialValue() {
sm.mux.Lock()
defer sm.mux.Unlock()

if sm.updated.IsZero() {
sm.log.TRACE.Print("waiting for initial value")

// wait for initial update
for sm.updated.IsZero() {
sm.mux.Unlock()
time.Sleep(waitTimeout)
sm.mux.Lock()
}
}
}

// update the actual meter data
func (sm *SMA) updateMeterValues(msg sma.Telegram) {
sm.mux.Lock()
defer sm.mux.Unlock()

if sm.powerO != "" {
// use user-defined obis
if power, ok := msg.Values[sm.powerO]; ok {
sm.power = power
sm.values.power = power
sm.updated = time.Now()
}
} else {
sm.power = msg.Values[sma.ImportPower] - msg.Values[sma.ExportPower]
sm.values.power = msg.Values[sma.ImportPower] - msg.Values[sma.ExportPower]
sm.updated = time.Now()
}

if sm.energyO != "" {
if energy, ok := msg.Values[sm.energyO]; ok {
sm.energy = energy
sm.values.energy = energy
sm.updated = time.Now()
} else {
sm.log.WARN.Println("missing obis for energy")
}
}

if currentL1, ok := msg.Values[sma.CurrentL1]; ok {
sm.currentL1 = currentL1
sm.values.currentL1 = currentL1
sm.updated = time.Now()
} else {
sm.log.WARN.Println("missing obis for currentL1")
}

if currentL2, ok := msg.Values[sma.CurrentL2]; ok {
sm.currentL2 = currentL2
sm.values.currentL2 = currentL2
sm.updated = time.Now()
} else {
sm.log.WARN.Println("missing obis for currentL2")
}

if currentL3, ok := msg.Values[sma.CurrentL3]; ok {
sm.currentL3 = currentL3
sm.values.currentL3 = currentL3
sm.updated = time.Now()
} else {
sm.log.WARN.Println("missing obis for currentL3")
}

sm.mux.Unlock()
}

// receive processes the channel message containing the multicast data
Expand All @@ -155,30 +142,45 @@ func (sm *SMA) receive() {
}
}

// CurrentPower implements the Meter.CurrentPower interface
func (sm *SMA) CurrentPower() (float64, error) {
sm.once.Do(sm.waitForInitialValue)
// waitForInitialValue makes sure we don't start with an error
func (sm *SMA) waitForInitialValue() {
sm.mux.Lock()
defer sm.mux.Unlock()

if time.Since(sm.updated) > udpTimeout {
return 0, errors.New("recv timeout")
}
if sm.updated.IsZero() {
sm.log.TRACE.Print("waiting for initial value")

return sm.power, nil
// wait for initial update
for sm.updated.IsZero() {
sm.mux.Unlock()
time.Sleep(waitTimeout)
sm.mux.Lock()
}
}
}

// Currents implements the MeterCurrent interface
func (sm *SMA) Currents() (float64, float64, float64, error) {
func (sm *SMA) hasValue() (values, error) {
sm.once.Do(sm.waitForInitialValue)
sm.mux.Lock()
defer sm.mux.Unlock()

if time.Since(sm.updated) > udpTimeout {
return 0, 0, 0, errors.New("recv timeout")
if elapsed := time.Since(sm.updated); elapsed > udpTimeout {
return values{}, fmt.Errorf("recv timeout: %v", elapsed.Truncate(time.Second))
}

return sm.currentL1, sm.currentL2, sm.currentL3, nil
return sm.values, nil
}

// CurrentPower implements the Meter.CurrentPower interface
func (sm *SMA) CurrentPower() (float64, error) {
values, err := sm.hasValue()
return values.power, err
}

// Currents implements the MeterCurrent interface
func (sm *SMA) Currents() (float64, float64, float64, error) {
values, err := sm.hasValue()
return values.currentL1, sm.values.currentL2, sm.values.currentL3, err
}

// SMAEnergy decorates SMA with api.MeterEnergy interface
Expand All @@ -188,13 +190,6 @@ type SMAEnergy struct {

// TotalEnergy implements the api.MeterEnergy interface
func (sm *SMAEnergy) TotalEnergy() (float64, error) {
sm.once.Do(sm.waitForInitialValue)
sm.mux.Lock()
defer sm.mux.Unlock()

if time.Since(sm.updated) > udpTimeout {
return 0, errors.New("recv timeout")
}

return sm.energy, nil
values, err := sm.hasValue()
return values.energy, err
}
16 changes: 8 additions & 8 deletions meter/sma_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ func TestSMAUpdateMeterValues(t *testing.T) {
}

sm.updateMeterValues(tt.messsage)
if sm.power != tt.wantPower {
t.Errorf("Listener.processMessage() got Power %v, want %v", sm.power, tt.wantPower)
if sm.values.power != tt.wantPower {
t.Errorf("Listener.processMessage() got Power %v, want %v", sm.values.power, tt.wantPower)
}

if sm.currentL1 != tt.wantCurrentL1 {
t.Errorf("Listener.processMessage() got CurrentL1 %v, want %v", sm.currentL1, tt.wantCurrentL1)
if sm.values.currentL1 != tt.wantCurrentL1 {
t.Errorf("Listener.processMessage() got CurrentL1 %v, want %v", sm.values.currentL1, tt.wantCurrentL1)
}

if sm.currentL2 != tt.wantCurrentL2 {
t.Errorf("Listener.processMessage() got CurrentL2 %v, want %v", sm.currentL2, tt.wantCurrentL2)
if sm.values.currentL2 != tt.wantCurrentL2 {
t.Errorf("Listener.processMessage() got CurrentL2 %v, want %v", sm.values.currentL2, tt.wantCurrentL2)
}

if sm.currentL3 != tt.wantCurrentL3 {
t.Errorf("Listener.processMessage() got CurrentL3 %v, want %v", sm.currentL3, tt.wantCurrentL3)
if sm.values.currentL3 != tt.wantCurrentL3 {
t.Errorf("Listener.processMessage() got CurrentL3 %v, want %v", sm.values.currentL3, tt.wantCurrentL3)
}

})
Expand Down
41 changes: 22 additions & 19 deletions provider/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,21 +249,30 @@ func (h *msgHandler) waitForInitialValue() {
}
}

func (h *msgHandler) floatGetter() (float64, error) {
func (h *msgHandler) hasValue() (string, error) {
h.once.Do(h.waitForInitialValue)
h.mux.Lock()
defer h.mux.Unlock()

if elapsed := time.Since(h.updated); h.timeout != 0 && elapsed > h.timeout {
return 0, fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second))
return "", fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second))
}

val, err := strconv.ParseFloat(h.payload, 64)
return h.payload, nil
}

func (h *msgHandler) floatGetter() (float64, error) {
v, err := h.hasValue()
if err != nil {
return 0, fmt.Errorf("%s invalid: '%s'", h.topic, h.payload)
return 0, err
}

return h.scale * val, nil
f, err := strconv.ParseFloat(v, 64)
if err != nil {
return 0, fmt.Errorf("%s invalid: '%s'", h.topic, v)
}

return f * h.scale, nil
}

func (h *msgHandler) intGetter() (int64, error) {
Expand All @@ -272,25 +281,19 @@ func (h *msgHandler) intGetter() (int64, error) {
}

func (h *msgHandler) stringGetter() (string, error) {
h.once.Do(h.waitForInitialValue)
h.mux.Lock()
defer h.mux.Unlock()

if elapsed := time.Since(h.updated); h.timeout != 0 && elapsed > h.timeout {
return "", fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second))
v, err := h.hasValue()
if err != nil {
return "", err
}

return string(h.payload), nil
return string(v), nil
}

func (h *msgHandler) boolGetter() (bool, error) {
h.once.Do(h.waitForInitialValue)
h.mux.Lock()
defer h.mux.Unlock()

if elapsed := time.Since(h.updated); h.timeout != 0 && elapsed > h.timeout {
return false, fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second))
v, err := h.hasValue()
if err != nil {
return false, err
}

return util.Truish(string(h.payload)), nil
return util.Truish(v), nil
}

0 comments on commit 7ecfe9d

Please sign in to comment.