From ddd8da8465f6fdffa0ad18cb61e96b8d642a39a6 Mon Sep 17 00:00:00 2001 From: rra Date: Thu, 26 Nov 2020 13:55:57 +0100 Subject: [PATCH] added code for the heartbeat sensor --- heartbeat.py | 20 ++++++++++ pulsesensor.py | 102 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+) create mode 100644 heartbeat.py create mode 100644 pulsesensor.py diff --git a/heartbeat.py b/heartbeat.py new file mode 100644 index 0000000..7213a0e --- /dev/null +++ b/heartbeat.py @@ -0,0 +1,20 @@ +from pulsesensor import Pulsesensor +import time + + +#this code assumes you have an MCP3008 with the analog heartbeat sensor on channel 0 + +p = Pulsesensor(channel=0, device=0) +p.startAsyncBPM() + +try: + while True: + bpm = p.BPM + if bpm > 0: + print("BPM: %d" % bpm) + else: + print("No Heartbeat found") + time.sleep(1) +except: + p.stopAsyncBPM() + diff --git a/pulsesensor.py b/pulsesensor.py new file mode 100644 index 0000000..cd39596 --- /dev/null +++ b/pulsesensor.py @@ -0,0 +1,102 @@ +# extended from https://github.com/WorldFamousElectronics/PulseSensor_Amped_Arduino + +import time +import threading +from gpiozero import MCP3008 + +class Pulsesensor: + def __init__(self, channel = 0, bus = 0, device = 0): + self.channel = channel + self.BPM = 0 + self.adc = MCP3008(bus, device) + + def getBPMLoop(self): + # init variables + rate = [0] * 10 # array to hold last 10 IBI values + sampleCounter = 0 # used to determine pulse timing + lastBeatTime = 0 # used to find IBI + P = 512 # used to find peak in pulse wave, seeded + T = 512 # used to find trough in pulse wave, seeded + thresh = 525 # used to find instant moment of heart beat, seeded + amp = 100 # used to hold amplitude of pulse waveform, seeded + firstBeat = True # used to seed rate array so we startup with reasonable BPM + secondBeat = False # used to seed rate array so we startup with reasonable BPM + + IBI = 600 # int that holds the time interval between beats! Must be seeded! + Pulse = False # "True" when User's live heartbeat is detected. "False" when not a "live beat". + lastTime = int(time.time()*1000) + + while not self.thread.stopped: + Signal = self.adc.raw_value + currentTime = int(time.time()*1000) + + sampleCounter += currentTime - lastTime + lastTime = currentTime + + N = sampleCounter - lastBeatTime + + # find the peak and trough of the pulse wave + if Signal < thresh and N > (IBI/5.0)*3: # avoid dichrotic noise by waiting 3/5 of last IBI + if Signal < T: # T is the trough + T = Signal # keep track of lowest point in pulse wave + + if Signal > thresh and Signal > P: + P = Signal + + # signal surges up in value every time there is a pulse + if N > 250: # avoid high frequency noise + if Signal > thresh and Pulse == False and N > (IBI/5.0)*3: + Pulse = True # set the Pulse flag when we think there is a pulse + IBI = sampleCounter - lastBeatTime # measure time between beats in mS + lastBeatTime = sampleCounter # keep track of time for next pulse + + if secondBeat: # if this is the second beat, if secondBeat == TRUE + secondBeat = False; # clear secondBeat flag + for i in range(len(rate)): # seed the running total to get a realisitic BPM at startup + rate[i] = IBI + + if firstBeat: # if it's the first time we found a beat, if firstBeat == TRUE + firstBeat = False; # clear firstBeat flag + secondBeat = True; # set the second beat flag + continue + + # keep a running total of the last 10 IBI values + rate[:-1] = rate[1:] # shift data in the rate array + rate[-1] = IBI # add the latest IBI to the rate array + runningTotal = sum(rate) # add upp oldest IBI values + + runningTotal /= len(rate) # average the IBI values + self.BPM = 60000/runningTotal # how many beats can fit into a minute? that's BPM! + + if Signal < thresh and Pulse == True: # when the values are going down, the beat is over + Pulse = False # reset the Pulse flag so we can do it again + amp = P - T # get amplitude of the pulse wave + thresh = amp/2 + T # set thresh at 50% of the amplitude + P = thresh # reset these for next time + T = thresh + + if N > 2500: # if 2.5 seconds go by without a beat + thresh = 512 # set thresh default + P = 512 # set P default + T = 512 # set T default + lastBeatTime = sampleCounter # bring the lastBeatTime up to date + firstBeat = True # set these to avoid noise + secondBeat = False # when we get the heartbeat back + self.BPM = 0 + + time.sleep(0.005) + + + # Start getBPMLoop routine which saves the BPM in its variable + def startAsyncBPM(self): + self.thread = threading.Thread(target=self.getBPMLoop) + self.thread.stopped = False + self.thread.start() + return + + # Stop the routine + def stopAsyncBPM(self): + self.thread.stopped = True + self.BPM = 0 + return +