Source code for signalz.generators.ecgsyn

"""
.. versionadded:: 0.6

This functions produces a synthesized ECG signal. User can control following
parameters: mean heart rate, number of beats, sampling frequency,
waveform morphology (P, Q, R, S, and T timing, amplitude,and duration),
standard deviation of the RR interval, LF/HF ratio.

Copyright (c) 2003 by Patrick McSharry & Gari Clifford, All Rights Reserved 
:cite:`mcsharry2003dynamical`.

More information can be found in PhysionNet https://www.physionet.org/physiotools/ecgsyn/

Example Usage
===============

Simple example follows

.. code-block:: python

    x, peaks = ecgsyn(n=10, hrmean=50, hrstd=3, sfecg=128)
    
The returned variable `x` contains the synthetic ECG series.   

References
============

.. bibliography:: ecgsyn.bib
    :style: plain

Function Documentation
======================================
"""
import numpy as np

from signalz.misc import check_type_or_raise, ode45, rem

[docs]def derfunc(t, x, rr, sfint, ti, ai, bi): """ Derivations of the ECG function. """ xi = np.cos(ti) yi = np.sin(ti) ta = np.arctan2(x[1], x[0]) r0 = 1. a0 = 1. - np.sqrt((x[0]**2) + (x[1]**2)) / r0 ip = int(np.floor(t * sfint)) w0 = 2. * np.pi / rr[ip] fresp = 0.25 zbase = 0.005 * np.sin(2 * np.pi * fresp * t) dx1dt = a0*x[0] - w0*x[1] dx2dt = a0*x[1] + w0*x[0] dti = rem(ta - ti, 2. * np.pi) dx3dt = - np.sum(ai * dti * np.exp(-0.5*(dti / bi)**2)) - (x[2] - zbase) return np.array([dx1dt, dx2dt, dx3dt])
def rrprocess(flo, fhi, flostd, fhistd, lfhfratio, hrmean, hrstd, sfrr, n): w1 = 2. * np.pi * flo w2 = 2. * np.pi * fhi c1 = 2. * np.pi * flostd c2 = 2. * np.pi * fhistd sig2 = 1 sig1 = lfhfratio rrmean = 60 / hrmean rrstd = 60 * hrstd / float(hrmean * hrmean) df = sfrr / float(n) w = np.arange(0,n) * 2 * np.pi * df dw1 = w - w1 dw2 = w - w2 Hw1 = sig1 * np.exp(-0.5 * (dw1 / c1)**2) / np.sqrt(2.*np.pi*c1**2) Hw2 = sig2 * np.exp(-0.5 * (dw2 / c2)**2) / np.sqrt(2.*np.pi*c2**2) Hw = Hw1 + Hw2 Hw0 = np.concatenate([Hw[0:int(n/2)], Hw[int(n/2):0:-1]]) Sw = (sfrr / 2.) * np.sqrt(Hw0) ph0 = 2 * np.pi * np.random.uniform(0, 1, int(n/2)-1) ph = np.concatenate([[0], ph0, [0], -np.flipud(ph0)]) SwC = Sw * np.exp(ph*1j) x = (1 / float(n)) * np.real(np.fft.ifft(SwC)) xstd = np.std(x) ratio = rrstd / float(xstd) return rrmean + x * ratio
[docs]def annotate_peaks(x, thetap, sfecg): """ This function annotates PQRST peaks (P=1, Q=2, R=3, S=4, T=5). """ # find rough positions of peaks n = len(x) irpeaks = np.zeros(n) theta = np.arctan2(x[:,1], x[:,0]) ind0 = np.zeros(n) for i in range(0, n-1): a = (theta[i] <= thetap) & (thetap <= theta[i+1]) j = np.where(a==1)[0] if len(j) > 0: d1 = thetap[j] - theta[i] d2 = theta[i+1] - thetap[j] if d1[0] < d2[0]: ind0[i] = j[0]+1 else: ind0[i+1] = j[0]+1 # shift the peaks to correct position ind = np.zeros(n) z = x[:,2] extrema = np.array([np.argmax, np.argmin, np.argmax, np.argmin, np.argmax]) for i in range(0,5): ind1 = np.where(ind0==i+1)[0] for j in ind1: if j: surrounding = z[j-3:j+3] correction = extrema[i](surrounding) ind[j-3+correction] = i+1 return ind
[docs]def ecgsyn(sfecg=256, n=256, hrmean=60., hrstd=1, lfhfratio=0.5, sfint=512, ti=[-70, -15, 0, 15, 100], ai=[1.2, -5, 30, -7.5, 0.75], bi=[0.25, 0.1, 0.1, 0.1, 0.4]): """ ECGSYN - realistic ecg generator. Kwargs: * `sfecg` : ECG sampling frequency (int), it Hz * `N` : approaximate number of heart beats (int) * `hrmean` : mean heart rate (float) in beats per minute * `hrstd` : standard deviation of heart rate (float) in beats per minute * 'lfhfration : LF/HF ratio (float) * `sfint` : internal sampling frequency (int) in Hz * `ti` : angles of PQRST extrema (1d array of size 5) in degrees * `ai` : z-position of PQRST extrema (1d array of size 5) * `bi` : Gaussian width of peaks (1d array of size 5) Returns: * `x` : ECG values in mV * `peaks`: labels for PQRST peaks (P=1, Q=2, R=3, S=4, T=5 and 0 elsewhere) """ # check data types check_type_or_raise(sfecg, int, "sfecg") check_type_or_raise(n, int, "sfecg") check_type_or_raise(sfint, int, "sfint") # data cleaning ti = np.array(ti) hrmean = float(hrmean) ti = ti * np.pi / 180. ai = np.array(ai) bi = np.array(bi) # adjust extrema parameters for mean heart rate hrfact = np.sqrt(hrmean / 60.) hrfact2 = np.sqrt(hrfact) bi = hrfact * bi ti = np.array([hrfact2, hrfact, 1, hrfact, hrfact2]) * ti # check that sfint is an integer multiple of sfecg if sfint % sfecg != 0 or sfint < sfecg: raise ValueError("Sfint must be an integer multiple of sfecg") # define frequency parameters for rr process, flo and fhi correspond # to the Mayer waves and respiratory rate respectively flo = 0.1 fhi = 0.25 flostd = 0.01 fhistd = 0.01 # calculate time scales for rr and total output sampfreqrr = 1 trr = 1 / float(sampfreqrr) tstep = 1 / float(sfecg) rrmean = 60 / hrmean Nrr = 2**(np.ceil(np.log2(n * rrmean / trr))) # compute rr process rr0 = rrprocess(flo, fhi, flostd, fhistd, lfhfratio, hrmean, hrstd, sampfreqrr, Nrr) # upsample rr time series from 1 Hz to sfint Hz rrlin = np.arange(0, len(rr0)*sfint) / float(sfint) # upsample rr = np.interp(rrlin, np.arange(0, len(rr0)), rr0) # upsample # make the rrn time series dt = 1 / float(sfint) rrn = np.zeros(len(rr)) tecg = 0 i = 0 while i <= len(rr): tecg = tecg + rr[i] ip = int(np.round(tecg / dt)) rrn[i:ip+1] = rr[i] #+1? i = ip+1 Nt = ip # integrate system using fourth order Runge-Kutta x0 = np.array([1,0,0.04]) Tspan = np.arange(0, (Nt-1)*dt, dt) Tspan = Tspan[:len(rrn)] z = ode45(derfunc, Tspan, x0, rrn, sfint, ti, ai, bi) # resample (downsample) resample_factor = int(sfint / sfecg) x = z[::resample_factor] # annotate peaks ipeaks = annotate_peaks(x, ti, sfecg) # Scale signal to lie between -0.4 and 1.2 mV x = x[:,2] zmin = x.min() zmax = x.max() zrange = zmax - zmin out = (x - zmin) * 1.6 / zrange - 0.4 return out, ipeaks
if __name__ == "__main__": x, peaks = ecgsyn(n=10, hrmean=50, hrstd=3, sfecg=128)