本文主要是介绍SARIMA初步研究,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
import pandas as pd import numpy as np import matplotlib.pyplot as plt from statsmodels.tsa.stattools import adfuller # 导入ADF检验函数 from statsmodels.tsa.seasonal import seasonal_decompose # 导入季节性分解函数,将数列分解为趋势、季节性和残差三部分 from statsmodels.stats.diagnostic import acorr_ljungbox # 导入白噪声检验函数 from statsmodels.graphics.tsaplots import plot_pacf, plot_acf # 导入自相关和偏自相关的绘图函数 from matplotlib.ticker import MaxNLocator # 导入自动查找到最佳的最大刻度函数 from statsmodels.tsa.arima_model import ARIMA # 导入ARIMA模型 from statsmodels.tsa.statespace.sarimax import SARIMAX from sklearn import svm# 单位根检验(test_stationarity,ADF检验),用于检验序列是否是平稳的 # 季节性分解函数(seasonal_decompose),通过分解后的趋势、季节性确认序列是否是平稳的 # 白噪声检验函数 # 自相关性和偏自相关性(acf_pacf),通过截尾或拖尾的lag值,初步确认p,q。也可以用来检验序列是否平稳# ADF检验:这是一种检查数据稳定性的统计测试。无效假设:时间序列是不稳定的。 # 测试结果由测试统计量和一些置信区间的临界值组成。 # 如果“测试统计量”少于“临界值”,我们可以拒绝无效假设,并认为序列是稳定的。 # 当p-value<0.05,且Test Statistic显著小于Critical Value (5%)时,数列稳定 # 主要看p-value,显著小于的判断不精确 def test_stationarity(timeseries, window=12):rolmean = timeseries.rolling(window=window, center=False).mean()rolstd = timeseries.rolling(window=window, center=False).std()# 旧版方法,即将被移除# rolmean = pd.rolling_mean(timeseries, window=window)# rolstd = pd.rolling_std(timeseries, window=window)# 设置原始图,移动平均图和标准差图的式样orig = plt.plot(timeseries, color='blue', label='Original')mean = plt.plot(rolmean, color='red', label='Rolling Mean')std = plt.plot(rolstd, color='black', label='Rolling Std')plt.legend(loc='best') # 使用自动最佳的图例显示位置plt.title('Rolling Mean & Standard Deviation')plt.show()print('ADF检验结果:')dftest = adfuller(timeseries, autolag='AIC') # 使用减小AIC的办法估算ADF测试所需的滞后数# 将ADF测试结果、显著性概率、所用的滞后数和所用的观测数打印出来dfoutput = pd.Series(dftest[0:4], index=['Test Statistic', 'p-value', 'Num Lags Used', 'Num Observations Used'])for key, value in dftest[4].items():dfoutput['Critical Value (%s)' % key] = valueprint(dfoutput)# 按照滑动均值将维经过指数转化的时间序列分为趋势(trend), 季节性(seasonality)和残差(residual)三部分 def decompose_plot(series, title=''):decomposition = seasonal_decompose(series) # 季节性分解函数trend = decomposition.trend # 分解出的趋势,包含NaN值,原因不明seasonal = decomposition.seasonal # 分解出的季节性residual = decomposition.resid # 分解出的残差,包含NaN值,原因不明fig = decomposition.plot()fig.set_size_inches(12, 6)fig.suptitle(title)fig.tight_layout()fig2 = acf_pacf(residual, title='Residuals', figsize=(12, 6)) # 分析残差的自相关,偏自相关# test_stationarity(residual.dropna()) # Dropna后才能做稳定性检验# 原数据的残差一般是不稳定的,经过差分后的数据残差可能是平稳的# 定义一个画自相关,偏自相关的函数 # series 输入的时间序列 # lags 自相关和偏自相关函数的滞后取值范围 def acf_pacf(series, lags=40, title=None, figsize=(12, 6)):# 求自相关函数fig = plt.figure(figsize=figsize) # figure指设置图形的特征。figsize为设置图形大小,单位为inchax1 = fig.add_subplot(211) # 子图2行1列的第一张,大于10后用逗号分隔,如(3,4,10)ax1.set_xlabel('lags') # 横坐标为滞后值ax1.set_ylabel('AutoCorrelation') # 纵坐标为自相关系数ax1.xaxis.set_major_locator(MaxNLocator(integer=True)) # 设置主坐标轴为自动设置最佳的整数型坐标标签plot_acf(series.dropna(), lags=lags, ax=ax1) # 没有title参数,需要删除title# 求偏自相关函数ax2 = fig.add_subplot(212)ax2.set_xlabel('lags')ax2.set_ylabel('Partial AutoCorrelation')ax2.xaxis.set_major_locator(MaxNLocator(integer=True))plot_pacf(series.dropna(), lags=lags, ax=ax2) # 没有title参数,需要删除titleplt.tight_layout() # 设置为紧凑布局plt.show()# #生成一个伪随机白噪声用于测试acorr_ljungbox的可靠性 # from random import gauss # from random import seed # from pandas import Series # # seed random number generator # # seed(1) #指定生成伪随机数的种子,指定后每次生成的随机数相同 # # create white noise series # whitenoise = Series([gauss(0.0, 1.0) for i in range(1000)])#创建一个高斯分布的白噪声 # acf_pacf(whitenoise, lags=40, title='White Noise Series') # print(u'白噪声检验结果为:', acorr_ljungbox(whitenoise, lags=1))#检验结果:平稳度,p-value。p-value>0.05为白噪声# 骑自行车人数预测 df = pd.read_csv('D:\\portland-oregon-average-monthly-.csv') print(df.head(), '\nindex type:\n', type(df.index)) df['Month'] = pd.to_datetime(df['Month'], format='%Y-%m') # 索引并resample为月 indexed_df = df.set_index('Month') ts = indexed_df['riders'] ts = ts.resample('M').sum() # ts.plot(title='Monthly Num. of Ridership') # print(ts.head(), '\nindex type:\n', type(ts.index))# #原始数据分析 # test_stationarity(ts) # decompose_plot(ts, title='ts decompose') # print(u'白噪声检验结果为:', acorr_ljungbox(ts, lags=1))# 是否为白噪声测试,p-value<0.05非白噪声。平稳且非白噪声函数,可用于时间序列建模 # acf_pacf(ts, lags=20, title='ts ACF&PACF')# 自相关和偏自相关初步确认p,q # 季节差分12个月 ts_sdiff = ts.diff(12) test_stationarity(ts_sdiff.dropna()) decompose_plot(ts_sdiff.dropna(), title='ts_sdiff decompose') print(u'白噪声检验结果为:', acorr_ljungbox(ts_sdiff.dropna(), lags=1)) # 是否为白噪声测试,p-value<0.05非白噪声。平稳且非白噪声函数,可用于时间序列建模 acf_pacf(ts_sdiff.dropna(), lags=20, title='ts_sdiff ACF&PACF') # 自相关和偏自相关初步确认p,q # 趋势差分1个月 ts_diff_of_sdiff = ts_sdiff.diff(1) test_stationarity(ts_diff_of_sdiff.dropna()) decompose_plot(ts_diff_of_sdiff.dropna(), title='ts_diff_of_sdiff decompose') print(u'白噪声检验结果为:', acorr_ljungbox(ts_diff_of_sdiff.dropna(), lags=1)) # 是否为白噪声测试,p-value<0.05非白噪声。平稳且非白噪声函数,可用于时间序列建模 acf_pacf(ts_diff_of_sdiff.dropna(), lags=20, title='ts_diff_of_sdiff ACF&PACF') # 自相关和偏自相关初步确认p,q # 得出ACF&PACF后,开始计算使用的p,q,P,Q # PACF(lag=k)=0,则p=k-1。如果PACF=0时,ACF仍然显著>0,则再增加一些p # ACF(lag=k)=0,则q=k-1。如果ACF=0时,PACF仍然显著>0,则再增加一些q # P: 如果季节差分后序列的ACF(lag=季节周期)显著为正, 考虑增加P # Q: 如果季节差分后序列的ACF(lag=季节周期)显著为负, 考虑增加Q # 绘出拟合后的ARIMA的拟合残差的ACF和PACF, 按上述规则调整p, q, P, Q # 通常P, Q最多为1 # 这里考虑p=0, q=0,季节周期=12, 季节差分后的ACF(12)显著为负, 可以考虑P=0, Q=1 pdq = (5, 1, 0) PDQ = (0, 1, 1, 12) # # SARIMA的参数: order = p, d, q, seasonal_order=P, D, Q, season的周期=12 # model = SARIMAX(ts, order=pdq,seasonal_order=PDQ).fit()# 已拟合的模型 # print(model.summary()) # ts.plot() # model.fittedvalues.plot(color='red') # residuals = pd.DataFrame(model.resid)# 拟合的数据和原始数据的残差 # acf_pacf(residuals, title='Residuals', figsize=(10,4)) # plt.title('RSS: %.4f'% sum((model.fittedvalues-ts)**2))# 计算拟残差平方和(RSS) # residuals.plot(kind='kde')# 残差的核密度估计 # print('\n\nResiduals Summary:\n', residuals.describe()) # plt.show()test_point = 30 # 测试点数 train_size = int(len(ts) - test_point) # 测试集大小 train, test = ts[:train_size], ts[train_size:] # 切分数据集 model_train = SARIMAX(train, order=pdq, seasonal_order=PDQ).fit() # 用于测试的已拟合的模型 predict_train = model_train.forecast(test_point) fitted_train = model_train.fittedvalues.append(predict_train) # 拟合曲线和预测曲线model_run = SARIMAX(ts, order=pdq, seasonal_order=PDQ).fit() # 用于测试的已拟合的模型 predict_run = model_run.forecast(test_point) fitted_run = model_run.fittedvalues.append(predict_run) # 拟合曲线和预测曲线# 通过支持向量机,将预测有偏差的残差再进行预测 residual = predict_train - test # for n in np.arange(3,15): #n从3到15测试最佳值 n = 13 residual_list = list() for i in np.arange(n - 1, len(residual)): # n-1之前的值无法计算for j in np.arange(-(n - 1), 1):residual_list.append(residual[i + j]) # df_residual=pd.DataFrame(np.repeat(np.array([np.nan]),n).reshape(1,n)) #组成n+1列的数据框,用前n列来预测n+1列的残差 df_residual = pd.DataFrame() df_residual = df_residual.append(pd.DataFrame(np.array(residual_list).reshape(len(residual) - (n - 1), n))) df_residual.index = residual[n - 1:].index matrix_residual = df_residual.as_matrix() # 训练SVM data_train = matrix_residual[:int(0.5 * len(matrix_residual)), :] data_test = matrix_residual[int(0.5 * len(matrix_residual)):, :] x_train = data_train[:, 0:n - 1] y_train = data_train[:, n - 1] # .astype(int) # 必须是离散的int型才能预测 x_test = data_test[:, 0:n - 1] y_test = data_test[:, n - 1] # .astype(int) # 必须是离散的int型才能预测 model_svm = svm.SVR(epsilon=0.2) # (C=1.0, kernel='rbf', gamma='auto') model_svm.fit(x_train, y_train) y_pred = model_svm.predict(x_test[0]) y_pred = model_svm.predict(x_test[1]) y_pred = model_svm.predict(x_test[2]) y_pred = model_svm.predict(x_test[3]) residual_pred = y_test - y_pred print(y_pred) print('n=', n, '残差预测误差均值=', np.mean(residual_pred), '残差预测误差标准差=', np.std(residual_pred)) # # SVM最终训练 # x_final = matrix_residual[:, 0:n-1] # y_final = matrix_residual[:, n-1].astype(int) #必须是离散的int型才能预测 # x_test = data_test.as_matrix()[:, 0:n-1] # y_test = data_test.as_matrix()[:, n-1].astype(int) #必须是离散的int型才能预测 # model_svm = svm.SVC() # (C=1.0, kernel='rbf', gamma='auto') # model_svm.fit(x_final, y_final) # matrix_residual_new=model_svm.predict(x_test) # print (n) # print(matrix_residual_new) print('end')predict_train[-len(y_pred):]=predict_train[-len(y_pred):]-y_pred fitted_train_svm = model_train.fittedvalues.append(predict_train) # 拟合曲线和预测曲线plt.figure(figsize=(12, 6)) plt.title('Riders') plt.plot(ts, 'o', label='observed') plt.plot(fitted_train, 'g', label='forecast', color='green') plt.plot(fitted_train_svm, 'g', label='forecast', color='pink') plt.plot(fitted_run, 'g', label='forecast', color='red') plt.legend(loc='upper left') plt.show() print('end')
这篇关于SARIMA初步研究的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!