【RNN从入门到实战】GRU入门到实战——使用GRU预测股票。

网友投稿 1906 2022-05-28

摘要

GRU是LSTM网络的一种效果很好的变体,它较LSTM网络的结构更加简单,而且效果也很好,因此也是当前非常流形的一种网络。GRU既然是LSTM的变体,因此也是可以解决RNN网络中的长依赖问题。

在LSTM中引入了三个门函数:输入门、遗忘门和输出门来控制输入值、记忆值和输出值。而在GRU模型中只有两个门:分别是更新门和重置门。具体结构如下图所示:

图中的update gate和reset gate分别表示更新门和重置门。

更新门的作用类似于LSTM的遗忘和输入门。它决定前一时刻的状态信息哪些被丢弃和哪些被更新新信息,更新门的值越大说明前一时刻的状态信息带入越多。

重置门控制前一状态有多少信息被写入到当前的候选集

【RNN从入门到实战】GRU入门到实战——使用GRU预测股票。

h

t

~

\tilde{h_{t}}

ht ~ 上,重置门越小,前一状态的信息被写入的越少。

详见下图:

GRU和LSTM的区别

GRU使用门控机制学习长期依赖关系的基本思想和 LSTM 一致,但还是有一些关键区别:

1、GRU 有两个门(重置门与更新门),而 LSTM 有三个门(输入门、遗忘门和输出门)。

2、GRU 并不会控制并保留内部记忆(c_t),且没有 LSTM 中的输出门。

3、LSTM 中的输入与遗忘门对应于 GRU 的更新门,重置门直接作用于前面的隐藏状态。

4、在计算输出时并不应用二阶非线性。

如何选择?

由于 GRU 参数更少,收敛速度更快,因此其实际花费时间要少很多,这可以大大加速了我们的迭代过程。 而从表现上讲,二者之间孰优孰劣并没有定论,这要依据具体的任务和数据集而定,而实际上,二者之间的 performance 差距往往并不大,远没有调参所带来的效果明显,我通常的做法,首先选择GRU作为基本的单元,因为其收敛速度快,可以加速试验进程,快速迭代,而我认为快速迭代这一特点很重要。如果实现没其余优化技巧,才会尝试将 GRU 换为 LSTM,看看有没有变化。

实战——使用GRU预测股票

制作数据集

数据的下载地址:http://quotes.money.163.com/trade/lsjysj_zhishu_000001.html

首先选择查询数据的范围,然后选择下载数据:

点击下载数据后,弹出个框,这个框里可以看到起止日期,数据的列,默认勾选全部,然后单击下载。

将下载后的数据放到工程的根目录下面,然后做一些处理。

第一步 读入csv文件,编码方式设置为gbk。

第二步 将日期列转为“年-月-日”这样的格式。

第三步 根据日期排序,数据默认是从降序,改为升序。

最后 保存到“sh.csv”中。

代码如下:

import pandas as pd df = pd.read_csv("000001.csv", encoding='gbk') df["日期"] = pd.to_datetime(df["日期"], format="%Y-%m-%d") df.sort_values(by=["日期"], ascending=True) df = df.sort_values(by=["日期"], ascending=True) df.to_csv("sh.csv", index=False, sep=',')

到这里数据集制作完成了

读入数据集,构建时序数据。

generate_df_affect_by_n_days函数,通过一个序列来生成一个矩阵(用于处理时序的数据)。就是把当天的前n天作为参数,当天的数据作为label。

readData中的文件名为:sh.csv ,参数column,代表选用的数据,本次预测只用了一列数据,列名就是column,参数n就是之前模型中所说的n,代表column前n天的数据。train_end表示的是后面多少个数据作为测试集。

import pandas as pd import matplotlib.pyplot as plt import datetime import torch import torch.nn as nn import numpy as np from torch.utils.data import Dataset, DataLoader def generate_df_affect_by_n_days(series, n, index=False): if len(series) <= n: raise Exception("The Length of series is %d, while affect by (n=%d)." % (len(series), n)) df = pd.DataFrame() for i in range(n): df['c%d' % i] = series.tolist()[i:-(n - i)] df['y'] = series.tolist()[n:] if index: df.index = series.index[n:] return df def readData(column='收盘价', n=30, all_too=True, index=False, train_end=-300): df = pd.read_csv("sh.csv", index_col=0, encoding='utf-8') df.fillna(0, inplace=True) df.replace(to_replace="None", value=0) del df["股票代码"] del df["名称"] df.index = list(map(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d").date(), df.index)) df_column = df[column].copy() df_column_train, df_column_test = df_column[:train_end], df_column[train_end - n:] df_generate_from_df_column_train = generate_df_affect_by_n_days(df_column_train, n, index=index) print(df_generate_from_df_column_train) if all_too: return df_generate_from_df_column_train, df_column, df.index.tolist() return df_generate_from_df_column_train

构建模型

模型见RNN类,采用GRU+全连接。隐藏层设置64,GRU的输出是64维的,所以设置全连接也是64。

TrainSet是数据读取类,将输出数据的最后一列做标签,前面的列做输入,类的写法是Pytorch的固定模式。

class RNN(nn.Module): def __init__(self, input_size): super(RNN, self).__init__() self.rnn = nn.GRU( input_size=input_size, hidden_size=64, num_layers=1, batch_first=True, ) self.out = nn.Sequential( nn.Linear(64, 1), ) self.hidden = None def forward(self, x): r_out, self.hidden = self.rnn(x) # None 表示 hidden state 会用全0的 state out = self.out(r_out) return out class TrainSet(Dataset): def __init__(self, data): # 定义好 image 的路径 self.data, self.label = data[:, :-1].float(), data[:, -1].float() def __getitem__(self, index): return self.data[index], self.label[index] def __len__(self): return len(self.data)

训练与测试

n为模型中的n

LR是模型的学习率

EPOCH是多次循环

train_end这个在之前的数据集中有提到。(注意是负数)

n = 30

LR = 0.0001

EPOCH = 100

train_end = -500

loss选用mse

预测的数据选择“收盘价”

n = 10 LR = 0.0001 EPOCH = 100 train_end = -500 # 数据集建立 df, df_all, df_index = readData('收盘价', n=n, train_end=train_end) df_all = np.array(df_all.tolist()) plt.plot(df_index, df_all, label='real-data') df_numpy = np.array(df) df_numpy_mean = np.mean(df_numpy) df_numpy_std = np.std(df_numpy) df_numpy = (df_numpy - df_numpy_mean) / df_numpy_std df_tensor = torch.Tensor(df_numpy) trainset = TrainSet(df_tensor) trainloader = DataLoader(trainset, batch_size=16, shuffle=True) rnn = RNN(n) optimizer = torch.optim.Adam(rnn.parameters(), lr=LR) # optimize all cnn parameters loss_func = nn.MSELoss() for step in range(EPOCH): for tx, ty in trainloader: output = rnn(torch.unsqueeze(tx, dim=0)) loss = loss_func(torch.squeeze(output), ty) optimizer.zero_grad() # clear gradients for this training step loss.backward() # back propagation, compute gradients optimizer.step() print(step, loss) if step % 10: torch.save(rnn, 'rnn.pkl') torch.save(rnn, 'rnn.pkl') generate_data_train = [] generate_data_test = [] test_index = len(df_all) + train_end df_all_normal = (df_all - df_numpy_mean) / df_numpy_std df_all_normal_tensor = torch.Tensor(df_all_normal) for i in range(n, len(df_all)): x = df_all_normal_tensor[i - n:i] x = torch.unsqueeze(torch.unsqueeze(x, dim=0), dim=0) y = rnn(x) if i < test_index: generate_data_train.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean) else: generate_data_test.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean) plt.plot(df_index[n:train_end], generate_data_train, label='generate_train') plt.plot(df_index[train_end:], generate_data_test, label='generate_test') plt.legend() plt.show() plt.cla() plt.plot(df_index[train_end:-400], df_all[train_end:-400], label='real-data') plt.plot(df_index[train_end:-400], generate_data_test[:-400], label='generate_test') plt.legend() plt.show()

总结

本实例数据集使用上证的收盘价,构建时序数据集,使用GRU对数据预测,从结果上来看,走势有一定的滞后性,由于只是用了价格预测价格,还是太简单了,可以考虑加入更多的特征去优化这一算法。

完整代码

import pandas as pd import matplotlib.pyplot as plt import datetime import torch import torch.nn as nn import numpy as np from torch.utils.data import Dataset, DataLoader import os os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE' def generate_df_affect_by_n_days(series, n, index=False): if len(series) <= n: raise Exception("The Length of series is %d, while affect by (n=%d)." % (len(series), n)) df = pd.DataFrame() for i in range(n): df['c%d' % i] = series.tolist()[i:-(n - i)] df['y'] = series.tolist()[n:] if index: df.index = series.index[n:] return df def readData(column='收盘价', n=30, all_too=True, index=False, train_end=-300): df = pd.read_csv("sh.csv", index_col=0, encoding='utf-8') df.fillna(0, inplace=True) df.replace(to_replace="None", value=0) del df["股票代码"] del df["名称"] df.index = list(map(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d").date(), df.index)) df_column = df[column].copy() df_column_train, df_column_test = df_column[:train_end], df_column[train_end - n:] df_generate_from_df_column_train = generate_df_affect_by_n_days(df_column_train, n, index=index) print(df_generate_from_df_column_train) if all_too: return df_generate_from_df_column_train, df_column, df.index.tolist() return df_generate_from_df_column_train class RNN(nn.Module): def __init__(self, input_size): super(RNN, self).__init__() self.rnn = nn.GRU( input_size=input_size, hidden_size=64, num_layers=1, batch_first=True, ) self.out = nn.Sequential( nn.Linear(64, 1), ) self.hidden = None def forward(self, x): r_out, self.hidden = self.rnn(x) # None 表示 hidden state 会用全0的 state out = self.out(r_out) return out class TrainSet(Dataset): def __init__(self, data): # 定义好 image 的路径 self.data, self.label = data[:, :-1].float(), data[:, -1].float() def __getitem__(self, index): return self.data[index], self.label[index] def __len__(self): return len(self.data) n = 10 LR = 0.0001 EPOCH = 100 train_end = -500 # 数据集建立 df, df_all, df_index = readData('收盘价', n=n, train_end=train_end) df_all = np.array(df_all.tolist()) plt.plot(df_index, df_all, label='real-data') df_numpy = np.array(df) df_numpy_mean = np.mean(df_numpy) df_numpy_std = np.std(df_numpy) df_numpy = (df_numpy - df_numpy_mean) / df_numpy_std df_tensor = torch.Tensor(df_numpy) trainset = TrainSet(df_tensor) trainloader = DataLoader(trainset, batch_size=64, shuffle=True) rnn = RNN(n) optimizer = torch.optim.Adam(rnn.parameters(), lr=LR) # optimize all cnn parameters loss_func = nn.MSELoss() for step in range(EPOCH): for tx, ty in trainloader: output = rnn(torch.unsqueeze(tx, dim=0)) loss = loss_func(torch.squeeze(output), ty) optimizer.zero_grad() # clear gradients for this training step loss.backward() # back propagation, compute gradients optimizer.step() print(step, loss) if step % 10: torch.save(rnn, 'rnn.pkl') torch.save(rnn, 'rnn.pkl') generate_data_train = [] generate_data_test = [] test_index = len(df_all) + train_end df_all_normal = (df_all - df_numpy_mean) / df_numpy_std df_all_normal_tensor = torch.Tensor(df_all_normal) for i in range(n, len(df_all)): x = df_all_normal_tensor[i - n:i] x = torch.unsqueeze(torch.unsqueeze(x, dim=0), dim=0) y = rnn(x) if i < test_index: generate_data_train.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean) else: generate_data_test.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean) plt.plot(df_index[n:train_end], generate_data_train, label='generate_train') plt.plot(df_index[train_end:], generate_data_test, label='generate_test') plt.legend() plt.show() plt.cla() plt.plot(df_index[train_end:-400], df_all[train_end:-400], label='real-data') plt.plot(df_index[train_end:-400], generate_data_test[:-400], label='generate_test') plt.legend() plt.show()

AI 神经网络

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:SCOM管理包对象详解
下一篇:Cult3D基础教程——2.操作界面认识
相关文章