Browse Source

added code for the heartbeat sensor

main
rra 11 months ago
parent
commit
ddd8da8465
  1. 20
      heartbeat.py
  2. 102
      pulsesensor.py

20
heartbeat.py

@ -0,0 +1,20 @@
from pulsesensor import Pulsesensor
import time
#this code assumes you have an MCP3008 with the analog heartbeat sensor on channel 0
p = Pulsesensor(channel=0, device=0)
p.startAsyncBPM()
try:
while True:
bpm = p.BPM
if bpm > 0:
print("BPM: %d" % bpm)
else:
print("No Heartbeat found")
time.sleep(1)
except:
p.stopAsyncBPM()

102
pulsesensor.py

@ -0,0 +1,102 @@
# extended from https://github.com/WorldFamousElectronics/PulseSensor_Amped_Arduino
import time
import threading
from gpiozero import MCP3008
class Pulsesensor:
def __init__(self, channel = 0, bus = 0, device = 0):
self.channel = channel
self.BPM = 0
self.adc = MCP3008(bus, device)
def getBPMLoop(self):
# init variables
rate = [0] * 10 # array to hold last 10 IBI values
sampleCounter = 0 # used to determine pulse timing
lastBeatTime = 0 # used to find IBI
P = 512 # used to find peak in pulse wave, seeded
T = 512 # used to find trough in pulse wave, seeded
thresh = 525 # used to find instant moment of heart beat, seeded
amp = 100 # used to hold amplitude of pulse waveform, seeded
firstBeat = True # used to seed rate array so we startup with reasonable BPM
secondBeat = False # used to seed rate array so we startup with reasonable BPM
IBI = 600 # int that holds the time interval between beats! Must be seeded!
Pulse = False # "True" when User's live heartbeat is detected. "False" when not a "live beat".
lastTime = int(time.time()*1000)
while not self.thread.stopped:
Signal = self.adc.raw_value
currentTime = int(time.time()*1000)
sampleCounter += currentTime - lastTime
lastTime = currentTime
N = sampleCounter - lastBeatTime
# find the peak and trough of the pulse wave
if Signal < thresh and N > (IBI/5.0)*3: # avoid dichrotic noise by waiting 3/5 of last IBI
if Signal < T: # T is the trough
T = Signal # keep track of lowest point in pulse wave
if Signal > thresh and Signal > P:
P = Signal
# signal surges up in value every time there is a pulse
if N > 250: # avoid high frequency noise
if Signal > thresh and Pulse == False and N > (IBI/5.0)*3:
Pulse = True # set the Pulse flag when we think there is a pulse
IBI = sampleCounter - lastBeatTime # measure time between beats in mS
lastBeatTime = sampleCounter # keep track of time for next pulse
if secondBeat: # if this is the second beat, if secondBeat == TRUE
secondBeat = False; # clear secondBeat flag
for i in range(len(rate)): # seed the running total to get a realisitic BPM at startup
rate[i] = IBI
if firstBeat: # if it's the first time we found a beat, if firstBeat == TRUE
firstBeat = False; # clear firstBeat flag
secondBeat = True; # set the second beat flag
continue
# keep a running total of the last 10 IBI values
rate[:-1] = rate[1:] # shift data in the rate array
rate[-1] = IBI # add the latest IBI to the rate array
runningTotal = sum(rate) # add upp oldest IBI values
runningTotal /= len(rate) # average the IBI values
self.BPM = 60000/runningTotal # how many beats can fit into a minute? that's BPM!
if Signal < thresh and Pulse == True: # when the values are going down, the beat is over
Pulse = False # reset the Pulse flag so we can do it again
amp = P - T # get amplitude of the pulse wave
thresh = amp/2 + T # set thresh at 50% of the amplitude
P = thresh # reset these for next time
T = thresh
if N > 2500: # if 2.5 seconds go by without a beat
thresh = 512 # set thresh default
P = 512 # set P default
T = 512 # set T default
lastBeatTime = sampleCounter # bring the lastBeatTime up to date
firstBeat = True # set these to avoid noise
secondBeat = False # when we get the heartbeat back
self.BPM = 0
time.sleep(0.005)
# Start getBPMLoop routine which saves the BPM in its variable
def startAsyncBPM(self):
self.thread = threading.Thread(target=self.getBPMLoop)
self.thread.stopped = False
self.thread.start()
return
# Stop the routine
def stopAsyncBPM(self):
self.thread.stopped = True
self.BPM = 0
return
Loading…
Cancel
Save