python 绘制蜡烛图

2024-02-08 08:10
文章标签 python 绘制 蜡烛

本文主要是介绍python 绘制蜡烛图,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.用途:绘制期货数据蜡烛图;具体应用于数据分析。
2.功能:
2.1.实现数据平移方法1:单击鼠标后移动的别处释放方法2:鼠标压下平移方法3:左右箭头键平移
2.2.缩放方法1:滚轮缩放方法2:上下箭头键缩放
2.3.十字光标:显示光标处日期价格3.缺点:数据太多时耗时多,十字光标只能显示在主图,附图不能显示。有高手请指教。
4.数据:日线数据(当然其他周期也可)
5.具体实现:TCursor单光标Candle 蜡烛图类
6.应用:candle = Candle()candle.plot(df)7.备注:
v0.2:2023/12/27
Candle修复当光标移动到当前数据外报错;删除当光标移动到数据外自动平移;
提取出数据含当前数据作为gloabl字典
TCursor修改全局参数;v0.3:2023/12/27
增加ratioButton;主要用来自动切换不同周期,如日线,周线,月线,季线,年线;
调整x轴刻度间隔。v0.4:2023/12/29
增加checkButtonline=False,  # 蜡烛图是否显示为线图ma=True,  # 蜡烛图是否显示均线vol=True,  # 蜡烛图是否显示成交量vline=False,  # 主图上是否显示垂线(合约开始日,合约结算日)time=False  # 移动光标显示文本是否显示合约剩余天数)v0.5:2023/12/30
取消全局变量,重命名部分函数

原数据:只用到date,open    high    low    close    volume    我的数据有62000行

date	codemon	varmon	open	high	low	close	volume	amount	position	settle	前结算	main	var
1994/9/15	A	A	2220	2224	2220	2220	0		3689	0		main	A
1994/9/16	A	A	2228	2228	2210	2210	0		3574	0		main	A
2004/11/3	A0501	A0501	2644	2647	2630	2636	51756	136508.12	137943			not	A
2004/11/4	A0501	A0501	2627	2630	2615	2623	44481	116643.29	136166			not	A
2004/11/5	A0501	A0501	2602	2622	2601	2621	42961	112022.78	136262			not	A
2004/11/8	A0501	A0501	2629	2640	2617	2629	57325	150813.26	140600			not	A
2004/11/9	A0501	A0501	2632	2640	2628	2634	32965	86836.74	139376			not	A
2004/11/10	A0501	A0501	2644	2652	2637	2643	61681	163124	133836			not	A
2004/11/11	A0501	A0501	2650	2660	2612	2635	71765	189398.07	134233			not	A
2023/7/28	A2405	A2405	4875	4877	4857	4865	272	1323.46	2160			not	A
2023/7/31	A2405	A2405	4857	4871	4830	4859	491	2380.46	2293			not	A
2023/8/1	A2405	A2405	4852	4868	4838	4850	272	1319.17	2407			not	A
2023/8/2	A2405	A2405	4852	4856	4832	4837	397	1923.4	2600			not	A
2023/8/3	A2405	A2405	4826	4849	4814	4840	345	1666.15	2678			not	A
2023/8/4	A2405	A2405	4849	4878	4849	4865	702	3416.92	2579			not	A
2023/8/7	A2405	A2405	4876	4930	4843	4915	924	4522.95	2388			not	A
2023/8/8	A2405	A2405	4907	4917	4888	4898	429	2101.7	2427			not	A
2023/8/9	A2405	A2405	4899	4999	4889	4996	1154	5724.85	2328			not	A

 版本v0.5

# -*- coding: utf-8 -*-
"""
@Project:TcyQuant
@File:   candle.py
@Auth:   tcy
@Date:   2023/12/21 19:50
@Desc:
@Ver :   0.0.5
@Emial:  3615693665@qq.com
@City:   China Shanghai Songjiang Yexie
"""
# import talib
import numpy as np
import pandas as pd
import mplfinance as mpf
import matplotlib.pyplot as plt
from matplotlib import ticker as tk
from matplotlib.widgets import Cursor, MultiCursor
from matplotlib.widgets import RadioButtons, CheckButtonsplt.rcParams['font.sans-serif'] = ['simHei']  # 以黑体显示中文
plt.rcParams['axes.unicode_minus'] = False  # 解决保存图像符号“-”显示为放块的问题class _TMultiCursor(object):"""一个用于多个子图(横排或者竖排)的十字星光标,可以在多个子图上同时出现single=0表示仅仅一个子图显示水平线,所有子图显示垂直线,用于竖排的子图single=1表示仅仅一个子图显示垂直线,所有子图显示水平线,用于横排的子图注意:为了能让光标响应事件处理,必须保持对它的引用(比如有个变量保存)用法:import matplotlib.pyplot as pltimport numpy as npfig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)t = np.arange(0.0, 2.0, 0.01)ax1.plot(t, np.sin(2*np.pi*t))ax2.plot(t, np.sin(4*np.pi*t))cursor = TMultiCursor(fig.canvas, (ax1, ax2), single=0, color='w', lw=0.5)plt.show()#单子图上面的十字星光标fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)t = np.arange(0.0, 2.0, 0.01)ax1.plot(t, np.sin(2 * np.pi * t))ax2.plot(t, np.sin(4 * np.pi * t))cursor = Cursor(ax1, useblit=True, color='r', lw=0.5)plt.show()#多子图上面同时出现十字星光标fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)t = np.arange(0.0, 2.0, 0.01)ax1.plot(t, np.sin(2*np.pi*t))ax2.plot(t, np.sin(4*np.pi*t))# cursor = Cursor(ax1, useblit=True, color='r', lw=0.5)cursor = MultiCursor(fig.canvas, (ax1, ax2), useblit=True,horizOn=True, vertOn=True, color='r', lw=0.5)plt.show()"""def __init__(self, fig, axes, single=0, **lineprops):self.canvas = fig.canvasself.axes = axesself.single = singleself.text = self.axes[0].text(0.72, 0.9, '', transform=self.axes[0].transAxes)if not lineprops:lineprops = dict(color='gray', lw=0.5, ls='dashdot')if single not in [0, 1]:raise ValueError('Unrecognized single value: ' +str(single) +', must be 0 or 1')xmin, xmax = axes[0].get_xlim()ymin, ymax = axes[0].get_ylim()xmid = 0.5 * (xmin + xmax)ymid = 0.5 * (ymin + ymax)self.background = Noneself.needclear = Falselineprops['animated'] = True  # for bltself.lines = [[ax.axhline(ymid, visible=False, **lineprops) for ax in axes],[ax.axvline(xmid, visible=False, **lineprops) for ax in axes]]self.canvas.mpl_connect('motion_notify_event', self.onmove)self.canvas.mpl_connect('draw_event', self.clear)def clear(self, event):self.background = (self.canvas.copy_from_bbox(self.canvas.figure.bbox))for line in self.lines[0] + self.lines[1]:line.set_visible(False)self.text.set_visible(False)def onmove(self, event):if event.inaxes is None:returnif not self.canvas.widgetlock.available(self):returnself.needclear = Truefor i in range(len(self.axes)):if event.inaxes == self.axes[i]:if self.single == 0:for line in self.lines[1]:line.set_xdata((event.xdata, event.xdata))line.set_visible(True)line = self.lines[0][i]line.set_ydata((event.ydata, event.ydata))line.set_visible(True)else:for line in self.lines[0]:line.set_ydata((event.ydata, event.ydata))line.set_visible(True)line = self.lines[1][i]line.set_xdata((event.xdata, event.xdata))line.set_visible(True)else:self.lines[self.single][i].set_visible(False)if self.background is not None:# self.text.set_visible(False)self.text.set_visible(True)x, y = event.xdata, event.ydataself.text.set_text('x=%1.2f, y=%1.2f' % (x, y))self.canvas.restore_region(self.background)for lines in self.lines:for line in lines:if line.get_visible():line.axes.draw_artist(line)self.canvas.blit()class TCursor:"""单子图更快的光标"""def __init__(self, ax, data=None):self.ax = axself.background = Noneself.data = dataxmin, xmax = ax.get_xlim()ymin, ymax = ax.get_ylim()xmid = 0.5 * (xmin + xmax)ymid = 0.5 * (ymin + ymax)self.horizontal_line = ax.axhline(ymid, color='k', lw=0.8, ls='--')self.vertical_line = ax.axvline(xmid, color='k', lw=0.8, ls='--')# 轴坐标中的文本位置bbox = dict(boxstyle='round', facecolor='wheat', alpha=0.5)self.text = ax.text(0.72, 0.9, '', transform=ax.transAxes, fontsize=8,verticalalignment='top', bbox=bbox, clip_on=False)self._creating_background = Falseax.figure.canvas.mpl_connect('draw_event', self.on_draw)def on_draw(self, event):self.create_new_background()def set_cross_hair_visible(self, visible):need_redraw = self.horizontal_line.get_visible() != visibleself.horizontal_line.set_visible(visible)self.vertical_line.set_visible(visible)self.text.set_visible(visible)return need_redrawdef create_new_background(self):if not self._creating_background:self._creating_background = Trueself.set_cross_hair_visible(False)self.ax.figure.canvas.draw()self.background = self.ax.figure.canvas.copy_from_bbox(self.ax.bbox)self.set_cross_hair_visible(True)self._creating_background = Falsedef _get_text_(self, idx):text = '\n'.join((r'open: %.1f' % self.data['cur_data']['open'].iloc[idx],r'high: %.1f' % self.data['cur_data']['high'].iloc[idx],r'low %.1f' % self.data['cur_data']['low'].iloc[idx],r'close: %.1f' % self.data['cur_data']['close'].iloc[idx],r'date:%s' % self.data['cur_data'].index[idx].strftime('%Y%m%d')))if self.data['ndays']:text += '\nndays: %s' % self.data['cur_data']['剩余天数'].iloc[idx]return textdef on_mouse_move(self, event):if self.background is None:self.create_new_background()if not event.inaxes:need_redraw = self.set_cross_hair_visible(False)if need_redraw:self.ax.figure.canvas.restore_region(self.background)self.ax.figure.canvas.blit(self.ax.bbox)else:if self.data['cur_data'] is None:returnidx = int(event.xdata)# 光标超出范围平移数据,不在显示文本if idx < 0 or idx >= self.data['cur_n']:returnself.set_cross_hair_visible(True)# update the line positionsx, y = int(event.xdata), int(event.ydata)xmin, xmax = self.ax.get_xlim()ymin, ymax = self.ax.get_ylim()xmid = 0.5 * (xmin + xmax)ymid = 0.5 * (ymin + ymax)self.horizontal_line.set_ydata([y])self.vertical_line.set_xdata([x])_text = self._get_text_(idx)self.text.set_text(_text)self.ax.figure.canvas.restore_region(self.background)self.ax.draw_artist(self.horizontal_line)self.ax.draw_artist(self.vertical_line)self.ax.draw_artist(self.text)self.ax.figure.canvas.blit(self.ax.bbox)class Candle:@classmethoddef get_font(cls):return {'常用': 'Times New Roman',# 中文字体'黑体': 'SimHei','微软雅黑': 'Microsoft YaHei','微软正黑体': 'Microsoft JhengHei','新宋体': 'NSimSun','新细明体': 'PMingLiU','细明体': 'MingLiU','华文新魏': 'STXinwei','华文行楷': 'STXingkai','华文隶书': 'STLliti','花纹琥珀': 'STHupo','华文彩云': 'STCaiyun','方正姚体': 'FZYaoti','方正舒体': 'FZShuTi','标楷体': 'DFKai-SB','华文仿宋': 'STFangsong','华文中宋': 'STZhongsong','华文宋体': 'STSong','华文楷体': 'STKaiti','华文细黑': 'STXihei','幼圆': 'YouYuan','隶书': 'LiSu','楷体_GB 2313': 'Kaiti_GB2313','仿宋_GB2313': 'FangSong_GB2313','仿宋': 'FangSong'}@classmethoddef _get_mystyle_(cls):my_color = mpf.make_marketcolors(up='r', down='g', edge='inherit',wick='inherit', volume='inherit')color = '(0.82, 0.83, 0.85)'return mpf.make_mpf_style(marketcolors=my_color, figcolor=color, gridcolor=color)def _set_title_(self, title='kline '):freqs = {'d': 'day','w': 'week','m': 'mon','q': 'quarter','y': 'year'}title += freqs.get(self.data['freq'], 'day')self.fig.suptitle(title)# self.fig.text(0.50, 0.94, 'kline day', **font)def _set_fig_(self):if self.fig is not None:self.fig.clear()font = {'fontname': 'simHei', 'size': '16', 'color': 'black','weight': 'bold', 'va': 'bottom', 'ha': 'center'}fs, fc = (12, 8), (0.82, 0.83, 0.85)self.fig = mpf.figure(style=self.style, figsize=fs, facecolor=fc)def _set_axis_(self):if self.axes:for k, axis in self.axes.items():axis.set_visible(False)rect = [0.06, 0.25, 0.90, 0.70]  # left, bottom, width, heightif self.data['vol']:axkline = self.fig.add_axes(rect)rect = [0.06, 0.15, 0.90, 0.10]axvol = self.fig.add_axes(rect, sharex=axkline)axvol.set_ylabel('vol')self.axes = {'axkline': axkline, 'axvol': axvol}else:rect = [0.06, 0.15, 0.90, 0.80]self.axes = {'axkline': self.fig.add_axes(rect)}def _set_btn_(self):def set_radio(names):axcolor = 'lightgoldenrodyellow'rect = [0.01, 0.50, 0.05, 0.20]self.axradio = self.fig.add_axes(rect, facecolor='none')for name in names:  # 设置边框颜色透明度self.axradio.spines[name].set(alpha=0.1, color='g')labels = ('d', 'w', 'm', 'q', 'y')self.radio_btn = RadioButtons(self.axradio, labels)  # facecolor='none'for _text in self.radio_btn.labels:_text.set(alpha=0.5, color='g')self.radio_btn.on_clicked(self.on_radiobtn_clicked)def set_check(names):rect = [0.01, 0.33, 0.05, 0.17]self.axcheck = self.fig.add_axes(rect, facecolor='none')for name in names:self.axcheck.spines[name].set(alpha=0.1, color='g')labels = ['line', 'ma', 'vol', 'vline', 'ndays']  # 合约剩余时间visibility = [v for k, v in self.data.items() if k in labels]self.check_btn = CheckButtons(self.axcheck, labels, visibility)for _text in self.check_btn.labels:_text.set(alpha=0.5, color='g')self.check_btn.on_clicked(self.on_checkbtn_clicked)names = ['bottom', 'top', 'left', 'right']set_radio(names)set_check(names)def _set_mpfargs_(self):self.mpfargs['volume'] = self.axes.get('axvol', False)self.mpfargs['mav'] = [5, 10] if self.data['ma'] else []self.mpfargs['addplot'] = []self.mpfargs['type'] = 'line' if self.data['line'] else 'candle'def _init_(self):self._set_fig_()self._set_btn_()self._set_axis_()self._set_mpfargs_()self.fig.canvas.mpl_connect('axes_enter_event', self.on_enter_axes)self.fig.canvas.mpl_connect('axes_leave_event', self.on_leave_axes)self.fig.canvas.mpl_connect('button_press_event', self.on_btnpress)# 拖动-鼠标按下移动到新位置后释放self.fig.canvas.mpl_connect('button_release_event', self.on_btnrelease)# 鼠标按下拖动self.fig.canvas.mpl_connect('motion_notify_event', self.on_motion)self.fig.canvas.mpl_connect('scroll_event', self.on_scroll)  # 放大缩小self.fig.canvas.mpl_connect('key_press_event', self.on_key_press)# plt.tight_layout()plt.subplots_adjust(hspace=0.2)def __init__(self):self.style = self._get_mystyle_()self.fig = Noneself.axes = {}self.axcheck = Noneself.axradio = Noneself.cursor = Noneself.radio_btn = Noneself.check_btn = Noneself.btnpressed = Falseself.mpfargs = {}self.data = dict(data=None,  # 输入数据freq='',  # 显示数据频率df=None,  # 输入数据转为指定频率的数据n=0,  # df的长度cur_data=None,  # 当前数据cur_n=0,  # 当前数据长度cur_start=None,  # 当前数据开始索引cur_end=None,  # 当前数据结算索引cur_x=-1,  # 当前光标位置line=False,  # 蜡烛图是否显示为线图ma=True,  # 蜡烛图是否显示均线vol=True,  # 蜡烛图是否显示成交量vline=False,  # 主图上是否显示垂线(合约开始日,合约结算日)ndays=False  # 移动光标显示文本是否显示合约剩余天数))self._init_()def _set_tick_(self):def get_locator():if self.data['freq'] == 'y':return 1elif self.data['freq'] == 'q':return 3elif self.data['cur_n'] <= 90:return 5else:return int(self.data['cur_n'] / 18)def format_date(x, pos):if x < 0 or x > len(self.data['cur_data'].index) - 1:return ''return self.data['cur_data'].index[int(x)]fmt = tk.FuncFormatter(format_date)self.axes['axkline'].xaxis.set_major_formatter(fmt)val = get_locator()self.axes['axkline'].xaxis.set_major_locator(tk.MultipleLocator(val))def _plot_(self):""" 根据最新的参数,重新绘制整个图表"""# def get_ma(cols=[5, 10]):#     def op(x): return talib.MA(self.data['cur_data']['close'], timeperiod=x)#     d = {('MA%s' % v): op(v) for v in cols}#     return pd.DataFrame(d)# data = get_ma(cols=[5, 10])# ap = [mpf.make_addplot(data, ax=self.axes['axkline'])]# color=(0.6, 0.75, 0.6),ylabel=''self._set_curdata_()self._set_tick_()# 绘制图表mpf.plot(self.data['cur_data'],ax=self.axes['axkline'],volume=self.mpfargs['volume'],mav=self.mpfargs['mav'],addplot=self.mpfargs['addplot'],type=self.mpfargs['type'],style=self.style,datetime_format='%Y%m%d',ylabel='',ylabel_lower='',xrotation=60, warn_too_much_data=self.data['n'])self._set_title_()plt.show()def plot(self, df):""" 根据最新的参数,重新绘制整个图表"""def set_idx_startend():if self.data['cur_start'] is None or self.data['cur_end'] is None:if self.data['n'] <= 200:self.data['cur_start'] = 0self.data['cur_end'] = self.data['n']elif 200 < self.data['n']:self.data['cur_start'] = 100self.data['cur_end'] = self.data['cur_start'] + 100if self.data['cur_start'] >= self.data['cur_end']:self.data['cur_start'] = 0self._set_data_(self.data['freq'], df)set_idx_startend()self._plot_()def _set_data_(self, freq, df=None):if df is not None:self.data['data'] = dffreqs = {'w': 'W-FRI', 'm': 'M', 'q': 'Q', 'y': 'Y'}if freq not in freqs:self.data['df'] = self.data['data']else:self.data['df'] = cls.to_freq(self.data['data'], freqs[freq])self.data['n'] = len(self.data['df'])def _set_curdata_(self, start=None, end=None):if start is None:start = self.data['cur_start']if end is None:end = self.data['cur_end']end = self.data['n'] if end > self.data['n'] else endstart = 0 if (start < 0 or start >= end) else startif start == end:start, end = 0, self.data['n']self.data['cur_start'], self.data['cur_end'] = start, endself.data['cur_data'] = self.data['df'].iloc[start: end]self.data['cur_n'] = len(self.data['cur_data'])def _clear_(self):for axis in self.axes.values():axis.clear()#     ax.cla()#     # plt.clf()def on_radiobtn_clicked(self, label):self.data['freq'] = labelself._set_data_(self.data['freq'])self._clear_()self._plot_()def on_checkbtn_clicked(self, label):names = ['line', 'ma', 'vol', 'vline', 'ndays']d = dict(zip(names, self.check_btn.get_status()))self.data[label] = d[label]if label == 'vol':self._set_axis_()self._set_mpfargs_()self._clear_()self._plot_()def on_enter_axes(self, event):if event.inaxes is None or event.inaxes != self.axes['axkline']:returnif self.cursor is None:self.cursor = TCursor(self.axes['axkline'], self.data)self.fig.canvas.mpl_connect('motion_notify_event', self.cursor.on_mouse_move)self.fig.canvas.mpl_connect('draw_event', self.cursor.on_draw)event.canvas.draw_idle()def on_leave_axes(self, event):if event.inaxes == self.axes['axkline']:self.cursor = Noneevent.canvas.draw_idle()self._clear_()self._plot_()def on_btnpress(self, event):# 在松开鼠标后才真正移动K线,需记录鼠标按下时的坐标,并标记目前鼠标已按下if event.inaxes is None or event.inaxes != self.axes['axkline']:returnself.cursor = Noneif event.button != 1:returnif event.xdata is None:self.data['cur_x'] = -1self.btnpressed = Falseelse:self.btnpressed = Trueself.data['cur_x'] = int(event.xdata)# 鼠标拖动:-鼠标按下移动到新位置后释放def on_btnrelease(self, event):# 松开鼠标拖动到新位置与之前鼠标按下时的位置做差求得距离,# 在显示90天的基础上修改起止日期,重新获取数据,对子图和画布都做清理后重新显示if event.inaxes is None or event.inaxes != self.axes['axkline']:returnself.cursor = Noneevent.canvas.draw_idle()if event.xdata is not None:diff = int(event.xdata) - self.data['cur_x']start = self.data['cur_start'] - diffend = self.data['cur_end'] - diffself._set_curdata_(start, end)self._clear_()self._plot_()self.btnpressed = Falseself.data['cur_x'] = int(event.xdata)else:self.data['cur_x'] = -1def on_motion(self, event):  # 鼠标按下拖动,鼠标不按下平移显示光标if event.inaxes is None or event.inaxes != self.axes['axkline']:returnif not self.btnpressed:  # 显示光标if event.xdata is None:self.data['cur_x'] = -1returnidx = int(event.xdata)# 光标超出范围平移数据,不在显示文本if idx < 0 or idx >= self.data['cur_n']:returnelse:  # 显示光标文本if self.data['cur_start'] < 0:self.data['cur_start'] = 0if self.data['cur_end'] > self.data['n']:self.data['cur_end'] = self.data['n']self._set_curdata_()if self.cursor is None:self.cursor = TCursor(self.axes['axkline'], self.data)self.fig.canvas.mpl_connect('motion_notify_event', self.cursor.on_mouse_move)self.fig.canvas.mpl_connect('draw_event', self.cursor.on_draw)else:# 设定平移的左右界限,如果平移后超出界限,则不再平移self.cursor = Noneif event.xdata is not None:dx = int(event.xdata - self.data['cur_x'])start = self.data['cur_start'] - dxend = self.data['cur_end'] - dxself._set_curdata_(start, end)self._clear_()self._plot_()# 设置默认显示90天的数据,当放大缩小时,每次在当前基础上变化10 %,# 然后用新的起止日期获取数据,清理当前画布后重新显示新数据。def on_scroll(self, event):if event.inaxes is None or event.inaxes != self.axes['axkline']:returnself.cursor = Nonediff = int(self.data['cur_n'] * 0.1 / 2)if event.button == 'down':start = self.data['cur_start'] + diffend = self.data['cur_end'] - diffself._set_curdata_(start, end)elif event.button == 'up':start = self.data['cur_start'] - diffend = self.data['cur_end'] + diffself._set_curdata_(start, end)self._clear_()self._plot_()# 键盘按下处理def on_key_press(self, event):self.cursor = Noneif event.key in ['up', 'down', 'left', 'right']:diff = int(self.data['cur_n'] * 0.1 / 2)diff = 1 if diff == 0 else diffstart, end = 0, 0if event.key == 'up':  # 缩放start = self.data['cur_start'] - diffend = self.data['cur_end'] + diffelif event.key == 'down':  # 缩放start = self.data['cur_start'] + diffend = self.data['cur_end'] - diffelif event.key == 'left':  # 平移start = self.data['cur_start'] - 1end = self.data['cur_end'] - 1elif event.key == 'right':  # 平移start = self.data['cur_start'] + 1end = self.data['cur_end'] + 1self._set_curdata_(start, end)self.axes['axkline'].clear()self.axes['axvol'].clear()self._plot_()@classmethoddef to_freq(cls, df, freq='W-FRI', cols=None):# type:(pd.DataFrame,str,list)->pd.DataFrame"""用途:转换数据频率参数:freq:str*S秒;*T,min;*H小时*B工作日;*D日历日*W-FRI周频(星期五)*M月末;*SM 半月结束(15日和月末)*Q季末*A(S)-Jun 年度,锚定于6月底*A,*Y 年终频率cols:list-like =None要返回的列名"""if cols is None:cols = ['date', 'name', 'open', 'high', 'low', 'close','volume', '是合约开始日', '是合约结束日', '剩余天数']cols = [v for v in cols if v in df.columns]df = df[cols]rdf = pd.DataFrame()if 'name' in cols:rdf['name'] = df['name'].resample(freq).first()rdf['open'] = df['open'].resample(freq).first()rdf['high'] = df['high'].resample(freq).max()rdf['low'] = df['low'].resample(freq).min()rdf['close'] = df['close'].resample(freq).last()if 'date' in cols:rdf['start'] = df['date'].resample(freq).first()rdf['end'] = df['date'].resample(freq).last()if 'volume' in cols:rdf['volume'] = df['volume'].resample(freq).last()  # sum()这一月的每天成交量的和if '是合约开始日' in cols:rdf['是合约开始日'] = df['是合约开始日'].resample(freq).max()if '是合约结束日' in cols:rdf['是合约结束日'] = df['是合约结束日'].resample(freq).max()if '剩余天数' in cols:rdf['剩余天数'] = df['剩余天数'].resample(freq).last()# g1.resample('W', on='date').first()return rdfdef adj_data(df):df = df.sort_values(['varmon', 'date'])g = df.groupby('varmon')dfdst = pd.DataFrame()now = pd.Timestamp.now()today = now.round(freq='D')y2, m2 = str(today.year)[-2:], today.strftime('%m')for varmon in g.groups:df1 = g.get_group(varmon).copy()df1['是合约开始日'] = Falsedf1['是合约结束日'] = Falsedf1['剩余天数'] = -1i0 = df1.columns.get_loc('是合约开始日')i = df1.columns.get_loc('是合约结束日')val = df1['main'].iloc[0]if val != 'main' and val != 'index':df1.iloc[0, i0] = Truets = df1['date'].iloc[-1]ym = re.sub(r'\D+', '', varmon)_y2, _m2 = ym[:2], ym[-2:]data = [v for v in range(len(df1) - 1, -1, -1)]if _y2 < y2:df1.iloc[-1, i] = Truedf1['剩余天数'] = dataelse:if _m2 < m2:df1.iloc[-1, i] = Truedf1['剩余天数'] = dataelif _m2 == m2:if ts.day < 15:start = len(df1) - 1 + 15 - ts.daydata = [v for v in range(start, -1, -1)]df1.iloc[-1, i] = Falsedf1['剩余天数'] = dataelif ts.day >= 15:df1.iloc[-1, i] = Truedf1['剩余天数'] = dataelse:ts2 = pd.Timestamp(year=int('20' + _y2),month=int(_m2),day=15)start = len(df1) - 1 + (ts2 - ts).daysdata = [v for v in range(start, -1, -1)]df1.iloc[-1, i] = Falsedf1['剩余天数'] = datadfdst = pd.concat([dfdst, df1])return dfdstif __name__ == '__main__':import re# 读取示例数据df = pd.read_csv(r'D:\futuresdata_cn\data\day\okday\A.csv')df['date'] = pd.to_datetime(df['date'])df = adj_data(df)df = df.set_index('date')candle = Candle()candle.plot(df)

版本v0.4

# -*- coding: utf-8 -*-
"""
@Project:TcyQuant
@File:   candle.py
@Auth:   tcy
@Date:   2023/12/21 19:50
@Desc:
@Ver :   0.0.4
@Emial:  3615693665@qq.com
@City:   China Shanghai Songjiang Yexie
"""
# import talib
# import re
import numpy as np
import pandas as pd
import mplfinance as mpf
import matplotlib.pyplot as plt
from matplotlib import ticker as tk
from matplotlib.widgets import Cursor, MultiCursor
from matplotlib.widgets import RadioButtons, CheckButtonsplt.rcParams['font.sans-serif'] = ['simHei']  # 以黑体显示中文
plt.rcParams['axes.unicode_minus'] = False  # 解决保存图像符号“-”显示为放块的问题DATA = dict(data=None,  # 输入数据freq='',  # 显示数据频率df=None,  # 输入数据转为指定频率的数据n=0,  # df的长度cur_data=None,  # 当前数据cur_n=0,  # 当前数据长度cur_start=None,  # 当前数据开始索引cur_end=None,  # 当前数据结算索引cur_x=-1,  # 当前光标位置line=False,  # 蜡烛图是否显示为线图ma=True,  # 蜡烛图是否显示均线vol=True,  # 蜡烛图是否显示成交量vline=False,  # 主图上是否显示垂线(合约开始日,合约结算日)time=False  # 移动光标显示文本是否显示合约剩余天数))class _TMultiCursor(object):"""一个用于多个子图(横排或者竖排)的十字星光标,可以在多个子图上同时出现single=0表示仅仅一个子图显示水平线,所有子图显示垂直线,用于竖排的子图single=1表示仅仅一个子图显示垂直线,所有子图显示水平线,用于横排的子图注意:为了能让光标响应事件处理,必须保持对它的引用(比如有个变量保存)用法:import matplotlib.pyplot as pltimport numpy as npfig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)t = np.arange(0.0, 2.0, 0.01)ax1.plot(t, np.sin(2*np.pi*t))ax2.plot(t, np.sin(4*np.pi*t))cursor = TMultiCursor(fig.canvas, (ax1, ax2), single=0, color='w', lw=0.5)plt.show()#单子图上面的十字星光标fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)t = np.arange(0.0, 2.0, 0.01)ax1.plot(t, np.sin(2 * np.pi * t))ax2.plot(t, np.sin(4 * np.pi * t))cursor = Cursor(ax1, useblit=True, color='r', lw=0.5)plt.show()#多子图上面同时出现十字星光标fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)t = np.arange(0.0, 2.0, 0.01)ax1.plot(t, np.sin(2*np.pi*t))ax2.plot(t, np.sin(4*np.pi*t))# cursor = Cursor(ax1, useblit=True, color='r', lw=0.5)cursor = MultiCursor(fig.canvas, (ax1, ax2), useblit=True,horizOn=True, vertOn=True, color='r', lw=0.5)plt.show()"""def __init__(self, fig, axes, single=0, **lineprops):self.canvas = fig.canvasself.axes = axesself.single = singleself.text = self.axes[0].text(0.72, 0.9, '', transform=self.axes[0].transAxes)if not lineprops:lineprops = dict(color='gray', lw=0.5, ls='dashdot')if single not in [0, 1]:raise ValueError('Unrecognized single value: ' +str(single) +', must be 0 or 1')xmin, xmax = axes[0].get_xlim()ymin, ymax = axes[0].get_ylim()xmid = 0.5 * (xmin + xmax)ymid = 0.5 * (ymin + ymax)self.background = Noneself.needclear = Falselineprops['animated'] = True  # for bltself.lines = [[ax.axhline(ymid, visible=False, **lineprops) for ax in axes],[ax.axvline(xmid, visible=False, **lineprops) for ax in axes]]self.canvas.mpl_connect('motion_notify_event', self.onmove)self.canvas.mpl_connect('draw_event', self.clear)def clear(self, event):self.background = (self.canvas.copy_from_bbox(self.canvas.figure.bbox))for line in self.lines[0] + self.lines[1]:line.set_visible(False)self.text.set_visible(False)def onmove(self, event):if event.inaxes is None:returnif not self.canvas.widgetlock.available(self):returnself.needclear = Truefor i in range(len(self.axes)):if event.inaxes == self.axes[i]:if self.single == 0:for line in self.lines[1]:line.set_xdata((event.xdata, event.xdata))line.set_visible(True)line = self.lines[0][i]line.set_ydata((event.ydata, event.ydata))line.set_visible(True)else:for line in self.lines[0]:line.set_ydata((event.ydata, event.ydata))line.set_visible(True)line = self.lines[1][i]line.set_xdata((event.xdata, event.xdata))line.set_visible(True)else:self.lines[self.single][i].set_visible(False)if self.background is not None:# self.text.set_visible(False)self.text.set_visible(True)x, y = event.xdata, event.ydataself.text.set_text('x=%1.2f, y=%1.2f' % (x, y))self.canvas.restore_region(self.background)for lines in self.lines:for line in lines:if line.get_visible():line.axes.draw_artist(line)self.canvas.blit()class TCursor:"""单子图更快的光标"""def __init__(self, ax):self.ax = axself.background = Nonexmin, xmax = ax.get_xlim()ymin, ymax = ax.get_ylim()xmid = 0.5 * (xmin + xmax)ymid = 0.5 * (ymin + ymax)self.horizontal_line = ax.axhline(ymid, color='k', lw=0.8, ls='--')self.vertical_line = ax.axvline(xmid, color='k', lw=0.8, ls='--')# 轴坐标中的文本位置bbox = dict(boxstyle='round', facecolor='wheat', alpha=0.5)self.text = ax.text(0.72, 0.9, '', transform=ax.transAxes, fontsize=8,verticalalignment='top', bbox=bbox, clip_on=False)self._creating_background = Falseax.figure.canvas.mpl_connect('draw_event', self.on_draw)def on_draw(self, event):self.create_new_background()def set_cross_hair_visible(self, visible):need_redraw = self.horizontal_line.get_visible() != visibleself.horizontal_line.set_visible(visible)self.vertical_line.set_visible(visible)self.text.set_visible(visible)return need_redrawdef create_new_background(self):if not self._creating_background:self._creating_background = Trueself.set_cross_hair_visible(False)self.ax.figure.canvas.draw()self.background = self.ax.figure.canvas.copy_from_bbox(self.ax.bbox)self.set_cross_hair_visible(True)self._creating_background = Falsedef on_mouse_move(self, event):if self.background is None:self.create_new_background()if not event.inaxes:need_redraw = self.set_cross_hair_visible(False)if need_redraw:self.ax.figure.canvas.restore_region(self.background)self.ax.figure.canvas.blit(self.ax.bbox)else:if DATA['cur_data'] is None:returnidx = int(event.xdata)# 光标超出范围平移数据,不在显示文本if idx < 0 or idx >= DATA['cur_n']:returnself.set_cross_hair_visible(True)# update the line positionsx, y = int(event.xdata), int(event.ydata)xmin, xmax = self.ax.get_xlim()ymin, ymax = self.ax.get_ylim()xmid = 0.5 * (xmin + xmax)ymid = 0.5 * (ymin + ymax)self.horizontal_line.set_ydata([y])self.vertical_line.set_xdata([x])_text = '\n'.join((r'open: %.1f' % DATA['cur_data']['open'].iloc[idx],r'high: %.1f' % DATA['cur_data']['high'].iloc[idx],r'low %.1f' % DATA['cur_data']['low'].iloc[idx],r'close: %.1f' % DATA['cur_data']['close'].iloc[idx],r'date:%s' % DATA['cur_data'].index[idx].strftime('%Y%m%d')))if DATA['time']:_text += '\nndays: %s' % DATA['cur_data']['剩余天数'].iloc[idx]self.text.set_text(_text)self.ax.figure.canvas.restore_region(self.background)self.ax.draw_artist(self.horizontal_line)self.ax.draw_artist(self.vertical_line)self.ax.draw_artist(self.text)self.ax.figure.canvas.blit(self.ax.bbox)class Candle:@classmethoddef get_font(cls):return {'常用': 'Times New Roman',# 中文字体'黑体': 'SimHei','微软雅黑': 'Microsoft YaHei','微软正黑体': 'Microsoft JhengHei','新宋体': 'NSimSun','新细明体': 'PMingLiU','细明体': 'MingLiU','华文新魏': 'STXinwei','华文行楷': 'STXingkai','华文隶书': 'STLliti','花纹琥珀': 'STHupo','华文彩云': 'STCaiyun','方正姚体': 'FZYaoti','方正舒体': 'FZShuTi','标楷体': 'DFKai-SB','华文仿宋': 'STFangsong','华文中宋': 'STZhongsong','华文宋体': 'STSong','华文楷体': 'STKaiti','华文细黑': 'STXihei','幼圆': 'YouYuan','隶书': 'LiSu','楷体_GB 2313': 'Kaiti_GB2313','仿宋_GB2313': 'FangSong_GB2313','仿宋': 'FangSong'}@classmethoddef _get_mystyle_(cls):my_color = mpf.make_marketcolors(up='r', down='g', edge='inherit',wick='inherit', volume='inherit')color = '(0.82, 0.83, 0.85)'return mpf.make_mpf_style(marketcolors=my_color, figcolor=color, gridcolor=color)def set_title(self, title='kline '):freqs = {'d': 'day','w': 'week','m': 'mon','q': 'quarter','y': 'year'}title += freqs.get(DATA['freq'], 'day')self.fig.suptitle(title)# self.fig.text(0.50, 0.94, 'kline day', **font)def set_fig(self):if self.fig is not None:self.fig.clear()font = {'fontname': 'simHei', 'size': '16', 'color': 'black','weight': 'bold', 'va': 'bottom', 'ha': 'center'}fs, fc = (12, 8), (0.82, 0.83, 0.85)self.fig = mpf.figure(style=self.style, figsize=fs, facecolor=fc)def set_axis(self):if self.axes:for k, axis in self.axes.items():axis.set_visible(False)rect = [0.06, 0.25, 0.90, 0.70]  # left, bottom, width, heightif DATA['vol']:axkline = self.fig.add_axes(rect)rect = [0.06, 0.15, 0.90, 0.10]axvol = self.fig.add_axes(rect, sharex=axkline)axvol.set_ylabel('vol')self.axes = {'axkline': axkline, 'axvol': axvol}else:rect = [0.06, 0.15, 0.90, 0.80]self.axes = {'axkline': self.fig.add_axes(rect)}def set_btn(self):def set_radio(names):axcolor = 'lightgoldenrodyellow'rect = [0.01, 0.50, 0.05, 0.20]self.axradio = self.fig.add_axes(rect, facecolor='none')for name in names:  # 设置边框颜色透明度self.axradio.spines[name].set(alpha=0.1, color='g')labels = ('d', 'w', 'm', 'q', 'y')self.radio_btn = RadioButtons(self.axradio, labels)  # facecolor='none'for _text in self.radio_btn.labels:_text.set(alpha=0.5, color='g')self.radio_btn.on_clicked(self.radiobtn_on_clicked)def set_check(names):rect = [0.01, 0.33, 0.05, 0.17]self.axcheck = self.fig.add_axes(rect, facecolor='none')for name in names:self.axcheck.spines[name].set(alpha=0.1, color='g')labels = ['line', 'ma', 'vol', 'vline', 'time']  # 合约剩余时间visibility = [v for k, v in DATA.items() if k in labels]self.check_btn = CheckButtons(self.axcheck, labels, visibility)for _text in self.check_btn.labels:_text.set(alpha=0.5, color='g')self.check_btn.on_clicked(self.checkbtn_on_clicked)names = ['bottom', 'top', 'left', 'right']set_radio(names)set_check(names)def set_mpfargs(self):self.mpfargs['volume'] = self.axes.get('axvol', False)self.mpfargs['mav'] = [5, 10] if DATA['ma'] else []self.mpfargs['addplot'] = []self.mpfargs['type'] = 'line' if DATA['line'] else 'candle'def _init_(self):self.set_fig()self.set_btn()self.set_axis()self.set_mpfargs()self.fig.canvas.mpl_connect('axes_enter_event', self.enter_axes)self.fig.canvas.mpl_connect('axes_leave_event', self.leave_axes)self.fig.canvas.mpl_connect('button_press_event', self.on_butpress)# 拖动-鼠标按下移动到新位置后释放self.fig.canvas.mpl_connect('button_release_event', self.on_butrelease)# 鼠标按下拖动self.fig.canvas.mpl_connect('motion_notify_event', self.on_motion)self.fig.canvas.mpl_connect('scroll_event', self.on_scroll)  # 放大缩小self.fig.canvas.mpl_connect('key_press_event', self.on_key_press)# plt.tight_layout()plt.subplots_adjust(hspace=0.2)def __init__(self):self.style = self._get_mystyle_()self.fig = Noneself.axes = {}self.axcheck = Noneself.axradio = Noneself.cursor = Noneself.radio_btn = Noneself.check_btn = Noneself.btnpressed = Falseself.mpfargs = {}self._init_()def _plot_(self):""" 根据最新的参数,重新绘制整个图表"""# def get_ma(cols=[5, 10]):#     def op(x): return talib.MA(DATA['cur_data']['close'], timeperiod=x)#     d = {('MA%s' % v): op(v) for v in cols}#     return pd.DataFrame(d)def get_locator():if DATA['freq'] == 'y':return 1elif DATA['freq'] == 'q':return 3elif DATA['cur_n'] <= 90:return 5else:return int(DATA['cur_n'] / 18)def format_date(x, pos):if x < 0 or x > len(DATA['cur_data'].index) - 1:return ''return DATA['cur_data'].index[int(x)]self._set_curdata_()# data = get_ma(cols=[5, 10])# ap = [mpf.make_addplot(data, ax=self.axes['axkline'])]# color=(0.6, 0.75, 0.6),ylabel=''fmt = tk.FuncFormatter(format_date)self.axes['axkline'].xaxis.set_major_formatter(fmt)val = get_locator()self.axes['axkline'].xaxis.set_major_locator(tk.MultipleLocator(val))# 绘制图表mpf.plot(DATA['cur_data'],ax=self.axes['axkline'],volume=self.mpfargs['volume'],mav=self.mpfargs['mav'],addplot=self.mpfargs['addplot'],type=self.mpfargs['type'],style=self.style,datetime_format='%Y%m%d',ylabel='',ylabel_lower='',xrotation=60, warn_too_much_data=DATA['n'])self.set_title()plt.show()def plot(self, df):""" 根据最新的参数,重新绘制整个图表"""def set_idx_startend():if DATA['cur_start'] is None or DATA['cur_end'] is None:if DATA['n'] <= 200:DATA['cur_start'] = 0DATA['cur_end'] = DATA['n']elif 200 < DATA['n']:DATA['cur_start'] = 100DATA['cur_end'] = DATA['cur_start'] + 100if DATA['cur_start'] >= DATA['cur_end']:DATA['cur_start'] = 0self._set_data_(DATA['freq'], df)set_idx_startend()self._plot_()@classmethoddef _set_data_(cls, freq, df=None):if df is not None:DATA['data'] = dffreqs = {'w': 'W-FRI', 'm': 'M', 'q': 'Q', 'y': 'Y'}if freq not in freqs:DATA['df'] = DATA['data']else:DATA['df'] = cls.to_freq(DATA['data'], freqs[freq])DATA['n'] = len(DATA['df'])def radiobtn_on_clicked(self, label):DATA['freq'] = labelself._set_data_(DATA['freq'])self._clear_()self._plot_()def checkbtn_on_clicked(self, label):names = ['line', 'ma', 'vol', 'vline', 'time']d = dict(zip(names, self.check_btn.get_status()))DATA[label] = d[label]if label == 'vol':self.set_axis()self.set_mpfargs()self._clear_()self._plot_()@classmethoddef _set_curdata_(cls, start=None, end=None):if start is None:start = DATA['cur_start']if end is None:end = DATA['cur_end']end = DATA['n'] if end > DATA['n'] else endstart = 0 if (start < 0 or start >= end) else startif start == end:start, end = 0, DATA['n']DATA['cur_start'], DATA['cur_end'] = start, endDATA['cur_data'] = DATA['df'].iloc[start: end]DATA['cur_n'] = len(DATA['cur_data'])def enter_axes(self, event):if event.inaxes is None or event.inaxes != self.axes['axkline']:returnif self.cursor is None:self.cursor = TCursor(self.axes['axkline'])self.fig.canvas.mpl_connect('motion_notify_event', self.cursor.on_mouse_move)self.fig.canvas.mpl_connect('draw_event', self.cursor.on_draw)event.canvas.draw_idle()def leave_axes(self, event):if event.inaxes == self.axes['axkline']:self.cursor = Noneevent.canvas.draw_idle()self._clear_()self._plot_()def on_butpress(self, event):# 在松开鼠标后才真正移动K线,需记录鼠标按下时的坐标,并标记目前鼠标已按下if event.inaxes is None or event.inaxes != self.axes['axkline']:returnself.cursor = Noneif event.button != 1:returnif event.xdata is None:DATA['cur_x'] = -1self.btnpressed = Falseelse:self.btnpressed = TrueDATA['cur_x'] = int(event.xdata)# 鼠标拖动:-鼠标按下移动到新位置后释放def on_butrelease(self, event):# 松开鼠标拖动到新位置与之前鼠标按下时的位置做差求得距离,# 在显示90天的基础上修改起止日期,重新获取数据,对子图和画布都做清理后重新显示if event.inaxes is None or event.inaxes != self.axes['axkline']:returnself.cursor = Noneevent.canvas.draw_idle()if event.xdata is not None:diff = int(event.xdata) - DATA['cur_x']start = DATA['cur_start'] - diffend = DATA['cur_end'] - diffself._set_curdata_(start, end)self._clear_()self._plot_()self.btnpressed = FalseDATA['cur_x'] = int(event.xdata)else:DATA['cur_x'] = -1def on_motion(self, event):  # 鼠标按下拖动,鼠标不按下平移显示光标if event.inaxes is None or event.inaxes != self.axes['axkline']:returnif not self.btnpressed:  # 显示光标if event.xdata is None:DATA['cur_x'] = -1returnidx = int(event.xdata)# 光标超出范围平移数据,不在显示文本if idx < 0 or idx >= DATA['cur_n']:returnelse:  # 显示光标文本if DATA['cur_start'] < 0:DATA['cur_start'] = 0if DATA['cur_end'] > DATA['n']:DATA['cur_end'] = DATA['n']self._set_curdata_()if self.cursor is None:self.cursor = TCursor(self.axes['axkline'])self.fig.canvas.mpl_connect('motion_notify_event', self.cursor.on_mouse_move)self.fig.canvas.mpl_connect('draw_event', self.cursor.on_draw)else:# 设定平移的左右界限,如果平移后超出界限,则不再平移self.cursor = Noneif event.xdata is not None:dx = int(event.xdata - DATA['cur_x'])start = DATA['cur_start'] - dxend = DATA['cur_end'] - dxself._set_curdata_(start, end)self._clear_()self._plot_()# 设置默认显示90天的数据,当放大缩小时,每次在当前基础上变化10 %,# 然后用新的起止日期获取数据,清理当前画布后重新显示新数据。def on_scroll(self, event):if event.inaxes is None or event.inaxes != self.axes['axkline']:returnself.cursor = Nonediff = int(DATA['cur_n'] * 0.1 / 2)if event.button == 'down':start = DATA['cur_start'] + diffend = DATA['cur_end'] - diffself._set_curdata_(start, end)elif event.button == 'up':start = DATA['cur_start'] - diffend = DATA['cur_end'] + diffself._set_curdata_(start, end)self._clear_()self._plot_()def _clear_(self):for axis in self.axes.values():axis.clear()#     ax.cla()#     # plt.clf()# 键盘按下处理def on_key_press(self, event):self.cursor = Noneif event.key in ['up', 'down', 'left', 'right']:diff = int(DATA['cur_n'] * 0.1 / 2)diff = 1 if diff == 0 else diffstart, end = 0, 0if event.key == 'up':  # 缩放start = DATA['cur_start'] - diffend = DATA['cur_end'] + diffelif event.key == 'down':  # 缩放start = DATA['cur_start'] + diffend = DATA['cur_end'] - diffelif event.key == 'left':  # 平移start = DATA['cur_start'] - 1end = DATA['cur_end'] - 1elif event.key == 'right':  # 平移start = DATA['cur_start'] + 1end = DATA['cur_end'] + 1self._set_curdata_(start, end)self.axes['axkline'].clear()self.axes['axvol'].clear()self._plot_()@classmethoddef to_freq(cls, df, freq='W-FRI', cols=None):# type:(pd.DataFrame,str,list)->pd.DataFrame"""用途:转换数据频率参数:freq:str*S秒;*T,min;*H小时*B工作日;*D日历日*W-FRI周频(星期五)*M月末;*SM 半月结束(15日和月末)*Q季末*A(S)-Jun 年度,锚定于6月底*A,*Y 年终频率cols:list-like =None要返回的列名"""if cols is None:cols = ['date', 'name', 'open', 'high', 'low', 'close','volume', '是合约开始日', '是合约结束日', '剩余天数']cols = [v for v in cols if v in df.columns]df = df[cols]rdf = pd.DataFrame()if 'name' in cols:rdf['name'] = df['name'].resample(freq).first()rdf['open'] = df['open'].resample(freq).first()rdf['high'] = df['high'].resample(freq).max()rdf['low'] = df['low'].resample(freq).min()rdf['close'] = df['close'].resample(freq).last()if 'date' in cols:rdf['start'] = df['date'].resample(freq).first()rdf['end'] = df['date'].resample(freq).last()if 'volume' in cols:rdf['volume'] = df['volume'].resample(freq).last()  # sum()这一月的每天成交量的和if '是合约开始日' in cols:rdf['是合约开始日'] = df['是合约开始日'].resample(freq).max()if '是合约结束日' in cols:rdf['是合约结束日'] = df['是合约结束日'].resample(freq).max()if '剩余天数' in cols:rdf['剩余天数'] = df['剩余天数'].resample(freq).last()# g1.resample('W', on='date').first()return rdfdef adj_data(df):df = df.sort_values(['varmon', 'date'])g = df.groupby('varmon')dfdst = pd.DataFrame()now = pd.Timestamp.now()today = now.round(freq='D')y2, m2 = str(today.year)[-2:], today.strftime('%m')for varmon in g.groups:df1 = g.get_group(varmon).copy()df1['是合约开始日'] = Falsedf1['是合约结束日'] = Falsedf1['剩余天数'] = -1i0 = df1.columns.get_loc('是合约开始日')i = df1.columns.get_loc('是合约结束日')val = df1['main'].iloc[0]if val != 'main' and val != 'index':df1.iloc[0, i0] = Truets = df1['date'].iloc[-1]ym = re.sub(r'\D+', '', varmon)_y2, _m2 = ym[:2], ym[-2:]data = [v for v in range(len(df1) - 1, -1, -1)]if _y2 < y2:df1.iloc[-1, i] = Truedf1['剩余天数'] = dataelse:if _m2 < m2:df1.iloc[-1, i] = Truedf1['剩余天数'] = dataelif _m2 == m2:if ts.day < 15:start = len(df1) - 1 + 15 - ts.daydata = [v for v in range(start, -1, -1)]df1.iloc[-1, i] = Falsedf1['剩余天数'] = dataelif ts.day >= 15:df1.iloc[-1, i] = Truedf1['剩余天数'] = dataelse:ts2 = pd.Timestamp(year=int('20' + _y2),month=int(_m2),day=15)start = len(df1) - 1 + (ts2 - ts).daysdata = [v for v in range(start, -1, -1)]df1.iloc[-1, i] = Falsedf1['剩余天数'] = datadfdst = pd.concat([dfdst, df1])return dfdstif __name__ == '__main__':import re# 读取示例数据df = pd.read_csv(r'D:\futuresdata_cn\data\day\okday\A.csv')df['date'] = pd.to_datetime(df['date'])df = adj_data(df)df = df.set_index('date')candle = Candle()candle.plot(df)

 

版本v0.3

# -*- coding: utf-8 -*-
"""
@Project:TcyQuant
@File:   candle.py
@Auth:   tcy
@Date:   2023/12/21 19:50
@Desc:
@Ver :   0.0.3
@Emial:  3615693665@qq.com
@City:   China Shanghai Songjiang Yexie
"""
# import talib
import numpy as np
import pandas as pd
import mplfinance as mpf
import matplotlib.pyplot as plt
from matplotlib import ticker as tk
from matplotlib.widgets import Cursor, MultiCursor
from matplotlib.widgets import RadioButtonsplt.rcParams['font.sans-serif'] = ['simHei']  # 以黑体显示中文
plt.rcParams['axes.unicode_minus'] = False  # 解决保存图像符号“-”显示为放块的问题DATA = dict(data=None, freq='', df=None, n=0, cur_data=None,cur_n=0, cur_start=None, cur_end=None, cur_x=-1)class _TMultiCursor(object):"""一个用于多个子图(横排或者竖排)的十字星光标,可以在多个子图上同时出现single=0表示仅仅一个子图显示水平线,所有子图显示垂直线,用于竖排的子图single=1表示仅仅一个子图显示垂直线,所有子图显示水平线,用于横排的子图注意:为了能让光标响应事件处理,必须保持对它的引用(比如有个变量保存)用法:import matplotlib.pyplot as pltimport numpy as npfig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)t = np.arange(0.0, 2.0, 0.01)ax1.plot(t, np.sin(2*np.pi*t))ax2.plot(t, np.sin(4*np.pi*t))cursor = TMultiCursor(fig.canvas, (ax1, ax2), single=0, color='w', lw=0.5)plt.show()#单子图上面的十字星光标fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)t = np.arange(0.0, 2.0, 0.01)ax1.plot(t, np.sin(2 * np.pi * t))ax2.plot(t, np.sin(4 * np.pi * t))cursor = Cursor(ax1, useblit=True, color='r', lw=0.5)plt.show()#多子图上面同时出现十字星光标fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)t = np.arange(0.0, 2.0, 0.01)ax1.plot(t, np.sin(2*np.pi*t))ax2.plot(t, np.sin(4*np.pi*t))# cursor = Cursor(ax1, useblit=True, color='r', lw=0.5)cursor = MultiCursor(fig.canvas, (ax1, ax2), useblit=True,horizOn=True, vertOn=True, color='r', lw=0.5)plt.show()"""def __init__(self, fig, axes, single=0, **lineprops):self.canvas = fig.canvasself.axes = axesself.single = singleself.text = self.axes[0].text(0.72, 0.9, '', transform=self.axes[0].transAxes)if not lineprops:lineprops = dict(color='gray', lw=0.5, ls='dashdot')if single not in [0, 1]:raise ValueError('Unrecognized single value: ' +str(single) +', must be 0 or 1')xmin, xmax = axes[0].get_xlim()ymin, ymax = axes[0].get_ylim()xmid = 0.5 * (xmin + xmax)ymid = 0.5 * (ymin + ymax)self.background = Noneself.needclear = Falselineprops['animated'] = True  # for bltself.lines = [[ax.axhline(ymid, visible=False, **lineprops) for ax in axes],[ax.axvline(xmid, visible=False, **lineprops) for ax in axes]]self.canvas.mpl_connect('motion_notify_event', self.onmove)self.canvas.mpl_connect('draw_event', self.clear)def clear(self, event):self.background = (self.canvas.copy_from_bbox(self.canvas.figure.bbox))for line in self.lines[0] + self.lines[1]:line.set_visible(False)self.text.set_visible(False)def onmove(self, event):if event.inaxes is None:returnif not self.canvas.widgetlock.available(self):returnself.needclear = Truefor i in range(len(self.axes)):if event.inaxes == self.axes[i]:if self.single == 0:for line in self.lines[1]:line.set_xdata((event.xdata, event.xdata))line.set_visible(True)line = self.lines[0][i]line.set_ydata((event.ydata, event.ydata))line.set_visible(True)else:for line in self.lines[0]:line.set_ydata((event.ydata, event.ydata))line.set_visible(True)line = self.lines[1][i]line.set_xdata((event.xdata, event.xdata))line.set_visible(True)else:self.lines[self.single][i].set_visible(False)if self.background is not None:# self.text.set_visible(False)self.text.set_visible(True)x, y = event.xdata, event.ydataself.text.set_text('x=%1.2f, y=%1.2f' % (x, y))self.canvas.restore_region(self.background)for lines in self.lines:for line in lines:if line.get_visible():line.axes.draw_artist(line)self.canvas.blit()class TCursor:"""单子图更快的光标"""def __init__(self, ax):self.ax = axself.background = Nonexmin, xmax = ax.get_xlim()ymin, ymax = ax.get_ylim()xmid = 0.5 * (xmin + xmax)ymid = 0.5 * (ymin + ymax)self.horizontal_line = ax.axhline(ymid, color='k', lw=0.8, ls='--')self.vertical_line = ax.axvline(xmid, color='k', lw=0.8, ls='--')# 轴坐标中的文本位置bbox = dict(boxstyle='round', facecolor='wheat', alpha=0.5)self.text = ax.text(0.72, 0.9, '', transform=ax.transAxes, fontsize=8,verticalalignment='top', bbox=bbox, clip_on=False)self._creating_background = Falseax.figure.canvas.mpl_connect('draw_event', self.on_draw)def on_draw(self, event):self.create_new_background()def set_cross_hair_visible(self, visible):need_redraw = self.horizontal_line.get_visible() != visibleself.horizontal_line.set_visible(visible)self.vertical_line.set_visible(visible)self.text.set_visible(visible)return need_redrawdef create_new_background(self):if not self._creating_background:self._creating_background = Trueself.set_cross_hair_visible(False)self.ax.figure.canvas.draw()self.background = self.ax.figure.canvas.copy_from_bbox(self.ax.bbox)self.set_cross_hair_visible(True)self._creating_background = Falsedef on_mouse_move(self, event):if self.background is None:self.create_new_background()if not event.inaxes:need_redraw = self.set_cross_hair_visible(False)if need_redraw:self.ax.figure.canvas.restore_region(self.background)self.ax.figure.canvas.blit(self.ax.bbox)else:if DATA['cur_data'] is None:returnidx = int(event.xdata)# 光标超出范围平移数据,不在显示文本if idx < 0 or idx >= DATA['cur_n']:returnself.set_cross_hair_visible(True)# update the line positionsx, y = int(event.xdata), int(event.ydata)xmin, xmax = self.ax.get_xlim()ymin, ymax = self.ax.get_ylim()xmid = 0.5 * (xmin + xmax)ymid = 0.5 * (ymin + ymax)self.horizontal_line.set_ydata([y])self.vertical_line.set_xdata([x])_text = '\n'.join((r'open: %.1f' % DATA['cur_data']["open"].iloc[idx],r'high: %.1f' % DATA['cur_data']["high"].iloc[idx],r'low %.1f' % DATA['cur_data']["low"].iloc[idx],r'close: %.1f' % DATA['cur_data']["close"].iloc[idx],r'date:%s' % DATA['cur_data'].index[idx].strftime('%Y%m%d')))self.text.set_text(_text)self.ax.figure.canvas.restore_region(self.background)self.ax.draw_artist(self.horizontal_line)self.ax.draw_artist(self.vertical_line)self.ax.draw_artist(self.text)self.ax.figure.canvas.blit(self.ax.bbox)class Candle:@classmethoddef to_freq(cls, df, freq='W-FRI', cols=None):# type:(pd.DataFrame,str,list)->pd.DataFrame"""用途:转换数据频率参数:freq:str*S秒;*T,min;*H小时*B工作日;*D日历日*W-FRI周频(星期五)*M月末;*SM 半月结束(15日和月末)*Q季末*A(S)-Jun 年度,锚定于6月底*A,*Y 年终频率cols:list-like =None要返回的列名"""if cols is None:cols = ['date', 'name', 'open', 'high', 'low', 'close', 'volume']cols = [v for v in cols if v in df.columns]df = df[cols]rdf = pd.DataFrame()if 'name' in cols:rdf['name'] = df['name'].resample(freq).first()rdf['open'] = df['open'].resample(freq).first()rdf['high'] = df['high'].resample(freq).max()rdf['low'] = df['low'].resample(freq).min()rdf['close'] = df['close'].resample(freq).last()if 'date' in cols:rdf['start'] = df['date'].resample(freq).first()rdf['end'] = df['date'].resample(freq).last()if 'volume' in cols:rdf['volume'] = df['volume'].resample(freq).last()  # sum()这一月的每天成交量的和# g1.resample('W', on='date').first()return rdf@classmethoddef get_font(cls):return {'常用': 'Times New Roman',# 中文字体'黑体': 'SimHei','微软雅黑': 'Microsoft YaHei','微软正黑体': 'Microsoft JhengHei','新宋体': 'NSimSun','新细明体': 'PMingLiU','细明体': 'MingLiU','华文新魏': 'STXinwei','华文行楷': 'STXingkai','华文隶书': 'STLliti','花纹琥珀': 'STHupo','华文彩云': 'STCaiyun','方正姚体': 'FZYaoti','方正舒体': 'FZShuTi','标楷体': 'DFKai-SB','华文仿宋': 'STFangsong','华文中宋': 'STZhongsong','华文宋体': 'STSong','华文楷体': 'STKaiti','华文细黑': 'STXihei','幼圆': 'YouYuan','隶书': 'LiSu','楷体_GB 2313': 'Kaiti_GB2313','仿宋_GB2313': 'FangSong_GB2313','仿宋': 'FangSong'}@classmethoddef _get_mystyle_(cls):my_color = mpf.make_marketcolors(up='r', down='g', edge='inherit',wick='inherit', volume='inherit')color = '(0.82, 0.83, 0.85)'return mpf.make_mpf_style(marketcolors=my_color, figcolor=color, gridcolor=color)def set_title(self, title='kline '):freqs = {'d': 'day', 'w': 'week', 'm': 'mon', 'q': 'quarter', 'y': 'year'}title += freqs.get(DATA['freq'], 'other')self.fig.suptitle(title)def _init_(self):def set_fig():font = {'fontname': 'simHei', 'size': '16', 'color': 'black','weight': 'bold', 'va': 'bottom', 'ha': 'center'}fs, fc = (12, 8), (0.82, 0.83, 0.85)self.fig = mpf.figure(style=self.style, figsize=fs, facecolor=fc)# self.fig.text(0.50, 0.94, 'kline day', **font)def set_axis():axkline = self.fig.add_axes([0.06, 0.25, 0.90, 0.70])axvol = self.fig.add_axes([0.06, 0.15, 0.90, 0.10], sharex=axkline)axvol.set_ylabel('vol')self.axes = {'axkline': axkline, 'axvol': axvol}axcolor = 'lightgoldenrodyellow'self.axradio = self.fig.add_axes([0.01, 0.50, 0.05, 0.20], facecolor='none')# 设置边框颜色透明度names = ['bottom', 'top', 'left', 'right']for name in names:self.axradio.spines[name].set(alpha=0.1, color='g')self.radio = RadioButtons(self.axradio, ('d', 'w', 'm', 'q', 'y'))  # facecolor='none'for _text in self.radio.labels:_text.set(alpha=0.5, color='g')self.radio.on_clicked(self.radiobtn_on_clicked)set_fig()set_axis()self.fig.canvas.mpl_connect('axes_enter_event', self.enter_axes)self.fig.canvas.mpl_connect('axes_leave_event', self.leave_axes)self.fig.canvas.mpl_connect('button_press_event', self.on_butpress)# 拖动-鼠标按下移动到新位置后释放self.fig.canvas.mpl_connect('button_release_event', self.on_butrelease)# 鼠标按下拖动self.fig.canvas.mpl_connect('motion_notify_event', self.on_motion)self.fig.canvas.mpl_connect('scroll_event', self.on_scroll)  # 放大缩小self.fig.canvas.mpl_connect('key_press_event', self.on_key_press)# plt.tight_layout()plt.subplots_adjust(hspace=0.2)def __init__(self):self.style = self._get_mystyle_()self.fig = Noneself.axes = {}self.cursor = Noneself.btnpressed = Falseself._init_()def _plot_(self):""" 根据最新的参数,重新绘制整个图表"""# def get_ma(cols=[5, 10]):#     def op(x): return talib.MA(DATA['cur_data']['close'], timeperiod=x)#     d = {('MA%s' % v): op(v) for v in cols}#     return pd.DataFrame(d)def get_locator():if DATA['freq'] == 'y':return 1elif DATA['freq'] == 'q':return 3elif DATA['cur_n'] <= 90:return 5else:return int(DATA['cur_n'] / 18)def format_date(x, pos):if x < 0 or x > len(DATA['cur_data'].index) - 1:return ''return DATA['cur_data'].index[int(x)]self._set_curdata_()# data = get_ma(cols=[5, 10])# ap = [mpf.make_addplot(data, ax=self.axes['axkline'])]# color=(0.6, 0.75, 0.6),ylabel=''fmt = tk.FuncFormatter(format_date)self.axes['axkline'].xaxis.set_major_formatter(fmt)val = get_locator()self.axes['axkline'].xaxis.set_major_locator(tk.MultipleLocator(val))# 绘制图表mpf.plot(DATA['cur_data'],ax=self.axes['axkline'],volume=self.axes['axvol'],mav=[5, 10],# addplot=ap,type='candle',style=self.style,datetime_format='%Y%m%d',ylabel='',ylabel_lower='',xrotation=60, warn_too_much_data=DATA['n'])self.set_title()plt.show()def plot(self, df):""" 根据最新的参数,重新绘制整个图表"""def set_idx_startend():if DATA['cur_start'] is None or DATA['cur_end'] is None:if DATA['n'] <= 200:DATA['cur_start'] = 0DATA['cur_end'] = DATA['n']elif 200 < DATA['n']:DATA['cur_start'] = 100DATA['cur_end'] = DATA['cur_start'] + 100if DATA['cur_start'] >= DATA['cur_end']:DATA['cur_start'] = 0self._set_data_(DATA['freq'], df)set_idx_startend()self._plot_()@classmethoddef _set_data_(cls, freq, df=None):if df is not None:DATA['data'] = dffreqs = {'w': 'W-FRI', 'm': 'M', 'q': 'Q', 'y': 'Y'}if freq not in freqs:DATA['df'] = DATA['data']else:DATA['df'] = cls.to_freq(DATA['data'], freqs[freq])DATA['n'] = len(DATA['df'])def radiobtn_on_clicked(self, label):DATA['freq'] = labelself._set_data_(DATA['freq'])self._clear_()self._plot_()@classmethoddef _set_curdata_(cls, start=None, end=None):if start is None:start = DATA['cur_start']if end is None:end = DATA['cur_end']end = DATA['n'] if end > DATA['n'] else endstart = 0 if (start < 0 or start >= end) else startif start == end:start, end = 0, DATA['n']DATA['cur_start'], DATA['cur_end'] = start, endDATA['cur_data'] = DATA['df'].iloc[start: end]DATA['cur_n'] = len(DATA['cur_data'])def enter_axes(self, event):if event.inaxes is None or event.inaxes != self.axes['axkline']:returnif self.cursor is None:self.cursor = TCursor(self.axes['axkline'])self.fig.canvas.mpl_connect('motion_notify_event', self.cursor.on_mouse_move)self.fig.canvas.mpl_connect('draw_event', self.cursor.on_draw)event.canvas.draw_idle()def leave_axes(self, event):if event.inaxes == self.axes['axkline']:self.cursor = Noneevent.canvas.draw_idle()self._clear_()self._plot_()def on_butpress(self, event):# 在松开鼠标后才真正移动K线,需记录鼠标按下时的坐标,并标记目前鼠标已按下if event.inaxes is None or event.inaxes != self.axes['axkline']:returnself.cursor = Noneif event.button != 1:returnif event.xdata is None:DATA['cur_x'] = -1self.btnpressed = Falseelse:self.btnpressed = TrueDATA['cur_x'] = int(event.xdata)# 鼠标拖动:-鼠标按下移动到新位置后释放def on_butrelease(self, event):# 松开鼠标拖动到新位置与之前鼠标按下时的位置做差求得距离,# 在显示90天的基础上修改起止日期,重新获取数据,对子图和画布都做清理后重新显示if event.inaxes is None or event.inaxes != self.axes['axkline']:returnself.cursor = Noneevent.canvas.draw_idle()if event.xdata is not None:diff = int(event.xdata) - DATA['cur_x']start = DATA['cur_start'] - diffend = DATA['cur_end'] - diffself._set_curdata_(start, end)self._clear_()self._plot_()self.btnpressed = FalseDATA['cur_x'] = int(event.xdata)else:DATA['cur_x'] = -1def on_motion(self, event):  # 鼠标按下拖动,鼠标不按下平移显示光标if event.inaxes is None or event.inaxes != self.axes['axkline']:returnif not self.btnpressed:  # 显示光标if event.xdata is None:DATA['cur_x'] = -1returnidx = int(event.xdata)# 光标超出范围平移数据,不在显示文本if idx < 0 or idx >= DATA['cur_n']:return# if idx < 0:#     start = DATA['cur_start'] - 1#     end = DATA['cur_end'] - 1# else:  # idx >= DATA['cur_n']:#     start = DATA['cur_start'] + 1#     end = DATA['cur_end'] + 1# self.cursor = None# self._set_curdata_(start, end)# self._clear_()# self._plot_()else:  # 显示光标文本if DATA['cur_start'] < 0:DATA['cur_start'] = 0if DATA['cur_end'] > DATA['n']:DATA['cur_end'] = DATA['n']self._set_curdata_()if self.cursor is None:self.cursor = TCursor(self.axes['axkline'])self.fig.canvas.mpl_connect('motion_notify_event', self.cursor.on_mouse_move)self.fig.canvas.mpl_connect('draw_event', self.cursor.on_draw)else:# 设定平移的左右界限,如果平移后超出界限,则不再平移self.cursor = Noneif event.xdata is not None:dx = int(event.xdata - DATA['cur_x'])start = DATA['cur_start'] - dxend = DATA['cur_end'] - dxself._set_curdata_(start, end)self._clear_()self._plot_()# 设置默认显示90天的数据,当放大缩小时,每次在当前基础上变化10 %,# 然后用新的起止日期获取数据,清理当前画布后重新显示新数据。def on_scroll(self, event):if event.inaxes is None or event.inaxes != self.axes['axkline']:returnself.cursor = Nonediff = int(DATA['cur_n'] * 0.1 / 2)if event.button == 'down':start = DATA['cur_start'] + diffend = DATA['cur_end'] - diffself._set_curdata_(start, end)elif event.button == 'up':start = DATA['cur_start'] - diffend = DATA['cur_end'] + diffself._set_curdata_(start, end)self._clear_()self._plot_()def _clear_(self):self.axes['axkline'].clear()self.axes['axvol'].clear()# for ax in self.axes.values():#     ax.cla()#     # plt.clf()# 键盘按下处理def on_key_press(self, event):self.cursor = Noneif event.key in ['up', 'down', 'left', 'right']:diff = int(DATA['cur_n'] * 0.1 / 2)diff = 1 if diff == 0 else diffstart, end = 0, 0if event.key == 'up':  # 缩放start = DATA['cur_start'] - diffend = DATA['cur_end'] + diffelif event.key == 'down':  # 缩放start = DATA['cur_start'] + diffend = DATA['cur_end'] - diffelif event.key == 'left':  # 平移start = DATA['cur_start'] - 1end = DATA['cur_end'] - 1elif event.key == 'right':  # 平移start = DATA['cur_start'] + 1end = DATA['cur_end'] + 1self._set_curdata_(start, end)self.axes['axkline'].clear()self.axes['axvol'].clear()self._plot_()if __name__ == '__main__':# 读取示例数据df = pd.read_csv(r'D:\futuresdata_cn\data\day\okday\A.csv')df['date'] = pd.to_datetime(df['date'])df = df.set_index('date')candle = Candle()candle.plot(df)

版本v0.2

# -*- coding: utf-8 -*-
"""
@Project:TcyQuant
@File:   candle.py
@Auth:   tcy
@Date:   2023/12/21 19:50
@Desc:
@Ver :   0.0.2
@Emial:  3615693665@qq.com
@City:   China Shanghai Songjiang Yexie
"""
import talib
import numpy as np
import pandas as pd
import mplfinance as mpf
import matplotlib.pyplot as plt
from matplotlib import ticker as tk
from matplotlib.widgets import Cursor, MultiCursor
from matplotlib.widgets import RadioButtonsplt.rcParams['font.sans-serif'] = ['simHei']  # 以黑体显示中文
plt.rcParams['axes.unicode_minus'] = False  # 解决保存图像符号“-”显示为放块的问题DATA = dict(data=None,freq='',df=None, n=0, cur_data=None,cur_n=0,cur_start=None, cur_end=None, cur_x=-1)class _TMultiCursor(object):"""一个用于多个子图(横排或者竖排)的十字星光标,可以在多个子图上同时出现single=0表示仅仅一个子图显示水平线,所有子图显示垂直线,用于竖排的子图single=1表示仅仅一个子图显示垂直线,所有子图显示水平线,用于横排的子图注意:为了能让光标响应事件处理,必须保持对它的引用(比如有个变量保存)用法:import matplotlib.pyplot as pltimport numpy as npfig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)t = np.arange(0.0, 2.0, 0.01)ax1.plot(t, np.sin(2*np.pi*t))ax2.plot(t, np.sin(4*np.pi*t))cursor = TMultiCursor(fig.canvas, (ax1, ax2), single=0, color='w', lw=0.5)plt.show()#单子图上面的十字星光标fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)t = np.arange(0.0, 2.0, 0.01)ax1.plot(t, np.sin(2 * np.pi * t))ax2.plot(t, np.sin(4 * np.pi * t))cursor = Cursor(ax1, useblit=True, color='r', lw=0.5)plt.show()#多子图上面同时出现十字星光标fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)t = np.arange(0.0, 2.0, 0.01)ax1.plot(t, np.sin(2*np.pi*t))ax2.plot(t, np.sin(4*np.pi*t))# cursor = Cursor(ax1, useblit=True, color='r', lw=0.5)cursor = MultiCursor(fig.canvas, (ax1, ax2), useblit=True,horizOn=True, vertOn=True, color='r', lw=0.5)plt.show()"""def __init__(self, fig, axes, single=0, **lineprops):self.canvas = fig.canvasself.axes = axesself.single = singleself.text = self.axes[0].text(0.72, 0.9, '', transform=self.axes[0].transAxes)if not lineprops:lineprops = dict(color='gray', lw=0.5, ls='dashdot')if single not in [0, 1]:raise ValueError('Unrecognized single value: ' +str(single) +', must be 0 or 1')xmin, xmax = axes[0].get_xlim()ymin, ymax = axes[0].get_ylim()xmid = 0.5 * (xmin + xmax)ymid = 0.5 * (ymin + ymax)self.background = Noneself.needclear = Falselineprops['animated'] = True  # for bltself.lines = [[ax.axhline(ymid, visible=False, **lineprops) for ax in axes],[ax.axvline(xmid, visible=False, **lineprops) for ax in axes]]self.canvas.mpl_connect('motion_notify_event', self.onmove)self.canvas.mpl_connect('draw_event', self.clear)def clear(self, event):self.background = (self.canvas.copy_from_bbox(self.canvas.figure.bbox))for line in self.lines[0] + self.lines[1]:line.set_visible(False)self.text.set_visible(False)def onmove(self, event):if event.inaxes is None:returnif not self.canvas.widgetlock.available(self):returnself.needclear = Truefor i in range(len(self.axes)):if event.inaxes == self.axes[i]:if self.single == 0:for line in self.lines[1]:line.set_xdata((event.xdata, event.xdata))line.set_visible(True)line = self.lines[0][i]line.set_ydata((event.ydata, event.ydata))line.set_visible(True)else:for line in self.lines[0]:line.set_ydata((event.ydata, event.ydata))line.set_visible(True)line = self.lines[1][i]line.set_xdata((event.xdata, event.xdata))line.set_visible(True)else:self.lines[self.single][i].set_visible(False)if self.background is not None:# self.text.set_visible(False)self.text.set_visible(True)x, y = event.xdata, event.ydataself.text.set_text('x=%1.2f, y=%1.2f' % (x, y))self.canvas.restore_region(self.background)for lines in self.lines:for line in lines:if line.get_visible():line.axes.draw_artist(line)self.canvas.blit()class TCursor:"""单子图更快的光标"""def __init__(self, ax):self.ax = axself.background = Nonexmin, xmax = ax.get_xlim()ymin, ymax = ax.get_ylim()xmid = 0.5 * (xmin + xmax)ymid = 0.5 * (ymin + ymax)self.horizontal_line = ax.axhline(ymid, color='k', lw=0.8, ls='--')self.vertical_line = ax.axvline(xmid, color='k', lw=0.8, ls='--')# 轴坐标中的文本位置bbox = dict(boxstyle='round', facecolor='wheat', alpha=0.5)self.text = ax.text(0.72, 0.9, '', transform=ax.transAxes, fontsize=8,verticalalignment='top', bbox=bbox, clip_on=False)self._creating_background = Falseax.figure.canvas.mpl_connect('draw_event', self.on_draw)def on_draw(self, event):self.create_new_background()def set_cross_hair_visible(self, visible):need_redraw = self.horizontal_line.get_visible() != visibleself.horizontal_line.set_visible(visible)self.vertical_line.set_visible(visible)self.text.set_visible(visible)return need_redrawdef create_new_background(self):if not self._creating_background:self._creating_background = Trueself.set_cross_hair_visible(False)self.ax.figure.canvas.draw()self.background = self.ax.figure.canvas.copy_from_bbox(self.ax.bbox)self.set_cross_hair_visible(True)self._creating_background = Falsedef on_mouse_move(self, event):if self.background is None:self.create_new_background()if not event.inaxes:need_redraw = self.set_cross_hair_visible(False)if need_redraw:self.ax.figure.canvas.restore_region(self.background)self.ax.figure.canvas.blit(self.ax.bbox)else:if DATA['cur_data'] is None:returnidx = int(event.xdata)# 光标超出范围平移数据,不在显示文本if idx < 0 or idx >= DATA['cur_n']:returnself.set_cross_hair_visible(True)# update the line positionsx, y = int(event.xdata), int(event.ydata)xmin, xmax = self.ax.get_xlim()ymin, ymax = self.ax.get_ylim()xmid = 0.5 * (xmin + xmax)ymid = 0.5 * (ymin + ymax)self.horizontal_line.set_ydata([y])self.vertical_line.set_xdata([x])_text = '\n'.join((r'open: %.1f' % DATA['cur_data']["open"].iloc[idx],r'high: %.1f' % DATA['cur_data']["high"].iloc[idx],r'low %.1f' % DATA['cur_data']["low"].iloc[idx],r'close: %.1f' % DATA['cur_data']["close"].iloc[idx],r'date:%s' % DATA['cur_data'].index[idx].strftime('%Y%m%d')))self.text.set_text(_text)self.ax.figure.canvas.restore_region(self.background)self.ax.draw_artist(self.horizontal_line)self.ax.draw_artist(self.vertical_line)self.ax.draw_artist(self.text)self.ax.figure.canvas.blit(self.ax.bbox)class Candle:@classmethoddef to_freq(cls, df, freq='W-FRI', cols=None):# type:(pd.DataFrame,str,list)->pd.DataFrame"""用途:转换数据频率参数:freq:str*S秒;*T,min;*H小时*B工作日;*D日历日*W-FRI周频(星期五)*M月末;*SM 半月结束(15日和月末)*Q季末*A(S)-Jun 年度,锚定于6月底*A,*Y 年终频率cols:list-like =None要返回的列名"""if cols is None:cols = ['date', 'name', 'open', 'high', 'low', 'close', 'volume']cols = [v for v in cols if v in df.columns]df = df[cols]rdf = pd.DataFrame()if 'name' in cols:rdf['name'] = df['name'].resample(freq).first()rdf['open'] = df['open'].resample(freq).first()rdf['high'] = df['high'].resample(freq).max()rdf['low'] = df['low'].resample(freq).min()rdf['close'] = df['close'].resample(freq).last()if 'date' in cols:rdf['start'] = df['date'].resample(freq).first()rdf['end'] = df['date'].resample(freq).last()if 'volume' in cols:rdf['volume'] = df['volume'].resample(freq).last()  # sum()这一月的每天成交量的和# g1.resample('W', on='date').first()return rdf@classmethoddef get_font(cls):return {'常用': 'Times New Roman',# 中文字体'黑体': 'SimHei','微软雅黑': 'Microsoft YaHei','微软正黑体': 'Microsoft JhengHei','新宋体': 'NSimSun','新细明体': 'PMingLiU','细明体': 'MingLiU','华文新魏': 'STXinwei','华文行楷': 'STXingkai','华文隶书': 'STLliti','花纹琥珀': 'STHupo','华文彩云': 'STCaiyun','方正姚体': 'FZYaoti','方正舒体': 'FZShuTi','标楷体': 'DFKai-SB','华文仿宋': 'STFangsong','华文中宋': 'STZhongsong','华文宋体': 'STSong','华文楷体': 'STKaiti','华文细黑': 'STXihei','幼圆': 'YouYuan','隶书': 'LiSu','楷体_GB 2313': 'Kaiti_GB2313','仿宋_GB2313': 'FangSong_GB2313','仿宋': 'FangSong'}@classmethoddef _get_mystyle_(cls):my_color = mpf.make_marketcolors(up='r', down='g', edge='inherit',wick='inherit', volume='inherit')color = '(0.82, 0.83, 0.85)'return mpf.make_mpf_style(marketcolors=my_color, figcolor=color, gridcolor=color)def _init_(self):def set_fig():font = {'fontname': 'simHei', 'size': '16', 'color': 'black','weight': 'bold', 'va': 'bottom', 'ha': 'center'}fs, fc = (12, 8), (0.82, 0.83, 0.85)self.fig = mpf.figure(style=self.style, figsize=fs, facecolor=fc)# self.fig.text(0.50, 0.94, 'kline day', **font)self.fig.suptitle('kline day')def set_axis():axkline = self.fig.add_axes([0.06, 0.25, 0.90, 0.70])axvol = self.fig.add_axes([0.06, 0.15, 0.90, 0.10], sharex=axkline)axvol.set_ylabel('vol')self.axes = {'axkline': axkline, 'axvol': axvol}axcolor = 'lightgoldenrodyellow'self.axradio = self.fig.add_axes([0.01, 0.50, 0.05, 0.20], facecolor='none')self.radio = RadioButtons(self.axradio, ('d', 'w', 'm', 'q', 'y'))  # facecolor='none'for _text in self.radio.labels:_text.set(alpha=0.5, color='g')self.radio.on_clicked(self.radiobtn_on_clicked)set_fig()set_axis()self.fig.canvas.mpl_connect('axes_enter_event', self.enter_axes)self.fig.canvas.mpl_connect('axes_leave_event', self.leave_axes)self.fig.canvas.mpl_connect('button_press_event', self.on_butpress)# 拖动-鼠标按下移动到新位置后释放self.fig.canvas.mpl_connect('button_release_event', self.on_butrelease)# 鼠标按下拖动self.fig.canvas.mpl_connect('motion_notify_event', self.on_motion)self.fig.canvas.mpl_connect('scroll_event', self.on_scroll)  # 放大缩小self.fig.canvas.mpl_connect('key_press_event', self.on_key_press)# plt.tight_layout()plt.subplots_adjust(hspace=0.2)def __init__(self):self.style = self._get_mystyle_()self.fig = Noneself.axes = {}self.cursor = Noneself.btnpressed = Falseself._init_()def _plot_(self):""" 根据最新的参数,重新绘制整个图表"""def get_ma(cols=[5, 10]):def op(x): return talib.MA(DATA['cur_data']['close'], timeperiod=x)d = {('MA%s' % v): op(v) for v in cols}return pd.DataFrame(d)def format_date(x, pos):if x < 0 or x > len(DATA['cur_data'].index) - 1:return ''return DATA['cur_data'].index[int(x)]self._update_curdata_()data = get_ma(cols=[5, 10])ap = [mpf.make_addplot(data, ax=self.axes['axkline'])]# color=(0.6, 0.75, 0.6),ylabel=''self.axes['axkline'].xaxis.set_major_formatter(tk.FuncFormatter(format_date))self.axes['axkline'].xaxis.set_major_locator(tk.MultipleLocator(5 if DATA['cur_n'] <= 90 else int(DATA['cur_n'] / 18)))# 绘制图表mpf.plot(DATA['cur_data'],ax=self.axes['axkline'],volume=self.axes['axvol'],addplot=ap,type='candle',style=self.style,datetime_format='%Y%m%d',ylabel='',ylabel_lower='',xrotation=45, warn_too_much_data=DATA['n'])plt.show()def plot(self, df):""" 根据最新的参数,重新绘制整个图表"""def set_idx_startend():if DATA['cur_start'] is None or DATA['cur_end'] is None:if DATA['n'] <= 200:DATA['cur_start'] = 0DATA['cur_end'] = DATA['n']elif 200 < DATA['n']:DATA['cur_start'] = 100DATA['cur_end'] = DATA['cur_start'] + 100self._set_data_(DATA['freq'],df)set_idx_startend()self._plot_()@classmethoddef _set_data_(cls,freq,df=None):if df is not None:DATA['data'] = dffreqs = {'w':'W-FRI','m':'M','q':'Q','y':'Y'}if freq not in freqs:DATA['df'] = DATA['data']else:DATA['df'] = cls.to_freq(DATA['data'],freqs[freq])DATA['n'] = len(DATA['df'])def radiobtn_on_clicked(self, label):pass# DATA['freq']=label# self._set_data_(DATA['freq'])# self._plot_()@classmethoddef _update_curdata_(cls):DATA['cur_data'] = DATA['df'].iloc[DATA['cur_start']: DATA['cur_end']]DATA['cur_n'] = len(DATA['cur_data'])@classmethoddef _set_curdata_(cls, start, end):start = 0 if start < 0 else startend = DATA['n'] if end > DATA['n'] else endDATA['cur_start'] = startDATA['cur_end'] = endDATA['cur_data'] = DATA['df'].iloc[start: end]DATA['cur_n'] = len(DATA['cur_data'])def enter_axes(self, event):if event.inaxes is None or event.inaxes != self.axes['axkline']:returnif self.cursor is None:self.cursor = TCursor(self.axes['axkline'])self.fig.canvas.mpl_connect('motion_notify_event', self.cursor.on_mouse_move)self.fig.canvas.mpl_connect('draw_event', self.cursor.on_draw)event.canvas.draw_idle()def leave_axes(self, event):if event.inaxes == self.axes['axkline']:self.cursor = Noneevent.canvas.draw_idle()self._clear_()self._plot_()def on_butpress(self, event):# 在松开鼠标后才真正移动K线,需记录鼠标按下时的坐标,并标记目前鼠标已按下if event.inaxes is None or event.inaxes != self.axes['axkline']:returnself.cursor = Noneif event.button != 1:returnif event.xdata is None:DATA['cur_x'] = -1self.btnpressed = Falseelse:self.btnpressed = TrueDATA['cur_x'] = int(event.xdata)# 鼠标拖动:-鼠标按下移动到新位置后释放def on_butrelease(self, event):# 松开鼠标拖动到新位置与之前鼠标按下时的位置做差求得距离,# 在显示90天的基础上修改起止日期,重新获取数据,对子图和画布都做清理后重新显示if event.inaxes is None or event.inaxes != self.axes['axkline']:returnself.cursor = Noneevent.canvas.draw_idle()if event.xdata is not None:diff = int(event.xdata) - DATA['cur_x']start = DATA['cur_start'] - diffend = DATA['cur_end'] - diffself._set_curdata_(start, end)self._clear_()self._plot_()self.btnpressed = FalseDATA['cur_x'] = int(event.xdata)else:DATA['cur_x'] = -1def on_motion(self, event):  # 鼠标按下拖动,鼠标不按下平移显示光标if event.inaxes is None or event.inaxes != self.axes['axkline']:returnif not self.btnpressed:  # 显示光标if event.xdata is None:DATA['cur_x'] = -1returnidx = int(event.xdata)# 光标超出范围平移数据,不在显示文本if idx < 0 or idx >= DATA['cur_n']:return# if idx < 0:#     start = DATA['cur_start'] - 1#     end = DATA['cur_end'] - 1# else:  # idx >= DATA['cur_n']:#     start = DATA['cur_start'] + 1#     end = DATA['cur_end'] + 1# self.cursor = None# self._set_curdata_(start, end)# self._clear_()# self._plot_()else:  # 显示光标文本if DATA['cur_start'] < 0:DATA['cur_start'] = 0if DATA['cur_end'] > DATA['n']:DATA['cur_end'] = DATA['n']self._update_curdata_()if self.cursor is None:self.cursor = TCursor(self.axes['axkline'])self.fig.canvas.mpl_connect('motion_notify_event', self.cursor.on_mouse_move)self.fig.canvas.mpl_connect('draw_event', self.cursor.on_draw)else:# 设定平移的左右界限,如果平移后超出界限,则不再平移self.cursor = Noneif event.xdata is not None:dx = int(event.xdata - DATA['cur_x'])start = DATA['cur_start'] - dxend = DATA['cur_end'] - dxself._set_curdata_(start, end)self._clear_()self._plot_()# 设置默认显示90天的数据,当放大缩小时,每次在当前基础上变化10 %,# 然后用新的起止日期获取数据,清理当前画布后重新显示新数据。def on_scroll(self, event):if event.inaxes is None or event.inaxes != self.axes['axkline']:returnself.cursor = Nonediff = int(DATA['cur_n'] * 0.1 / 2)if event.button == 'down':start = DATA['cur_start'] + diffend = DATA['cur_end'] - diffself._set_curdata_(start, end)elif event.button == 'up':start = DATA['cur_start'] - diffend = DATA['cur_end'] + diffself._set_curdata_(start, end)self._clear_()self._plot_()def _clear_(self):self.axes['axkline'].clear()self.axes['axvol'].clear()# for ax in self.axes.values():#     ax.cla()#     # plt.clf()# 键盘按下处理def on_key_press(self, event):self.cursor = Noneif event.key in ['up', 'down', 'left', 'right']:diff = int(DATA['cur_n'] * 0.1 / 2)diff = 1 if diff == 0 else diffstart, end = 0, 0if event.key == 'up':  # 缩放start = DATA['cur_start'] - diffend = DATA['cur_end'] + diffelif event.key == 'down':  # 缩放start = DATA['cur_start'] + diffend = DATA['cur_end'] - diffelif event.key == 'left':  # 平移start = DATA['cur_start'] - 1end = DATA['cur_end'] - 1elif event.key == 'right':  # 平移start = DATA['cur_start'] + 1end = DATA['cur_end'] + 1self._set_curdata_(start, end)self.axes['axkline'].clear()self.axes['axvol'].clear()self._plot_()if __name__ == '__main__':# 读取示例数据df = pd.read_csv(r'D:\futuresdata_cn\data\day\okday\A.csv')df['date'] = pd.to_datetime(df['date'])df = df.set_index('date')# df1=Candle.to_freq(df)# print(df1)candle = Candle()candle.plot(df)

版本v0.1 

# -*- coding: utf-8 -*-
"""
@Project:TcyQuant
@File:   candle.py
@Auth:   tcy
@Date:   2023/12/21 19:50
@Desc:
@Ver :   0.0.1
@Emial:  3615693665@qq.com
@City:   China Shanghai Songjiang Yexie
"""
import talib
import numpy as np
import pandas as pd
import mplfinance as mpf
import matplotlib.pyplot as plt
from matplotlib import ticker as tk
from matplotlib.widgets import Cursor, MultiCursorplt.rcParams['font.sans-serif'] = ['simHei']  # 以黑体显示中文
plt.rcParams['axes.unicode_minus'] = False  # 解决保存图像符号“-”显示为放块的问题
CURDATA = Noneclass _TMultiCursor(object):"""一个用于多个子图(横排或者竖排)的十字星光标,可以在多个子图上同时出现single=0表示仅仅一个子图显示水平线,所有子图显示垂直线,用于竖排的子图single=1表示仅仅一个子图显示垂直线,所有子图显示水平线,用于横排的子图注意:为了能让光标响应事件处理,必须保持对它的引用(比如有个变量保存)用法:import matplotlib.pyplot as pltimport numpy as npfig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)t = np.arange(0.0, 2.0, 0.01)ax1.plot(t, np.sin(2*np.pi*t))ax2.plot(t, np.sin(4*np.pi*t))cursor = TMultiCursor(fig.canvas, (ax1, ax2), single=0, color='w', lw=0.5)plt.show()#单子图上面的十字星光标fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)t = np.arange(0.0, 2.0, 0.01)ax1.plot(t, np.sin(2 * np.pi * t))ax2.plot(t, np.sin(4 * np.pi * t))cursor = Cursor(ax1, useblit=True, color='r', lw=0.5)plt.show()#多子图上面同时出现十字星光标fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)t = np.arange(0.0, 2.0, 0.01)ax1.plot(t, np.sin(2*np.pi*t))ax2.plot(t, np.sin(4*np.pi*t))# cursor = Cursor(ax1, useblit=True, color='r', lw=0.5)cursor = MultiCursor(fig.canvas, (ax1, ax2), useblit=True,horizOn=True, vertOn=True, color='r', lw=0.5)plt.show()"""def __init__(self, fig, axes, single=0, **lineprops):self.canvas = fig.canvasself.axes = axesself.single = singleself.text = self.axes[0].text(0.72, 0.9, '', transform=self.axes[0].transAxes)if not lineprops:lineprops = dict(color='gray', lw=0.5, ls='dashdot')if single not in [0, 1]:raise ValueError('Unrecognized single value: ' +str(single) +', must be 0 or 1')xmin, xmax = axes[0].get_xlim()ymin, ymax = axes[0].get_ylim()xmid = 0.5 * (xmin + xmax)ymid = 0.5 * (ymin + ymax)self.background = Noneself.needclear = Falselineprops['animated'] = True  # for bltself.lines = [[ax.axhline(ymid, visible=False, **lineprops) for ax in axes],[ax.axvline(xmid, visible=False, **lineprops) for ax in axes]]self.canvas.mpl_connect('motion_notify_event', self.onmove)self.canvas.mpl_connect('draw_event', self.clear)def clear(self, event):self.background = (self.canvas.copy_from_bbox(self.canvas.figure.bbox))for line in self.lines[0] + self.lines[1]:line.set_visible(False)self.text.set_visible(False)def onmove(self, event):if event.inaxes is None:returnif not self.canvas.widgetlock.available(self):returnself.needclear = Truefor i in range(len(self.axes)):if event.inaxes == self.axes[i]:if self.single == 0:for line in self.lines[1]:line.set_xdata((event.xdata, event.xdata))line.set_visible(True)line = self.lines[0][i]line.set_ydata((event.ydata, event.ydata))line.set_visible(True)else:for line in self.lines[0]:line.set_ydata((event.ydata, event.ydata))line.set_visible(True)line = self.lines[1][i]line.set_xdata((event.xdata, event.xdata))line.set_visible(True)else:self.lines[self.single][i].set_visible(False)if self.background is not None:# self.text.set_visible(False)self.text.set_visible(True)x, y = event.xdata, event.ydataself.text.set_text('x=%1.2f, y=%1.2f' % (x, y))self.canvas.restore_region(self.background)for lines in self.lines:for line in lines:if line.get_visible():line.axes.draw_artist(line)self.canvas.blit()
class TCursor:"""单子图更快的光标"""def __init__(self, ax):self.ax = axself.background = Nonexmin, xmax = ax.get_xlim()ymin, ymax = ax.get_ylim()xmid = 0.5 * (xmin + xmax)ymid = 0.5 * (ymin + ymax)self.horizontal_line = ax.axhline(ymid, color='k', lw=0.8, ls='--')self.vertical_line = ax.axvline(xmid, color='k', lw=0.8, ls='--')# 轴坐标中的文本位置bbox = dict(boxstyle='round', facecolor='wheat', alpha=0.5)self.text = ax.text(0.72, 0.9, '', transform=ax.transAxes, fontsize=8,verticalalignment='top', bbox=bbox, clip_on=False)self._creating_background = Falseax.figure.canvas.mpl_connect('draw_event', self.on_draw)def on_draw(self, event):self.create_new_background()def set_cross_hair_visible(self, visible):need_redraw = self.horizontal_line.get_visible() != visibleself.horizontal_line.set_visible(visible)self.vertical_line.set_visible(visible)self.text.set_visible(visible)return need_redrawdef create_new_background(self):if not self._creating_background:self._creating_background = Trueself.set_cross_hair_visible(False)self.ax.figure.canvas.draw()self.background = self.ax.figure.canvas.copy_from_bbox(self.ax.bbox)self.set_cross_hair_visible(True)self._creating_background = Falsedef on_mouse_move(self, event):if self.background is None:self.create_new_background()if not event.inaxes:need_redraw = self.set_cross_hair_visible(False)if need_redraw:self.ax.figure.canvas.restore_region(self.background)self.ax.figure.canvas.blit(self.ax.bbox)else:self.set_cross_hair_visible(True)# update the line positionsx, y = int(event.xdata), int(event.ydata)xmin, xmax = self.ax.get_xlim()ymin, ymax = self.ax.get_ylim()xmid = 0.5 * (xmin + xmax)ymid = 0.5 * (ymin + ymax)self.horizontal_line.set_ydata([y])self.vertical_line.set_xdata([x])idx = int(event.xdata)_text = '\n'.join((r'open: %.1f' % CURDATA["open"].iloc[idx],r'high: %.1f' % CURDATA["high"].iloc[idx],r'low %.1f' % CURDATA["low"].iloc[idx],r'close: %.1f' % CURDATA["close"].iloc[idx],r'date:%s' % CURDATA.index[idx].strftime('%Y%m%d')))self.text.set_text(_text)self.ax.figure.canvas.restore_region(self.background)self.ax.draw_artist(self.horizontal_line)self.ax.draw_artist(self.vertical_line)self.ax.draw_artist(self.text)self.ax.figure.canvas.blit(self.ax.bbox)
class Candle:@classmethoddef get_font(cls):return {'常用': 'Times New Roman',# 中文字体'黑体': 'SimHei','微软雅黑': 'Microsoft YaHei','微软正黑体': 'Microsoft JhengHei','新宋体': 'NSimSun','新细明体': 'PMingLiU','细明体': 'MingLiU','华文新魏': 'STXinwei','华文行楷': 'STXingkai','华文隶书': 'STLliti','花纹琥珀': 'STHupo','华文彩云': 'STCaiyun','方正姚体': 'FZYaoti','方正舒体': 'FZShuTi','标楷体': 'DFKai-SB','华文仿宋': 'STFangsong','华文中宋': 'STZhongsong','华文宋体': 'STSong','华文楷体': 'STKaiti','华文细黑': 'STXihei','幼圆': 'YouYuan','隶书': 'LiSu','楷体_GB 2313': 'Kaiti_GB2313','仿宋_GB2313': 'FangSong_GB2313','仿宋': 'FangSong'}@classmethoddef _get_mystyle_(cls):my_color = mpf.make_marketcolors(up='r', down='g', edge='inherit',wick='inherit', volume='inherit')color = '(0.82, 0.83, 0.85)'return mpf.make_mpf_style(marketcolors=my_color, figcolor=color, gridcolor=color)def _init_(self):def set_fig():font = {'fontname': 'simHei', 'size': '16', 'color': 'black','weight': 'bold', 'va': 'bottom', 'ha': 'center'}fs, fc = (12, 8), (0.82, 0.83, 0.85)self.fig = mpf.figure(style=self.style, figsize=fs, facecolor=fc)self.fig.text(0.50, 0.94, 'kline day', **font)def set_axis():axkline = self.fig.add_axes([0.08, 0.25, 0.88, 0.60])axvol = self.fig.add_axes([0.08, 0.15, 0.88, 0.10], sharex=axkline)axvol.set_ylabel('vol')self.axes = {'axkline': axkline, 'axvol': axvol}set_fig()set_axis()self.fig.canvas.mpl_connect('axes_enter_event', self.enter_axes)self.fig.canvas.mpl_connect('axes_leave_event', self.leave_axes)self.fig.canvas.mpl_connect('button_press_event', self.on_butpress)# 拖动-鼠标按下移动到新位置后释放self.fig.canvas.mpl_connect('button_release_event', self.on_butrelease)# 鼠标按下拖动self.fig.canvas.mpl_connect('motion_notify_event', self.on_motion)self.fig.canvas.mpl_connect('scroll_event', self.on_scroll)  # 放大缩小self.fig.canvas.mpl_connect('key_press_event', self.on_key_press)# plt.tight_layout()plt.subplots_adjust(hspace=0.2)def __init__(self):self.style = self._get_mystyle_()self.fig = Noneself.axes = {}self.cursor = Noneself.df = Noneself.n_data = 0self.curdata = Noneself.n_curdata = 0self.idx_start = Noneself.idx_end = Noneself.x_curdata = -1self.btnpressed = Falseself._init_()def _plot_(self):""" 根据最新的参数,重新绘制整个图表"""def get_ma(cols=[5, 10]):def op(x): return talib.MA(self.curdata['close'], timeperiod=x)d = {('MA%s' % v): op(v) for v in cols}return pd.DataFrame(d)def format_date(x, pos):if x < 0 or x > len(elf.curdata.index) - 1:return ''return show_data.index[int(x)]global CURDATAself.curdata = self.df.iloc[self.idx_start: self.idx_end]CURDATA = self.curdataself.n_curdata = len(self.curdata)data = get_ma(cols=[5, 10])ap = [mpf.make_addplot(data, ax=self.axes['axkline'])]# color=(0.6, 0.75, 0.6),ylabel=''self.axes['axkline'].xaxis.set_major_formatter(tk.FuncFormatter(format_date))self.axes['axkline'].xaxis.set_major_locator(tk.MultipleLocator(5 if self.n_curdata <= 90 else int(self.n_curdata / 18)))# 绘制图表mpf.plot(self.curdata,ax=self.axes['axkline'],volume=self.axes['axvol'],addplot=ap,type='candle',style=self.style,datetime_format='%Y%m%d',ylabel='',ylabel_lower='',xrotation=45, warn_too_much_data=self.n_data)# self.draw_cross()plt.show()def plot(self, df):""" 根据最新的参数,重新绘制整个图表"""def set_idx_startend():if self.idx_start is None or self.idx_end is None:if self.n_data <= 200:self.idx_start = 0self.idx_end = self.n_dataelif 200 < self.n_data:self.idx_start = 100self.idx_end = self.idx_start + 100self.df = dfself.n_data = len(self.df)set_idx_startend()self._plot_()def enter_axes(self, event):ax = event.inaxesif ax is None:returnif self.cursor is None:self.cursor = TCursor(self.axes['axkline'])self.fig.canvas.mpl_connect('motion_notify_event', self.cursor.on_mouse_move)self.fig.canvas.mpl_connect('draw_event', self.cursor.on_draw)event.canvas.draw_idle()def leave_axes(self, event):self.cursor = Noneevent.canvas.draw_idle()def on_butpress(self, event):# 在松开鼠标后才真正移动K线,需记录鼠标按下时的坐标,并标记目前鼠标已按下self.cursor = Noneevent.canvas.draw_idle()if not (event.inaxes == self.axes['axkline']):returnif event.button != 1:returnif event.xdata is None:self.x_curdata = -1self.btnpressed = Falseelse:self.btnpressed = Trueself.x_curdata = int(event.xdata)# 鼠标拖动:-鼠标按下移动到新位置后释放def on_butrelease(self, event):# 松开鼠标拖动到新位置与之前鼠标按下时的位置做差求得距离,# 在显示90天的基础上修改起止日期,重新获取数据,对子图和画布都做清理后重新显示self.cursor = Noneevent.canvas.draw_idle()if event.xdata is None:returndiff = int(event.xdata) - self.x_curdataself.idx_start -= diffself.idx_end -= diffif self.idx_start < 0:self.idx_start = 0if self.idx_end > self.n_data:self.idx_end = self.n_datafor ax in self.axes.values():ax.cla()# plt.clf()self._plot_()self.btnpressed = Falseself.x_curdata = -1def on_motion(self, event):  # 鼠标按下拖动if not self.btnpressed:ax = event.inaxesif ax is None:return# idx = int(event.xdata)# if idx < 0 or idx >= self.n_curdata:#     returnif self.cursor is None:self.cursor = TCursor(self.axes['axkline'])self.fig.canvas.mpl_connect('motion_notify_event', self.cursor.on_mouse_move)self.fig.canvas.mpl_connect('draw_event', self.cursor.on_draw)else:self.cursor = Noneif not event.inaxes == self.axes['axkline']:returnif event.xdata is None:returndx = int(event.xdata - self.x_curdata)self.idx_start -= dxself.idx_end -= dx# 设定平移的左右界限,如果平移后超出界限,则不再平移if self.idx_start <= 0:self.idx_start = 0if self.idx_end > self.n_data:self.idx_end = self.n_dataself.axes['axkline'].clear()self.axes['axvol'].clear()self._plot_()# 设置默认显示90天的数据,当放大缩小时,每次在当前基础上变化10 %,# 然后用新的起止日期获取数据,清理当前画布后重新显示新数据。def on_scroll(self, event):ax = event.inaxesif ax is None:returndiff = int(self.n_curdata * 0.1 / 2)if event.button == 'down':self.idx_start += diffself.idx_end -= diffelif event.button == 'up':self.idx_start -= diffself.idx_end += diffif self.idx_end > self.n_data:self.idx_end = self.n_dataif self.idx_start < 0:self.idx_start = 0self.curdata = self.df.iloc[self.idx_start: self.idx_end]global CURDATACURDATA = self.curdatafor ax in self.axes.values():ax.cla()# plt.clf()self._plot_()self.cursor = Noneevent.canvas.draw_idle()# 键盘按下处理def on_key_press(self, event):self.cursor = Noneevent.canvas.draw_idle()diff = int(self.n_curdata * 0.1 / 2)if diff == 0:diff = 1if event.key == 'up':  # 向上,看仔细1倍self.idx_start -= diffself.idx_end += diffelif event.key == 'down':  # 向下,看多1倍标的self.idx_start += diffself.idx_end -= diffelif event.key == 'left':self.idx_start -= 1self.idx_end -= 1elif event.key == 'right':self.idx_start += 1self.idx_end += 1if self.idx_start < 0:self.idx_start = 0if self.idx_end > self.n_data:self.idx_end = self.n_dataself.axes['axkline'].clear()self.axes['axvol'].clear()self._plot_()if __name__ == '__main__':# 读取示例数据df = pd.read_csv(r'D:\futuresdata_cn\data\day\okday\A.csv')df['date'] = pd.to_datetime(df['date'])df = df.set_index('date')candle = Candle()candle.plot(df)

 

这篇关于python 绘制蜡烛图的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/690420

相关文章

python: 多模块(.py)中全局变量的导入

文章目录 global关键字可变类型和不可变类型数据的内存地址单模块(单个py文件)的全局变量示例总结 多模块(多个py文件)的全局变量from x import x导入全局变量示例 import x导入全局变量示例 总结 global关键字 global 的作用范围是模块(.py)级别: 当你在一个模块(文件)中使用 global 声明变量时,这个变量只在该模块的全局命名空

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn

系列文章目录 监督学习:参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归(SAheart.csv) 【学习笔记】 陈强-机器学习-Python-Ch6 多项逻辑回归 【学习笔记 及 课后题练习】 陈强-机器学习-Python-Ch7 判别分析 【学

【WebGPU Unleashed】1.1 绘制三角形

一部2024新的WebGPU教程,作者Shi Yan。内容很好,翻译过来与大家共享,内容上会有改动,加上自己的理解。更多精彩内容尽在 dt.sim3d.cn ,关注公众号【sky的数孪技术】,技术交流、源码下载请添加微信号:digital_twin123 在 3D 渲染领域,三角形是最基本的绘制元素。在这里,我们将学习如何绘制单个三角形。接下来我们将制作一个简单的着色器来定义三角形内的像素

nudepy,一个有趣的 Python 库!

更多资料获取 📚 个人网站:ipengtao.com 大家好,今天为大家分享一个有趣的 Python 库 - nudepy。 Github地址:https://github.com/hhatto/nude.py 在图像处理和计算机视觉应用中,检测图像中的不适当内容(例如裸露图像)是一个重要的任务。nudepy 是一个基于 Python 的库,专门用于检测图像中的不适当内容。该

Flutter 进阶:绘制加载动画

绘制加载动画:由小圆组成的大圆 1. 定义 LoadingScreen 类2. 实现 _LoadingScreenState 类3. 定义 LoadingPainter 类4. 总结 实现加载动画 我们需要定义两个类:LoadingScreen 和 LoadingPainter。LoadingScreen 负责控制动画的状态,而 LoadingPainter 则负责绘制动画。

pip-tools:打造可重复、可控的 Python 开发环境,解决依赖关系,让代码更稳定

在 Python 开发中,管理依赖关系是一项繁琐且容易出错的任务。手动更新依赖版本、处理冲突、确保一致性等等,都可能让开发者感到头疼。而 pip-tools 为开发者提供了一套稳定可靠的解决方案。 什么是 pip-tools? pip-tools 是一组命令行工具,旨在简化 Python 依赖关系的管理,确保项目环境的稳定性和可重复性。它主要包含两个核心工具:pip-compile 和 pip

HTML提交表单给python

python 代码 from flask import Flask, request, render_template, redirect, url_forapp = Flask(__name__)@app.route('/')def form():# 渲染表单页面return render_template('./index.html')@app.route('/submit_form',

利用matlab bar函数绘制较为复杂的柱状图,并在图中进行适当标注

示例代码和结果如下:小疑问:如何自动选择合适的坐标位置对柱状图的数值大小进行标注?😂 clear; close all;x = 1:3;aa=[28.6321521955954 26.2453660695847 21.69102348512086.93747104431360 6.25442246899816 3.342835958564245.51365061796319 4.87