Python环境下基于动态模态分解的股票价格预测

2024-04-16 08:44

本文主要是介绍Python环境下基于动态模态分解的股票价格预测,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

动态模态分解模型的基本思想是直接从数据模拟得到的流场中提取流动的动态信息,根据不同频率的流场变动寻找数据映射,基于动态非线性无穷维转化成动态线性有穷维的方式,采用了Arnoldi 方法以及奇异值分解SVD降维的思想,借鉴了ARIMA、SARIMA 以及季节模型等许多时间序列的关键特征,被广泛的使用在数学、物理、金融等领域。

动态模态分解按照频率对系统进行排序,提取系统特征频率,从而观察不同频率的流动结构对流场的贡献,同时动态模态分解模态特征值可以进行流场预测。因为动态模态分解算法理论的严密性、稳定性、简易性等优点,在不断被运用的同时,动态模态分解算法也在本来的基础之上不断被完善,如与SPA检验结合起来,以验证股票价格预测对比基准点的强有效性;以及通过联系动态模态分解算法和光谱研究的方式,模拟股票市场在循环经济当中的震动,均能够有效地采集分析数据,并最终得到结果。

import numpy as np
import pandas as pd
import datetime
import matplotlib.pyplot as plt
import matplotlib as mpl#%% Load data
data = pd.read_csv('historical_stock_prices.csv')#%% User inputs
# Choose dates
start_date = '2014-03-18'
end_date = '2015-03-18'# Choose tickers
s1 = 'AEO'
s2 = 'ANF'
s3 = 'FL'
s4 = 'GPS'
s5 = 'SCVL'
s6 = 'RL'
s7 = 'URBN'
s8 = 'ROST'# Number of past days to build the DMD model on
mp = 7
# Number of future days to predict with DMD
mf = 1# Percentage of portfolio to sell off each day
sell_perc = 0.25# Initial capital
init_cap = 1e6#%% Functionsdef GetPrices(portfolio_size, bigX, current_day):'''Gets the day close prices of each company in the portfolio at the currentdayInputs:portfolio_size: int, the number of companies that we can trade withbigX: array (portfolio size * number of days), consisting of time series close prices along the columns and new companies along the rowscurrent_day: int, the last day considered in the DMD model construction inorder to make a prediction about the next dayreturns: day_close: array (portfolio size * 1), consisting of close prices for each company on the current day'''# Find prices on a given dayday_close = np.zeros(shape=(portfolio_size,1))for i in range(0,portfolio_size):day_close[i,0] = bigX[i,current_day-1]return day_closedef Trade(current_day, mp, mf, bigX, portfolio_size, stock_amounts, day_close, sell_perc):'''The core algorithm, executing trades and stepping forward days in time.Inputs:current_day: int, the last day considered in the DMD model construction inorder to make a prediction about the next daymp: int, number of historical days used to build the DMD modelmf: int, number of days to predict in the future with the DMD modelbigX: array (portfolio size * number of days), consisting of time series close prices along the columns and new companies along the rowsportfolio_size: int, the number of companies that we can trade withstock_amounts: array (portfolio size * 1), the number of stocks for each company held in the portfolio at the current dayday_close: array (portfolio size * 1), consisting of close prices for each company on the current daysell_perc: float, a user input defining which proportion of the portfolio valueshould be sold at the end of each dayreturns: stock_amounts: array (portfolio size * 1), the number of stocks for each company held in the portfolio after trades have been executedday_close: array (portfolio size * 1), consisting of the new day close prices after stepping forward one daycurrent_day: int, the next day after taking one step forward'''first_day = current_day - (mp-1)# Time vector spans mp+mf, DMD will extrapolate to make a prediction about mft = list(range(first_day,mp+first_day+1))# Form the DMD matricesX1 = bigX[:,(first_day-1):(current_day-1)]X2 = bigX[:,(first_day):current_day]# Snapshots separated by 1 trading daydt = 1# Conduct DMDPhi, b, omega = DMD(X1, X2, dt)# DMD reconstruction to predict price on current_day + 1price_predictions = DMDreconstruct(X1, t, b, omega, Phi, mp, mf)# Calculate increases in price between current_day and the following dayprice_increases = np.zeros(shape=(portfolio_size,1))for i in range(0,portfolio_size):price_increases[i,0] = (price_predictions[i] - bigX[i,current_day-1])/bigX[i,current_day-1]# Calculate current portfolio valueportfolio_value = np.zeros(shape=(portfolio_size,1))for i in range(0,portfolio_size):portfolio_value[i,0] = stock_amounts[i,0]*day_close[i,0]# Sell bottom 25% of portfoliocash, stock_amounts = Sell(portfolio_value, sell_perc, price_increases, portfolio_size, stock_amounts, day_close)# Buy best performing shares with cash from sales.stock_amounts = Buy(price_increases, cash, day_close, stock_amounts)# Increment daycurrent_day += 1# Get new day_close pricesday_close = GetPrices(portfolio_size, bigX, current_day)return stock_amounts, day_close, current_daydef Sell(portfolio_value, sell_perc, price_increases, portfolio_size, stock_amounts, day_close):'''Conducts the sale of a proportion of the stocks in the portfolio with theworst predicted next-day pricesInputs:portfolio_value: array (portfolio size * 1), calculating the total value of all stocks held in the portfolio according to the current day close pricessell_perc: float, a user input defining which proportion of the portfolio valueshould be sold at the end of each dayprice_increases: array (portfolio size * 1), calculating the predicted changesin price between the current day and the next-day prediction for each stock portfolio_size: int, the number of companies that we can trade withstock_amounts: array (portfolio size * 1), the number of stocks for each company held in the portfolio at the current dayday_close: array (portfolio size * 1), consisting of close prices for each company on the current dayreturns: cash: float, the amount of cash generated by the sale of the worst-performingstocksstock_amounts: array (portfolio size * 1), the number of stocks for each company held in the portfolio at the end of the sale'''sell_value = np.sum(portfolio_value)*sell_perccash = 0lowest = np.sort(price_increases,axis=None)for i in range(0,portfolio_size):# For each ticker, find location of lowest price in price_increases lowest_value = stock_amounts[price_increases == lowest[i]]*day_close[price_increases == lowest[i]]temp_cash = cash + lowest_valueif temp_cash < sell_value:stock_amounts[price_increases == lowest[i]] = 0cash = temp_cashelif temp_cash == sell_value:stock_amounts[price_increases == lowest[i]] = 0cash = temp_cashbreakelse:number_sold = (sell_value-cash)/day_close[price_increases == lowest[i]]stock_amounts[price_increases == lowest[i]] = stock_amounts[price_increases == lowest[i]] - number_soldnew_cash = number_sold*day_close[price_increases == lowest[i]]cash = new_cash + cashbreakreturn cash, stock_amountsdef Buy(price_increases, cash, day_close, stock_amounts):'''Purchases stocks using the cash generated by the sale of the bottom of the portfolio, with an even distribution between the top two performing stocks.Inputs:price_increases: array (portfolio size * 1), calculating the predicted changesin price between the current day and the next-day prediction for each stock cash: float, the amount of cash generated by the sale of the worst-performingstocksday_close: array (portfolio size * 1), consisting of close prices for each company on the current daystock_amounts: array (portfolio size * 1), the number of stocks for each company held in the portfolio at the end of the salereturns: stock_amounts: array (portfolio size * 1), the number of stocks for each company held in the portfolio at the end of the purchases'''best = np.sort(price_increases,axis=None)[::-1]number_bought1 = 0.5*cash/day_close[price_increases == best[0]]number_bought2 = 0.5*cash/day_close[price_increases == best[1]]    stock_amounts[price_increases == best[0]] = stock_amounts[price_increases == best[0]] + number_bought1stock_amounts[price_increases == best[1]] = stock_amounts[price_increases == best[1]] + number_bought2return stock_amountsdef DMD(X1, X2, dt):'''Conducts the DMD analysisInputs:X1: array (portfolio size * (mp-1)), the first DMD matrixX2: array (portfolio size * (mp-1)), the second DMD matrixdt: float, the time difference between snapshots of data (ie days)returns: Phi: array (portfolio size * (mp-1)), the DMD modesb: array ((mp-1) * 1), the DMD mode amplitudesomega: array ((mp-1) * 1), the DMD mode frequencies'''# SVD on X1U,S,V = np.linalg.svd(X1,full_matrices=0)Sigmar = np.diag(S)# Calculate AtildeAtilde = np.linalg.solve(Sigmar.T,(U.T @ X2 @ V.T).T).T# Eigendecomp of AtildeLambda, W = np.linalg.eig(Atilde)L = np.diag(Lambda)# DMD modesPhi = X2 @ np.linalg.solve(Sigmar.T,V).T @ W# DMD amplitudesalpha1 = Sigmar @ V[:,0]b = np.linalg.solve(W @ L,alpha1)# Frequencyomega = np.log(Lambda)/dtreturn Phi, b, omegadef DMDreconstruct(X1, t, b, omega, Phi, mp, mf):'''Conducts the DMD reconstruction in order to make a next-day price predictionInputs:X1: array (portfolio size * (mp-1)), the first DMD matrixt: list (length mp+mf), time vector used to reconstruct the data matrixb: array ((mp-1) * 1), the DMD mode amplitudesomega: array ((mp-1) * 1), the DMD mode frequenciesPhi: array (portfolio size * (mp-1)), the DMD modesmp: int, number of historical days used to build the DMD modelmf: int, number of days to predict in the future with the DMD modelreturns: price_predictions: array (portfolio size * 1), the DMD model of day close prices projected out mf day(s) into the future'''time_dynamics = np.zeros(shape=(X1.shape[1],len(t)),dtype=np.complex128)for i in range(0,len(t)):time_dynamics[:,i] = np.multiply(b,np.exp(omega*t[i]))X_dmd = Phi @ time_dynamicsprice_predictions = np.real(X_dmd[:,(mp)])return price_predictions#%% Set parameters and reduce table size
after_start_date = data['date'] >= start_date
before_end_date = data['date'] <= end_date
between_two_dates = after_start_date & before_end_datetabledates = data.loc[between_two_dates]tickers = [s1,s2,s3,s4,s5,s6,s7,s8]
portfolio_size = len(tickers)# Get retail_table in the specified date range
reduced_table = []
for i in tickers:ticker_loc = tabledates['ticker'] == ireduced_table.append(tabledates.loc[ticker_loc])retail_table = pd.concat(reduced_table)# Form the big data matrix.
# For each ticker, get all the close prices and store.
days = len(retail_table[retail_table['ticker'] == tickers[0]])
bigX = np.zeros(shape=(portfolio_size,days))
for i in range(0,portfolio_size):temp = retail_table[retail_table['ticker'] == tickers[i]]temp_price_vector =  temp['close'].values.tolist()bigX[i,:] = temp_price_vector#%% Initialise the trading
# Initialise at day 7, as DMD uses data on the previous 7 days to predict
# the price on the following day
current_day = 7# Initialise capital and date
init_each = 1e6/portfolio_size
init_day = datetime.datetime.strptime(start_date,'%Y-%m-%d') + datetime.timedelta(days = (mp-1))day_close = GetPrices(portfolio_size, bigX, current_day)# Evenly distribute stock
stock_amounts = np.zeros(shape=(portfolio_size,1))
for i in range(0,portfolio_size):stock_amounts[i,0] = init_each/day_close[i]#%% The trading
# Initialise portfolio value over time
valuet = np.zeros(shape=(1,days))# Trade
for i in range(0,days-mp-1):stock_amounts, day_close, current_day = Trade(current_day, mp, mf, bigX, portfolio_size, stock_amounts, day_close, sell_perc);# Calculate value of portfolio and store in valuetvalue = np.sum(stock_amounts*day_close)valuet[0,i] = value#%% Load S&P data
SP = pd.read_csv('S&Pretail_reduced.csv')#%% Average returns
returnDMD = valuet[0,0:days-(mp+mf)] - 1e6
avreturnDMD = np.mean(returnDMD)
returnSP = SP['close'][0:days-(mp+mf)] - 1e6
avreturnSP = np.mean(returnSP)
DMDperformance = avreturnDMD/avreturnSP
print('DMD produces average returns of',round(DMDperformance,1),'times the S&P index.')#%% Plot
axdates = pd.to_datetime(SP['date'][0:days-(mp+mf)],dayfirst=True)plt.figure()
mpl.rc('font',family='Times New Roman')
plt.plot(axdates,valuet[0,0:days-(mp+mf)]/1e6,linewidth=3,color="#0072BD")
plt.plot(axdates,SP['close'][0:days-(mp+mf)]/1e6,linewidth=3,color="#7E2F8E")
plt.ylabel('USD (millions)',fontsize=20)
plt.legend(['DMD Algorithm','S&P Retail Index'],)
plt.grid()
plt.show()

图片

知乎学术咨询:

哥廷根数学学派 - 知乎

工学博士,担任《Mechanical System and Signal Processing》,《中国电机工程学报》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

这篇关于Python环境下基于动态模态分解的股票价格预测的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/908310

相关文章

Python的Darts库实现时间序列预测

《Python的Darts库实现时间序列预测》Darts一个集统计、机器学习与深度学习模型于一体的Python时间序列预测库,本文主要介绍了Python的Darts库实现时间序列预测,感兴趣的可以了解... 目录目录一、什么是 Darts?二、安装与基本配置安装 Darts导入基础模块三、时间序列数据结构与

Python正则表达式匹配和替换的操作指南

《Python正则表达式匹配和替换的操作指南》正则表达式是处理文本的强大工具,Python通过re模块提供了完整的正则表达式功能,本文将通过代码示例详细介绍Python中的正则匹配和替换操作,需要的朋... 目录基础语法导入re模块基本元字符常用匹配方法1. re.match() - 从字符串开头匹配2.

Python使用FastAPI实现大文件分片上传与断点续传功能

《Python使用FastAPI实现大文件分片上传与断点续传功能》大文件直传常遇到超时、网络抖动失败、失败后只能重传的问题,分片上传+断点续传可以把大文件拆成若干小块逐个上传,并在中断后从已完成分片继... 目录一、接口设计二、服务端实现(FastAPI)2.1 运行环境2.2 目录结构建议2.3 serv

通过Docker容器部署Python环境的全流程

《通过Docker容器部署Python环境的全流程》在现代化开发流程中,Docker因其轻量化、环境隔离和跨平台一致性的特性,已成为部署Python应用的标准工具,本文将详细演示如何通过Docker容... 目录引言一、docker与python的协同优势二、核心步骤详解三、进阶配置技巧四、生产环境最佳实践

Python一次性将指定版本所有包上传PyPI镜像解决方案

《Python一次性将指定版本所有包上传PyPI镜像解决方案》本文主要介绍了一个安全、完整、可离线部署的解决方案,用于一次性准备指定Python版本的所有包,然后导出到内网环境,感兴趣的小伙伴可以跟随... 目录为什么需要这个方案完整解决方案1. 项目目录结构2. 创建智能下载脚本3. 创建包清单生成脚本4

Python实现Excel批量样式修改器(附完整代码)

《Python实现Excel批量样式修改器(附完整代码)》这篇文章主要为大家详细介绍了如何使用Python实现一个Excel批量样式修改器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录前言功能特性核心功能界面特性系统要求安装说明使用指南基本操作流程高级功能技术实现核心技术栈关键函

python获取指定名字的程序的文件路径的两种方法

《python获取指定名字的程序的文件路径的两种方法》本文主要介绍了python获取指定名字的程序的文件路径的两种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要... 最近在做项目,需要用到给定一个程序名字就可以自动获取到这个程序在Windows系统下的绝对路径,以下

Java使用Javassist动态生成HelloWorld类

《Java使用Javassist动态生成HelloWorld类》Javassist是一个非常强大的字节码操作和定义库,它允许开发者在运行时创建新的类或者修改现有的类,本文将简单介绍如何使用Javass... 目录1. Javassist简介2. 环境准备3. 动态生成HelloWorld类3.1 创建CtC

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结

Python实现批量CSV转Excel的高性能处理方案

《Python实现批量CSV转Excel的高性能处理方案》在日常办公中,我们经常需要将CSV格式的数据转换为Excel文件,本文将介绍一个基于Python的高性能解决方案,感兴趣的小伙伴可以跟随小编一... 目录一、场景需求二、技术方案三、核心代码四、批量处理方案五、性能优化六、使用示例完整代码七、小结一、