欢迎光临散文网 会员登陆 & 注册

序列数据处理学习(一)利用极值方法检测峰值学习

2022-10-31 15:10 作者:饼干快快快跑  | 我要投稿

最近有个时间序列异常值处理的任务,找一些源码学习一下思路:

一、Marcos Duarte编写的峰值查找算法

算法思路:

1.通过极值前后一阶导符号相反的条件,提取所有峰值的坐标: 

     ine = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0))[0]  

# np.hstack((0, dx)) 在dx前面插入0,通过与运算就可以判断先dx>0再dx<0 的位置为峰值点。

2.同时对于平顶峰,可以选择检测上下沿

ine = np.where((np.hstack((dx, 0)) <= 0) & (np.hstack((0, dx)) > 0))[0]  

#先dx>0再dx<0或=0 的位置为峰值

通过极值点获得峰值坐标后就是一系列的功能性筛选:

3.NaN和NaN附近的值不能为峰值

ind = ind[np.in1d(ind, np.unique(np.hstack((indnan, indnan - 1, indnan + 1))), invert=True)]

#测试峰值坐标是否在空数值的附近。5in1d:测试一维数组的每个元素是否也存在于第二个数组中,返回一个长度相同布尔数组,

4.去除处于首位和末尾的峰值点

    if ind.size and ind[0] == 0:

        ind = ind[1:]   #去除在首位的峰值

    if ind.size and ind[-1] == x.size - 1:

        ind = ind[:-1] #去除在末位的峰值

5.去除 高度 小于阈值mph的峰值

      ind = ind[x[ind] >= mph]  #去除 高度 小于阈值mph的峰值

6.剔除前后突变小于阈值的峰值

dx = np.min(np.vstack([x[ind] - x[ind - 1], x[ind] - x[ind + 1]]), axis=0)

ind = np.delete(ind, np.where(dx < threshold)[0])

#计算峰值点前后的差值,上下组合在一起在取最小值

#np.vstack按垂直方向(行顺序)堆叠数组构成一个新的数组,np.min取两侧最小的突变,加入axis参数,当axis=0时会分别取每一列的最大值或最小值,axis=1时,会分别取每一行的最大值或最小值,且将所有取到的数据放在一个一维数组中。

7.剔除峰值间距离过近的峰值

idel = idel | (ind >= ind[i] - mpd) & (ind <= ind[i] + mpd)

#搜索在当前峰,前后mpd区间里的其他峰,若有则给这些峰在idel里赋值1


算法风格:

这个算法的所有操作基本在峰值的索引序列上进行

先用逻辑语句获得布尔序列,再通过np.where函数取得索引序列

后面的功能判断,利用索引仅在极值点附近进行,简化了运算。


相关函数:

np.where:根据判断语句,提取索引

np.hstack,np.vstack :合并组合序列的方法

np.unique:合并后去除重复的函数

np.in1d:找数组B中元素,是否在数组A中存在,并标记位置

np.min(A, axis=0):求列最小

np.delete:判断语句配合np.where提取索引,精准删除

(np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0) :判断极值点的思路





#Marcos Duarte编写的峰值查找算法,功能比较强大,筛选参数比较多。算法源码如下:

def Marcos_detect_peaks(x, mph=None, mpd=1, threshold=0, edge='both',

                 kpsh=False, valley=False, show=False, ax=None):

    """Detect peaks in data based on their amplitude and other features.

    Parameters

    ----------

    x : 1D array_like

        data.

    mph : {None, number}, optional (default = None) #峰最小的高度

        detect peaks that are greater than minimum peak height.

    mpd : positive integer, optional (default = 1) #峰最小的宽度

        detect peaks that are at least separated by minimum peak distance (in

        number of data).

    threshold : positive number, optional (default = 0) 

        detect peaks (valleys) that are greater (smaller) than `threshold`

        in relation to their immediate neighbors.

    edge : {None, 'rising', 'falling', 'both'}, optional (default = 'rising') #平顶峰检测

        for a flat peak, keep only the rising edge ('rising'), only the

        falling edge ('falling'), both edges ('both'), or don't detect a

        flat peak (None).

    kpsh : bool, optional (default = False) #

        keep peaks with same height even if they are closer than `mpd`.

    valley : bool, optional (default = False)  #谷值检测

        if True (1), detect valleys (local minima) instead of peaks.

    show : bool, optional (default = False)

        if True (1), plot data in matplotlib figure.

    ax : a matplotlib.axes.Axes instance, optional (default = None).

    Returns

    -------

    ind : 1D array_like

        indeces of the peaks in `x`.

    Notes

    -----

    The detection of valleys instead of peaks is performed internally by simply

    negating the data: `ind_valleys = detect_peaks(-x)`


    The function can handle NaN's

    See this IPython Notebook [1]_.

    References

    ----------

    "Marcos Duarte, https://github.com/demotu/BMC"

    [1] http://nbviewer.ipython.org/github/demotu/BMC/blob/master/notebooks/DetectPeaks.ipynb

    """


    x = np.atleast_1d(x).astype('float64') #一维数组,转换为浮点型

    if x.size < 3:

        return np.array([], dtype=int)  #输入数组长度小于3,返回空

    if valley:

        x = -x  #检测谷值,将数组沿x轴镜像

    # find indexes of all peaks

    dx = x[1:] - x[:-1] 

    ## x[1:]截取索引1(第二个值)到所有, x[:-1]截取除最后一个的全部,相减为梯度

    # handle NaN's

    indnan = np.where(np.isnan(x))[0]   

    #通过isnan函数判断是否为空值,通过where获得nan所在索引

    if indnan.size:

        x[indnan] = np.inf  #np.inf 表示+∞,是没有确切的数值的,类型为浮点型

        dx[np.where(np.isnan(dx))[0]] = np.inf  #若为无穷,对应位置梯度为无穷


    ##########定位峰值点##########

    ine, ire, ife = np.array([[], [], []], dtype=int)   #创建空数组,ine, ire, ife = 平顶峰检测

    if not edge:  #不进行平顶峰检测

        ine = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0))[0]  

        # np.hstack((0, dx)) 在dx前面插入0,通过与运算就可以判断先dx>0在dx小于0 的位置为峰值点。

    else:

        if edge.lower() in ['rising', 'both']:

            ire = np.where((np.hstack((dx, 0)) <= 0) & (np.hstack((0, dx)) > 0))[0]  #上升沿后可以接下降沿也可以接平顶,返回数组坐标

        if edge.lower() in ['falling', 'both']:

            ife = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) >= 0))[0]     ##平顶或者上升沿可以接下降沿,返回数组坐标

    #返回峰值点坐标

    ind = np.unique(np.hstack((ine, ire, ife))) 

    # 当同时检测上下沿时,去除其中重复的坐标 ,并按坐标由小到大返回一个新的无元素重复的元组

    # handle NaN's

    if ind.size and indnan.size:

        # NaN's and values close to NaN's cannot be peaks,NaN和NaN附近的值不能为峰值

        ind = ind[np.in1d(ind, np.unique(np.hstack((indnan, indnan - 1, indnan + 1))), invert=True)] 

        #测试峰值坐标是否在空数值的附近。5in1d:测试一维数组的每个元素是否也存在于第二个数组中,返回一个长度相同布尔数组,


    #first and last values of x cannot be peaks  #去除第一个和最后一个峰值点

    if ind.size and ind[0] == 0:

        ind = ind[1:]   #去除在首位的峰值

    if ind.size and ind[-1] == x.size - 1:

        ind = ind[:-1] #去除在末位的峰值


    # remove peaks < minimum peak height #

    if ind.size and mph is not None:

        ind = ind[x[ind] >= mph]  #去除 高度 小于阈值mph的峰值

    # remove peaks - neighbors < threshold

    if ind.size and threshold > 0:   

        dx = np.min(np.vstack([x[ind] - x[ind - 1], x[ind] - x[ind + 1]]), axis=0) 

        #np.vstack按垂直方向(行顺序)堆叠数组构成一个新的数组,np.min取两侧最小的突变,加入axis参数,当axis=0时会分别取每一列的最大值或最小值,axis=1时,会分别取每一行的最大值或最小值,且将所有取到的数据放在一个一维数组中。

        ind = np.delete(ind, np.where(dx < threshold)[0])   #剔除突变小于阈值的峰值

    

    # detect small peaks closer than minimum peak distance #检测小于最小峰值距离的小峰值

    if ind.size and mpd > 1:

        ind = ind[np.argsort(x[ind])][::-1]  #np.argsort:将a中的元素从小到大排列,提取其在排列前对应的index(索引)输出。[::-1]:取从后向前(相反)的元素,b = a[i:j:s]这种格式呢,i,j与上面的一样,但s表示步进,缺省为1.

        idel = np.zeros(ind.size, dtype=bool) #zeros 在bool布尔里为0,idel数组只能true,false

        for i in range(ind.size):

            if not idel[i]:    # if ((not idel[i]) == ture) :    , not idel[i]是取反运算

                # keep peaks with the same height if kpsh is True  ,仅当峰间隔相同时保留峰,周期性峰值,下面'\'是换行标记

                #(ind >= ind[i] - mpd) & (ind <= ind[i] + mpd),搜索在当前峰,前后mpd区间里的其他峰,若有则给这些峰在idel里赋值1

                idel = idel | (ind >= ind[i] - mpd) & (ind <= ind[i] + mpd) \

                       & (x[ind[i]] > x[ind] if kpsh else True)

                idel[i] = 0  # Keep current peak

        # remove the small peaks and sort back the indexes by their occurrence

        ind = np.sort(ind[~idel])  #把小峰标记的1取反,在ind中删去这些峰值,再次排序

    return ind

二、Janko Slavic 方法

和上一个方法差不多,基本就是换了一种求极值的写法,相比之前的复杂一点点。

#Janko Slavic 方法

def findpeaks(data, spacing=1, limit=None):

    """Finds peaks in `data` which are of `spacing` width and >=`limit`.

    :param data: values

    :param spacing: minimum spacing to the next peak (should be 1 or more),到下一个峰值的最小间距(应该是1或更多)

    :param limit: peaks should have value greater or equal ,高度阈值

    :return:

    """

    len = data.size

    x = np.zeros(len + 2 * spacing) #增加两个空位

    x[:spacing] = data[0] - 1.e-6  #0:1首位赋值

    x[-spacing:] = data[-1] - 1.e-6 #-1:-1,末尾赋值

    x[spacing:spacing + len] = data #中间的为原数据,相当于给首末位复制一个数

    peak_candidate = np.zeros(len)  #候选数组

    peak_candidate[:] = True #布尔

    for s in range(spacing):

        start = spacing - s - 1

        h_b = x[start: start + len]  

        # before,起始位0,b会有两个第一位,最后一位是倒数第二位

        start = spacing

        h_c = x[start: start + len]  # central,起始位1,等于原数据

        start = spacing + s + 1

        h_a = x[start: start + len]  # after,起始位2,b会有两个末位,第一位是第二位

        peak_candidate = np.logical_and(peak_candidate, np.logical_and(h_c > h_b, h_c > h_a))  #同样是通过极值前后一阶导符号相反的条件,提取所有峰值的坐标

    ind = np.argwhere(peak_candidate) 

    #np.argwhere与where的功能基本一样,但是返回值的结构更简单

    ind = ind.reshape(ind.size) #整形成数组

    if limit is not None:

        ind = ind[data[ind] > limit] #高度阈值

    return ind

三、Tony Beltramelli方法

相当于先做了个归一化再求极值

相关函数

  1. np.roll(ratios, 1) ,这个滚动函数挺有意思。

  2. 通过逻辑判断的布尔序列求索引,可以实现np.argwhere的功能

    for i in range(0, len(peaks)):

    if peaks[i]:

          peak_indexes.append(i)

    return peak_indexes


def detect_peaks(signal, threshold=0.5):

    """ Performs peak detection on three steps: root mean square, peak to

    average ratios and first order logic.

    threshold used to discard peaks too small """

    # compute root mean square

    root_mean_square = sqrt(np.sum(np.square(signal) / len(signal)))#均方根:平方,均值,开方

    # compute peak to average ratios

    ratios = np.array([pow(x / root_mean_square, 2) for x in signal]) #计算与均方根的比值

    # apply first order logic

    peaks = (ratios > np.roll(ratios, 1)) & (ratios > np.roll(ratios, -1)) & (ratios > threshold)

    # np.roll:意思是将a,沿着axis的方向,滚动shift长度。np.roll(ratios, 1)相当于在数组首位加了最后的一位,其他后延

    # optional: return peak indices

    peak_indexes = []

    for i in range(0, len(peaks)):

        if peaks[i]:

            peak_indexes.append(i)

    return peak_indexes


四、总结

三种方法的结果

1.极值方法求峰值的方法就先看到这里,基本就是利用极值定义判断得到布尔序列,在转换成索引。

2.其次不要瞎归一化,注意需要强调的点针对性地选择归一化方法

3.前后序列补的值也要考虑好,最好用标准的正常值,也不要随便用0、复制等办法





序列数据处理学习(一)利用极值方法检测峰值学习的评论 (共 条)

分享到微博请遵守国家法律