Skip to content

Commit

Permalink
Switched filters and interpolation ops to use peakdet routines.
Browse files Browse the repository at this point in the history
  • Loading branch information
bbfrederick committed Jan 21, 2025
1 parent b41c04e commit d96c1a5
Showing 1 changed file with 62 additions and 83 deletions.
145 changes: 62 additions & 83 deletions physioqc/metrics/chest_belt.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,86 +3,69 @@
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.patches import Polygon
from peakdet import operations
from peakdet import Physio, operations
from scipy.signal import resample, welch
from scipy.stats import pearsonr

from .. import references
from ..due import due
from .utils import butterbpfiltfilt, butterlpfiltfilt, hamming, physio_or_numpy
from .utils import hamming, physio_or_numpy


def respirationprefilter(
rawresp, Fs, lowerpass=0.01, upperpass=2.0, order=1, debug=False
):
def envelopedetect(data, upperpass=0.05, order=5):
"""
Apply a bandpass prefilter to the respiration signal
Parameters
----------
rawresp: ndarray
The raw respiration signal
Fs: float
The sampling frequency of the respiration signal in Hz
lowerpass: float
Lower pass cutoff frequency in Hz
upperpass: float
Upper pass cutoff frequency in Hz
order: int
Butterworth filter order
debug: bool
Print debug information and plot intermediate results
Returns
-------
filteredresp: ndarray
"""
if debug:
print(
f"respirationprefilter: Fs={Fs} order={order}, lowerpass={lowerpass}, upperpass={upperpass}"
)
return butterbpfiltfilt(Fs, lowerpass, upperpass, rawresp, order, debug=debug)
# return operations.filter_physio(rawresp, cutoffs=[lowerpass, upperpass], method='bandpass')


def respenvelopefilter(squarevals, Fs, upperpass=0.05, order=8, debug=False):
"""
Apply a lowepass filter to the respiration envelope signal to get the RMS amplitude
Parameters
----------
squarevals: ndarray
The square of the either the positive or negative portions of the respriation signal
Fs: float
The sampling frequency of the respiration signal in Hz
data
upperpass: float
Upper pass cutoff frequency in Hz
order: int
Butterworth filter order
debug: bool
Print debug information
Returns
-------
filteredenv: ndarray
The filtered respiration envelope signal
einferior: ndarray
The lower edge of the signal envelope
esuperior: ndarray
The upper edge of the signal envelope
"""
if debug:
print(f"respenvelopefilter: Fs={Fs} order={order}, upperpass={upperpass}")
return butterlpfiltfilt(Fs, upperpass, squarevals, order, debug=debug)
npdata = data.data
targetfs = data.fs
esuperior = (
2.0
* operations.filter_physio(
Physio(np.square(np.where(npdata > 0.0, npdata, 0.0)), fs=targetfs),
[upperpass],
"lowpass",
order=order,
).data
)
esuperior = np.sqrt(np.where(esuperior > 0.0, esuperior, 0.0))
einferior = (
2.0
* operations.filter_physio(
Physio(np.square(np.where(npdata < 0.0, npdata, 0.0)), fs=targetfs),
[upperpass],
"lowpass",
order=order,
).data
)
einferior = -np.sqrt(np.where(einferior > 0.0, einferior, 0.0))
return einferior, esuperior


@due.dcite(references.ROMANO_2023)
def respiratorysqi(
rawresp, Fs, envelopelpffreq=0.05, slidingfilterpctwidth=10.0, debug=False
rawresp, Fs, envelopelpffreq=0.075, slidingfilterpctwidth=10.0, debug=False
):
"""
Implementation of Romano's method from A Signal Quality Index for Improving the Estimation of
Breath-by-Breath Respiratory Rate During Sport and Exercise,
IEEE SENSORS JOURNAL, VOL. 23, NO. 24, 15 DECEMBER 2023
NB: In part A, Romano does not specify the upper pass frequency for the respiratory envelope filter.
0.05Hz looks pretty good.
0.075Hz looks pretty good.
In part B, the width of the sliding window bandpass filter is not specified. We use a range of +/- 5% of the
center frequency.
Expand All @@ -109,27 +92,29 @@ def respiratorysqi(
# rawresp = physio_or_numpy(rawresp)

# get the sample frequency down to around 25 Hz for respiratory waveforms
targetfreq = 25.0
if Fs > targetfreq:
dsfac = int(Fs / targetfreq)
print(f"downsampling by a factor of {dsfac}")
rawresp = rawresp[::dsfac] + 0.0
Fs /= dsfac
targetfs = 25.0
rawresp = Physio(rawresp, fs=Fs)
rawresp = operations.interpolate_physio(rawresp, target_fs=targetfs)

# A. Signal Preprocessing
# Apply first order Butterworth bandpass, 0.01-2Hz
prefiltered = respirationprefilter(rawresp, Fs, debug=debug)
# Apply third order Butterworth bandpass, 0.01-2Hz
prefiltered = operations.filter_physio(
rawresp,
[0.01, 2.0],
"bandpass",
order=3,
)
if debug:
plt.plot(rawresp)
plt.plot(prefiltered)
plt.plot(rawresp.data)
plt.plot(prefiltered.data)
plt.title("Raw and prefiltered respiratory signal")
plt.legend(["Raw", "Prefiltered"])
plt.show()
if debug:
print("prefiltered: ", prefiltered)
print("prefiltered: ", prefiltered.data)

# calculate the derivative
derivative = np.gradient(prefiltered, 1.0 / Fs)
derivative = np.gradient(prefiltered.data, 1.0 / targetfs)

# normalize the derivative to the range of ~-1 to 1
derivmax = np.max(derivative)
Expand All @@ -145,18 +130,9 @@ def respiratorysqi(
plt.show()

# amplitude correct by flattening the envelope function
esuperior = 2.0 * respenvelopefilter(
np.square(np.where(normderiv > 0.0, normderiv, 0.0)),
Fs,
upperpass=envelopelpffreq,
einferior, esuperior = envelopedetect(
Physio(normderiv, fs=targetfs), upperpass=envelopelpffreq, order=3
)
esuperior = np.sqrt(np.where(esuperior > 0.0, esuperior, 0.0))
einferior = 2.0 * respenvelopefilter(
np.square(np.where(normderiv < 0.0, normderiv, 0.0)),
Fs,
upperpass=envelopelpffreq,
)
einferior = -np.sqrt(np.where(einferior > 0.0, einferior, 0.0))
if debug:
plt.plot(normderiv)
plt.plot(esuperior)
Expand All @@ -173,9 +149,9 @@ def respiratorysqi(

# B. Detection of breaths in sliding window
seglength = 12.0
segsamples = int(seglength * Fs)
segsamples = int(seglength * targetfs)
segstep = 2.0
stepsamples = int(segstep * Fs)
stepsamples = int(segstep * targetfs)
totaltclength = len(rawresp)
numsegs = int((totaltclength - segsamples) // stepsamples)
if (totaltclength - segsamples) % segsamples != 0:
Expand All @@ -193,7 +169,7 @@ def respiratorysqi(
segment = rmsnormderiv[segstart:segend] + 0.0
segment *= hamming(segsamples)
segment -= np.mean(segment)
thex, they = welch(segment, Fs, nfft=4096)
thex, they = welch(segment, targetfs, nfft=4096)
peakfreqs[i] = thex[np.argmax(they)]
respfilterorder = 1
lowerfac = 1.0 - slidingfilterpctwidth / 200.0
Expand All @@ -202,9 +178,12 @@ def respiratorysqi(
upperpass = peakfreqs[i] * upperfac
if debug:
print(peakfreqs[i], lowerfac, lowerpass, upperfac, upperpass)
filteredsegment = butterlpfiltfilt(
Fs, upperpass, segment, respfilterorder, debug=False
)
filteredsegment = operations.filter_physio(
Physio(segment, fs=targetfs),
[upperpass],
"lowpass",
order=respfilterorder,
).data
filteredsegment -= np.mean(filteredsegment)
if i < numsegs - 1:
respfilteredderivs[
Expand All @@ -227,7 +206,7 @@ def respiratorysqi(
# C. Breaths segmentation
# The fastest credible breathing rate is 20 breaths/min -> 3 seconds/breath, so set the dist to be 50%
# of that in points
thedist = int((3.0 * Fs) / 2.0)
thedist = int((3.0 * targetfs) / 2.0)
peakinfo = operations.peakfind_physio(respfilteredderivs, thresh=0.05, dist=thedist)
if debug:
ax = operations.plot_physio(peakinfo)
Expand All @@ -245,9 +224,9 @@ def respiratorysqi(
breathinfo = {}
startpt = thepeaks[thisbreath]
endpt = thepeaks[thisbreath + 1]
thebreathlocs[thisbreath] = (endpt + startpt) / (Fs * 2.0)
breathinfo["starttime"] = startpt / Fs
breathinfo["endtime"] = endpt / Fs
thebreathlocs[thisbreath] = (endpt + startpt) / (targetfs * 2.0)
breathinfo["starttime"] = startpt / targetfs
breathinfo["endtime"] = endpt / targetfs
breathinfo["centertime"] = thebreathlocs[thisbreath]
thescaledbreaths[thisbreath, :] = resample(
respfilteredderivs[startpt:endpt], scaledpeaklength
Expand Down

0 comments on commit d96c1a5

Please sign in to comment.