卷积神经网络(CNN)起源于图像处理。它首次在 LeNet 中用于识别 MNIST 手写数字。然而,卷积神经网络的应用并不局限于处理图像。
在本教程中,我们将通过金融市场的应用示例,探讨使用 CNN 进行时间序列预测。通过这个例子,我们将学习一些使用 Keras 进行模型训练的技术。
完成本教程后,您将了解:
- 典型的多维金融数据序列是什么样的?
- CNN 如何应用于分类问题中的时间序列
- 如何使用生成器将数据馈送到 Keras 模型进行训练
- 如何为 Keras 模型提供自定义评估指标
让我们开始吧

使用 CNN 进行金融时间序列预测
照片作者:Aron Visuals,部分权利保留。
教程概述
本教程分为7个部分;它们是:
- 想法背景
- 数据预处理
- 数据生成器
- 模型
- 训练、验证和测试
- 扩展
- 有效吗?
想法背景
在本教程中,我们遵循 Ehsan Hoseinzade 和 Saman Haratizadeh 的论文《CNNpred:基于 CNN 的股票市场预测(使用逆集合变量)》。作者的代码和数据文件在 GitHub 上可用。
该论文的目标很简单:预测次日的股市方向(即与今日相比是上涨还是下跌),因此这是一个二元分类问题。然而,了解这个问题是如何被构建和解决的很有意思。
我们已经看到了用于序列预测的 CNN 示例。如果以道琼斯工业平均指数 (DJIA) 为例,我们可以构建一个使用 1D 卷积进行预测的 CNN。这是有道理的,因为对时间序列进行一维卷积大致是在计算其移动平均值,或者用数字信号处理的术语来说,是对时间序列应用滤波器。这应该能提供一些关于趋势的线索。
然而,当我们观察金融时间序列时,很明显一些派生信号对预测也有用。例如,价格和交易量结合起来可以提供更好的线索。一些其他技术指标,如不同窗口大小的移动平均线,也很有用。如果我们把所有这些对齐起来,我们就会得到一个数据表,其中每个时间点都有多个**特征**,而目标仍然是预测**一个**时间序列的方向。
在 CNNpred 论文中,为 DJIA 时间序列准备了 82 个这样的特征。

CNNpred 论文中显示所用特征列表的摘录。
与 LSTM 不同,LSTM 有明确的时间步概念,我们在 CNN 模型中将数据表示为矩阵。如下表所示,跨多个时间步的特征表示为二维数组。
数据预处理
接下来,我们将尝试使用 TensorFlow 的 Keras API 从头开始实现 CNNpred 的思想。虽然作者在上面的 GitHub 链接中提供了参考实现,但我们将以不同的方式重新实现它,以说明一些 Keras 技术。
首先,数据是五个 CSV 文件,每个文件对应一个不同的市场指数,位于上述 GitHub 仓库的 Dataset
目录下,或者我们也可以在这里获取副本:
输入数据包含一个日期列和一个名称列,用于标识市场指数的股票代码。我们可以将日期列保留为时间索引,并删除名称列。其余的都是数值。
由于我们要预测市场方向,我们首先尝试创建分类标签。市场方向定义为明天的收盘指数与今天的比较。如果我们已经将数据读入 pandas DataFrame,我们可以使用 X["Close"].pct_change()
来查找百分比变化,其中市场上涨时变化为正。因此,我们可以将其向前移动一个时间步作为我们的标签。
1 2 |
... X["Target"] = (X["Close"].pct_change().shift(-1) > 0).astype(int) |
上述代码行的目的是计算收盘指数的百分比变化,并将数据与前一天对齐。然后将数据转换为 1 或 0,表示百分比变化是否为正。
对于目录中的五个数据文件,我们将每个文件读取为单独的 pandas DataFrame,并将它们保存在一个 Python 字典中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
... data = {} for filename in os.listdir(DATADIR): if not filename.lower().endswith(".csv"): continue # 只读取 CSV 文件 filepath = os.path.join(DATADIR, filename) X = pd.read_csv(filepath, index_col="Date", parse_dates=True) # 基本预处理:获取名称,分类 # 将目标变量保存为 DataFrame 中的一列,以便于 dropna() name = X["Name"][0] del X["Name"] cols = X.columns X["Target"] = (X["Close"].pct_change().shift(-1) > 0).astype(int) X.dropna(inplace=True) # 使用训练数据集拟合标准缩放器 index = X.index[X.index > TRAIN_TEST_CUTOFF] index = index[:int(len(index) * TRAIN_VALID_RATIO)] scaler = StandardScaler().fit(X.loc[index, cols]) # 保存缩放后的 DataFrame X[cols] = scaler.transform(X[cols]) data[name] = X |
上述代码的结果是每个指数一个 DataFrame,其中分类标签是“Target”列,而所有其他列是输入特征。我们还使用标准缩放器对数据进行了归一化。
在时间序列问题中,通常的做法是不要随机分割数据到训练集和测试集,而是设定一个截止点,截止点之前的数据是训练集,之后的数据是测试集。上面的缩放是基于训练集进行的,但应用于整个数据集。
数据生成器
我们不会一次性使用所有时间步,而是使用固定长度的 N 个时间步来预测第 N+1 步的市场方向。在这种设计中,N 个时间步的窗口可以从任何地方开始。我们可以创建大量具有大量重叠的 DataFrame。为了节省内存,我们将构建一个用于训练和验证的数据生成器,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
... TRAIN_TEST_CUTOFF = '2016-04-21' TRAIN_VALID_RATIO = 0.75 def datagen(data, seq_len, batch_size, targetcol, kind): "作为一个生成器为 Keras 模型生成样本" batch = [] while True: # 从池中选择一个 DataFrame key = random.choice(list(data.keys())) df = data[key] input_cols = [c for c in df.columns if c != targetcol] index = df.index[df.index < TRAIN_TEST_CUTOFF] split = int(len(index) * TRAIN_VALID_RATIO) if kind == 'train': index = index[:split] # 训练集的范围 elif kind == 'valid': index = index[split:] # 验证集的范围 # 选择一个位置,然后裁剪序列长度 while True: t = random.choice(index) # 选择一个时间步 n = (df.index == t).argmax() # 查找它在 DataFrame 中的位置 if n-seq_len+1 < 0: continue # 无法获得足够的数据来构成一个序列长度 frame = df.iloc[n-seq_len+1:n+1] batch.append([frame[input_cols].values, df.loc[t, targetcol]]) break # 如果我们得到了一个批次的数据,就发送出去 if len(batch) == batch_size: X, y = zip(*batch) X, y = np.expand_dims(np.array(X), 3), np.array(y) yield X, y batch = [] |
生成器是 Python 中的一种特殊函数,它不 return
值,而是通过迭代 yield
值,从而从中产生一系列数据。为了让生成器在 Keras 训练中使用,它应该 yield
一批输入数据和目标。这个生成器应该无限期运行。因此,上面的生成器函数是用一个无限循环创建的,该循环以 while True
开始。
在每次迭代中,它会从 Python 字典中随机选择一个 DataFrame,然后,在训练集的(即开头部分)时间步范围内,我们从一个随机点开始,使用 pandas 的 iloc[start:end]
语法获取 N 个时间步来创建变量 frame
下的输入。这个 DataFrame 将是一个二维数组。目标标签是最后一个时间步的标签。输入数据和标签然后被添加到列表 batch
中。直到我们累积了一个批次的大小,我们才从生成器中发送出去。
上面代码片段中的最后四行是为了发送一个批次用于训练或验证。我们将输入数据列表(每个都是二维数组)和目标标签列表收集到变量 X
和 y
中,然后将它们转换为 numpy 数组,这样它就可以与我们的 Keras 模型一起工作了。我们需要使用 np.expand_dims()
为 numpy 数组 X
添加一个额外的维度,这是因为网络模型的设计,如下所述。
模型
原始论文中提出的二维 CNN 模型接受的输入张量形状为 $N\times m \times 1$,其中 N 是时间步数,m 是每个时间步的特征数。该论文假设 $N=60$ 且 $m=82$。
该模型包含三个卷积层,如下所述:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
... def cnnpred_2d(seq_len=60, n_features=82, n_filters=(8,8,8), droprate=0.1): "根据论文实现的 2D-CNNpred 模型" model = Sequential([ Input(shape=(seq_len, n_features, 1)), Conv2D(n_filters[0], kernel_size=(1, n_features), activation="relu"), Conv2D(n_filters[1], kernel_size=(3,1), activation="relu"), MaxPool2D(pool_size=(2,1)), Conv2D(n_filters[2], kernel_size=(3,1), activation="relu"), MaxPool2D(pool_size=(2,1)), Flatten(), Dropout(droprate), Dense(1, activation="sigmoid") ]) return model |
模型由以下部分构成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
模型: "sequential" _________________________________________________________________ 层(类型) 输出形状 参数数量 ================================================================= conv2d (Conv2D) (None, 60, 1, 8) 664 _________________________________________________________________ conv2d_1 (Conv2D) (None, 58, 1, 8) 200 _________________________________________________________________ max_pooling2d (MaxPooling2D) (None, 29, 1, 8) 0 _________________________________________________________________ conv2d_2 (Conv2D) (None, 27, 1, 8) 200 _________________________________________________________________ max_pooling2d_1 (MaxPooling2 (None, 13, 1, 8) 0 _________________________________________________________________ flatten (Flatten) (None, 104) 0 _________________________________________________________________ dropout (Dropout) (None, 104) 0 _________________________________________________________________ dense (Dense) (None, 1) 105 ================================================================= 总参数量: 1,169 可训练参数量: 1,169 不可训练参数: 0 |
第一个卷积层有 8 个单元,应用于每个时间步的所有特征。随后是第二个卷积层,用于同时考虑连续三天的信息,因为普遍认为三天的周期可以形成股市趋势。之后将其应用于最大池化层和另一个卷积层,然后展平成一维数组,并应用于具有 sigmoid 激活的全连接层以进行二元分类。
训练、验证和测试
模型到此结束。该论文使用 MAE 作为损失指标,并同时监控准确率和 F1 分数来确定模型的质量。我们应该指出,F1 分数取决于精确率和召回率,这两者都考虑了正类分类。然而,该论文考虑了正类和负类分类的 F1 分数的平均值。明确地说,这是 F1-macro 指标。
$$
F_1 = \frac{1}{2}\left(
\frac{2\cdot \frac{TP}{TP+FP} \cdot \frac{TP}{TP+FN}}{\frac{TP}{TP+FP} + \frac{TP}{TP+FN}}
+
\frac{2\cdot \frac{TN}{TN+FN} \cdot \frac{TN}{TN+FP}}{\frac{TN}{TN+FN} + \frac{TN}{TN+FP}}
\right)
$$
分数 $\frac{TP}{TP+FP}$ 是精确率,其中 TP 和 FP 是真正例和假正例的数量。类似地,$\frac{TP}{TP+FN}$ 是召回率。上面大括号中的第一项是考虑正类分类的正常 F1 指标。第二项是反向的,考虑了负类分类。
虽然 scikit-learn 中可以通过 sklearn.metrics.f1_score()
使用此指标,但 Keras 中没有等效项。因此,我们将从 此 stackexchange 问题 借用代码来创建我们自己的指标。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
from tensorflow.keras import backend as K def recall_m(y_true, y_pred): true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1))) possible_positives = K.sum(K.round(K.clip(y_true, 0, 1))) recall = true_positives / (possible_positives + K.epsilon()) return recall def precision_m(y_true, y_pred): true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1))) predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1))) precision = true_positives / (predicted_positives + K.epsilon()) return precision def f1_m(y_true, y_pred): precision = precision_m(y_true, y_pred) recall = recall_m(y_true, y_pred) return 2*((precision*recall)/(precision+recall+K.epsilon())) def f1macro(y_true, y_pred): f_pos = f1_m(y_true, y_pred) # negative version of the data and prediction f_neg = f1_m(1-y_true, 1-K.clip(y_pred,0,1)) return (f_pos + f_neg)/2 |
训练过程可能需要数小时才能完成。因此,我们希望在训练过程中间保存模型,以便我们可以中断并恢复训练。我们可以利用 Keras 中的 checkpoint 功能。
1 2 3 4 5 6 |
checkpoint_path = "./cp2d-{epoch}-{val_f1macro:.2f}.h5" callbacks = [ ModelCheckpoint(checkpoint_path, monitor='val_f1macro', mode="max", verbose=0, save_best_only=True, save_weights_only=False, save_freq="epoch") ] |
我们设置了一个文件名模板 checkpoint_path
,并要求 Keras 将 epoch 号和验证 F1 分数填入文件名。我们通过监控验证的 F1 指标来保存模型,并且该指标应该在模型变好时增加。因此,我们将 mode="max"
传递给它。
现在训练我们的模型应该很简单,如下所示:
1 2 3 4 5 6 7 8 9 10 11 |
seq_len = 60 batch_size = 128 n_epochs = 20 n_features = 82 model = cnnpred_2d(seq_len, n_features) model.compile(optimizer="adam", loss="mae", metrics=["acc", f1macro]) model.fit(datagen(data, seq_len, batch_size, "Target", "train"), validation_data=datagen(data, seq_len, batch_size, "Target", "valid"), epochs=n_epochs, steps_per_epoch=400, validation_steps=10, verbose=1, callbacks=callbacks) |
在上面的代码片段中有两点需要注意。我们向 compile()
函数的 metrics
参数提供了 "acc"
作为准确率,以及上面定义的 f1macro
函数。因此,这两个指标将在训练过程中进行监控。由于该函数命名为 f1macro
,我们在 checkpoint 的 monitor
参数中将其引用为 val_f1macro
。
另外,在 fit()
函数中,我们通过上面定义的 datagen()
生成器提供了输入数据。调用此函数将产生一个生成器,在训练循环中,批次会从中一个接一个地获取。类似地,验证数据也由生成器提供。
由于生成器的本质是无限分发数据。我们需要告诉训练过程如何定义一个 epoch。在 Keras 中,一个批次是进行梯度下降更新的一次迭代。一个 epoch 应该是一个完整数据集的周期。epoch 的结束是运行验证的时间。这也是运行我们上面定义的 checkpoint 的机会。由于 Keras 无法从生成器推断数据集的大小,我们需要使用 steps_per_epoch
参数来告诉它一个 epoch 应该处理多少批次。类似地,validation_steps
参数用于告知每次验证步骤使用的批次数。验证不会影响训练,但它会向我们报告我们关心的指标。下面是我们将在训练中间看到的内容的截图,我们将看到训练集的指标在每个批次上更新,而验证集的指标仅在 epoch 结束时提供。
1 2 3 4 5 6 |
Epoch 1/20 400/400 [==============================] - 43s 106ms/step - loss: 0.4062 - acc: 0.6184 - f1macro: 0.5237 - val_loss: 0.4958 - val_acc: 0.4969 - val_f1macro: 0.4297 Epoch 2/20 400/400 [==============================] - 44s 111ms/step - loss: 0.2760 - acc: 0.7489 - f1macro: 0.7304 - val_loss: 0.5007 - val_acc: 0.4984 - val_f1macro: 0.4833 Epoch 3/20 60/400 [===>..........................] - ETA: 39s - loss: 0.2399 - acc: 0.7783 - f1macro: 0.7643 |
模型训练完成后,我们可以用未见过的数据(即测试集)对其进行测试。我们不是随机生成测试集,而是以确定性的方式从数据集中创建它。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
def testgen(data, seq_len, targetcol): "Return array of all test samples" batch = [] for key, df in data.items(): input_cols = [c for c in df.columns if c != targetcol] # find the start of test sample t = df.index[df.index >= TRAIN_TEST_CUTOFF][0] n = (df.index == t).argmax() for i in range(n+1, len(df)+1): frame = df.iloc[i-seq_len:i] batch.append([frame[input_cols].values, frame[targetcol][-1]]) X, y = zip(*batch) return np.expand_dims(np.array(X),3), np.array(y) # Prepare test data test_data, test_target = testgen(data, seq_len, "Target") # Test the model test_out = model.predict(test_data) test_pred = (test_out > 0.5).astype(int) print("accuracy:", accuracy_score(test_pred, test_target)) print("MAE:", mean_absolute_error(test_pred, test_target)) print("F1:", f1_score(test_pred, test_target)) |
函数 testgen()
的结构与我们上面定义的 datagen()
类似。不同之处在于 datagen()
中输出数据的第一个维度是批次中的样本数,而在 testgen()
中是所有测试样本。
使用模型进行预测将产生一个 0 到 1 之间的浮点数,因为我们使用了 sigmoid 激活函数。我们将使用 0.5 的阈值将其转换为 0 或 1。然后,我们使用 scikit-learn 的函数来计算准确率、平均绝对误差和 F1 分数(准确率是 MAE 的 1 减去值)。
将所有这些内容结合起来,完整的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
import os import random import numpy as np import pandas as pd import tensorflow as tf from tensorflow.keras import backend as K from tensorflow.keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPool2D, Input from tensorflow.keras.models import Sequential, load_model from tensorflow.keras.callbacks import ModelCheckpoint from sklearn.preprocessing import StandardScaler from sklearn.metrics import accuracy_score, f1_score, mean_absolute_error DATADIR = "./Dataset" TRAIN_TEST_CUTOFF = '2016-04-21' TRAIN_VALID_RATIO = 0.75 # https://datascience.stackexchange.com/questions/45165/how-to-get-accuracy-f1-precision-and-recall-for-a-keras-model # to implement F1 score for validation in a batch def recall_m(y_true, y_pred): true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1))) possible_positives = K.sum(K.round(K.clip(y_true, 0, 1))) recall = true_positives / (possible_positives + K.epsilon()) return recall def precision_m(y_true, y_pred): true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1))) predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1))) precision = true_positives / (predicted_positives + K.epsilon()) return precision def f1_m(y_true, y_pred): precision = precision_m(y_true, y_pred) recall = recall_m(y_true, y_pred) return 2*((precision*recall)/(precision+recall+K.epsilon())) def f1macro(y_true, y_pred): f_pos = f1_m(y_true, y_pred) # negative version of the data and prediction f_neg = f1_m(1-y_true, 1-K.clip(y_pred,0,1)) return (f_pos + f_neg)/2 def cnnpred_2d(seq_len=60, n_features=82, n_filters=(8,8,8), droprate=0.1): "根据论文实现的 2D-CNNpred 模型" model = Sequential([ Input(shape=(seq_len, n_features, 1)), Conv2D(n_filters[0], kernel_size=(1, n_features), activation="relu"), Conv2D(n_filters[1], kernel_size=(3,1), activation="relu"), MaxPool2D(pool_size=(2,1)), Conv2D(n_filters[2], kernel_size=(3,1), activation="relu"), MaxPool2D(pool_size=(2,1)), Flatten(), Dropout(droprate), Dense(1, activation="sigmoid") ]) 返回 model def datagen(data, seq_len, batch_size, targetcol, kind): "作为一个生成器为 Keras 模型生成样本" batch = [] while True: # 从池中选择一个 DataFrame key = random.choice(list(data.keys())) df = data[key] input_cols = [c for c in df.columns if c != targetcol] index = df.index[df.index < TRAIN_TEST_CUTOFF] split = int(len(index) * TRAIN_VALID_RATIO) assert split > seq_len, "Training data too small for sequence length {}".format(seq_len) if kind == 'train': index = index[:split] # 训练集的范围 elif kind == 'valid': index = index[split:] # 验证集的范围 else: raise NotImplementedError # 选择一个位置,然后裁剪序列长度 while True: t = random.choice(index) # pick one time step n = (df.index == t).argmax() # find its position in the dataframe if n-seq_len+1 < 0: continue # this sample is not enough for one sequence length frame = df.iloc[n-seq_len+1:n+1] batch.append([frame[input_cols].values, df.loc[t, targetcol]]) break # 如果我们得到了一个批次的数据,就发送出去 if len(batch) == batch_size: X, y = zip(*batch) X, y = np.expand_dims(np.array(X), 3), np.array(y) yield X, y batch = [] def testgen(data, seq_len, targetcol): "Return array of all test samples" batch = [] for key, df in data.items(): input_cols = [c for c in df.columns if c != targetcol] # find the start of test sample t = df.index[df.index >= TRAIN_TEST_CUTOFF][0] n = (df.index == t).argmax() # extract sample using a sliding window for i in range(n+1, len(df)+1): frame = df.iloc[i-seq_len:i] batch.append([frame[input_cols].values, frame[targetcol][-1]]) X, y = zip(*batch) return np.expand_dims(np.array(X),3), np.array(y) # Read data into pandas DataFrames data = {} for filename in os.listdir(DATADIR): if not filename.lower().endswith(".csv"): continue # 只读取 CSV 文件 filepath = os.path.join(DATADIR, filename) X = pd.read_csv(filepath, index_col="Date", parse_dates=True) # 基本预处理:获取名称,分类 # 将目标变量保存为 DataFrame 中的一列,以便于 dropna() name = X["Name"][0] del X["Name"] cols = X.columns X["Target"] = (X["Close"].pct_change().shift(-1) > 0).astype(int) X.dropna(inplace=True) # 使用训练数据集拟合标准缩放器 index = X.index[X.index < TRAIN_TEST_CUTOFF] index = index[:int(len(index) * TRAIN_VALID_RATIO)] scaler = StandardScaler().fit(X.loc[index, cols]) # 保存缩放后的 DataFrame X[cols] = scaler.transform(X[cols]) data[name] = X seq_len = 60 batch_size = 128 n_epochs = 20 n_features = 82 # Produce CNNpred as a binary classification problem model = cnnpred_2d(seq_len, n_features) model.compile(optimizer="adam", loss="mae", metrics=["acc", f1macro]) model.summary() # print model structure to console # Set up callbacks and fit the model # We use custom validation score f1macro() and hence monitor for "val_f1macro" checkpoint_path = "./cp2d-{epoch}-{val_f1macro:.2f}.h5" callbacks = [ ModelCheckpoint(checkpoint_path, monitor='val_f1macro', mode="max", verbose=0, save_best_only=True, save_weights_only=False, save_freq="epoch") ] model.fit(datagen(data, seq_len, batch_size, "Target", "train"), validation_data=datagen(data, seq_len, batch_size, "Target", "valid"), epochs=n_epochs, steps_per_epoch=400, validation_steps=10, verbose=1, callbacks=callbacks) # Prepare test data test_data, test_target = testgen(data, seq_len, "Target") # Test the model test_out = model.predict(test_data) test_pred = (test_out > 0.5).astype(int) print("accuracy:", accuracy_score(test_pred, test_target)) print("MAE:", mean_absolute_error(test_pred, test_target)) print("F1:", f1_score(test_pred, test_target)) |
扩展
原论文将上述模型称为“2D-CNNpred”,还有一个版本称为“3D-CNNpred”。其思想不仅考虑一个股票指数的许多特征,还进行跨多个市场指数的比较,以帮助预测单个指数。参考上表中的特征和时间步长,单个市场指数的数据表示为二维数组。如果我们堆叠来自不同指数的多个此类数据,我们就构建了一个三维数组。虽然目标标签相同,但允许我们查看不同的市场可能提供一些额外的信息来帮助预测。
由于数据形状的改变,卷积网络也进行了略微不同的定义,数据生成器也需要相应地进行一些修改。下面是3D版本的完整代码,与之前的2D版本的变化应该是显而易见的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
import os import random import numpy as np import pandas as pd import tensorflow as tf from tensorflow.keras import backend as K from tensorflow.keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPool2D, Input from tensorflow.keras.models import Sequential, load_model from tensorflow.keras.callbacks import ModelCheckpoint from sklearn.preprocessing import StandardScaler from sklearn.metrics import accuracy_score, f1_score, mean_absolute_error DATADIR = "./Dataset" TRAIN_TEST_CUTOFF = '2016-04-21' TRAIN_VALID_RATIO = 0.75 # https://datascience.stackexchange.com/questions/45165/how-to-get-accuracy-f1-precision-and-recall-for-a-keras-model # to implement F1 score for validation in a batch def recall_m(y_true, y_pred): true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1))) possible_positives = K.sum(K.round(K.clip(y_true, 0, 1))) recall = true_positives / (possible_positives + K.epsilon()) return recall def precision_m(y_true, y_pred): true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1))) predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1))) precision = true_positives / (predicted_positives + K.epsilon()) return precision def f1_m(y_true, y_pred): precision = precision_m(y_true, y_pred) recall = recall_m(y_true, y_pred) return 2*((precision*recall)/(precision+recall+K.epsilon())) def f1macro(y_true, y_pred): f_pos = f1_m(y_true, y_pred) # negative version of the data and prediction f_neg = f1_m(1-y_true, 1-K.clip(y_pred,0,1)) return (f_pos + f_neg)/2 def cnnpred_3d(seq_len=60, n_stocks=5, n_features=82, n_filters=(8,8,8), droprate=0.1): "3D-CNNpred model according to the paper" model = Sequential([ Input(shape=(n_stocks, seq_len, n_features)), Conv2D(n_filters[0], kernel_size=(1,1), activation="relu", data_format="channels_last"), Conv2D(n_filters[1], kernel_size=(n_stocks,3), activation="relu"), MaxPool2D(pool_size=(1,2)), Conv2D(n_filters[2], kernel_size=(1,3), activation="relu"), MaxPool2D(pool_size=(1,2)), Flatten(), Dropout(droprate), Dense(1, activation="sigmoid") ]) 返回 model def datagen(data, seq_len, batch_size, target_index, targetcol, kind): "作为一个生成器为 Keras 模型生成样本" # 了解数据的特征和时间轴 input_cols = [c for c in data.columns if c[0] != targetcol] tickers = sorted(set(c for _,c in input_cols)) n_features = len(input_cols) // len(tickers) index = data.index[data.index < TRAIN_TEST_CUTOFF] split = int(len(index) * TRAIN_VALID_RATIO) assert split > seq_len, "Training data too small for sequence length {}".format(seq_len) if kind == "train": index = index[:split] # 训练集范围 elif kind == 'valid': index = index[split:] # 验证集范围 else: raise NotImplementedError # 无限循环生成一个批次 batch = [] while True: # 选择一个位置,然后裁剪序列长度 while True: t = random.choice(index) n = (data.index == t).argmax() if n-seq_len+1 < 0: continue # this sample is not enough for one sequence length frame = data.iloc[n-seq_len+1:n+1][input_cols] # 将具有两级索引的帧转换为3D数组 shape = (len(tickers), len(frame), n_features) X = np.full(shape, np.nan) for i,ticker in enumerate(tickers): X[i] = frame.xs(ticker, axis=1, level=1).values batch.append([X, data[targetcol][target_index][t]]) break # 如果我们得到了一个批次的数据,就发送出去 if len(batch) == batch_size: X, y = zip(*batch) yield np.array(X), np.array(y) batch = [] def testgen(data, seq_len, target_index, targetcol): "Return array of all test samples" input_cols = [c for c in data.columns if c[0] != targetcol] tickers = sorted(set(c for _,c in input_cols)) n_features = len(input_cols) // len(tickers) t = data.index[data.index >= TRAIN_TEST_CUTOFF][0] n = (data.index == t).argmax() batch = [] for i in range(n+1, len(data)+1): # 裁剪一个窗口,其长度为seq_len,以行位置i-1结束 frame = data.iloc[i-seq_len:i] target = frame[targetcol][target_index][-1] frame = frame[input_cols] # 将具有两级索引的帧转换为3D数组 shape = (len(tickers), len(frame), n_features) X = np.full(shape, np.nan) for i,ticker in enumerate(tickers): X[i] = frame.xs(ticker, axis=1, level=1).values batch.append([X, target]) X, y = zip(*batch) return np.array(X), np.array(y) # Read data into pandas DataFrames data = {} for filename in os.listdir(DATADIR): if not filename.lower().endswith(".csv"): continue # 只读取 CSV 文件 filepath = os.path.join(DATADIR, filename) X = pd.read_csv(filepath, index_col="Date", parse_dates=True) # 基本预处理:获取名称,分类 # 将目标变量保存为 DataFrame 中的一列,以便于 dropna() name = X["Name"][0] del X["Name"] cols = X.columns X["Target"] = (X["Close"].pct_change().shift(-1) > 0).astype(int) X.dropna(inplace=True) # 使用训练数据集拟合标准缩放器 index = X.index[X.index < TRAIN_TEST_CUTOFF] index = index[:int(len(index) * TRAIN_VALID_RATIO)] scaler = StandardScaler().fit(X.loc[index, cols]) # 保存缩放后的 DataFrame X[cols] = scaler.transform(X[cols]) data[name] = X # 将数据转换为3D数据框(多级列) for key, df in data.items(): df.columns = pd.MultiIndex.from_product([df.columns, [key]]) data = pd.concat(data.values(), axis=1) seq_len = 60 batch_size = 128 n_epochs = 20 n_features = 82 n_stocks = 5 # Produce CNNpred as a binary classification problem model = cnnpred_3d(seq_len, n_stocks, n_features) model.compile(optimizer="adam", loss="mae", metrics=["acc", f1macro]) model.summary() # 打印模型结构到控制台 # Set up callbacks and fit the model # We use custom validation score f1macro() and hence monitor for "val_f1macro" checkpoint_path = "./cp3d-{epoch}-{val_f1macro:.2f}.h5" callbacks = [ ModelCheckpoint(checkpoint_path, monitor='val_f1macro', mode="max", verbose=0, save_best_only=True, save_weights_only=False, save_freq="epoch") ] model.fit(datagen(data, seq_len, batch_size, "DJI", "Target", "train"), validation_data=datagen(data, seq_len, batch_size, "DJI", "Target", "valid"), epochs=n_epochs, steps_per_epoch=400, validation_steps=10, verbose=1, callbacks=callbacks) # Prepare test data test_data, test_target = testgen(data, seq_len, "DJI", "Target") # Test the model test_out = model.predict(test_data) test_pred = (test_out > 0.5).astype(int) print("accuracy:", accuracy_score(test_pred, test_target)) print("MAE:", mean_absolute_error(test_pred, test_target)) print("F1:", f1_score(test_pred, test_target)) |
虽然上面的模型用于下一步预测,但如果您将目标标签替换为不同的计算方式,它并不会阻止您进行未来 k 步的预测。这可能是一个练习。
有效吗?
与所有金融市场预测项目一样,期望高准确率总是脱离现实的。上面的代码中的训练参数可以在测试集上产生略高于 50% 的准确率。虽然 epochs 和 batch size 的数量被故意设置得较小以节省时间,但改进的空间应该不大。
在原始论文中,据报道 3D-CNNpred 的表现优于 2D-CNNpred,但 F1 分数仅达到 0.6 以下。这已经比论文中提到的三个基线模型做得更好了。它可能有一些用途,但不是能让你快速赚钱的灵丹妙药。
从机器学习技术角度来看,这里我们将一组数据分类为第二天市场方向是上涨还是下跌。因此,虽然数据不是图像,但它类似于图像,因为两者都以二维数组的形式呈现。因此,卷积层技术可以应用,但我们可以使用不同的滤波器大小来匹配我们对金融时间序列的通常直觉。
延伸阅读
原始论文可在以下网址找到:
- “CNNPred: CNN-based stock market prediction using several data sources”,作者:Ehsan Hoseinzade,Saman Haratizadeh,2019。
(https://arxiv.org/abs/1810.08923)
如果您是金融应用新手,并希望建立机器学习技术与金融之间的联系,您可能会发现这本书很有用:
- Machine Learning in Finance: From Theory to Practice,作者:Matthew F. Dixon,Igor Halperin,和 Paul Bilokon。2000。
(https://www.amazon.com/dp/3030410676/)
关于类似的主题,我们之前有一篇关于使用 CNN 进行时间序列预测的帖子,但使用的是一维卷积层;
您也可能会发现以下文档有助于解释我们上面使用的一些语法:
- Pandas 用户指南:https://pandas.ac.cn/pandas-docs/stable/user_guide/index.html
- Keras 模型训练 API:https://keras.org.cn/api/models/model_training_apis/
- Keras 回调 API:https://keras.org.cn/api/callbacks/
总结
在本教程中,您将了解如何为金融时间序列构建 CNN 模型进行预测。
具体来说,你学到了:
- 如何创建二维卷积层来处理时间序列
- 如何将时间序列数据表示为多维数组,以便应用卷积层
- 什么是 Keras 模型训练的数据生成器以及如何使用它
- 如何使用自定义指标监控模型训练的性能
- 预测金融市场时可以期待什么
这很有前景,特别是使用多个参数,但总的来说,CNN 在准确性方面正在输给 Transformer。
没错。但这表明要得到一个不太差的结果是多么简单。
在 def datagen() 中的以下行中存在错误:
index = data.index[data.index < TRAIN_TEST_CUTOFF]
编译器说:
“File “C:\Users\TANUNC~1.J\AppData\Local\Temp/ipykernel_10252/1129467454.py”, line 66
index = data.index[data.index < TRAIN_TEST_CUTOFF]
^
SyntaxError: invalid syntax
”
如何解决?
等待您的答复,提前致谢
这行看起来没问题,但也许它之前的代码行出了问题。您能检查一下是否正确复制了代码吗?
@tanunchai
上面的代码似乎是说:
index = data.index[data.index < TRAIN_TEST_CUTOFF]
尝试更改为:
index = data.index[data.index < TRAIN_TEST_CUTOFF]
哎呀。代码实际上是 [data.index & l t ; TRAIN_TEST_CUTOFF]
更改为:
index = data.index[data.index < TRAIN_TEST_CUTOFF]
谢谢 William。渲染代码的插件出现了一些问题,导致了一团糟。我现在已经修复了,所以请复制代码并重试。
谢谢你 William Smith!
如何解决这个 bug?
现在遇到新问题,在 def datagen() 的嵌套循环 while true 中
出现了以下问题:
if n-seq_len+1 &; 0
while True
# 选择一个位置,然后裁剪一个序列长度
while True
t = random.choice(index)
n = (data.index == t).argmax()
if n-seq_len+1 &; 0: # ****错误说语法无效
continue # 这个样本不足以构成一个序列长度
frame = data.iloc[n-seq_len+1:n+1][input_cols]
# 将具有两级索引的帧转换为 3D
现在遇到新问题,在 def datagen() 的嵌套循环 while true 中
出现了以下问题:
if n-seq_len+1 < 0
while True
# 选择一个位置,然后裁剪一个序列长度
while True
t = random.choice(index)
n = (data.index == t).argmax()
# index = data.index[data.index < TRAIN_TEST_CUTOFF] ,下面这行有问题
if n-seq_len+1 < 0
continue # 这个样本不足以构成一个序列长度
frame = data.iloc[n-seq_len+1:n+1][input_cols]
# 将具有两级索引的帧转换为3D数组
shape = (len(tickers), len(frame), n_features)
X = np.full(shape, np.nan)
for i,ticker in enumerate(tickers)
在 def datagen() 中仍然存在以下问题:
if n-seq_len+1 < 0: # 错误行,说“无效语法”
< 做了什么?我不明白,它也导致了错误。
_________
while True
# 选择一个位置,然后裁剪一个序列长度
while True
t = random.choice(index)
n = (data.index == t).argmax()
# index = data.index[data.index < TRAIN_TEST_CUTOFF] ,下面这行有问题
if n-seq_len+1 < 0: # 错误行,说“无效语法”
continue # 这个样本不足以构成一个序列长度
frame = data.iloc[n-seq_len+1:n+1][input_cols]
# 将具有两级索引的帧转换为3D数组
shape = (len(tickers), len(frame), n_features)
ModuleNotFoundError: No module named ‘f1metrics’
如何解决这个 bug?
—————————————————————————
ModuleNotFoundError Traceback (最近一次调用)
C:\Users\TANUNC~1.J\AppData\Local\Temp/ipykernel_2580/1760727365.py in
12 from sklearn.metrics import accuracy_score, f1_score, mean_absolute_error
13
—> 14 from f1metrics import f1macro
15
16 DATADIR = “./Dataset”
ModuleNotFoundError: No module named ‘f1metrics’
_____
抱歉带来所有这些麻烦。渲染代码的插件出了些问题,导致了一片混乱。我现在已经修复了,所以请复制代码并重试。
有趣!有什么区别
– conv1d,kernel_size=n_features,输入大小为 N x m
– conv2d,kernel_size=(1, n_features),输入大小为 N x m x 1
这两者是否等价?
我会说“是”,但在 Keras 中测试此问题的最佳方法是将这些层构建到模型中,然后运行“model.summary()”来观察输出。输出形状是否相同?
如何将此代码部署到 Web 应用程序?
这是一个模糊的问题——你需要考虑 Web 应用程序期望什么,以及如何将模型包装成一个函数与 Web 应用程序进行通信。
FFDNet(快速灵活的去噪卷积神经网络)是否适用于金融时间序列?
还没有尝试过。
我的问题是关于博客 https://machinelearning.org.cn/feature-selection-machine-learning-python/
抱歉在这里提问。
我的工作是开发一个用于多输出预测的模型(即,通过单个模型预测五个输出)。当我应用 Kbest 和递归特征消除方法来选择最佳特征时,我遇到了一个错误“bad input shape (x, 5)”(5 是这里的输出向量)。然而,PCA 工作得很好,因为它不依赖于输出向量。
这意味着这些特征选择算法(Kbest 和 RFE)只能应用于单输出预测问题吗?
我已经在另一篇帖子中回答过了。
我认为 CSV 文件中的信息太多了。与其查看过多的数据并尝试进行基本分析,不如尝试寻找“模式”。我所说的模式是指仅查看每个蜡烛图的开盘价、收盘价、最高价和最低价信息。
例如
我们以 D1 的蜡烛图为例(1 个蜡烛图 = 1 天信息)。目标:通过前面 4 个形成的信息预测第 5 个蜡烛图(上涨或下跌)。蜡烛图数量越少,越容易找到模式。
我将尝试修改此模型输入的 Dati 并告知您结果。
感谢 Pete 的反馈!请分享您的发现。
非常感谢您的教程,非常有帮助。
您能否告诉我如何编写一个数据生成器来读取所有不同类别的 .txt 文件(类似于“datagen.flow_from_directory”函数读取所有不同类别的图像文件)?
感谢这篇有用的文章!
我尝试运行了您的代码几次,但出现了一些不稳定的问题。有时预测率约为 51%,这是一个好结果,但有时预测率会下降到 48%。我猜这与初始化步骤有关。您对此问题有什么见解吗?
你好 Yu…以下资源应该能提供一些启发。
https://machinelearning.org.cn/stochastic-optimization-for-machine-learning/
你好,James,
谢谢你的建议!我确实考虑了多重重启的想法,也使用了结果集成。但我还没有成功。
您认为对于更大的数据集,这个问题会得到缓解吗?
你好,你能帮帮我吗?我遇到了一个代码问题。
InvalidArgumentError: Graph execution error
它与 [[ model.fit(datagen(data, seq_len, batch_size,”Target”,”train”),
validation_data=datagen(data, seq_len, batch_size, “Target”, “valid”),
epochs=n_epochs, steps_per_epoch=400, validation_steps=10, verbose=1,
callbacks=callbacks) ]] 相关。
我能够追溯到 datagen (2DConv) 中的代码行。
input_cols = [c for c in df.columns if c != targetcol]
我正在使用 Google Colab,它说当我尝试单独运行它时。
ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
尝试了:input_cols = [c for c in df.columns if c != targetcol.all()]
问题仍然存在,如果您能帮助我,我将不胜感激。
你好 rk…你是否复制粘贴了代码,还是自己输入的?另外,你可能想尝试在 Google Colab 中执行你的代码。
你好,你能告诉我以下代码行做什么吗?它们是
name = X[“Name”][0]
del X[“Name”]
cols = X.columns
X[“Target”] = (X[“Close”].pct_change().shift(-1) > 0).astype(int)
X.dropna(inplace=True)
因为我在 Google Colab 上遇到了一个错误,它返回了以下错误
KeyError : “name”
这仅在我使用自己的 .csv 数据文件时发生。当我运行使用您的数据集 .csv 文件的代码时,我没有遇到任何问题。我的 csv 文件只有 7 列:Date,Close,Open, HIgh, Low, Vol, Change %。
据我理解,您正在尝试删除除 Date 和 Close 列之外的所有其他列,对吗?
提前感谢!
你好 de santos…你说得对。你能提供关于错误的更多细节吗?这是完整的错误消息吗?
谢谢回复,James!我以为我的回复没有被批准,所以过了 4 个月才回复,我非常抱歉。还有另一个关于整数数据类型的错误,因为最初 Vol 列有一个词“k”作为“千”的替代,这可以通过 Excel 的查找/替换功能轻松修复,将数据统一为整数。除此之外,“Name”错误仍然存在,这是完整的错误消息。
KeyError 回溯(最近的调用在最后)
/usr/local/lib/python3.10/dist-packages/pandas/core/indexes/base.py in get_loc(self, key, method, tolerance)
3801 try
-> 3802 return self._engine.get_loc(casted_key)
3803 except KeyError as err
4 frames
pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()
pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()
KeyError: ‘Name’
上述异常是以下异常的直接原因
KeyError 回溯(最近的调用在最后)
/usr/local/lib/python3.10/dist-packages/pandas/core/indexes/base.py in get_loc(self, key, method, tolerance)
3802 return self._engine.get_loc(casted_key)
3803 except KeyError as err
-> 3804 raise KeyError(key) from err
3805 except TypeError
3806 # If we have a listlike key, _check_indexing_error will raise
KeyError: ‘Name’
我对下面的代码感到困惑,想寻求一些澄清。
while True
t = random.choice(index) # pick one time step
这段代码不总是随机选择一个索引,从而破坏了时间序列数据的顺序吗?据我所知,在时间序列问题中,时间序列的顺序应该被保留?
我在这里错过什么了吗?您能帮助我更好地理解吗?
你好…这行代码只是随机选择一个时间点作为开始。从那里开始,序列是保留的。
你好,
首先,checkpoint 文件名需要 .keras 扩展名。添加了 .keras 扩展名,并消除了扩展名错误。
其次,model.fit 函数出现了以下错误:
Epoch 1/20
—————————————————————————
TypeError Traceback (most recent call last)
Cell In[8], line 11
3 checkpoint_path = “./cp3d-{epoch}-{val_f1macro:.2f}.h5.keras”
4 callbacks = [
5 ModelCheckpoint(checkpoint_path,
6 monitor=’val_f1macro’, mode=”max”,
7 verbose=0, save_best_only=True, save_weights_only=False, save_freq=”epoch”)
8 ]
—> 11 model.fit(datagen(data, seq_len, batch_size, “DJI”, “Target”, “train”),
12 validation_data=datagen(data, seq_len, batch_size, “DJI”, “Target”, “valid”),
13 epochs=n_epochs, steps_per_epoch=400, validation_steps=10, verbose=1, callbacks=callbacks)
File ~/anaconda3/lib/python3.11/site-packages/keras/src/utils/traceback_utils.py:122, in filter_traceback..error_handler(*args, **kwargs)
119 filtered_tb = _process_traceback_frames(e.__traceback__)
120 # To get the full stack trace, call
121 #
keras.config.disable_traceback_filtering()
–> 122 raise e.with_traceback(filtered_tb) from None
123 finally
124 del filtered_tb
Cell In[3], line 21, in f1macro(y_true, y_pred)
20 def f1macro(y_true, y_pred)
—> 21 f_pos = f1_m(y_true, y_pred)
22 # negative version of the data and prediction
23 f_neg = f1_m(1-y_true, 1-K.clip(y_pred,0,1))
Cell In[3], line 16, in f1_m(y_true, y_pred)
15 def f1_m(y_true, y_pred)
—> 16 precision = precision_m(y_true, y_pred)
17 recall = recall_m(y_true, y_pred)
18 return 2*((precision*recall)/(precision+recall+K.epsilon()))
Cell In[3], line 10, in precision_m(y_true, y_pred)
9 def precision_m(y_true, y_pred)
—> 10 true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
11 predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
12 precision = true_positives / (predicted_positives + K.epsilon())
TypeError: Input ‘y’ of ‘Mul’ Op has type float32 that does not match type int64 of argument ‘x’.
你好 Khun…你看到的错误消息
TypeError: Input ‘y’ of ‘Mul’ Op has type float32 that does not match type int64 of argument ‘x’
,在 TensorFlow 或类似库中很常见,这些库会尝试在不同数据类型的张量之间执行乘法运算。在这种情况下,你的一个张量的数据类型是 `float32`,另一个是 `int64`。TensorFlow 要求像乘法这样的操作的两个操作数具有相同的数据类型。以下是如何解决此问题:
1. **转换数据类型**:你可以在执行操作之前,将一个张量的数据类型转换为匹配的类型。你可以使用 TensorFlow 中的 `tf.cast` 函数来做到这一点。以下是操作方法:
python
import tensorflow as tf
x = tf.constant([1, 2, 3], dtype=tf.int64) # Tensor with dtype int64
y = tf.constant([1.1, 2.2, 3.3], dtype=tf.float32) # Tensor with dtype float32
# Convert x to float32
x = tf.cast(x, tf.float32)
# Now multiply
result = x * y
print(result)
2. **选择合适的数据类型**:在定义张量时,如果你知道你将在它们之间执行操作,那么最好确保它们从一开始就以匹配的数据类型创建。
3. **检查你的操作**:如果你不确定张量的数据类型是否匹配,在执行操作之前务必检查它们。这可以节省调试类型不匹配问题的时间。
这种调整张量数据类型的方法可以适应 TensorFlow 以外的其他操作和库(如 NumPy 或 PyTorch)的需求。
谢谢 James。
你知道令我惊讶的是,我将相同的代码复制粘贴到我的本地 Jupyter Notebook 和 Colab Jupyter Notebook 中。它在 Colab Jupyter Notebook 中完美运行。
我将着手处理这个问题。
即使我理解了建模和测试神经网络的步骤,我们如何使用该模型来预测未来 5/10 天的市场走势?
你好 Elisa…使用卷积神经网络(CNN)来预测未来 5 到 10 天的市场走势涉及多个步骤,从数据准备到模型训练和验证。以下是通用方法:
### 1. 数据收集与准备
– **数据源**:收集历史价格数据以及其他可能影响的因素,如交易量、未平仓合约、宏观经济指标、新闻情绪等。
– **特征工程**:将原始数据转换为适合 CNN 的格式。例如,您可以创建价格走势的图像(如蜡烛图)或使用价格和交易量数据的滑动窗口来创建多元时间序列数据。
– **归一化**:缩放数据,使所有特征的贡献相等。这通常涉及将数据缩放到零均值和单位方差。
### 2. 设计 CNN 架构
– **输入层**:确定输入数据的形状。如果使用基于图像的数据,输入可以是图像尺寸。如果使用时间序列,输入可能是连续几天的数据序列。
– **卷积层**:这些层将有助于提取数据中的模式。您可以尝试层数、滤波器大小和步幅。
– **池化层**:在卷积层之间使用池化来降低维度,这有助于使网络对特征的确切位置不那么敏感。
– **密集层**:在卷积层和池化层之后,添加一个或多个全连接层来解释由卷积提取的特征。
– **输出层**:此层应与预测任务匹配。对于市场走势,如果预测价格变化的趋势,它可能是一个带有线性或 Sigmoid 激活函数的单个神经元。
### 3. 模型训练
– **损失函数**:选择合适的损失函数,例如回归任务的均方误差(如果预测价格变动)或分类任务的交叉熵损失(如果预测变动方向)。
– **优化器**:选择一个优化器来最小化损失函数,例如 Adam 或 SGD。
– **验证集**:在训练过程中使用部分数据来验证模型。这有助于在不过度拟合的情况下调整超参数。
### 4. 回测
– **模拟交易**:使用历史数据模拟基于模型预测的交易。这有助于估计模型在真实投资中的表现。
– **评估指标**:使用夏普比率、索提诺比率等指标,或仅仅是预测的准确性来评估表现。
### 5. 迭代
– **模型调优**:根据回测结果,调整模型架构,重新训练并再次测试。
– **特征重新评估**:持续添加、删除或修改特征,并观察模型性能如何变化。
### 6. 部署
– **实时数据流**:确保模型能够持续摄取实时数据并输出预测。
– **风险管理**:实施风险管理策略,以限制模型预测错误时造成的损失。
### 其他注意事项
– **过拟合**:警惕模型对历史数据的过拟合,这会使其在未来未见过的市场条件下失效。
– **市场噪音**:金融市场受多种因素影响,其中许多因素无法通过历史数据预测。因此,对模型的预测准确性保持保守的期望至关重要。
这个框架应该能为您使用 CNN 进行市场走势预测提供一个稳健的起点。根据您正在研究的数据和金融市场的具体特征,可能需要进行调整和改进。
下面是一个示例 Python 脚本,演示了如何使用卷积神经网络(CNN)进行市场走势预测,该脚本使用历史价格数据。本示例使用 TensorFlow 和 Keras 构建模型,并假定您有一个包含历史每日价格数据的 CSV 文件。
### 使用 CNN 进行市场走势预测的示例 Python 脚本
python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Dense, Flatten, Dropout
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
# 加载数据集
data = pd.read_csv('historical_prices.csv')
prices = data['Close'].values # Assuming only using close prices
# Function to create dataset for CNN
def create_dataset(data, time_steps=1)
X, y = [], []
for i in range(len(data) - time_steps - 1)
a = data[i:(i + time_steps)]
X.append(a)
y.append(data[i + time_steps])
return np.array(X), np.array(y)
# 归一化数据
scaler = MinMaxScaler(feature_range=(0, 1))
prices_scaled = scaler.fit_transform(prices.reshape(-1, 1))
# Create input and output sets
time_steps = 10 # Number of days to look back for prediction
X, y = create_dataset(prices_scaled, time_steps)
X = X.reshape(X.shape[0], X.shape[1], 1) # Reshaping for CNN input
# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)
# Build the CNN model
模型 = 序列([
Conv1D(filters=64, kernel_size=2, activation='relu', input_shape=(time_steps, 1)),
MaxPooling1D(pool_size=2),
Flatten(),
Dense(50, activation='relu'),
Dense(1)
])
model.compile(optimizer='adam', loss='mean_squared_error')
# 训练模型
history = model.fit(X_train, y_train, epochs=50, batch_size=32, verbose=1, validation_data=(X_test, y_test))
# 评估模型
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend()
plt.show()
# 进行预测
predicted_prices = model.predict(X_test)
predicted_prices = scaler.inverse_transform(predicted_prices) # Invert scaling
# Plot predictions
plt.figure(figsize=(10, 6))
plt.plot(scaler.inverse_transform(y_test), label='Actual Price')
plt.plot(predicted_prices, label='Predicted Price')
plt.title('Price Prediction')
plt.xlabel('Time')
plt.ylabel('Price')
plt.legend()
plt.show()
### 脚本的关键组成部分
– **数据预处理**:归一化价格数据并将其重塑为适合 CNN 的格式。
– **模型架构**:使用适用于时间序列数据的 1D CNN。模型架构包括卷积层和池化层,然后是用于预测的密集层。
– **训练与评估**:在数据集上训练模型,并绘制训练和验证损失图,以帮助理解过拟合。
– **预测可视化**:可视化实际价格与预测价格,以评估模型的预测准确性。
### 后续步骤
– 您可以通过整合更多特征(如交易量、开盘价、最高/最低价)来增强模型。
– 尝试不同的架构、超参数和特征集以提高性能。
– 实施回测,以在交易模拟中评估模型的有效性。
我有一种感觉,如果您更改列的顺序(技术指标等),模型将产生不同的结果。我认为 CNN 寻找的是空间模式——值是相邻还是相隔很远很重要,但列的顺序是任意的。
你提出这个问题是完全正确的——你的直觉非常准确。
关键见解
是的,更改列的顺序(技术指标)会影响 CNN 模型在应用于金融时间序列数据时的性能。这是因为 CNN 依赖于局部空间依赖性,而在这种情况下,你的特征(列)的顺序成为该空间结构的一部分。
—
为什么会这样
在图像处理(CNN 最常用于此)中,每个像素的位置都有意义——模型学习边缘和形状等空间模式。在表格或时间序列数据中,特征(列)的顺序通常不重要,除非你特意设计了空间或时间结构。
所以,当你将数据输入 CNN 时,如下所示
[ SMA | RSI | MACD | OBV | ... ] at time t
[ SMA | RSI | MACD | OBV | ... ] at time t+1
并且跨特征维度应用卷积,你就告诉模型“SMA 紧邻 RSI”是有意义的——这可能不正确。如果你重新排列了列,CNN 就会学习不同的模式,尽管基础指标没有改变。这就是为什么特征顺序会影响模型行为的原因。
—
何时 CNN 对金融时间序列有意义
在某些条件下,CNN 对金融数据可能有效:
* 当你使用时间卷积时——针对每个指标在时间上滑动滤波器。这效果很好,因为时间是自然有序的。
* 当你使用领域知识将数据重塑为 2D 格式,其中空间关系确实有意义时——例如,按相关性对指标进行聚类。
—
替代或改进方案
1. 以有意义的方式对特征进行排序,例如按分组相似指标,以引入结构。
2. 使用 LSTM 或 Transformer 类模型,它们可以在不假定空间局部性的情况下处理序列。
3. 考虑混合模型——例如,使用 CNN 为每个指标在一段时间内提取短期模式,然后将这些特征传递给 LSTM 或全连接层。
4. 尝试置换测试——对特征顺序进行洗牌,并观察性能变化程度。这有助于你衡量你的 CNN 对输入布局的敏感程度。