序列数据处理学习(一)利用极值方法检测峰值学习
最近有个时间序列异常值处理的任务,找一些源码学习一下思路:
一、Marcos Duarte编写的峰值查找算法
算法思路:
1.通过极值前后一阶导符号相反的条件,提取所有峰值的坐标:
ine = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0))[0]
# np.hstack((0, dx)) 在dx前面插入0,通过与运算就可以判断先dx>0再dx<0 的位置为峰值点。
2.同时对于平顶峰,可以选择检测上下沿
ine = np.where((np.hstack((dx, 0)) <= 0) & (np.hstack((0, dx)) > 0))[0]
#先dx>0再dx<0或=0 的位置为峰值
通过极值点获得峰值坐标后就是一系列的功能性筛选:
3.NaN和NaN附近的值不能为峰值
ind = ind[np.in1d(ind, np.unique(np.hstack((indnan, indnan - 1, indnan + 1))), invert=True)]
#测试峰值坐标是否在空数值的附近。5in1d:测试一维数组的每个元素是否也存在于第二个数组中,返回一个长度相同布尔数组,
4.去除处于首位和末尾的峰值点
if ind.size and ind[0] == 0:
ind = ind[1:] #去除在首位的峰值
if ind.size and ind[-1] == x.size - 1:
ind = ind[:-1] #去除在末位的峰值
5.去除 高度 小于阈值mph的峰值
ind = ind[x[ind] >= mph] #去除 高度 小于阈值mph的峰值
6.剔除前后突变小于阈值的峰值
dx = np.min(np.vstack([x[ind] - x[ind - 1], x[ind] - x[ind + 1]]), axis=0)
ind = np.delete(ind, np.where(dx < threshold)[0])
#计算峰值点前后的差值,上下组合在一起在取最小值
#np.vstack按垂直方向(行顺序)堆叠数组构成一个新的数组,np.min取两侧最小的突变,加入axis参数,当axis=0时会分别取每一列的最大值或最小值,axis=1时,会分别取每一行的最大值或最小值,且将所有取到的数据放在一个一维数组中。
7.剔除峰值间距离过近的峰值
idel = idel | (ind >= ind[i] - mpd) & (ind <= ind[i] + mpd)
#搜索在当前峰,前后mpd区间里的其他峰,若有则给这些峰在idel里赋值1
算法风格:
这个算法的所有操作基本在峰值的索引序列上进行
先用逻辑语句获得布尔序列,再通过np.where函数取得索引序列
后面的功能判断,利用索引仅在极值点附近进行,简化了运算。
相关函数:
np.where:根据判断语句,提取索引
np.hstack,np.vstack :合并组合序列的方法
np.unique:合并后去除重复的函数
np.in1d:找数组B中元素,是否在数组A中存在,并标记位置
np.min(A, axis=0):求列最小
np.delete:判断语句配合np.where提取索引,精准删除
(np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0) :判断极值点的思路
#Marcos Duarte编写的峰值查找算法,功能比较强大,筛选参数比较多。算法源码如下:
def Marcos_detect_peaks(x, mph=None, mpd=1, threshold=0, edge='both',
kpsh=False, valley=False, show=False, ax=None):
"""Detect peaks in data based on their amplitude and other features.
Parameters
----------
x : 1D array_like
data.
mph : {None, number}, optional (default = None) #峰最小的高度
detect peaks that are greater than minimum peak height.
mpd : positive integer, optional (default = 1) #峰最小的宽度
detect peaks that are at least separated by minimum peak distance (in
number of data).
threshold : positive number, optional (default = 0)
detect peaks (valleys) that are greater (smaller) than `threshold`
in relation to their immediate neighbors.
edge : {None, 'rising', 'falling', 'both'}, optional (default = 'rising') #平顶峰检测
for a flat peak, keep only the rising edge ('rising'), only the
falling edge ('falling'), both edges ('both'), or don't detect a
flat peak (None).
kpsh : bool, optional (default = False) #
keep peaks with same height even if they are closer than `mpd`.
valley : bool, optional (default = False) #谷值检测
if True (1), detect valleys (local minima) instead of peaks.
show : bool, optional (default = False)
if True (1), plot data in matplotlib figure.
ax : a matplotlib.axes.Axes instance, optional (default = None).
Returns
-------
ind : 1D array_like
indeces of the peaks in `x`.
Notes
-----
The detection of valleys instead of peaks is performed internally by simply
negating the data: `ind_valleys = detect_peaks(-x)`
The function can handle NaN's
See this IPython Notebook [1]_.
References
----------
"Marcos Duarte, https://github.com/demotu/BMC"
[1] http://nbviewer.ipython.org/github/demotu/BMC/blob/master/notebooks/DetectPeaks.ipynb
"""
x = np.atleast_1d(x).astype('float64') #一维数组,转换为浮点型
if x.size < 3:
return np.array([], dtype=int) #输入数组长度小于3,返回空
if valley:
x = -x #检测谷值,将数组沿x轴镜像
# find indexes of all peaks
dx = x[1:] - x[:-1]
## x[1:]截取索引1(第二个值)到所有, x[:-1]截取除最后一个的全部,相减为梯度
# handle NaN's
indnan = np.where(np.isnan(x))[0]
#通过isnan函数判断是否为空值,通过where获得nan所在索引
if indnan.size:
x[indnan] = np.inf #np.inf 表示+∞,是没有确切的数值的,类型为浮点型
dx[np.where(np.isnan(dx))[0]] = np.inf #若为无穷,对应位置梯度为无穷
##########定位峰值点##########
ine, ire, ife = np.array([[], [], []], dtype=int) #创建空数组,ine, ire, ife = 平顶峰检测
if not edge: #不进行平顶峰检测
ine = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0))[0]
# np.hstack((0, dx)) 在dx前面插入0,通过与运算就可以判断先dx>0在dx小于0 的位置为峰值点。
else:
if edge.lower() in ['rising', 'both']:
ire = np.where((np.hstack((dx, 0)) <= 0) & (np.hstack((0, dx)) > 0))[0] #上升沿后可以接下降沿也可以接平顶,返回数组坐标
if edge.lower() in ['falling', 'both']:
ife = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) >= 0))[0] ##平顶或者上升沿可以接下降沿,返回数组坐标
#返回峰值点坐标
ind = np.unique(np.hstack((ine, ire, ife)))
# 当同时检测上下沿时,去除其中重复的坐标 ,并按坐标由小到大返回一个新的无元素重复的元组
# handle NaN's
if ind.size and indnan.size:
# NaN's and values close to NaN's cannot be peaks,NaN和NaN附近的值不能为峰值
ind = ind[np.in1d(ind, np.unique(np.hstack((indnan, indnan - 1, indnan + 1))), invert=True)]
#测试峰值坐标是否在空数值的附近。5in1d:测试一维数组的每个元素是否也存在于第二个数组中,返回一个长度相同布尔数组,
#first and last values of x cannot be peaks #去除第一个和最后一个峰值点
if ind.size and ind[0] == 0:
ind = ind[1:] #去除在首位的峰值
if ind.size and ind[-1] == x.size - 1:
ind = ind[:-1] #去除在末位的峰值
# remove peaks < minimum peak height #
if ind.size and mph is not None:
ind = ind[x[ind] >= mph] #去除 高度 小于阈值mph的峰值
# remove peaks - neighbors < threshold
if ind.size and threshold > 0:
dx = np.min(np.vstack([x[ind] - x[ind - 1], x[ind] - x[ind + 1]]), axis=0)
#np.vstack按垂直方向(行顺序)堆叠数组构成一个新的数组,np.min取两侧最小的突变,加入axis参数,当axis=0时会分别取每一列的最大值或最小值,axis=1时,会分别取每一行的最大值或最小值,且将所有取到的数据放在一个一维数组中。
ind = np.delete(ind, np.where(dx < threshold)[0]) #剔除突变小于阈值的峰值
# detect small peaks closer than minimum peak distance #检测小于最小峰值距离的小峰值
if ind.size and mpd > 1:
ind = ind[np.argsort(x[ind])][::-1] #np.argsort:将a中的元素从小到大排列,提取其在排列前对应的index(索引)输出。[::-1]:取从后向前(相反)的元素,b = a[i:j:s]这种格式呢,i,j与上面的一样,但s表示步进,缺省为1.
idel = np.zeros(ind.size, dtype=bool) #zeros 在bool布尔里为0,idel数组只能true,false
for i in range(ind.size):
if not idel[i]: # if ((not idel[i]) == ture) : , not idel[i]是取反运算
# keep peaks with the same height if kpsh is True ,仅当峰间隔相同时保留峰,周期性峰值,下面'\'是换行标记
#(ind >= ind[i] - mpd) & (ind <= ind[i] + mpd),搜索在当前峰,前后mpd区间里的其他峰,若有则给这些峰在idel里赋值1
idel = idel | (ind >= ind[i] - mpd) & (ind <= ind[i] + mpd) \
& (x[ind[i]] > x[ind] if kpsh else True)
idel[i] = 0 # Keep current peak
# remove the small peaks and sort back the indexes by their occurrence
ind = np.sort(ind[~idel]) #把小峰标记的1取反,在ind中删去这些峰值,再次排序
return ind
二、Janko Slavic 方法
和上一个方法差不多,基本就是换了一种求极值的写法,相比之前的复杂一点点。
#Janko Slavic 方法
def findpeaks(data, spacing=1, limit=None):
"""Finds peaks in `data` which are of `spacing` width and >=`limit`.
:param data: values
:param spacing: minimum spacing to the next peak (should be 1 or more),到下一个峰值的最小间距(应该是1或更多)
:param limit: peaks should have value greater or equal ,高度阈值
:return:
"""
len = data.size
x = np.zeros(len + 2 * spacing) #增加两个空位
x[:spacing] = data[0] - 1.e-6 #0:1首位赋值
x[-spacing:] = data[-1] - 1.e-6 #-1:-1,末尾赋值
x[spacing:spacing + len] = data #中间的为原数据,相当于给首末位复制一个数
peak_candidate = np.zeros(len) #候选数组
peak_candidate[:] = True #布尔
for s in range(spacing):
start = spacing - s - 1
h_b = x[start: start + len]
# before,起始位0,b会有两个第一位,最后一位是倒数第二位
start = spacing
h_c = x[start: start + len] # central,起始位1,等于原数据
start = spacing + s + 1
h_a = x[start: start + len] # after,起始位2,b会有两个末位,第一位是第二位
peak_candidate = np.logical_and(peak_candidate, np.logical_and(h_c > h_b, h_c > h_a)) #同样是通过极值前后一阶导符号相反的条件,提取所有峰值的坐标
ind = np.argwhere(peak_candidate)
#np.argwhere与where的功能基本一样,但是返回值的结构更简单
ind = ind.reshape(ind.size) #整形成数组
if limit is not None:
ind = ind[data[ind] > limit] #高度阈值
return ind
三、Tony Beltramelli方法
相当于先做了个归一化再求极值
相关函数
np.roll(ratios, 1) ,这个滚动函数挺有意思。
通过逻辑判断的布尔序列求索引,可以实现np.argwhere的功能
for i in range(0, len(peaks)):
if peaks[i]:
peak_indexes.append(i)
return peak_indexes
def detect_peaks(signal, threshold=0.5):
""" Performs peak detection on three steps: root mean square, peak to
average ratios and first order logic.
threshold used to discard peaks too small """
# compute root mean square
root_mean_square = sqrt(np.sum(np.square(signal) / len(signal)))#均方根:平方,均值,开方
# compute peak to average ratios
ratios = np.array([pow(x / root_mean_square, 2) for x in signal]) #计算与均方根的比值
# apply first order logic
peaks = (ratios > np.roll(ratios, 1)) & (ratios > np.roll(ratios, -1)) & (ratios > threshold)
# np.roll:意思是将a,沿着axis的方向,滚动shift长度。np.roll(ratios, 1)相当于在数组首位加了最后的一位,其他后延
# optional: return peak indices
peak_indexes = []
for i in range(0, len(peaks)):
if peaks[i]:
peak_indexes.append(i)
return peak_indexes
四、总结

1.极值方法求峰值的方法就先看到这里,基本就是利用极值定义判断得到布尔序列,在转换成索引。
2.其次不要瞎归一化,注意需要强调的点针对性地选择归一化方法
3.前后序列补的值也要考虑好,最好用标准的正常值,也不要随便用0、复制等办法

