Python环境下基于动态模态分解的股票价格预测

2024-04-16 08:44

本文主要是介绍Python环境下基于动态模态分解的股票价格预测,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

动态模态分解模型的基本思想是直接从数据模拟得到的流场中提取流动的动态信息,根据不同频率的流场变动寻找数据映射,基于动态非线性无穷维转化成动态线性有穷维的方式,采用了Arnoldi 方法以及奇异值分解SVD降维的思想,借鉴了ARIMA、SARIMA 以及季节模型等许多时间序列的关键特征,被广泛的使用在数学、物理、金融等领域。

动态模态分解按照频率对系统进行排序,提取系统特征频率,从而观察不同频率的流动结构对流场的贡献,同时动态模态分解模态特征值可以进行流场预测。因为动态模态分解算法理论的严密性、稳定性、简易性等优点,在不断被运用的同时,动态模态分解算法也在本来的基础之上不断被完善,如与SPA检验结合起来,以验证股票价格预测对比基准点的强有效性;以及通过联系动态模态分解算法和光谱研究的方式,模拟股票市场在循环经济当中的震动,均能够有效地采集分析数据,并最终得到结果。

import numpy as np
import pandas as pd
import datetime
import matplotlib.pyplot as plt
import matplotlib as mpl#%% Load data
data = pd.read_csv('historical_stock_prices.csv')#%% User inputs
# Choose dates
start_date = '2014-03-18'
end_date = '2015-03-18'# Choose tickers
s1 = 'AEO'
s2 = 'ANF'
s3 = 'FL'
s4 = 'GPS'
s5 = 'SCVL'
s6 = 'RL'
s7 = 'URBN'
s8 = 'ROST'# Number of past days to build the DMD model on
mp = 7
# Number of future days to predict with DMD
mf = 1# Percentage of portfolio to sell off each day
sell_perc = 0.25# Initial capital
init_cap = 1e6#%% Functionsdef GetPrices(portfolio_size, bigX, current_day):'''Gets the day close prices of each company in the portfolio at the currentdayInputs:portfolio_size: int, the number of companies that we can trade withbigX: array (portfolio size * number of days), consisting of time series close prices along the columns and new companies along the rowscurrent_day: int, the last day considered in the DMD model construction inorder to make a prediction about the next dayreturns: day_close: array (portfolio size * 1), consisting of close prices for each company on the current day'''# Find prices on a given dayday_close = np.zeros(shape=(portfolio_size,1))for i in range(0,portfolio_size):day_close[i,0] = bigX[i,current_day-1]return day_closedef Trade(current_day, mp, mf, bigX, portfolio_size, stock_amounts, day_close, sell_perc):'''The core algorithm, executing trades and stepping forward days in time.Inputs:current_day: int, the last day considered in the DMD model construction inorder to make a prediction about the next daymp: int, number of historical days used to build the DMD modelmf: int, number of days to predict in the future with the DMD modelbigX: array (portfolio size * number of days), consisting of time series close prices along the columns and new companies along the rowsportfolio_size: int, the number of companies that we can trade withstock_amounts: array (portfolio size * 1), the number of stocks for each company held in the portfolio at the current dayday_close: array (portfolio size * 1), consisting of close prices for each company on the current daysell_perc: float, a user input defining which proportion of the portfolio valueshould be sold at the end of each dayreturns: stock_amounts: array (portfolio size * 1), the number of stocks for each company held in the portfolio after trades have been executedday_close: array (portfolio size * 1), consisting of the new day close prices after stepping forward one daycurrent_day: int, the next day after taking one step forward'''first_day = current_day - (mp-1)# Time vector spans mp+mf, DMD will extrapolate to make a prediction about mft = list(range(first_day,mp+first_day+1))# Form the DMD matricesX1 = bigX[:,(first_day-1):(current_day-1)]X2 = bigX[:,(first_day):current_day]# Snapshots separated by 1 trading daydt = 1# Conduct DMDPhi, b, omega = DMD(X1, X2, dt)# DMD reconstruction to predict price on current_day + 1price_predictions = DMDreconstruct(X1, t, b, omega, Phi, mp, mf)# Calculate increases in price between current_day and the following dayprice_increases = np.zeros(shape=(portfolio_size,1))for i in range(0,portfolio_size):price_increases[i,0] = (price_predictions[i] - bigX[i,current_day-1])/bigX[i,current_day-1]# Calculate current portfolio valueportfolio_value = np.zeros(shape=(portfolio_size,1))for i in range(0,portfolio_size):portfolio_value[i,0] = stock_amounts[i,0]*day_close[i,0]# Sell bottom 25% of portfoliocash, stock_amounts = Sell(portfolio_value, sell_perc, price_increases, portfolio_size, stock_amounts, day_close)# Buy best performing shares with cash from sales.stock_amounts = Buy(price_increases, cash, day_close, stock_amounts)# Increment daycurrent_day += 1# Get new day_close pricesday_close = GetPrices(portfolio_size, bigX, current_day)return stock_amounts, day_close, current_daydef Sell(portfolio_value, sell_perc, price_increases, portfolio_size, stock_amounts, day_close):'''Conducts the sale of a proportion of the stocks in the portfolio with theworst predicted next-day pricesInputs:portfolio_value: array (portfolio size * 1), calculating the total value of all stocks held in the portfolio according to the current day close pricessell_perc: float, a user input defining which proportion of the portfolio valueshould be sold at the end of each dayprice_increases: array (portfolio size * 1), calculating the predicted changesin price between the current day and the next-day prediction for each stock portfolio_size: int, the number of companies that we can trade withstock_amounts: array (portfolio size * 1), the number of stocks for each company held in the portfolio at the current dayday_close: array (portfolio size * 1), consisting of close prices for each company on the current dayreturns: cash: float, the amount of cash generated by the sale of the worst-performingstocksstock_amounts: array (portfolio size * 1), the number of stocks for each company held in the portfolio at the end of the sale'''sell_value = np.sum(portfolio_value)*sell_perccash = 0lowest = np.sort(price_increases,axis=None)for i in range(0,portfolio_size):# For each ticker, find location of lowest price in price_increases lowest_value = stock_amounts[price_increases == lowest[i]]*day_close[price_increases == lowest[i]]temp_cash = cash + lowest_valueif temp_cash < sell_value:stock_amounts[price_increases == lowest[i]] = 0cash = temp_cashelif temp_cash == sell_value:stock_amounts[price_increases == lowest[i]] = 0cash = temp_cashbreakelse:number_sold = (sell_value-cash)/day_close[price_increases == lowest[i]]stock_amounts[price_increases == lowest[i]] = stock_amounts[price_increases == lowest[i]] - number_soldnew_cash = number_sold*day_close[price_increases == lowest[i]]cash = new_cash + cashbreakreturn cash, stock_amountsdef Buy(price_increases, cash, day_close, stock_amounts):'''Purchases stocks using the cash generated by the sale of the bottom of the portfolio, with an even distribution between the top two performing stocks.Inputs:price_increases: array (portfolio size * 1), calculating the predicted changesin price between the current day and the next-day prediction for each stock cash: float, the amount of cash generated by the sale of the worst-performingstocksday_close: array (portfolio size * 1), consisting of close prices for each company on the current daystock_amounts: array (portfolio size * 1), the number of stocks for each company held in the portfolio at the end of the salereturns: stock_amounts: array (portfolio size * 1), the number of stocks for each company held in the portfolio at the end of the purchases'''best = np.sort(price_increases,axis=None)[::-1]number_bought1 = 0.5*cash/day_close[price_increases == best[0]]number_bought2 = 0.5*cash/day_close[price_increases == best[1]]    stock_amounts[price_increases == best[0]] = stock_amounts[price_increases == best[0]] + number_bought1stock_amounts[price_increases == best[1]] = stock_amounts[price_increases == best[1]] + number_bought2return stock_amountsdef DMD(X1, X2, dt):'''Conducts the DMD analysisInputs:X1: array (portfolio size * (mp-1)), the first DMD matrixX2: array (portfolio size * (mp-1)), the second DMD matrixdt: float, the time difference between snapshots of data (ie days)returns: Phi: array (portfolio size * (mp-1)), the DMD modesb: array ((mp-1) * 1), the DMD mode amplitudesomega: array ((mp-1) * 1), the DMD mode frequencies'''# SVD on X1U,S,V = np.linalg.svd(X1,full_matrices=0)Sigmar = np.diag(S)# Calculate AtildeAtilde = np.linalg.solve(Sigmar.T,(U.T @ X2 @ V.T).T).T# Eigendecomp of AtildeLambda, W = np.linalg.eig(Atilde)L = np.diag(Lambda)# DMD modesPhi = X2 @ np.linalg.solve(Sigmar.T,V).T @ W# DMD amplitudesalpha1 = Sigmar @ V[:,0]b = np.linalg.solve(W @ L,alpha1)# Frequencyomega = np.log(Lambda)/dtreturn Phi, b, omegadef DMDreconstruct(X1, t, b, omega, Phi, mp, mf):'''Conducts the DMD reconstruction in order to make a next-day price predictionInputs:X1: array (portfolio size * (mp-1)), the first DMD matrixt: list (length mp+mf), time vector used to reconstruct the data matrixb: array ((mp-1) * 1), the DMD mode amplitudesomega: array ((mp-1) * 1), the DMD mode frequenciesPhi: array (portfolio size * (mp-1)), the DMD modesmp: int, number of historical days used to build the DMD modelmf: int, number of days to predict in the future with the DMD modelreturns: price_predictions: array (portfolio size * 1), the DMD model of day close prices projected out mf day(s) into the future'''time_dynamics = np.zeros(shape=(X1.shape[1],len(t)),dtype=np.complex128)for i in range(0,len(t)):time_dynamics[:,i] = np.multiply(b,np.exp(omega*t[i]))X_dmd = Phi @ time_dynamicsprice_predictions = np.real(X_dmd[:,(mp)])return price_predictions#%% Set parameters and reduce table size
after_start_date = data['date'] >= start_date
before_end_date = data['date'] <= end_date
between_two_dates = after_start_date & before_end_datetabledates = data.loc[between_two_dates]tickers = [s1,s2,s3,s4,s5,s6,s7,s8]
portfolio_size = len(tickers)# Get retail_table in the specified date range
reduced_table = []
for i in tickers:ticker_loc = tabledates['ticker'] == ireduced_table.append(tabledates.loc[ticker_loc])retail_table = pd.concat(reduced_table)# Form the big data matrix.
# For each ticker, get all the close prices and store.
days = len(retail_table[retail_table['ticker'] == tickers[0]])
bigX = np.zeros(shape=(portfolio_size,days))
for i in range(0,portfolio_size):temp = retail_table[retail_table['ticker'] == tickers[i]]temp_price_vector =  temp['close'].values.tolist()bigX[i,:] = temp_price_vector#%% Initialise the trading
# Initialise at day 7, as DMD uses data on the previous 7 days to predict
# the price on the following day
current_day = 7# Initialise capital and date
init_each = 1e6/portfolio_size
init_day = datetime.datetime.strptime(start_date,'%Y-%m-%d') + datetime.timedelta(days = (mp-1))day_close = GetPrices(portfolio_size, bigX, current_day)# Evenly distribute stock
stock_amounts = np.zeros(shape=(portfolio_size,1))
for i in range(0,portfolio_size):stock_amounts[i,0] = init_each/day_close[i]#%% The trading
# Initialise portfolio value over time
valuet = np.zeros(shape=(1,days))# Trade
for i in range(0,days-mp-1):stock_amounts, day_close, current_day = Trade(current_day, mp, mf, bigX, portfolio_size, stock_amounts, day_close, sell_perc);# Calculate value of portfolio and store in valuetvalue = np.sum(stock_amounts*day_close)valuet[0,i] = value#%% Load S&P data
SP = pd.read_csv('S&Pretail_reduced.csv')#%% Average returns
returnDMD = valuet[0,0:days-(mp+mf)] - 1e6
avreturnDMD = np.mean(returnDMD)
returnSP = SP['close'][0:days-(mp+mf)] - 1e6
avreturnSP = np.mean(returnSP)
DMDperformance = avreturnDMD/avreturnSP
print('DMD produces average returns of',round(DMDperformance,1),'times the S&P index.')#%% Plot
axdates = pd.to_datetime(SP['date'][0:days-(mp+mf)],dayfirst=True)plt.figure()
mpl.rc('font',family='Times New Roman')
plt.plot(axdates,valuet[0,0:days-(mp+mf)]/1e6,linewidth=3,color="#0072BD")
plt.plot(axdates,SP['close'][0:days-(mp+mf)]/1e6,linewidth=3,color="#7E2F8E")
plt.ylabel('USD (millions)',fontsize=20)
plt.legend(['DMD Algorithm','S&P Retail Index'],)
plt.grid()
plt.show()

图片

知乎学术咨询:

哥廷根数学学派 - 知乎

工学博士,担任《Mechanical System and Signal Processing》,《中国电机工程学报》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

这篇关于Python环境下基于动态模态分解的股票价格预测的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/908310

相关文章

Python实现终端清屏的几种方式详解

《Python实现终端清屏的几种方式详解》在使用Python进行终端交互式编程时,我们经常需要清空当前终端屏幕的内容,本文为大家整理了几种常见的实现方法,有需要的小伙伴可以参考下... 目录方法一:使用 `os` 模块调用系统命令方法二:使用 `subprocess` 模块执行命令方法三:打印多个换行符模拟

Python实现MQTT通信的示例代码

《Python实现MQTT通信的示例代码》本文主要介绍了Python实现MQTT通信的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1. 安装paho-mqtt库‌2. 搭建MQTT代理服务器(Broker)‌‌3. pytho

基于Python开发一个图像水印批量添加工具

《基于Python开发一个图像水印批量添加工具》在当今数字化内容爆炸式增长的时代,图像版权保护已成为创作者和企业的核心需求,本方案将详细介绍一个基于PythonPIL库的工业级图像水印解决方案,有需要... 目录一、系统架构设计1.1 整体处理流程1.2 类结构设计(扩展版本)二、核心算法深入解析2.1 自

从入门到进阶讲解Python自动化Playwright实战指南

《从入门到进阶讲解Python自动化Playwright实战指南》Playwright是针对Python语言的纯自动化工具,它可以通过单个API自动执行Chromium,Firefox和WebKit... 目录Playwright 简介核心优势安装步骤观点与案例结合Playwright 核心功能从零开始学习

Python 字典 (Dictionary)使用详解

《Python字典(Dictionary)使用详解》字典是python中最重要,最常用的数据结构之一,它提供了高效的键值对存储和查找能力,:本文主要介绍Python字典(Dictionary)... 目录字典1.基本特性2.创建字典3.访问元素4.修改字典5.删除元素6.字典遍历7.字典的高级特性默认字典

Python自动化批量重命名与整理文件系统

《Python自动化批量重命名与整理文件系统》这篇文章主要为大家详细介绍了如何使用Python实现一个强大的文件批量重命名与整理工具,帮助开发者自动化这一繁琐过程,有需要的小伙伴可以了解下... 目录简介环境准备项目功能概述代码详细解析1. 导入必要的库2. 配置参数设置3. 创建日志系统4. 安全文件名处

使用Python构建一个高效的日志处理系统

《使用Python构建一个高效的日志处理系统》这篇文章主要为大家详细讲解了如何使用Python开发一个专业的日志分析工具,能够自动化处理、分析和可视化各类日志文件,大幅提升运维效率,需要的可以了解下... 目录环境准备工具功能概述完整代码实现代码深度解析1. 类设计与初始化2. 日志解析核心逻辑3. 文件处

python生成随机唯一id的几种实现方法

《python生成随机唯一id的几种实现方法》在Python中生成随机唯一ID有多种方法,根据不同的需求场景可以选择最适合的方案,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来一起学习学习... 目录方法 1:使用 UUID 模块(推荐)方法 2:使用 Secrets 模块(安全敏感场景)方法

Windows环境下解决Matplotlib中文字体显示问题的详细教程

《Windows环境下解决Matplotlib中文字体显示问题的详细教程》本文详细介绍了在Windows下解决Matplotlib中文显示问题的方法,包括安装字体、更新缓存、配置文件设置及编码調整,并... 目录引言问题分析解决方案详解1. 检查系统已安装字体2. 手动添加中文字体(以SimHei为例)步骤

Java JDK1.8 安装和环境配置教程详解

《JavaJDK1.8安装和环境配置教程详解》文章简要介绍了JDK1.8的安装流程,包括官网下载对应系统版本、安装时选择非系统盘路径、配置JAVA_HOME、CLASSPATH和Path环境变量,... 目录1.下载JDK2.安装JDK3.配置环境变量4.检验JDK官网下载地址:Java Downloads