简单的基于小波分解和独立分量分析的脑电信号降噪(Python)

2024-06-02 12:28

本文主要是介绍简单的基于小波分解和独立分量分析的脑电信号降噪(Python),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

脑电信号是一种典型的非平稳随机信号且存在一定的非高斯性和非线性。传统的分析处理方法是将脑电信号近似看做线性、准平稳、高斯分布的随机信号,这使得分析结果往往不能令人满意,实用性较差。现代的小波变换方法和独立分量分析方法的提出为有效地分析脑电信号提供了新的途径。由于所要提取的特征波频率不精确并受到噪声的影响,如果单独应用小波提取出的特征信号往往特征不够明显。独立分量分析是根据信号的多元统计特性进行分析处理,可以将多道混合信号进行独立分离。考虑到所要提取的特征波就是混合脑电信号中的一个独立分量,应用独立分量分析在一定程度上可以分离出特征波。

鉴于此,采用简单的小波分解和独立分量分析对脑电信号降噪,完整代码如下:

import numpy as np
import matplotlib.pyplot as plt
import pyedflib
# import sklearn.linear_model as slm
from sklearn import metrics
from statsmodels.tsa.ar_model import AutoReg
# import scipy
# import scipy.signal as signal
from sklearn.decomposition import FastICA
import pywtclass EEGSignalProcessing:def __init__(self) -> None:passdef read_signal(filename, number_of_samples = None, offset = 0):file = pyedflib.EdfReader(filename)if number_of_samples is None:number_of_samples = file.getNSamples()[0]number_of_signals = file.signals_in_filesignal = np.zeros((number_of_signals, number_of_samples))for i in range(number_of_signals):signal[i, :] = file.readSignal(i)[offset:offset + number_of_samples]file.close()return signaldef plot_signal(data, sampling_frequency, title, number_of_channels = None, channel_labels = None, yaxis_label = None, xaxis_label = None):       plt.rcParams['font.size'] = '16'fig = plt.figure()ax = fig.add_subplot(1,1,1)lenght = len(data)if number_of_channels is None:number_of_channels_useful = range(0, lenght)else:if isinstance(number_of_channels, str):number_of_channels_useful = range(0, lenght-1)else:number_of_channels_useful = number_of_channelsfor channel in number_of_channels_useful:if channel_labels is None:label = 'ch' + str(channel + 1)else:label = channel_labels[channel]limit = data[channel, :].sizex_values = [num/sampling_frequency for num in range(0, limit)]ax.plot(x_values, data[channel, :], label = label)fig.set_size_inches(15,5)plt.title(title)plt.legend()if yaxis_label is not None:plt.ylabel(yaxis_label)if xaxis_label is not None:plt.xlabel(xaxis_label)plt.show(block = True)def channel_desynchronize(data_1d, delay, value = 0):number_of_samples = len(data_1d)if delay > 0:for i in range(number_of_samples - 1, delay - 1, -1):data_1d[i] = data_1d[i - delay]for i in range(0, delay):data_1d[i] = valueif delay < 0:delay = -delayfor i in range(0, number_of_samples - delay):data_1d[i] = data_1d[i + delay]for i in range(number_of_samples - delay, number_of_samples):data_1d[i] = value        def all_channels_desynchronize(data, delay, value = 0):for i in range(0, len(data)):EEGSignalProcessing.channel_desynchronize(data[i], delay, value)        class NoiseReduction:def autoregression(data, delay):signals_number = len(data)samples_number = len(data[0])output = np.zeros((signals_number, samples_number))for i in range(0, signals_number):model = AutoReg(data[i], lags=delay)model_fit = model.fit()predictions = model_fit.predict(start=0, end=samples_number-1, dynamic=False)output[i, :samples_number] = predictionsreturn outputdef wavelet(linear_array):name = 'bior3.1'# Create wavelet object and define parameterswav = pywt.Wavelet(name)max_level = pywt.dwt_max_level(len(linear_array) + 1, wav.dec_len)# print("Maximum level is " + str(max_level))threshold = 0.04  # Threshold for filtering# Decompose into wavelet components, to the level selected:coeffs = pywt.wavedec(linear_array, name, level=5)plt.figure()for i in range(1, len(coeffs)):plt.subplot(max_level, 1, i)plt.plot(coeffs[i])coeffs[i] = pywt.threshold(coeffs[i], threshold * max(coeffs[i]))plt.plot(coeffs[i])plt.show()datarec = pywt.waverec(coeffs, name)return np.array(datarec)def wavelet_all_channels(data):output = []for c in data:output.append(EEGSignalProcessing.NoiseReduction.wavelet(c))return np.stack(output)def ica(data, mask=None):# maska do wyboru składowychreduce_level = [True, True, True, True, True, True, True, True, True]reduce_level[7] = Falseif mask is not None:reduce_level = masksigT = data.Tn = data.shape[0]# obliczanie ICAica = FastICA(n_components=n)sig_ica = ica.fit_transform(sigT)# Macierz mmieszaniaA_ica = ica.mixing_# Przycięcie macierzy mieszającej, aby odrzucić najmniej znaczące wartościA_ica_reduced = A_icasig_ica = sig_ica[:, reduce_level]X_reduced = np.dot(sig_ica, A_ica_reduced.T[reduce_level, :]) + ica.mean_ica_reconstruct = X_reduced.Treturn ica_reconstructclass Noise:def add_uniform_noise(data, low, high, seed=None):signals_number = len(data)samples_number = len(data[0])output = np.zeros((signals_number, samples_number))if seed is not None:np.random.seed(seed)for i in range(signals_number):if isinstance(low, str):if low == "min_value":low = min(data[i])if isinstance(high, str):if high == "max_value":high = max(data[i])noise = np.random.uniform(low, high, samples_number)output[i] = data[i] + noisereturn outputdef add_normal_noise(data, mean, std, amplitude=1, seed=None):signals_number = len(data)samples_number = len(data[0])output = np.zeros((signals_number, samples_number))if seed is not None:np.random.seed(seed)for i in range(signals_number):noise = np.random.normal(mean, std, samples_number)output[i] = data[i] + noisereturn amplitude*outputdef add_triangular_noise(data, left, peak, right, seed=None):signals_number = len(data)samples_number = len(data[0])output = np.zeros((signals_number, samples_number))if seed is not None:np.random.seed(seed)for i in range(signals_number):noise = np.random.triangular(left, peak, right, samples_number)output[i] = data[i] + noisereturn outputclass Metrics:def __init__(self) -> None:passdef evaluate_signal(signal, prediction, cut_left=100, cut_right=100):signal_cut = signal[cut_left:-cut_right]predicted_cut = prediction[cut_left:-cut_right]# metryki z sklearnmae = metrics.mean_absolute_error(signal_cut, predicted_cut)mse = metrics.mean_squared_error(signal_cut, predicted_cut)# wyświetlanieprint('MAE z biblioteki sklearn: {}'.format(round(mae, 2)))print('MSE z biblioteki sklearn: {}'.format(round(mse, 2)))def differantial(sigA, sigB, cutleft=100, cutright=100):differential = sigA[:,cutleft:-cutright] - sigB[:,cutleft:-cutright]return differentialdef main():channels_to_plot = [0,1,2,3,4]# signal = EEGSignalProcessing.read_signal(filename = "Subject00_2.edf", number_of_samples=1000)# signal = EEGSignalProcessing.read_signal(filename = "Subject00_1.edf", number_of_samples=1000)signal = EEGSignalProcessing.read_signal(filename = "rsvp_10Hz_08b.edf", number_of_samples=1000)EEGSignalProcessing.plot_signal(signal,sampling_frequency= 2048, title = "Orginalne sygnały EEG", number_of_channels = channels_to_plot,yaxis_label='Wartosc sygnalu', xaxis_label='Czas [s]')low = 2high = 4sig_noise_uniform = EEGSignalProcessing.Noise.add_uniform_noise(signal, low=low, high=high, seed=100)EEGSignalProcessing.plot_signal(sig_noise_uniform, title="Zaszumiony sygnał 5 kanałów EEG (Rozkład Jednostajny Low={}, High={})".format(low, high), sampling_frequency=2048, number_of_channels=channels_to_plot, yaxis_label='Wartość sygnału', xaxis_label='Czas [s]')mean = 0std = 2ampl = 2sig_noise_normal = EEGSignalProcessing.Noise.add_normal_noise(signal, mean=mean, std=std, amplitude=ampl, seed=100)EEGSignalProcessing.plot_signal(sig_noise_normal, title="Zaszumiony sygnał 5 kanałów EEG (Rozkład Normalny Low={}, High={})".format(mean, std), sampling_frequency=2048, number_of_channels=channels_to_plot, yaxis_label='Wartość sygnału', xaxis_label='Czas [s]')sig_n3_left = 0sig_n3_peak = 4sig_n3_right = 6sig_noise_triangular = EEGSignalProcessing.Noise.add_triangular_noise(signal, left=sig_n3_left, peak=sig_n3_peak, right=sig_n3_right, seed=100)EEGSignalProcessing.plot_signal(sig_noise_triangular, title="Zaszumiony sygnał 5 kanałów EEG (Rozkład Trójkątny Left={}, Peak={}, High={})".format(sig_n3_left, sig_n3_peak, sig_n3_right), sampling_frequency=2048, number_of_channels=channels_to_plot, yaxis_label='Wartość sygnału', xaxis_label='Czas [s]')# Odszumianie sygnałów# AutoregresjaAR_lag = 10signal_autoregresion = EEGSignalProcessing.NoiseReduction.autoregression(sig_noise_triangular, delay = AR_lag)EEGSignalProcessing.plot_signal(signal_autoregresion,title="5 odszumionych kanałów EEG - regresja liniowa delay={}".format(AR_lag), sampling_frequency=2048, number_of_channels=channels_to_plot,yaxis_label='wartość sygnału', xaxis_label='czas [s]')# Waveletsignal_wavelet = EEGSignalProcessing.NoiseReduction.wavelet_all_channels(sig_noise_triangular)EEGSignalProcessing.plot_signal(signal_wavelet,title="5 odszumionych kanałów EEG - Wavelet", sampling_frequency=2048, number_of_channels=channels_to_plot,yaxis_label='wartość sygnału', xaxis_label='czas [s]')# ICAsignal_ICA = EEGSignalProcessing.NoiseReduction.ica(sig_noise_triangular)EEGSignalProcessing.plot_signal(signal_ICA,title="5 odszumionych kanałów EEG - ICA", sampling_frequency=2048, number_of_channels=channels_to_plot,yaxis_label='wartość sygnału', xaxis_label='czas [s]')# Metrykich = 4noise_signal = sig_noise_normalprint('\nORYGINALNY')Metrics.evaluate_signal(signal[ch], signal[ch])print('\nNIEODSZUMIONY, dodano szum rozkład normalny')Metrics.evaluate_signal(signal[ch], noise_signal[ch])print('\nODSZUMIONY, najpierw dodano szum rozkład normalny, potem autoregresja')Metrics.evaluate_signal(signal[ch], signal_autoregresion[ch])print('\nODSZUMIONY, najpierw dodano szum rozkład normalny, potem ICA')Metrics.evaluate_signal(signal[ch], signal_ICA[ch])print('\nODSZUMIONY, najpierw dodano szum rozkład normalny, potem wavelet')Metrics.evaluate_signal(signal[ch], signal_wavelet[ch])# Sygnał różnicowydifferential_noise = Metrics.differantial(sig_noise_normal, signal)differential_AR = Metrics.differantial(signal_autoregresion, signal)differential_ICA = Metrics.differantial(signal_ICA, signal)differential_Wavelet = Metrics.differantial(signal_wavelet, signal)EEGSignalProcessing.plot_signal(differential_noise, title="Sygnał różnicowy, zaszumiony-orginalny",sampling_frequency=2048, number_of_channels=[ch], yaxis_label="Wartość sygnału",xaxis_label="Czas [s]")EEGSignalProcessing.plot_signal(differential_AR, title="Sygnał różnicowy, AR-orginalny",sampling_frequency=2048, number_of_channels=[ch], yaxis_label="Wartość sygnału",xaxis_label="Czas [s]")EEGSignalProcessing.plot_signal(differential_ICA, title="Sygnał różnicowy, ICA-orginalny",sampling_frequency=2048, number_of_channels=[ch], yaxis_label="Wartość sygnału",xaxis_label="Czas [s]")EEGSignalProcessing.plot_signal(differential_Wavelet, title="Sygnał różnicowy, wavelet-orginalny",sampling_frequency=2048, number_of_channels=[ch], yaxis_label="Wartość sygnału",xaxis_label="Czas [s]")
完整代码:https://mbd.pub/o/bread/mbd-ZZWYlJlpif __name__ == '__main__':main()

图片

图片

图片

图片

工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

这篇关于简单的基于小波分解和独立分量分析的脑电信号降噪(Python)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1023966

相关文章

python: 多模块(.py)中全局变量的导入

文章目录 global关键字可变类型和不可变类型数据的内存地址单模块(单个py文件)的全局变量示例总结 多模块(多个py文件)的全局变量from x import x导入全局变量示例 import x导入全局变量示例 总结 global关键字 global 的作用范围是模块(.py)级别: 当你在一个模块(文件)中使用 global 声明变量时,这个变量只在该模块的全局命名空

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

csu 1446 Problem J Modified LCS (扩展欧几里得算法的简单应用)

这是一道扩展欧几里得算法的简单应用题,这题是在湖南多校训练赛中队友ac的一道题,在比赛之后请教了队友,然后自己把它a掉 这也是自己独自做扩展欧几里得算法的题目 题意:把题意转变下就变成了:求d1*x - d2*y = f2 - f1的解,很明显用exgcd来解 下面介绍一下exgcd的一些知识点:求ax + by = c的解 一、首先求ax + by = gcd(a,b)的解 这个

hdu2289(简单二分)

虽说是简单二分,但是我还是wa死了  题意:已知圆台的体积,求高度 首先要知道圆台体积怎么求:设上下底的半径分别为r1,r2,高为h,V = PI*(r1*r1+r1*r2+r2*r2)*h/3 然后以h进行二分 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#includ

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

usaco 1.3 Prime Cryptarithm(简单哈希表暴搜剪枝)

思路: 1. 用一个 hash[ ] 数组存放输入的数字,令 hash[ tmp ]=1 。 2. 一个自定义函数 check( ) ,检查各位是否为输入的数字。 3. 暴搜。第一行数从 100到999,第二行数从 10到99。 4. 剪枝。 代码: /*ID: who jayLANG: C++TASK: crypt1*/#include<stdio.h>bool h

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

uva 10387 Billiard(简单几何)

题意是一个球从矩形的中点出发,告诉你小球与矩形两条边的碰撞次数与小球回到原点的时间,求小球出发时的角度和小球的速度。 简单的几何问题,小球每与竖边碰撞一次,向右扩展一个相同的矩形;每与横边碰撞一次,向上扩展一个相同的矩形。 可以发现,扩展矩形的路径和在当前矩形中的每一段路径相同,当小球回到出发点时,一条直线的路径刚好经过最后一个扩展矩形的中心点。 最后扩展的路径和横边竖边恰好组成一个直

poj 1113 凸包+简单几何计算

题意: 给N个平面上的点,现在要在离点外L米处建城墙,使得城墙把所有点都包含进去且城墙的长度最短。 解析: 韬哥出的某次训练赛上A出的第一道计算几何,算是大水题吧。 用convexhull算法把凸包求出来,然后加加减减就A了。 计算见下图: 好久没玩画图了啊好开心。 代码: #include <iostream>#include <cstdio>#inclu

uva 10130 简单背包

题意: 背包和 代码: #include <iostream>#include <cstdio>#include <cstdlib>#include <algorithm>#include <cstring>#include <cmath>#include <stack>#include <vector>#include <queue>#include <map>