Skip to content

Commit

Permalink
Add ZMQ_FILTER_TPRESULT_NOW_WITH_DATA
Browse files Browse the repository at this point in the history
Add a message type which not only publishes the TP properties but also the
raw AD data.
  • Loading branch information
t-b committed Feb 18, 2025
1 parent 9948189 commit 5f00ccf
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 31 deletions.
33 changes: 17 additions & 16 deletions Packages/MIES/MIES_Constants.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -1860,22 +1860,23 @@ StrConstant LOGBOOK_WAVE_TEMP_FOLDER = "Temp"
/// @name All available ZeroMQ message filters
/// @anchor ZeroMQMessageFilters
///@{
StrConstant IVS_PUB_FILTER = "ivscc"
StrConstant PRESSURE_STATE_FILTER = "pressure:state"
StrConstant PRESSURE_SEALED_FILTER = "pressure:sealed"
StrConstant PRESSURE_BREAKIN_FILTER = "pressure:break in"
StrConstant AUTO_TP_FILTER = "testpulse:autotune result"
StrConstant ZMQ_FILTER_TPRESULT_NOW = "testpulse:results live"
StrConstant ZMQ_FILTER_TPRESULT_1S = "testpulse:results 1s update"
StrConstant ZMQ_FILTER_TPRESULT_5S = "testpulse:results 5s update"
StrConstant ZMQ_FILTER_TPRESULT_10S = "testpulse:results 10s update"
StrConstant AMPLIFIER_CLAMP_MODE_FILTER = "amplifier:clamp mode"
StrConstant AMPLIFIER_AUTO_BRIDGE_BALANCE = "amplifier:auto bridge balance"
StrConstant ANALYSIS_FUNCTION_PB = "analysis function:pipette in bath"
StrConstant ANALYSIS_FUNCTION_SE = "analysis function:seal evaluation"
StrConstant ANALYSIS_FUNCTION_VM = "analysis function:true resting membrane potential"
StrConstant DAQ_TP_STATE_CHANGE_FILTER = "data acquisition:state change"
StrConstant ANALYSIS_FUNCTION_AR = "analysis function:access resistance smoke"
StrConstant IVS_PUB_FILTER = "ivscc"
StrConstant PRESSURE_STATE_FILTER = "pressure:state"
StrConstant PRESSURE_SEALED_FILTER = "pressure:sealed"
StrConstant PRESSURE_BREAKIN_FILTER = "pressure:break in"
StrConstant AUTO_TP_FILTER = "testpulse:autotune result"
StrConstant ZMQ_FILTER_TPRESULT_NOW = "testpulse:results live"
StrConstant ZMQ_FILTER_TPRESULT_1S = "testpulse:results 1s update"
StrConstant ZMQ_FILTER_TPRESULT_5S = "testpulse:results 5s update"
StrConstant ZMQ_FILTER_TPRESULT_10S = "testpulse:results 10s update"
StrConstant ZMQ_FILTER_TPRESULT_NOW_WITH_DATA = "testpulse:results live with data"
StrConstant AMPLIFIER_CLAMP_MODE_FILTER = "amplifier:clamp mode"
StrConstant AMPLIFIER_AUTO_BRIDGE_BALANCE = "amplifier:auto bridge balance"
StrConstant ANALYSIS_FUNCTION_PB = "analysis function:pipette in bath"
StrConstant ANALYSIS_FUNCTION_SE = "analysis function:seal evaluation"
StrConstant ANALYSIS_FUNCTION_VM = "analysis function:true resting membrane potential"
StrConstant DAQ_TP_STATE_CHANGE_FILTER = "data acquisition:state change"
StrConstant ANALYSIS_FUNCTION_AR = "analysis function:access resistance smoke"
///@}

/// which is sufficient to represent each sample point time with a distinctive number up to rates of 10 MHz.
Expand Down
12 changes: 6 additions & 6 deletions Packages/MIES/MIES_ForeignFunctionInterface.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ End
/// @sa PUB_GetJSONTemplate
Function/WAVE FFI_GetAvailableMessageFilters()

Make/FREE/T wv = {ZeroMQ_HEARTBEAT, IVS_PUB_FILTER, PRESSURE_STATE_FILTER, PRESSURE_SEALED_FILTER, \
PRESSURE_BREAKIN_FILTER, AUTO_TP_FILTER, AMPLIFIER_CLAMP_MODE_FILTER, \
AMPLIFIER_AUTO_BRIDGE_BALANCE, ANALYSIS_FUNCTION_PB, ANALYSIS_FUNCTION_SE, \
ANALYSIS_FUNCTION_VM, DAQ_TP_STATE_CHANGE_FILTER, \
ANALYSIS_FUNCTION_AR, ZMQ_FILTER_TPRESULT_NOW, ZMQ_FILTER_TPRESULT_1S, \
ZMQ_FILTER_TPRESULT_5S, ZMQ_FILTER_TPRESULT_10S}
Make/FREE/T wv = {ZeroMQ_HEARTBEAT, IVS_PUB_FILTER, PRESSURE_STATE_FILTER, PRESSURE_SEALED_FILTER, \
PRESSURE_BREAKIN_FILTER, AUTO_TP_FILTER, AMPLIFIER_CLAMP_MODE_FILTER, \
AMPLIFIER_AUTO_BRIDGE_BALANCE, ANALYSIS_FUNCTION_PB, ANALYSIS_FUNCTION_SE, \
ANALYSIS_FUNCTION_VM, DAQ_TP_STATE_CHANGE_FILTER, \
ANALYSIS_FUNCTION_AR, ZMQ_FILTER_TPRESULT_NOW, ZMQ_FILTER_TPRESULT_1S, \
ZMQ_FILTER_TPRESULT_5S, ZMQ_FILTER_TPRESULT_10S, ZMQ_FILTER_TPRESULT_NOW_WITH_DATA}

Note/K wv, "Heartbeat is sent every 5 seconds."

Expand Down
17 changes: 15 additions & 2 deletions Packages/MIES/MIES_Publish.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@ static Function PUB_GetJSONTemplate(string device, variable headstage)
End

/// @brief Publish the given message as given by the JSON and the filter
threadsafe Function PUB_Publish(variable jsonID, string messageFilter, [variable releaseJSON])
threadsafe Function PUB_Publish(variable jsonID, string messageFilter, [variable releaseJSON, WAVE/Z additionalData])

variable err

releaseJSON = ParamIsDefault(releaseJSON) ? 1 : !!releaseJSON

if(ParamIsDefault(additionalData))
WAVE/ZZ additionalData
else
ASSERT_TS(IsWaveRefWave(additionalData), "Expected a wave reference wave")
endif

Make/T/FREE filter = {messageFilter}
Make/T/FREE payload = {JSON_Dump(jsonID)}
Make/FREE/WAVE wv = {filter, payload}
Expand All @@ -39,6 +45,10 @@ threadsafe Function PUB_Publish(variable jsonID, string messageFilter, [variable
JSON_Release(jsonID)
endif

if(WaveExists(additionalData))
Concatenate/NP=(ROWS) {additionalData}, wv
endif

AssertOnAndClearRTError()
try
zeromq_pub_send_multi(wv); AbortOnRTE
Expand Down Expand Up @@ -677,6 +687,7 @@ End
/// Filter: #ZMQ_FILTER_TPRESULT_1S
/// Filter: #ZMQ_FILTER_TPRESULT_5S
/// Filter: #ZMQ_FILTER_TPRESULT_10S
/// Filter: #ZMQ_FILTER_TPRESULT_NOW_WITH_DATA
///
/// Example:
///
Expand Down Expand Up @@ -768,7 +779,7 @@ End
/// }
///
/// \endrst
threadsafe Function PUB_TPResult(string device, WAVE tpData)
threadsafe Function PUB_TPResult(string device, WAVE tpData, WAVE additionalData)

string path
variable jsonId = JSON_New()
Expand Down Expand Up @@ -806,6 +817,8 @@ threadsafe Function PUB_TPResult(string device, WAVE tpData)
PUB_AddTPResultEntry(jsonId, path + "/instantaneous resistance", tpData[%INSTANTRES], "MΩ")

PUB_Publish(jsonID, ZMQ_FILTER_TPRESULT_NOW, releaseJSON = 0)
PUB_Publish(jsonID, ZMQ_FILTER_TPRESULT_NOW_WITH_DATA, releaseJSON = 0, additionalData = additionalData)

if(PUB_CheckPublishingTime(ZMQ_FILTER_TPRESULT_1S, 1))
PUB_Publish(jsonID, ZMQ_FILTER_TPRESULT_1S, releaseJSON = 0)
endif
Expand Down
3 changes: 2 additions & 1 deletion Packages/MIES/MIES_TestPulse.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -1157,7 +1157,8 @@ threadsafe Function/DF TP_TSAnalysis(DFREF dfrInp)
tpData[%PULSESTARTPOINTSDAC] = pulseStartPointsDAC
tpData[%SAMPLINGINTERVALDAC] = samplingIntervalDAC

PUB_TPResult(device, tpData)
Make/FREE/WAVE additionalData = {data}
PUB_TPResult(device, tpData, additionalData)

return dfrOut
End
Expand Down
27 changes: 25 additions & 2 deletions Packages/tests/Basic/UTF_ZeroMQPublishing.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -568,17 +568,40 @@ static Function CheckTPData(variable jsonId)
CHECK_EQUAL_STR(stv, "MΩ")
End

// IUTF_TD_GENERATOR DataGenerators#PUB_TPFilters
// IUTF_TD_GENERATOR DataGenerators#PUB_TPFiltersWithoutData
static Function CheckTPPublishing([string str])

variable jsonId

WAVE tpData = PrepareTPData()

TUFXOP_Clear/Z/N=(str)
PUB_TPResult("TestDevice", tpData)
Make/FREE/N=0/WAVE additionalData
PUB_TPResult("TestDevice", tpData, additionalData)

jsonId = FetchAndParseMessage(str)
CheckTPData(jsonId)
JSON_Release(jsonID)
End

static Function CheckTPPublishingWithData()

variable jsonId

WAVE tpData = PrepareTPData()

Make/FREE/N=(2, 3) data = (p + q)^2
Make/FREE/WAVE additionalData = {data}

PUB_TPResult("TestDevice", tpData, additionalData)

Make/FREE/WAVE/N=0 receivedData
jsonId = FetchAndParseMessage(ZMQ_FILTER_TPRESULT_NOW_WITH_DATA, additionalData = receivedData)
CheckTPData(jsonId)
// first two: filter and message
WAVE wv = receivedData[2]
Redimension/N=(2, 3)/E=1/S wv
CHECK_EQUAL_WAVES(data, wv, mode = WAVE_DATA)

JSON_Release(jsonID)
End
47 changes: 45 additions & 2 deletions Packages/tests/HardwareBasic/UTF_TestPulseAndTPDuringDAQ.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,7 @@ End

static Function TestTPPublishing_REENTRY([string str])

variable sweepNo, jsonId, var, index, dimMarker, headstage
variable sweepNo, jsonId, var, index, dimMarker, headstage, i, foundHS0, foundHS1
string filter, stv, adUnit, daUnit

CHECK_EQUAL_VAR(GetSetVariable(str, "SetVar_Sweep"), 0)
Expand All @@ -1458,7 +1458,7 @@ static Function TestTPPublishing_REENTRY([string str])
WAVE tpStorage = GetTPStorage(str)
dimMarker = FindDimLabel(tpStorage, LAYERS, "TPMarker")

WAVE/T filters = DataGenerators#PUB_TPFilters()
WAVE/T filters = DataGenerators#PUB_TPFiltersWithoutData()
for(filter : filters)
jsonId = FetchAndParseMessage(filter)

Expand Down Expand Up @@ -1562,4 +1562,47 @@ static Function TestTPPublishing_REENTRY([string str])
stv = JSON_GetString(jsonID, "/results/instantaneous resistance/unit")
CHECK_EQUAL_STR(stv, "MΩ")
endfor

for(i = 0; i < 10; i += 1)
Make/FREE/N=0/WAVE receivedData
jsonId = FetchAndParseMessage(ZMQ_FILTER_TPRESULT_NOW_WITH_DATA, additionalData = receivedData)
CHECK_WAVE(receivedData, WAVE_WAVE | FREE_WAVE)

var = JSON_GetVariable(jsonID, "/properties/tp marker")
CHECK_NEQ_VAR(var, NaN)

WAVE/WAVE storedTP = TP_GetStoredTP(str, var, 0, 0)
CHECK_WAVE(storedTP, WAVE_WAVE | FREE_WAVE)

WAVE/WAVE tpAD = storedTP[0]
CHECK_WAVE(tpAD, NUMERIC_WAVE | FREE_WAVE)

Duplicate/FREE/RMD=[][0] tpAD, HS0
Duplicate/FREE/RMD=[][1] tpAD, HS1
Redimension/N=(-1, 0) HS0, HS1

WAVE data = receivedData[2]
Redimension/N=(DimSize(HS0, ROWS))/E=1/S data

var = JSON_GetVariable(jsonID, "/properties/headstage")
switch(var)
case 0:
CHECK(EqualWaves(data, HS0, WAVE_DATA))
foundHS0 = 1
break
case 1:
CHECK(EqualWaves(data, HS1, WAVE_DATA))
foundHS1 = 1
break
default:
FAIL()
endswitch

if(foundHS0 && foundHS1)
break
endif
endfor

CHECK(foundHS0)
CHECK(foundHS1)
End
2 changes: 1 addition & 1 deletion Packages/tests/UTF_DataGenerators.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ static Function/WAVE SF_TestOperationSelNoArg()
return wt
End

static Function/WAVE PUB_TPFilters()
static Function/WAVE PUB_TPFiltersWithoutData()

Make/FREE/T wv = {ZMQ_FILTER_TPRESULT_NOW, ZMQ_FILTER_TPRESULT_1S, ZMQ_FILTER_TPRESULT_5S, ZMQ_FILTER_TPRESULT_10S}
SetDimensionLabels(wv, "period_now;period_1s;period_5s;period_10s", ROWS)
Expand Down
8 changes: 7 additions & 1 deletion Packages/tests/UTF_HelperFunctions.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,19 @@ Function AdditionalExperimentCleanup()
KillOrMoveToTrash(wv = GetOverrideResults())
End

Function FetchAndParseMessage(string expectedFilter)
Function FetchAndParseMessage(string expectedFilter, [WAVE/Z/WAVE additionalData])

variable jsonID

[WAVE/WAVE receivedData, string msg, string filter] = FetchPublishedMessage(expectedFilter)
CHECK_WAVE(receivedData, WAVE_WAVE)

if(!ParamIsDefault(additionalData))
Redimension/N=(DimSize(receivedData, ROWS)) additionalData
additionalData[] = receivedData[p]
Duplicate/FREE receivedData, additionalData
endif

CHECK_PROPER_STR(msg)

jsonID = JSON_Parse(msg)
Expand Down

0 comments on commit 5f00ccf

Please sign in to comment.