简单的基于小波分解和独立分量分析的脑电信号降噪(Python)

2024-06-02 12:28

本文主要是介绍简单的基于小波分解和独立分量分析的脑电信号降噪(Python),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

脑电信号是一种典型的非平稳随机信号且存在一定的非高斯性和非线性。传统的分析处理方法是将脑电信号近似看做线性、准平稳、高斯分布的随机信号,这使得分析结果往往不能令人满意,实用性较差。现代的小波变换方法和独立分量分析方法的提出为有效地分析脑电信号提供了新的途径。由于所要提取的特征波频率不精确并受到噪声的影响,如果单独应用小波提取出的特征信号往往特征不够明显。独立分量分析是根据信号的多元统计特性进行分析处理,可以将多道混合信号进行独立分离。考虑到所要提取的特征波就是混合脑电信号中的一个独立分量,应用独立分量分析在一定程度上可以分离出特征波。

鉴于此,采用简单的小波分解和独立分量分析对脑电信号降噪,完整代码如下:

import numpy as np
import matplotlib.pyplot as plt
import pyedflib
# import sklearn.linear_model as slm
from sklearn import metrics
from statsmodels.tsa.ar_model import AutoReg
# import scipy
# import scipy.signal as signal
from sklearn.decomposition import FastICA
import pywtclass EEGSignalProcessing:def __init__(self) -> None:passdef read_signal(filename, number_of_samples = None, offset = 0):file = pyedflib.EdfReader(filename)if number_of_samples is None:number_of_samples = file.getNSamples()[0]number_of_signals = file.signals_in_filesignal = np.zeros((number_of_signals, number_of_samples))for i in range(number_of_signals):signal[i, :] = file.readSignal(i)[offset:offset + number_of_samples]file.close()return signaldef plot_signal(data, sampling_frequency, title, number_of_channels = None, channel_labels = None, yaxis_label = None, xaxis_label = None):       plt.rcParams['font.size'] = '16'fig = plt.figure()ax = fig.add_subplot(1,1,1)lenght = len(data)if number_of_channels is None:number_of_channels_useful = range(0, lenght)else:if isinstance(number_of_channels, str):number_of_channels_useful = range(0, lenght-1)else:number_of_channels_useful = number_of_channelsfor channel in number_of_channels_useful:if channel_labels is None:label = 'ch' + str(channel + 1)else:label = channel_labels[channel]limit = data[channel, :].sizex_values = [num/sampling_frequency for num in range(0, limit)]ax.plot(x_values, data[channel, :], label = label)fig.set_size_inches(15,5)plt.title(title)plt.legend()if yaxis_label is not None:plt.ylabel(yaxis_label)if xaxis_label is not None:plt.xlabel(xaxis_label)plt.show(block = True)def channel_desynchronize(data_1d, delay, value = 0):number_of_samples = len(data_1d)if delay > 0:for i in range(number_of_samples - 1, delay - 1, -1):data_1d[i] = data_1d[i - delay]for i in range(0, delay):data_1d[i] = valueif delay < 0:delay = -delayfor i in range(0, number_of_samples - delay):data_1d[i] = data_1d[i + delay]for i in range(number_of_samples - delay, number_of_samples):data_1d[i] = value        def all_channels_desynchronize(data, delay, value = 0):for i in range(0, len(data)):EEGSignalProcessing.channel_desynchronize(data[i], delay, value)        class NoiseReduction:def autoregression(data, delay):signals_number = len(data)samples_number = len(data[0])output = np.zeros((signals_number, samples_number))for i in range(0, signals_number):model = AutoReg(data[i], lags=delay)model_fit = model.fit()predictions = model_fit.predict(start=0, end=samples_number-1, dynamic=False)output[i, :samples_number] = predictionsreturn outputdef wavelet(linear_array):name = 'bior3.1'# Create wavelet object and define parameterswav = pywt.Wavelet(name)max_level = pywt.dwt_max_level(len(linear_array) + 1, wav.dec_len)# print("Maximum level is " + str(max_level))threshold = 0.04  # Threshold for filtering# Decompose into wavelet components, to the level selected:coeffs = pywt.wavedec(linear_array, name, level=5)plt.figure()for i in range(1, len(coeffs)):plt.subplot(max_level, 1, i)plt.plot(coeffs[i])coeffs[i] = pywt.threshold(coeffs[i], threshold * max(coeffs[i]))plt.plot(coeffs[i])plt.show()datarec = pywt.waverec(coeffs, name)return np.array(datarec)def wavelet_all_channels(data):output = []for c in data:output.append(EEGSignalProcessing.NoiseReduction.wavelet(c))return np.stack(output)def ica(data, mask=None):# maska do wyboru składowychreduce_level = [True, True, True, True, True, True, True, True, True]reduce_level[7] = Falseif mask is not None:reduce_level = masksigT = data.Tn = data.shape[0]# obliczanie ICAica = FastICA(n_components=n)sig_ica = ica.fit_transform(sigT)# Macierz mmieszaniaA_ica = ica.mixing_# Przycięcie macierzy mieszającej, aby odrzucić najmniej znaczące wartościA_ica_reduced = A_icasig_ica = sig_ica[:, reduce_level]X_reduced = np.dot(sig_ica, A_ica_reduced.T[reduce_level, :]) + ica.mean_ica_reconstruct = X_reduced.Treturn ica_reconstructclass Noise:def add_uniform_noise(data, low, high, seed=None):signals_number = len(data)samples_number = len(data[0])output = np.zeros((signals_number, samples_number))if seed is not None:np.random.seed(seed)for i in range(signals_number):if isinstance(low, str):if low == "min_value":low = min(data[i])if isinstance(high, str):if high == "max_value":high = max(data[i])noise = np.random.uniform(low, high, samples_number)output[i] = data[i] + noisereturn outputdef add_normal_noise(data, mean, std, amplitude=1, seed=None):signals_number = len(data)samples_number = len(data[0])output = np.zeros((signals_number, samples_number))if seed is not None:np.random.seed(seed)for i in range(signals_number):noise = np.random.normal(mean, std, samples_number)output[i] = data[i] + noisereturn amplitude*outputdef add_triangular_noise(data, left, peak, right, seed=None):signals_number = len(data)samples_number = len(data[0])output = np.zeros((signals_number, samples_number))if seed is not None:np.random.seed(seed)for i in range(signals_number):noise = np.random.triangular(left, peak, right, samples_number)output[i] = data[i] + noisereturn outputclass Metrics:def __init__(self) -> None:passdef evaluate_signal(signal, prediction, cut_left=100, cut_right=100):signal_cut = signal[cut_left:-cut_right]predicted_cut = prediction[cut_left:-cut_right]# metryki z sklearnmae = metrics.mean_absolute_error(signal_cut, predicted_cut)mse = metrics.mean_squared_error(signal_cut, predicted_cut)# wyświetlanieprint('MAE z biblioteki sklearn: {}'.format(round(mae, 2)))print('MSE z biblioteki sklearn: {}'.format(round(mse, 2)))def differantial(sigA, sigB, cutleft=100, cutright=100):differential = sigA[:,cutleft:-cutright] - sigB[:,cutleft:-cutright]return differentialdef main():channels_to_plot = [0,1,2,3,4]# signal = EEGSignalProcessing.read_signal(filename = "Subject00_2.edf", number_of_samples=1000)# signal = EEGSignalProcessing.read_signal(filename = "Subject00_1.edf", number_of_samples=1000)signal = EEGSignalProcessing.read_signal(filename = "rsvp_10Hz_08b.edf", number_of_samples=1000)EEGSignalProcessing.plot_signal(signal,sampling_frequency= 2048, title = "Orginalne sygnały EEG", number_of_channels = channels_to_plot,yaxis_label='Wartosc sygnalu', xaxis_label='Czas [s]')low = 2high = 4sig_noise_uniform = EEGSignalProcessing.Noise.add_uniform_noise(signal, low=low, high=high, seed=100)EEGSignalProcessing.plot_signal(sig_noise_uniform, title="Zaszumiony sygnał 5 kanałów EEG (Rozkład Jednostajny Low={}, High={})".format(low, high), sampling_frequency=2048, number_of_channels=channels_to_plot, yaxis_label='Wartość sygnału', xaxis_label='Czas [s]')mean = 0std = 2ampl = 2sig_noise_normal = EEGSignalProcessing.Noise.add_normal_noise(signal, mean=mean, std=std, amplitude=ampl, seed=100)EEGSignalProcessing.plot_signal(sig_noise_normal, title="Zaszumiony sygnał 5 kanałów EEG (Rozkład Normalny Low={}, High={})".format(mean, std), sampling_frequency=2048, number_of_channels=channels_to_plot, yaxis_label='Wartość sygnału', xaxis_label='Czas [s]')sig_n3_left = 0sig_n3_peak = 4sig_n3_right = 6sig_noise_triangular = EEGSignalProcessing.Noise.add_triangular_noise(signal, left=sig_n3_left, peak=sig_n3_peak, right=sig_n3_right, seed=100)EEGSignalProcessing.plot_signal(sig_noise_triangular, title="Zaszumiony sygnał 5 kanałów EEG (Rozkład Trójkątny Left={}, Peak={}, High={})".format(sig_n3_left, sig_n3_peak, sig_n3_right), sampling_frequency=2048, number_of_channels=channels_to_plot, yaxis_label='Wartość sygnału', xaxis_label='Czas [s]')# Odszumianie sygnałów# AutoregresjaAR_lag = 10signal_autoregresion = EEGSignalProcessing.NoiseReduction.autoregression(sig_noise_triangular, delay = AR_lag)EEGSignalProcessing.plot_signal(signal_autoregresion,title="5 odszumionych kanałów EEG - regresja liniowa delay={}".format(AR_lag), sampling_frequency=2048, number_of_channels=channels_to_plot,yaxis_label='wartość sygnału', xaxis_label='czas [s]')# Waveletsignal_wavelet = EEGSignalProcessing.NoiseReduction.wavelet_all_channels(sig_noise_triangular)EEGSignalProcessing.plot_signal(signal_wavelet,title="5 odszumionych kanałów EEG - Wavelet", sampling_frequency=2048, number_of_channels=channels_to_plot,yaxis_label='wartość sygnału', xaxis_label='czas [s]')# ICAsignal_ICA = EEGSignalProcessing.NoiseReduction.ica(sig_noise_triangular)EEGSignalProcessing.plot_signal(signal_ICA,title="5 odszumionych kanałów EEG - ICA", sampling_frequency=2048, number_of_channels=channels_to_plot,yaxis_label='wartość sygnału', xaxis_label='czas [s]')# Metrykich = 4noise_signal = sig_noise_normalprint('\nORYGINALNY')Metrics.evaluate_signal(signal[ch], signal[ch])print('\nNIEODSZUMIONY, dodano szum rozkład normalny')Metrics.evaluate_signal(signal[ch], noise_signal[ch])print('\nODSZUMIONY, najpierw dodano szum rozkład normalny, potem autoregresja')Metrics.evaluate_signal(signal[ch], signal_autoregresion[ch])print('\nODSZUMIONY, najpierw dodano szum rozkład normalny, potem ICA')Metrics.evaluate_signal(signal[ch], signal_ICA[ch])print('\nODSZUMIONY, najpierw dodano szum rozkład normalny, potem wavelet')Metrics.evaluate_signal(signal[ch], signal_wavelet[ch])# Sygnał różnicowydifferential_noise = Metrics.differantial(sig_noise_normal, signal)differential_AR = Metrics.differantial(signal_autoregresion, signal)differential_ICA = Metrics.differantial(signal_ICA, signal)differential_Wavelet = Metrics.differantial(signal_wavelet, signal)EEGSignalProcessing.plot_signal(differential_noise, title="Sygnał różnicowy, zaszumiony-orginalny",sampling_frequency=2048, number_of_channels=[ch], yaxis_label="Wartość sygnału",xaxis_label="Czas [s]")EEGSignalProcessing.plot_signal(differential_AR, title="Sygnał różnicowy, AR-orginalny",sampling_frequency=2048, number_of_channels=[ch], yaxis_label="Wartość sygnału",xaxis_label="Czas [s]")EEGSignalProcessing.plot_signal(differential_ICA, title="Sygnał różnicowy, ICA-orginalny",sampling_frequency=2048, number_of_channels=[ch], yaxis_label="Wartość sygnału",xaxis_label="Czas [s]")EEGSignalProcessing.plot_signal(differential_Wavelet, title="Sygnał różnicowy, wavelet-orginalny",sampling_frequency=2048, number_of_channels=[ch], yaxis_label="Wartość sygnału",xaxis_label="Czas [s]")
完整代码:https://mbd.pub/o/bread/mbd-ZZWYlJlpif __name__ == '__main__':main()

图片

图片

图片

图片

工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

这篇关于简单的基于小波分解和独立分量分析的脑电信号降噪(Python)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1023966

相关文章

一文详解如何在Python中从字符串中提取部分内容

《一文详解如何在Python中从字符串中提取部分内容》:本文主要介绍如何在Python中从字符串中提取部分内容的相关资料,包括使用正则表达式、Pyparsing库、AST(抽象语法树)、字符串操作... 目录前言解决方案方法一:使用正则表达式方法二:使用 Pyparsing方法三:使用 AST方法四:使用字

Python列表去重的4种核心方法与实战指南详解

《Python列表去重的4种核心方法与实战指南详解》在Python开发中,处理列表数据时经常需要去除重复元素,本文将详细介绍4种最实用的列表去重方法,有需要的小伙伴可以根据自己的需要进行选择... 目录方法1:集合(set)去重法(最快速)方法2:顺序遍历法(保持顺序)方法3:副本删除法(原地修改)方法4:

Python运行中频繁出现Restart提示的解决办法

《Python运行中频繁出现Restart提示的解决办法》在编程的世界里,遇到各种奇怪的问题是家常便饭,但是,当你的Python程序在运行过程中频繁出现“Restart”提示时,这可能不仅仅是令人头疼... 目录问题描述代码示例无限循环递归调用内存泄漏解决方案1. 检查代码逻辑无限循环递归调用内存泄漏2.

Python中判断对象是否为空的方法

《Python中判断对象是否为空的方法》在Python开发中,判断对象是否为“空”是高频操作,但看似简单的需求却暗藏玄机,从None到空容器,从零值到自定义对象的“假值”状态,不同场景下的“空”需要精... 目录一、python中的“空”值体系二、精准判定方法对比三、常见误区解析四、进阶处理技巧五、性能优化

使用Python构建一个Hexo博客发布工具

《使用Python构建一个Hexo博客发布工具》虽然Hexo的命令行工具非常强大,但对于日常的博客撰写和发布过程,我总觉得缺少一个直观的图形界面来简化操作,下面我们就来看看如何使用Python构建一个... 目录引言Hexo博客系统简介设计需求技术选择代码实现主框架界面设计核心功能实现1. 发布文章2. 加

python logging模块详解及其日志定时清理方式

《pythonlogging模块详解及其日志定时清理方式》:本文主要介绍pythonlogging模块详解及其日志定时清理方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录python logging模块及日志定时清理1.创建logger对象2.logging.basicCo

Python如何自动生成环境依赖包requirements

《Python如何自动生成环境依赖包requirements》:本文主要介绍Python如何自动生成环境依赖包requirements问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑... 目录生成当前 python 环境 安装的所有依赖包1、命令2、常见问题只生成当前 项目 的所有依赖包1、

如何将Python彻底卸载的三种方法

《如何将Python彻底卸载的三种方法》通常我们在一些软件的使用上有碰壁,第一反应就是卸载重装,所以有小伙伴就问我Python怎么卸载才能彻底卸载干净,今天这篇文章,小编就来教大家如何彻底卸载Pyth... 目录软件卸载①方法:②方法:③方法:清理相关文件夹软件卸载①方法:首先,在安装python时,下

python uv包管理小结

《pythonuv包管理小结》uv是一个高性能的Python包管理工具,它不仅能够高效地处理包管理和依赖解析,还提供了对Python版本管理的支持,本文主要介绍了pythonuv包管理小结,具有一... 目录安装 uv使用 uv 管理 python 版本安装指定版本的 Python查看已安装的 Python

使用Python开发一个带EPUB转换功能的Markdown编辑器

《使用Python开发一个带EPUB转换功能的Markdown编辑器》Markdown因其简单易用和强大的格式支持,成为了写作者、开发者及内容创作者的首选格式,本文将通过Python开发一个Markd... 目录应用概览代码结构与核心组件1. 初始化与布局 (__init__)2. 工具栏 (setup_t