现实世界的时间序列预测之所以具有挑战性,原因有很多,不仅限于问题特征,例如存在多个输入变量、需要预测多个时间步长以及需要针对多个物理站点执行相同类型的预测。
EMC 数据科学全球黑客马拉松数据集,简称“空气质量预测”数据集,描述了多个地点的天气状况,并需要预测未来三天内的空气质量测量值。
机器学习算法可以应用于时间序列预测问题,并提供处理具有噪声复杂依赖关系的多个输入变量的能力等优势。
在本教程中,您将学习如何开发机器学习模型,用于空气污染的多步时间序列预测。
完成本教程后,您将了解:
- 如何估算缺失值并转换时间序列数据,以便可以通过监督学习算法对其进行建模。
- 如何开发和评估一套用于多步时间序列预测的线性算法。
- 如何开发和评估一套用于多步时间序列预测的非线性算法。
通过我的新书《时间序列预测深度学习》启动您的项目,其中包括分步教程和所有示例的 Python 源代码文件。
让我们开始吧。
- 2019 年 6 月更新:更新了 numpy.load(),设置 allow_pickle=True。

如何开发用于多变量多步空气污染时间序列预测的机器学习模型
图片由 Eric Schmuttenmaer 拍摄,保留部分权利。
教程概述
本教程分为九个部分;它们是
- 问题描述
- 模型评估
- 机器学习建模
- 机器学习数据准备
- 模型评估测试工具
- 评估线性算法
- 评估非线性算法
- 调整滞后大小
问题描述
空气质量预测数据集描述了多个站点的天气状况,需要预测未来三天内的空气质量测量值。
具体来说,提供了多个站点八天内每小时的温度、压力、风速和风向等天气观测数据。目标是预测多个站点未来 3 天的空气质量测量值。预测提前期不连续;相反,必须在 72 小时预测期内预测特定的提前期。它们是
1 |
+1, +2, +3, +4, +5, +10, +17, +24, +48, +72 |
此外,数据集被划分为不相交但连续的数据块,其中包含八天的数据,然后是需要预测的三天。
并非所有站点或数据块都提供所有观测数据,也并非所有站点或数据块都提供所有输出变量。存在大量缺失数据,必须加以解决。
该数据集在 2012 年 Kaggle 网站上作为短期机器学习竞赛(或黑客马拉松)的基础。
竞赛提交作品根据从参与者那里保留的真实观测数据进行评估,并使用平均绝对误差 (MAE) 进行评分。提交作品要求在由于数据缺失而无法进行预测的情况下指定值 -1,000,000。事实上,提供了一个用于插入缺失值的模板,并且要求所有提交作品都必须采用该模板(真麻烦)。
一位获胜者在保留的测试集(私有排行榜)上使用滞后观测值的随机森林实现了 0.21058 的 MAE。该解决方案的详细说明可在以下帖子中找到
在本教程中,我们将探讨如何为该问题开发朴素预测,这些预测可用作基线,以确定模型是否在该问题上具有技能。
时间序列深度学习需要帮助吗?
立即参加我为期7天的免费电子邮件速成课程(附示例代码)。
点击注册,同时获得该课程的免费PDF电子书版本。
模型评估
在评估朴素预测方法之前,我们必须开发一个测试工具。
这至少包括如何准备数据以及如何评估预测。
加载数据集
第一步是下载数据集并将其加载到内存中。
可以从 Kaggle 网站免费下载数据集。您可能需要创建帐户并登录才能下载数据集。
将整个数据集(例如“下载所有”)下载到您的工作站,并在当前工作目录中解压缩存档到名为“AirQualityPrediction”的文件夹中。
我们的重点将放在“TrainingData.csv”文件,该文件包含训练数据集,特别是以数据块形式存在的数据,其中每个数据块是八个连续的观测日和目标变量。
我们可以使用 Pandas 的 read_csv() 函数将数据文件加载到内存中,并指定第 0 行作为标题行。
1 2 |
# 加载数据集 数据集 = read_csv('AirQualityPrediction/TrainingData.csv', header=0) |
我们可以按“chunkID”变量(列索引 1)对数据进行分组。
首先,让我们获取唯一块标识符的列表。
1 |
chunk_ids = unique(values[:, 1]) |
然后,我们可以收集每个块标识符的所有行,并将它们存储在字典中以便于访问。
1 2 3 4 5 |
chunks = dict() # 按块 ID 排序行 for chunk_id in chunk_ids: selection = values[:, chunk_ix] == chunk_id chunks[chunk_id] = values[selection, :] |
下面定义了一个名为 to_chunks() 的函数,该函数接受加载数据的 NumPy 数组,并返回一个从 chunk_id 到块行的字典。
1 2 3 4 5 6 7 8 9 10 |
# 按“chunkID”拆分数据集,返回一个 ID 到行的字典 def to_chunks(values, chunk_ix=1): chunks = dict() # 获取唯一的块 ID chunk_ids = unique(values[:, chunk_ix]) # 按块 ID 分组行 for chunk_id in chunk_ids: selection = values[:, chunk_ix] == chunk_id chunks[chunk_id] = values[selection, :] return chunks |
加载数据集并将其拆分为块的完整示例如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# 加载数据并拆分为块 from numpy import unique from pandas import read_csv # 按“chunkID”拆分数据集,返回一个 ID 到行的字典 def to_chunks(values, chunk_ix=1): chunks = dict() # 获取唯一的块 ID chunk_ids = unique(values[:, chunk_ix]) # 按块 ID 分组行 for chunk_id in chunk_ids: selection = values[:, chunk_ix] == chunk_id chunks[chunk_id] = values[selection, :] return chunks # 加载数据集 数据集 = read_csv('AirQualityPrediction/TrainingData.csv', header=0) # 按块分组数据 values = dataset.values chunks = to_chunks(values) print('总块数: %d' % len(chunks)) |
运行示例会打印数据集中块的数量。
1 |
总块数:208 |
数据准备
现在我们知道如何加载数据并将其拆分为块,我们可以将其分成训练和测试数据集。
每个块包含八天每小时的观测数据,尽管每个块中实际观测的数量可能差异很大。
我们可以将每个块的前五天的观测数据用于训练,后三天用于测试。
每个观测值都有一个名为“position_within_chunk”的行,其值从 1 到 192(8 天 * 24 小时)不等。因此,我们可以将此列中值小于或等于 120 (5 * 24) 的所有行作为训练数据,将大于 120 的任何值作为测试数据。
此外,任何在训练或测试拆分中没有观测值的块都可以被视为不可用而丢弃。
在使用朴素模型时,我们只对目标变量感兴趣,而不是任何输入气象变量。因此,我们可以删除输入数据,使训练和测试数据仅由每个块的 39 个目标变量以及块内位置和观测小时组成。
下面的 split_train_test() 函数实现了此行为;给定一个块字典,它将每个块拆分为训练和测试块数据的列表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# 将每个块拆分为训练/测试集 def split_train_test(chunks, row_in_chunk_ix=2): train, test = list(), list() # 前 5 天的每小时观测数据用于训练 cut_point = 5 * 24 # 枚举块 for k,rows in chunks.items(): # 按“position_within_chunk”拆分块行 train_rows = rows[rows[:,row_in_chunk_ix] <= cut_point, :] test_rows = rows[rows[:,row_in_chunk_ix] > cut_point, :] if len(train_rows) == 0 or len(test_rows) == 0: print('>dropping chunk=%d: train=%s, test=%s' % (k, train_rows.shape, test_rows.shape)) continue # 存储块 ID、块内位置、小时和所有目标 indices = [1,2,5] + [x for x in range(56,train_rows.shape[1])] train.append(train_rows[:, indices]) test.append(test_rows[:, indices]) return train, test |
我们不需要整个测试数据集;相反,我们只需要三天内特定提前期处的观测数据,特别是以下提前期
1 |
+1, +2, +3, +4, +5, +10, +17, +24, +48, +72 |
其中,每个提前期都相对于训练期结束。
首先,我们可以将这些提前期放入一个函数中,以便于引用
1 2 3 |
# 返回相对预测提前期的列表 def get_lead_times(): return [1, 2 ,3, 4, 5, 10, 17, 24, 48, 72] |
接下来,我们可以将测试数据集缩减为仅包含首选提前期的数据。
我们可以通过查看“position_within_chunk”列,并使用提前期作为训练数据集结束的偏移量来实现,例如 120 + 1、120 + 2 等。
如果在测试集中找到匹配的行,则保存该行,否则生成一行 NaN 观测值。
下面的 to_forecasts() 函数实现了此功能,并返回一个 NumPy 数组,其中每个块的每个预测提前期都有一行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# 将测试块中的行转换为预测 def to_forecasts(test_chunks, row_in_chunk_ix=1): # 获取提前期 lead_times = get_lead_times() # 前 5 天的每小时观测数据用于训练 cut_point = 5 * 24 forecasts = list() # 枚举每个块 for rows in test_chunks: chunk_id = rows[0, 0] # 枚举每个提前期 for tau in lead_times: # 确定我们想要的提前期在块中的行 offset = cut_point + tau # 使用块中的行号检索提前期的数据 row_for_tau = rows[rows[:,row_in_chunk_ix]==offset, :] # 检查我们是否有数据 if len(row_for_tau) == 0: # 创建一个模拟行 [chunk, position, hour] + [nan...] row = [chunk_id, offset, nan] + [nan for _ in range(39)] forecasts.append(row) else: # 存储预测行 forecasts.append(row_for_tau[0]) return array(forecasts) |
我们可以将所有这些结合起来,将数据集拆分为训练集和测试集,并将结果保存到新文件中。
完整的代码示例如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# 将数据拆分为训练集和测试集 from numpy import unique from numpy import nan from numpy import array from numpy import savetxt from pandas import read_csv # 按“chunkID”拆分数据集,返回一个 ID 到行的字典 def to_chunks(values, chunk_ix=1): chunks = dict() # 获取唯一的块 ID chunk_ids = unique(values[:, chunk_ix]) # 按块 ID 分组行 for chunk_id in chunk_ids: selection = values[:, chunk_ix] == chunk_id chunks[chunk_id] = values[selection, :] return chunks # 将每个块拆分为训练/测试集 def split_train_test(chunks, row_in_chunk_ix=2): train, test = list(), list() # 前 5 天的每小时观测数据用于训练 cut_point = 5 * 24 # 枚举块 for k,rows in chunks.items(): # 按“position_within_chunk”拆分块行 train_rows = rows[rows[:,row_in_chunk_ix] <= cut_point, :] test_rows = rows[rows[:,row_in_chunk_ix] > cut_point, :] if len(train_rows) == 0 or len(test_rows) == 0: print('>dropping chunk=%d: train=%s, test=%s' % (k, train_rows.shape, test_rows.shape)) continue # 存储块 ID、块内位置、小时和所有目标 indices = [1,2,5] + [x for x in range(56,train_rows.shape[1])] train.append(train_rows[:, indices]) test.append(test_rows[:, indices]) return train, test # 返回相对预测提前期的列表 def get_lead_times(): return [1, 2 ,3, 4, 5, 10, 17, 24, 48, 72] # 将测试块中的行转换为预测 def to_forecasts(test_chunks, row_in_chunk_ix=1): # 获取提前期 lead_times = get_lead_times() # 前 5 天的每小时观测数据用于训练 cut_point = 5 * 24 forecasts = list() # 枚举每个块 for rows in test_chunks: chunk_id = rows[0, 0] # 枚举每个提前期 for tau in lead_times: # 确定我们想要的提前期在块中的行 offset = cut_point + tau # 使用块中的行号检索提前期的数据 row_for_tau = rows[rows[:,row_in_chunk_ix]==offset, :] # 检查我们是否有数据 if len(row_for_tau) == 0: # 创建一个模拟行 [chunk, position, hour] + [nan...] row = [chunk_id, offset, nan] + [nan for _ in range(39)] forecasts.append(row) else: # 存储预测行 forecasts.append(row_for_tau[0]) return array(forecasts) # 加载数据集 数据集 = read_csv('AirQualityPrediction/TrainingData.csv', header=0) # 按块分组数据 values = dataset.values chunks = to_chunks(values) # 分割成训练/测试集 train, test = split_train_test(chunks) # 将训练块平铺成行 train_rows = array([row for rows in train for row in rows]) # print(train_rows.shape) print('训练行数: %s' % str(train_rows.shape)) # 仅将训练数据缩减到预测提前期 test_rows = to_forecasts(test) print('测试行数: %s' % str(test_rows.shape)) # 保存数据集 savetxt('AirQualityPrediction/naive_train.csv', train_rows, delimiter=',') savetxt('AirQualityPrediction/naive_test.csv', test_rows, delimiter=',') |
运行示例首先会提示块 69 因数据不足而被从数据集中删除。
然后我们可以看到,训练集和测试集中各有 42 列,其中一列用于块 ID,一列用于块内位置,一列用于一天中的小时,以及 39 个训练变量。
我们还可以看到,测试数据集的版本明显较小,只包含预测提前期处的行。
新的训练和测试数据集分别保存在“naive_train.csv”和“naive_test.csv”文件中。
1 2 3 |
>正在删除块=69:训练=(0, 95),测试=(28, 95) 训练行数: (23514, 42) 测试行数: (2070, 42) |
预测评估
一旦做出预测,就需要对其进行评估。
评估预测时,采用更简单的格式会很有帮助。例如,我们将使用 [块][变量][时间] 的三维结构,其中变量是目标变量编号(从 0 到 38),时间是提前期索引(从 0 到 9)。
模型预计将以这种格式进行预测。
我们还可以重新构造测试数据集,使其具有此数据集以进行比较。下面的 prepare_test_forecasts() 函数实现了此功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# 将块中的测试数据集转换为 [块][变量][时间] 格式 def prepare_test_forecasts(test_chunks): predictions = list() # 枚举要预测的块 for rows in test_chunks: # 枚举块的目标 chunk_predictions = list() for j in range(3, rows.shape[1]): yhat = rows[:, j] chunk_predictions.append(yhat) chunk_predictions = array(chunk_predictions) predictions.append(chunk_predictions) return array(predictions) |
我们将使用平均绝对误差 (MAE) 来评估模型。这是竞赛中使用的指标,考虑到目标变量的非高斯分布,这是一个明智的选择。
如果某个提前期在测试集中不包含数据(例如 NaN),则不会计算该预测的误差。如果该提前期在测试集中有数据但在预测中没有数据,则观测值的全部大小将被视为误差。最后,如果测试集有观测值并且进行了预测,则绝对差将被记录为误差。
calculate_error() 函数实现了这些规则并返回给定预测的误差。
1 2 3 4 5 6 7 |
# 计算实际值和预测值之间的误差 def calculate_error(actual, predicted): # 如果预测值为 nan,则给出完整的实际值 if isnan(predicted): return abs(actual) # 计算绝对差值 return abs(actual - predicted) |
误差在所有块和所有提前期上求和,然后求平均值。
将计算总体 MAE,但我们还将计算每个预测提前期的 MAE。这通常有助于模型选择,因为某些模型在不同的提前期可能表现不同。
下面的 evaluate_forecasts() 函数实现了此功能,它计算给定预测和预期值(以 [块][变量][时间] 格式)的 MAE 和每个提前期 MAE。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# 评估 [块][变量][时间] 格式的预测 def evaluate_forecasts(predictions, testset): lead_times = get_lead_times() total_mae, times_mae = 0.0, [0.0 for _ in range(len(lead_times))] total_c, times_c = 0, [0 for _ in range(len(lead_times))] # 枚举测试块 for i in range(len(test_chunks)): # 转换为预测 actual = testset[i] predicted = predictions[i] # 枚举目标变量 for j in range(predicted.shape[0]): # 枚举提前期 for k in range(len(lead_times)): # 如果实际值为 nan 则跳过 if isnan(actual[j, k]): continue # 计算误差 error = calculate_error(actual[j, k], predicted[j, k]) # 更新统计数据 total_mae += error times_mae[k] += error total_c += 1 times_c[k] += 1 # 归一化求和的绝对误差 total_mae /= total_c times_mae = [times_mae[i]/times_c[i] for i in range(len(times_mae))] return total_mae, times_mae |
一旦我们评估了模型,我们就可以展示它。
下面的 summarize_error() 函数首先打印模型性能的一行摘要,然后创建每个预测提前期的 MAE 图。
1 2 3 4 5 6 7 8 9 10 |
# 总结得分 def summarize_error(name, total_mae, times_mae): # 打印摘要 lead_times = get_lead_times() formatted = ['+%d %.3f' % (lead_times[i], times_mae[i]) for i in range(len(lead_times))] s_scores = ', '.join(formatted) print('%s: [%.3f MAE] %s' % (name, total_mae, s_scores)) # 绘制摘要 pyplot.plot([str(x) for x in lead_times], times_mae, marker='.') pyplot.show() |
现在我们准备开始探索朴素预测方法的性能。
机器学习建模
该问题可以通过机器学习进行建模。
大多数机器学习模型不直接支持时间上的观测概念。相反,滞后观测必须被视为输入特征才能进行预测。
这是机器学习算法用于时间序列预测的优势。具体来说,它们能够支持大量输入特征。这些可以是单个或多个输入时间序列的滞后观测值。
机器学习算法用于时间序列预测相对于经典方法的其他一般优势包括:
- 能够支持嘈杂的特征和变量之间的关系中的噪声。
- 能够处理不相关的特征。
- 能够支持变量之间复杂的关系。
该数据集的一个挑战是需要进行多步预测。机器学习方法可以用于进行多步预测的两种主要方法;它们是:
- 直接。为预测每个预测提前期开发一个单独的模型。
- 递归。开发一个单一模型进行一步预测,并递归使用该模型,其中先前的预测用作输入以预测后续的提前期。
当预测一个短的连续提前期块时,递归方法可能更合理,而当预测不连续的提前期时,直接方法可能更有意义。考虑到我们有兴趣预测三天内 10 个连续和不连续提前期的混合,直接方法可能更适合空气污染预测问题。
数据集有 39 个目标变量,我们为每个目标变量、每个预测提前期开发一个模型。这意味着我们需要 (39 * 10) 390 个机器学习模型。
机器学习算法用于时间序列预测的关键是输入数据的选择。我们可以考虑三种主要的数据来源,它们可以用作输入并映射到目标变量的每个预测提前期;它们是:
- 单变量数据,例如来自正在预测的目标变量的滞后观测值。
- 多变量数据,例如来自其他变量(天气和目标)的滞后观测值。
- 元数据,例如关于预测日期或时间的数据。
数据可以从所有块中提取,为学习从输入到目标预测提前期的映射提供丰富的数据集。
39 个目标变量实际上由 14 个站点上的 12 个变量组成。
由于数据提供的方式,默认的建模方法是将每个变量-站点视为独立的。有可能按变量折叠数据,并在多个站点对同一变量使用相同的模型。
一些变量被故意错误标记(例如,不同的数据使用具有相同标识符的变量)。尽管如此,也许可以识别这些错误标记的变量并将其排除在多站点模型之外。
机器学习数据准备
在探索该数据集的机器学习模型之前,我们必须以可以拟合模型的方式准备数据。
这需要两个数据准备步骤:
- 处理缺失数据。
- 准备输入-输出模式。
目前,我们将专注于 39 个目标变量,忽略气象和元数据。
处理缺失数据
块由五天或更少的每小时观测数据组成,针对 39 个目标变量。
许多块没有所有五天的数据,也没有任何块包含所有 39 个目标变量的数据。
在某个块缺少目标变量数据的情况下,不需要进行预测。
在某个块确实包含某个目标变量的一些数据,但没有全部五天数据的情况下,序列中将存在间隙。这些间隙可能是几个小时到一天多的观测值,有时甚至更长。
处理这些间隙的三个候选策略如下:
- 忽略间隙。
- 使用无间隙数据。
- 填补间隙。
我们可以忽略这些空白。这样做的问题是,当将数据拆分为输入和输出时,数据将不连续。当训练模型时,输入将不一致,但可能意味着最近 n 小时的数据,或分散在最近 n 天的数据。这种不一致性将使从输入到输出的学习映射非常嘈杂,并且可能比模型所需的时间更困难。
我们只能使用没有间隙的数据。这是一个不错的选择。风险在于我们可能没有足够的数据来拟合模型。
最后,我们可以填补空白。这称为数据插补,有许多策略可以用来填补空白。三种可能表现良好的方法包括:
- 将最后观察到的值向前延续(线性)。
- 使用该块内一天中该小时的中位数。
- 使用跨块一天中该小时的中位数。
在本教程中,我们将使用后一种方法,通过使用跨块的一天中该小时的中位数来填补空白。经过一些测试后,这种方法似乎能带来更多的训练样本和更好的模型性能。
对于给定的变量,可能存在由缺失行定义的缺失观测值。具体来说,每个观测值都有一个“position_within_chunk”。我们期望训练数据集中的每个块都有 120 个观测值,其中“positions_within_chunk”从 1 到 120(包括)。
因此,我们可以为每个变量创建一个包含 120 个 NaN 值的数组,使用“positions_within_chunk”值标记块中的所有观测值,任何剩余的都将标记为 NaN。然后我们可以绘制每个变量并查找间隙。
下面的 variable_to_series() 函数将获取一个块的行和目标变量的给定列索引,并将返回一个包含 120 个时间步长的变量序列,其中所有可用数据都使用块中的值进行标记。
1 2 3 4 5 6 7 8 9 10 11 |
# 布局一个在数据中存在缺失位置的变量 def variable_to_series(chunk_train, col_ix, n_steps=5*24): # 布局整个系列 data = [nan for _ in range(n_steps)] # 标记所有可用数据 for i in range(len(chunk_train)): # 获取块中的位置 position = int(chunk_train[i, 1] - 1) # 存储数据 data[position] = chunk_train[i, col_ix] return data |
我们需要为每个块计算一个并行的日期小时序列,我们可以用它来为块中的每个变量估算特定小时的数据。
给定一个部分填充的日期小时序列,下面的 interpolate_hours() 函数将填充缺失的日期小时。它通过找到第一个标记的小时,然后向前计数,填充日期小时,然后反向执行相同的操作来实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# 插值 24 小时制的小时序列(原地) def interpolate_hours(hours): # 找到第一个小时 ix = -1 for i in range(len(hours)): if not isnan(hours[i]): ix = i break # 向前填充 hour = hours[ix] for i in range(ix+1, len(hours)): # 递增小时 hour += 1 # 检查是否需要填充 if isnan(hours[i]): hours = hour % 24 # 向后填充 hour = hours[ix] for i in range(ix-1, -1, -1): # 递减小时 hour -= 1 # 检查是否需要填充 if isnan(hours[i]): hours = hour % 24 |
我们可以调用相同的 variable_to_series() 函数(上方)来创建带有缺失值的小时序列(列索引 2),然后调用 interpolate_hours() 来填充缺失值。
1 2 3 4 |
# 为块准备小时序列 hours = variable_to_series(rows, 2) # 插补小时 interpolate_hours(hours) |
然后我们可以将小时传递给任何可能使用它的插补函数。
现在我们可以尝试使用同一序列中相同小时的值来填充块中的缺失值。具体来说,我们将找到序列中所有具有相同小时的行,并计算中位数。
下面的 impute_missing() 函数接受块中的所有行、为块准备好的日期小时序列、具有缺失值的变量序列以及变量的列索引。
它首先检查序列是否完全缺失数据,如果是,则立即返回,因为无法执行插补。然后,它遍历序列的时间步长,当检测到没有数据的时间步长时,它收集序列中具有相同小时数据的所有行并计算中位数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# 插补缺失数据 def impute_missing(train_chunks, rows, hours, series, col_ix): # 使用所有序列中小时的中位数插补缺失值 imputed = list() for i in range(len(series)): if isnan(series[i]): # 收集所有块中该小时的所有行 all_rows = list() for rows in train_chunks: [all_rows.append(row) for row in rows[rows[:,2]==hours[i]]] # 计算目标的集中趋势 all_rows = array(all_rows) # 用中位数填充 value = nanmedian(all_rows[:, col_ix]) if isnan(value): value = 0.0 imputed.append(value) else: imputed.append(series[i]) return imputed |
监督表示
我们需要将每个目标变量的序列转换为包含输入和输出的行,以便我们可以拟合监督机器学习算法。
具体来说,我们有一个序列,例如:
1 |
[1, 2, 3, 4, 5, 6, 7, 8, 9] |
当使用 2 个滞后变量预测提前期 +1 时,我们将序列拆分为输入 (X) 和输出 (y) 模式,如下所示:
1 2 3 4 5 6 7 8 |
X, y 1, 2, 3 2, 3, 4 3, 4, 5 4, 5, 6 5, 6, 7 6, 7, 8 7, 8, 9 |
这首先要求我们选择作为输入使用的滞后观测数量。没有正确答案;相反,最好测试不同的数量并查看哪种有效。
然后,我们必须将序列拆分为监督学习格式,用于 10 个预测提前期中的每一个。例如,使用 2 个滞后观测值预测 +24 可能如下所示:
1 2 |
X, y 1, 2, 24 |
然后为 39 个目标变量中的每一个重复此过程。
然后可以将为每个提前期、每个目标变量准备的模式跨块聚合,以提供模型的训练数据集。
我们还必须准备一个测试数据集。也就是说,每个目标变量的每个块的输入数据 (X),以便我们可以将其用作输入来预测测试数据集中的提前期。如果我们将滞后设置为 2,那么测试数据集将由每个目标变量的每个块的最后两个观测值组成。非常简单。
我们可以从定义一个函数开始,该函数将为给定的完整(插补)序列创建输入-输出模式。
下面的 supervised_for_lead_time() 函数将接受一个序列,一个用作输入的滞后观测数量,以及一个要预测的预测提前期,然后将返回从序列中提取的输入/输出行列表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# 从序列创建输入/输出模式 def supervised_for_lead_time(series, n_lag, lead_time): samples = list() # 枚举观测值并创建输入/输出模式 for i in range(n_lag, len(series)): end_ix = i + (lead_time - 1) # 检查是否可以创建模式 if end_ix >= len(series): break # 检索输入和输出 start_ix = i - n_lag row = series[start_ix:i] + [series[end_ix]] samples.append(row) return samples |
理解这一部分很重要。
我们可以在小型测试数据集上测试此函数,并探索不同数量的滞后变量和预测提前期。
以下是一个完整的示例,它生成一个包含20个整数的序列,并创建了一个具有两个输入滞后时间的序列,用于预测+6的提前期。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# 测试监督式输入/输出模式 from numpy import array # 从序列创建输入/输出模式 def supervised_for_lead_time(series, n_lag, lead_time): data = list() # 枚举观测值并创建输入/输出模式 for i in range(n_lag, len(series)): end_ix = i + (lead_time - 1) # 检查是否可以创建模式 if end_ix >= len(series): break # 检索输入和输出 start_ix = i - n_lag row = series[start_ix:i] + [series[end_ix]] data.append(row) return array(data) # 定义测试数据集 data = [x for x in range(20)] # 转换为监督式格式 result = supervised_for_lead_time(data, 2, 6) # 显示结果 print(result) |
运行示例将打印出生成的模式,显示滞后观测值及其相关的预测提前期。
请尝试此示例,以熟悉此数据转换,因为它对于使用机器学习算法建模时间序列至关重要。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
[[ 0 1 7] [ 1 2 8] [ 2 3 9] [ 3 4 10] [ 4 5 11] [ 5 6 12] [ 6 7 13] [ 7 8 14] [ 8 9 15] [ 9 10 16] [10 11 17] [11 12 18] [12 13 19]] |
我们现在可以为给定目标变量序列的每个预测提前期调用 supervised_for_lead_time() 函数。
下面的 target_to_supervised() 函数实现了这一点。首先,目标变量被转换为序列,并使用上一节中开发的函数进行插补。然后为每个目标提前期创建训练样本。还为目标变量创建了一个测试样本。
然后返回此目标变量的每个预测提前期的训练数据和测试输入数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# 为此目标变量的每个提前期创建监督学习数据 def target_to_supervised(chunks, rows, hours, col_ix, n_lag): train_lead_times = list() # 获取序列 series = variable_to_series(rows, col_ix) if not has_data(series): return None, [nan for _ in range(n_lag)] # 插补 imputed = impute_missing(chunks, rows, hours, series, col_ix) # 为块变量准备测试样本 test_sample = array(imputed[-n_lag:]) # 枚举提前期 lead_times = get_lead_times() for lead_time in lead_times: # 从序列中制作输入/输出数据 train_samples = supervised_for_lead_time(imputed, n_lag, lead_time) train_lead_times.append(train_samples) return train_lead_times, test_sample |
我们已经有了这些部分;现在我们需要定义一个函数来驱动数据准备过程。
此函数构建训练和测试数据集。
方法是枚举每个目标变量,并从所有块中收集每个提前期的训练数据。同时,我们收集在对测试数据集进行预测时作为输入所需的样本。
结果是一个训练数据集,其维度为 [变量][提前期][样本],其中最后一个维度是目标变量的预测提前期的训练样本行。该函数还返回维度为 [块][变量][样本] 的测试数据集,其中最后一个维度是用于对块的目标变量进行预测的输入数据。
下面的 data_prep() 函数实现了此行为,它以块格式获取数据和指定数量的滞后观测值作为输入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# 准备训练 [变量][提前期][样本] 和测试 [块][变量][样本] def data_prep(chunks, n_lag, n_vars=39): lead_times = get_lead_times() train_data = [[list() for _ in range(len(lead_times))] for _ in range(n_vars)] test_data = [[list() for _ in range(n_vars)] for _ in range(len(chunks))] # 枚举块的目标 for var in range(n_vars): # 将目标数字转换为列号 col_ix = 3 + var # 枚举要预测的块 for c_id in range(len(chunks)): rows = chunks[c_id] # 为此块准备小时序列 hours = variable_to_series(rows, 2) # 插值小时 interpolate_hours(hours) # 检查是否有数据 if not has_data(rows[:, col_ix]): continue # 将序列转换为每个提前期的训练数据 train, test_sample = target_to_supervised(chunks, rows, hours, col_ix, n_lag) # 存储此变量-块的测试样本 test_data[c_id][var] = test_sample if train is not None: # 存储每个提前期的样本 for lead_time in range(len(lead_times)): # 将所有行添加到现有行列表中 train_data[var][lead_time].extend(train[lead_time]) # 将每个变量-提前期的所有行转换为 numpy 数组 for lead_time in range(len(lead_times)): train_data[var][lead_time] = array(train_data[var][lead_time]) return array(train_data), array(test_data) |
完整示例
我们可以将所有内容连接起来,并准备一个具有监督学习格式的训练和测试数据集,用于机器学习算法。
我们将使用前12小时的滞后观测值作为输入来预测每个预测提前期。
然后将生成的训练和测试数据集保存为二进制NumPy数组。
完整的示例如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# 准备数据 从 numpy 导入 loadtxt from numpy import nan from numpy import isnan from numpy import count_nonzero from numpy import unique from numpy import array from numpy import nanmedian from numpy import save # 按“chunkID”分割数据集,返回块列表 def to_chunks(values, chunk_ix=0): chunks = list() # 获取唯一的块 ID chunk_ids = unique(values[:, chunk_ix]) # 按块 ID 分组行 for chunk_id in chunk_ids: selection = values[:, chunk_ix] == chunk_id chunks.append(values[selection, :]) return chunks # 返回相对预测提前期的列表 def get_lead_times(): return [1, 2, 3, 4, 5, 10, 17, 24, 48, 72] # 插值 24 小时制的小时序列(原地) def interpolate_hours(hours): # 找到第一个小时 ix = -1 for i in range(len(hours)): if not isnan(hours[i]): ix = i break # 向前填充 hour = hours[ix] for i in range(ix+1, len(hours)): # 递增小时 hour += 1 # 检查是否需要填充 if isnan(hours[i]): hours = hour % 24 # 向后填充 hour = hours[ix] for i in range(ix-1, -1, -1): # 递减小时 hour -= 1 # 检查是否需要填充 if isnan(hours[i]): hours = hour % 24 # 如果数组包含任何非 nan 值,则返回 true def has_data(data): return count_nonzero(isnan(data)) < len(data) # 插补缺失数据 def impute_missing(train_chunks, rows, hours, series, col_ix): # 使用所有序列中小时的中位数插补缺失值 imputed = list() for i in range(len(series)): if isnan(series[i]): # 收集所有块中该小时的所有行 all_rows = list() for rows in train_chunks: [all_rows.append(row) for row in rows[rows[:,2]==hours[i]]] # 计算目标的集中趋势 all_rows = array(all_rows) # 用中位数填充 value = nanmedian(all_rows[:, col_ix]) if isnan(value): value = 0.0 imputed.append(value) else: imputed.append(series[i]) return imputed # 布局一个在数据中存在缺失位置的变量 def variable_to_series(chunk_train, col_ix, n_steps=5*24): # 布局整个系列 data = [nan for _ in range(n_steps)] # 标记所有可用数据 for i in range(len(chunk_train)): # 获取块中的位置 position = int(chunk_train[i, 1] - 1) # 存储数据 data[position] = chunk_train[i, col_ix] return data # 从序列创建输入/输出模式 def supervised_for_lead_time(series, n_lag, lead_time): samples = list() # 枚举观测值并创建输入/输出模式 for i in range(n_lag, len(series)): end_ix = i + (lead_time - 1) # 检查是否可以创建模式 if end_ix >= len(series): break # 检索输入和输出 start_ix = i - n_lag row = series[start_ix:i] + [series[end_ix]] samples.append(row) return samples # 为此目标变量的每个提前期创建监督学习数据 def target_to_supervised(chunks, rows, hours, col_ix, n_lag): train_lead_times = list() # 获取序列 series = variable_to_series(rows, col_ix) if not has_data(series): return None, [nan for _ in range(n_lag)] # 插补 imputed = impute_missing(chunks, rows, hours, series, col_ix) # 为块变量准备测试样本 test_sample = array(imputed[-n_lag:]) # 枚举提前期 lead_times = get_lead_times() for lead_time in lead_times: # 从序列中制作输入/输出数据 train_samples = supervised_for_lead_time(imputed, n_lag, lead_time) train_lead_times.append(train_samples) return train_lead_times, test_sample # 准备训练 [变量][提前期][样本] 和测试 [块][变量][样本] def data_prep(chunks, n_lag, n_vars=39): lead_times = get_lead_times() train_data = [[list() for _ in range(len(lead_times))] for _ in range(n_vars)] test_data = [[list() for _ in range(n_vars)] for _ in range(len(chunks))] # 枚举块的目标 for var in range(n_vars): # 将目标数字转换为列号 col_ix = 3 + var # 枚举要预测的块 for c_id in range(len(chunks)): rows = chunks[c_id] # 为此块准备小时序列 hours = variable_to_series(rows, 2) # 插值小时 interpolate_hours(hours) # 检查是否有数据 if not has_data(rows[:, col_ix]): continue # 将序列转换为每个提前期的训练数据 train, test_sample = target_to_supervised(chunks, rows, hours, col_ix, n_lag) # 存储此变量-块的测试样本 test_data[c_id][var] = test_sample if train is not None: # 存储每个提前期的样本 for lead_time in range(len(lead_times)): # 将所有行添加到现有行列表中 train_data[var][lead_time].extend(train[lead_time]) # 将每个变量-提前期的所有行转换为 numpy 数组 for lead_time in range(len(lead_times)): train_data[var][lead_time] = array(train_data[var][lead_time]) return array(train_data), array(test_data) # 加载数据集 train = loadtxt('AirQualityPrediction/naive_train.csv', delimiter=',') test = loadtxt('AirQualityPrediction/naive_test.csv', delimiter=',') # 按块分组数据 train_chunks = to_chunks(train) test_chunks = to_chunks(test) # 将训练数据转换为监督学习数据 n_lag = 12 train_data, test_data = data_prep(train_chunks, n_lag) print(train_data.shape, test_data.shape) # 将训练集和测试集保存到文件 save('AirQualityPrediction/supervised_train.npy', train_data) save('AirQualityPrediction/supervised_test.npy', test_data) |
运行示例可能需要一分钟。
结果是两个二进制文件,包含训练和测试数据集,我们可以在以下部分加载它们以训练和评估机器学习算法。
模型评估测试工具
在开始评估算法之前,我们需要测试工具的更多元素。
首先,我们需要能够根据训练数据拟合scikit-learn模型。下面的 fit_model() 函数将克隆模型配置并根据提供的训练数据进行拟合。我们将需要拟合许多(360个)每个配置模型的版本,因此此函数将被大量调用。
1 2 3 4 5 6 7 |
# 拟合单个模型 def fit_model(model, X, y): # 克隆模型配置 local_model = clone(model) # 拟合模型 local_model.fit(X, y) return local_model |
接下来,我们需要为每个变量和预测提前期组合拟合一个模型。
我们可以通过首先按变量,然后按提前期枚举训练数据集来完成此操作。然后,我们可以拟合一个模型并将其存储在一个具有相同结构的列表列表中,具体为: [变量][时间][模型]。
下面的 fit_models() 函数实现了这一点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
# 为每个变量和每个预测提前期拟合一个模型 [var][time][model] def fit_models(model, train): # 准备保存模型的结构 models = [[list() for _ in range(train.shape[1])] for _ in range(train.shape[0])] # 枚举变量 for i in range(train.shape[0]): # 枚举提前期 for j in range(train.shape[1]): # 获取数据 data = train[i, j] X, y = data[:, :-1], data[:, -1] # 拟合模型 local_model = fit_model(model, X, y) models[i][j].append(local_model) 返回 models |
模型拟合是缓慢的部分,可以通过并行化(例如使用Joblib库)来提高效率。这留作扩展。
一旦模型拟合好,它们就可以用来对测试数据集进行预测。
准备好的测试数据集首先按块组织,然后按目标变量组织。进行预测很快,首先需要检查是否可以进行预测(我们有输入数据),如果可以,则使用目标变量的相应模型。然后,变量的每个10个预测提前期将使用这些提前期的每个直接模型进行预测。
下面的 make_predictions() 函数实现了这一点,它将模型列表列表和加载的测试数据集作为参数,并返回一个结构为 [块][变量][时间] 的预测数组。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# 将预测返回为 [块][变量][时间] def make_predictions(models, test): lead_times = get_lead_times() predictions = list() # 枚举块 for i in range(test.shape[0]): # 枚举变量 chunk_predictions = list() for j in range(test.shape[1]): # 获取此块和目标的输入模式 pattern = test[i,j] # 假设一个nan预测 forecasts = array([nan for _ in range(len(lead_times))]) # 检查我们是否可以进行预测 if has_data(pattern): pattern = pattern.reshape((1, len(pattern))) # 预测每个提前期 forecasts = list() for k in range(len(lead_times)): yhat = models[j][k][0].predict(pattern) forecasts.append(yhat[0]) forecasts = array(forecasts) # 为此变量的每个提前期保存预测 chunk_predictions.append(forecasts) # 为此块保存预测 chunk_predictions = array(chunk_predictions) predictions.append(chunk_predictions) return array(predictions) |
我们需要一个模型列表进行评估。
我们可以定义一个通用的 get_models() 函数,它负责定义一个模型名称映射到配置好的 scikit-learn 模型对象的字典。
1 2 3 4 |
# 准备机器学习模型列表 def get_models(models=dict()): # ... 返回 models |
最后,我们需要一个函数来驱动模型评估过程。
给定模型字典,枚举模型,首先在训练数据上拟合模型矩阵,对测试数据集进行预测,评估预测结果,并总结结果。
下面的 evaluate_models() 函数实现了这一点。
1 2 3 4 5 6 7 8 9 10 11 |
# 评估一组模型 def evaluate_models(models, train, test, actual): for name, model in models.items(): # 拟合模型 fits = fit_models(model, train) # 进行预测 predictions = make_predictions(fits, test) # 评估预测 total_mae, _ = evaluate_forecasts(predictions, actual) # 总结预测 summarize_error(name, total_mae) |
我们现在拥有评估机器学习模型所需的一切。
评估线性算法
在本节中,我们将抽查一组线性机器学习算法。
线性算法假定输出是输入变量的线性函数。这与ARIMA等经典时间序列预测模型的假设非常相似。
抽查意味着评估一系列模型以粗略了解哪些模型有效。我们对任何优于简单自回归模型AR(2)(其MAE误差约为0.487)的模型感兴趣。
我们将测试八种具有默认配置的线性机器学习算法;具体来说:
- 线性回归
- Lasso 线性回归
- 岭回归
- 弹性网络回归
- Huber 回归
- Lasso Lars 线性回归
- Passive Aggressive 回归
- 随机梯度下降回归
我们可以在 get_models() 函数中定义这些模型。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# 准备机器学习模型列表 def get_models(models=dict()): # 线性模型 models['lr'] = LinearRegression() models['lasso'] = Lasso() models['ridge'] = Ridge() models['en'] = ElasticNet() models['huber'] = HuberRegressor() models['llars'] = LassoLars() models['pa'] = PassiveAggressiveRegressor(max_iter=1000, tol=1e-3) models['sgd'] = SGDRegressor(max_iter=1000, tol=1e-3) print('Defined %d models' % len(models)) 返回 models |
完整的代码示例如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# 评估线性算法 from numpy import load 从 numpy 导入 loadtxt from numpy import nan from numpy import isnan from numpy import count_nonzero from numpy import unique from numpy import array from sklearn.base import clone 来自 sklearn.linear_model 导入 LinearRegression 来自 sklearn.linear_model 导入 Lasso from sklearn.linear_model import Ridge 来自 sklearn.linear_model 导入 ElasticNet from sklearn.linear_model import HuberRegressor from sklearn.linear_model import LassoLars from sklearn.linear_model import PassiveAggressiveRegressor from sklearn.linear_model import SGDRegressor # 按“chunkID”分割数据集,返回块列表 def to_chunks(values, chunk_ix=0): chunks = list() # 获取唯一的块 ID chunk_ids = unique(values[:, chunk_ix]) # 按块 ID 分组行 for chunk_id in chunk_ids: selection = values[:, chunk_ix] == chunk_id chunks.append(values[selection, :]) return chunks # 如果数组包含任何非 nan 值,则返回 true def has_data(data): return count_nonzero(isnan(data)) < len(data) # 返回相对预测提前期的列表 def get_lead_times(): return [1, 2, 3, 4, 5, 10, 17, 24, 48, 72] # 拟合单个模型 def fit_model(model, X, y): # 克隆模型配置 local_model = clone(model) # 拟合模型 local_model.fit(X, y) return local_model # 为每个变量和每个预测提前期拟合一个模型 [var][time][model] def fit_models(model, train): # 准备保存模型的结构 models = [[list() for _ in range(train.shape[1])] for _ in range(train.shape[0])] # 枚举变量 for i in range(train.shape[0]): # 枚举提前期 for j in range(train.shape[1]): # 获取数据 data = train[i, j] X, y = data[:, :-1], data[:, -1] # 拟合模型 local_model = fit_model(model, X, y) models[i][j].append(local_model) 返回 模型 # 将预测返回为 [块][变量][时间] def make_predictions(models, test): lead_times = get_lead_times() predictions = list() # 枚举块 for i in range(test.shape[0]): # 枚举变量 chunk_predictions = list() for j in range(test.shape[1]): # 获取此块和目标的输入模式 pattern = test[i,j] # 假设一个nan预测 forecasts = array([nan for _ in range(len(lead_times))]) # 检查我们是否可以进行预测 if has_data(pattern): pattern = pattern.reshape((1, len(pattern))) # 预测每个提前期 forecasts = list() for k in range(len(lead_times)): yhat = models[j][k][0].predict(pattern) forecasts.append(yhat[0]) forecasts = array(forecasts) # 为此变量的每个提前期保存预测 chunk_predictions.append(forecasts) # 为此块保存预测 chunk_predictions = array(chunk_predictions) predictions.append(chunk_predictions) return array(predictions) # 将块中的测试数据集转换为 [块][变量][时间] 格式 def prepare_test_forecasts(test_chunks): predictions = list() # 枚举要预测的块 for rows in test_chunks: # 枚举块的目标 chunk_predictions = list() for j in range(3, rows.shape[1]): yhat = rows[:, j] chunk_predictions.append(yhat) chunk_predictions = array(chunk_predictions) predictions.append(chunk_predictions) return array(predictions) # 计算实际值和预测值之间的误差 def calculate_error(actual, predicted): # 如果预测值为 nan,则给出完整的实际值 if isnan(predicted): return abs(actual) # 计算绝对差值 return abs(actual - predicted) # 评估 [块][变量][时间] 格式的预测 def evaluate_forecasts(predictions, testset): lead_times = get_lead_times() total_mae, times_mae = 0.0, [0.0 for _ in range(len(lead_times))] total_c, times_c = 0, [0 for _ in range(len(lead_times))] # 枚举测试块 for i in range(len(test_chunks)): # 转换为预测 actual = testset[i] predicted = predictions[i] # 枚举目标变量 for j in range(predicted.shape[0]): # 枚举提前期 for k in range(len(lead_times)): # 如果实际值为 nan 则跳过 if isnan(actual[j, k]): continue # 计算误差 error = calculate_error(actual[j, k], predicted[j, k]) # 更新统计数据 total_mae += error times_mae[k] += error total_c += 1 times_c[k] += 1 # 归一化求和的绝对误差 total_mae /= total_c times_mae = [times_mae[i]/times_c[i] for i in range(len(times_mae))] return total_mae, times_mae # 总结得分 def summarize_error(name, total_mae): print('%s: %.3f MAE' % (name, total_mae)) # 准备机器学习模型列表 def get_models(models=dict()): # 线性模型 models['lr'] = LinearRegression() models['lasso'] = Lasso() models['ridge'] = Ridge() models['en'] = ElasticNet() models['huber'] = HuberRegressor() models['llars'] = LassoLars() models['pa'] = PassiveAggressiveRegressor(max_iter=1000, tol=1e-3) models['sgd'] = SGDRegressor(max_iter=1000, tol=1e-3) print('Defined %d models' % len(models)) 返回 模型 # 评估一组模型 def evaluate_models(models, train, test, actual): for name, model in models.items(): # 拟合模型 fits = fit_models(model, train) # 进行预测 predictions = make_predictions(fits, test) # 评估预测 total_mae, _ = evaluate_forecasts(predictions, actual) # 总结预测 summarize_error(name, total_mae) # 加载监督数据集 train = load('AirQualityPrediction/supervised_train.npy', allow_pickle=True) test = load('AirQualityPrediction/supervised_test.npy', allow_pickle=True) print(train.shape, test.shape) # 加载测试块进行验证 testset = loadtxt('AirQualityPrediction/naive_test.csv', delimiter=',') test_chunks = to_chunks(testset) actual = prepare_test_forecasts(test_chunks) # 准备模型列表 模型 = 获取_模型() # 评估模型 evaluate_models(models, train, test, actual) |
运行示例将打印出每个评估算法的MAE。
注意:由于算法或评估过程的随机性,或数值精度差异,您的结果可能会有所不同。考虑多次运行示例并比较平均结果。
我们可以看到,许多算法与简单的AR模型相比都表现出优势,MAE低于0.487。
Huber回归似乎表现最佳(默认配置下),MAE达到0.434。
这很有趣,因为Huber回归,或使用Huber损失的鲁棒回归,是一种旨在对训练数据集中的异常值具有鲁棒性的方法。这可能表明其他方法在进行一些数据准备(例如标准化和/或异常值去除)后可能会表现更好。
1 2 3 4 5 6 7 8 |
lr: 0.454 MAE lasso: 0.624 MAE ridge: 0.454 MAE en: 0.595 MAE huber: 0.434 MAE llars: 0.631 MAE pa: 0.833 MAE sgd: 0.457 MAE |
非线性算法
我们可以使用相同的框架来评估一组非线性和集成机器学习算法的性能。
具体来说:
非线性算法
- k-近邻
- 分类与回归树
- 额外树(Extra Tree)
- 支持向量回归
集成算法
- Adaboost
- 装袋决策树
- 随机森林
- 额外树
- 梯度提升机
下面的 get_models() 函数定义了这九个模型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# 准备机器学习模型列表 def get_models(models=dict()): # 非线性模型 models['knn'] = KNeighborsRegressor(n_neighbors=7) models['cart'] = DecisionTreeRegressor() models['extra'] = ExtraTreeRegressor() models['svmr'] = SVR() # # 集成模型 n_trees = 100 models['ada'] = AdaBoostRegressor(n_estimators=n_trees) models['bag'] = BaggingRegressor(n_estimators=n_trees) models['rf'] = RandomForestRegressor(n_estimators=n_trees) models['et'] = ExtraTreesRegressor(n_estimators=n_trees) models['gbm' = GradientBoostingRegressor(n_estimators=n_trees) print('Defined %d models' % len(models)) 返回 models |
完整的代码列表如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# 抽查非线性算法 from numpy import load 从 numpy 导入 loadtxt from numpy import nan from numpy import isnan from numpy import count_nonzero from numpy import unique from numpy import array from sklearn.base import clone from sklearn.neighbors import KNeighborsRegressor 来自 sklearn.tree 导入 DecisionTreeRegressor from sklearn.tree import ExtraTreeRegressor 来自 sklearn.svm 导入 SVR from sklearn.ensemble import AdaBoostRegressor from sklearn.ensemble import BaggingRegressor from sklearn.ensemble import RandomForestRegressor from sklearn.ensemble import ExtraTreesRegressor from sklearn.ensemble import GradientBoostingRegressor # 按“chunkID”分割数据集,返回块列表 def to_chunks(values, chunk_ix=0): chunks = list() # 获取唯一的块 ID chunk_ids = unique(values[:, chunk_ix]) # 按块 ID 分组行 for chunk_id in chunk_ids: selection = values[:, chunk_ix] == chunk_id chunks.append(values[selection, :]) return chunks # 如果数组包含任何非 nan 值,则返回 true def has_data(data): return count_nonzero(isnan(data)) < len(data) # 返回相对预测提前期的列表 def get_lead_times(): return [1, 2, 3, 4, 5, 10, 17, 24, 48, 72] # 拟合单个模型 def fit_model(model, X, y): # 克隆模型配置 local_model = clone(model) # 拟合模型 local_model.fit(X, y) return local_model # 为每个变量和每个预测提前期拟合一个模型 [var][time][model] def fit_models(model, train): # 准备保存模型的结构 models = [[list() for _ in range(train.shape[1])] for _ in range(train.shape[0])] # 枚举变量 for i in range(train.shape[0]): # 枚举提前期 for j in range(train.shape[1]): # 获取数据 data = train[i, j] X, y = data[:, :-1], data[:, -1] # 拟合模型 local_model = fit_model(model, X, y) models[i][j].append(local_model) 返回 模型 # 将预测返回为 [块][变量][时间] def make_predictions(models, test): lead_times = get_lead_times() predictions = list() # 枚举块 for i in range(test.shape[0]): # 枚举变量 chunk_predictions = list() for j in range(test.shape[1]): # 获取此块和目标的输入模式 pattern = test[i,j] # 假设一个nan预测 forecasts = array([nan for _ in range(len(lead_times))]) # 检查我们是否可以进行预测 if has_data(pattern): pattern = pattern.reshape((1, len(pattern))) # 预测每个提前期 forecasts = list() for k in range(len(lead_times)): yhat = models[j][k][0].predict(pattern) forecasts.append(yhat[0]) forecasts = array(forecasts) # 为此变量的每个提前期保存预测 chunk_predictions.append(forecasts) # 为此块保存预测 chunk_predictions = array(chunk_predictions) predictions.append(chunk_predictions) return array(predictions) # 将块中的测试数据集转换为 [块][变量][时间] 格式 def prepare_test_forecasts(test_chunks): predictions = list() # 枚举要预测的块 for rows in test_chunks: # 枚举块的目标 chunk_predictions = list() for j in range(3, rows.shape[1]): yhat = rows[:, j] chunk_predictions.append(yhat) chunk_predictions = array(chunk_predictions) predictions.append(chunk_predictions) return array(predictions) # 计算实际值和预测值之间的误差 def calculate_error(actual, predicted): # 如果预测值为 nan,则给出完整的实际值 if isnan(predicted): return abs(actual) # 计算绝对差值 return abs(actual - predicted) # 评估 [块][变量][时间] 格式的预测 def evaluate_forecasts(predictions, testset): lead_times = get_lead_times() total_mae, times_mae = 0.0, [0.0 for _ in range(len(lead_times))] total_c, times_c = 0, [0 for _ in range(len(lead_times))] # 枚举测试块 for i in range(len(test_chunks)): # 转换为预测 actual = testset[i] predicted = predictions[i] # 枚举目标变量 for j in range(predicted.shape[0]): # 枚举提前期 for k in range(len(lead_times)): # 如果实际值为 nan 则跳过 if isnan(actual[j, k]): continue # 计算误差 error = calculate_error(actual[j, k], predicted[j, k]) # 更新统计数据 total_mae += error times_mae[k] += error total_c += 1 times_c[k] += 1 # 归一化求和的绝对误差 total_mae /= total_c times_mae = [times_mae[i]/times_c[i] for i in range(len(times_mae))] return total_mae, times_mae # 总结得分 def summarize_error(name, total_mae): print('%s: %.3f MAE' % (name, total_mae)) # 准备机器学习模型列表 def get_models(models=dict()): # 非线性模型 models['knn'] = KNeighborsRegressor(n_neighbors=7) models['cart'] = DecisionTreeRegressor() models['extra'] = ExtraTreeRegressor() models['svmr'] = SVR() # # 集成模型 n_trees = 100 models['ada'] = AdaBoostRegressor(n_estimators=n_trees) models['bag'] = BaggingRegressor(n_estimators=n_trees) models['rf'] = RandomForestRegressor(n_estimators=n_trees) models['et'] = ExtraTreesRegressor(n_estimators=n_trees) models['gbm' = GradientBoostingRegressor(n_estimators=n_trees) print('Defined %d models' % len(models)) 返回 模型 # 评估一组模型 def evaluate_models(models, train, test, actual): for name, model in models.items(): # 拟合模型 fits = fit_models(model, train) # 进行预测 predictions = make_predictions(fits, test) # 评估预测 total_mae, _ = evaluate_forecasts(predictions, actual) # 总结预测 summarize_error(name, total_mae) # 加载监督数据集 train = load('AirQualityPrediction/supervised_train.npy', allow_pickle=True) test = load('AirQualityPrediction/supervised_test.npy', allow_pickle=True) print(train.shape, test.shape) # 加载测试块进行验证 testset = loadtxt('AirQualityPrediction/naive_test.csv', delimiter=',') test_chunks = to_chunks(testset) actual = prepare_test_forecasts(test_chunks) # 准备模型列表 模型 = 获取_模型() # 评估模型 evaluate_models(models, train, test, actual) |
运行示例后,我们可以看到许多算法与自回归算法的基线相比表现良好,尽管没有一个表现得像上一节中的Huber回归那样好。
注意:由于算法或评估过程的随机性,或数值精度差异,您的结果可能会有所不同。考虑多次运行示例并比较平均结果。
支持向量回归和梯度提升机可能值得进一步研究,它们分别达到了0.437和0.450的MAE。
1 2 3 4 5 6 7 8 9 |
knn: 0.484 MAE cart: 0.631 MAE extra: 0.630 MAE svmr: 0.437 MAE ada: 0.717 MAE bag: 0.471 MAE rf: 0.470 MAE et: 0.469 MAE gbm: 0.450 MAE |
调整滞后大小
在之前的抽查实验中,滞后观测值的数量被任意固定为12。
我们可以改变滞后观测值的数量,并评估对MAE的影响。有些算法可能需要更多或更少的前期观测值,但总体趋势可能适用于所有算法。
使用一系列不同数量的滞后观测值准备监督学习数据集,并在每个数据集上拟合和评估HuberRegressor。
我用以下滞后观测值数量进行了实验:
1 |
[1, 3, 6, 12, 24, 36, 48] |
结果如下:
1 2 3 4 5 6 7 |
1: 0.451 3: 0.445 6: 0.441 12: 0.434 24: 0.423 36: 0.422 48: 0.439 |
下面提供了这些结果的线图。

Huber回归的滞后观测值数量与MAE的线图
我们可以看到,随着滞后观测数量的增加,总体MAE呈现下降趋势,至少在某个点之后误差开始再次上升。
结果表明,至少对于HuberRegressor算法而言,36个滞后观测可能是一个不错的配置,实现了0.422的MAE。
扩展
本节列出了一些您可能希望探索的扩展本教程的想法。
- 数据准备。探索简单的数据准备(如标准化或统计异常值去除)是否可以提高模型性能。
- 特征工程。探索特征工程(如预测日内小时的中值)是否可以提高模型性能
- 气象变量。探索在模型中添加滞后气象变量是否可以提高性能。
- 跨站点模型。探索是否通过结合相同类型的目标变量并在不同站点之间重用模型可以提高性能。
- 算法调优。探索调优一些表现较好算法的超参数是否可以带来性能提升。
如果您探索了这些扩展中的任何一个,我很想知道。
进一步阅读
如果您想深入了解,本节提供了更多关于该主题的资源。
总结
在本教程中,您学习了如何开发用于空气污染数据多步时间序列预测的机器学习模型。
具体来说,你学到了:
- 如何估算缺失值并转换时间序列数据,以便可以通过监督学习算法对其进行建模。
- 如何开发和评估一套用于多步时间序列预测的线性算法。
- 如何开发和评估一套用于多步时间序列预测的非线性算法。
你有什么问题吗?
在下面的评论中提出你的问题,我会尽力回答。
哇,这是一个完整的教程!谢谢!
谢谢!
这个例子是否也适用于时间相关的多元制造数据?以便进行预测以及质量下降的根本原因分析?
也许可以尝试一下,然后与其他方法的结果进行比较?
我想将类似的方法应用于被视为具有多个时间步长的彩票
你不能预测彩票
https://machinelearning.org.cn/faq/single-faq/can-i-use-machine-learning-to-predict-the-lottery
很棒的教程,Jason!我又学到了一些有用的技巧。我很想知道你设计、开发和测试这样的代码需要多长时间。这对于计算一些比较指标以预测特定提前期内数据科学家的未来水平可能很有用——这是另一个有趣的时间序列预测问题 😉
这真的取决于开发者的经验。我用不到一天的时间写完了这个教程的代码和文字,但我确实有几十年的经验。
太棒了!这显然是“机器学习大师”级别。
谢谢。
完全正确 i_i
大师级别。
感谢发布如此完整的教程。
非常有用。无法一次性消化所有代码,需要对此进行研究。
肯定会有用。
杰森,你的工作真是太棒了……请提供宝贵的建议,指导我如何在JAVA中实现……
抱歉,我没有Java的例子。
如果我想预测1小时后的空气污染,但我的数据集没有chunkID,我是否可以删除to_chunks和get_lead_times函数,但我不知道get_lead_time函数的作用。谢谢。
这是一个复杂的例子,也许可以从一个更简单的例子开始,比如电力使用预测
https://machinelearning.org.cn/start-here/#deep_learning_time_series
嗨,Jason,
如果我想预测1到8小时后的AQHI,我该怎么做?get_lead_times()是预测小时数的意思,但我的数据集没有块。我应该如何在evaluate_forecasts()上操作?我需要根据块ID分割数据集的行号吗?谢谢。
我建议改用这个教程
https://machinelearning.org.cn/multi-step-time-series-forecasting-with-machine-learning-models-for-household-electricity-consumption/
无法获取空气质量数据集。当我点击接受规则按钮时,页面刷新了……
您必须登录Kaggle,然后点击“下载全部”按钮。
我遇到了和Gary一样的问题。
尽管我已经登录了我的Kaggle账户,但当我点击接受规则按钮时,我仍然停留在同一页面,没有提供数据集。这个数据集是不是不能下载了?如果可以的话,您能提供另一个链接吗?
登录后尝试刷新此网址
https://www.kaggle.com/c/dsg-hackathon/data
请注意“下载全部”按钮。
即使登录了Kaggle账户也无法下载数据。您能提供另一个链接吗
听到这个消息我很难过,具体问题是什么?
您好,杰森博士!您的教程太棒了,我学到了很多。
但是,我遇到了同样的问题,无法下载数据。我现在在中国,无法收到电话验证码。您能提供另一个关于这个的链接吗?
听到这个消息很抱歉。我不认为需要电话验证码,我很惊讶。
要继续,您必须通过手机验证您的
Kaggle 账户。
提交手机号码后,您将收到一条短信或语音电话,其中包含一个代码,您将在下一个屏幕上输入该代码。您只需执行此操作一次。请勿使用公共号码或与他人分享您的号码。
可能收取短信和数据费用。
条款和条件
隐私政策
电话号码(包括国家代码)
+1-555-555-5555
通过短信验证
切换到语音
哇,我从未见过这种情况。这一定是新的!很抱歉听到这个消息。
你好。是的,你确实需要一个电话号码才能用新账号下载 Kaggle 数据🙂
如果不行,就联系 Kaggle。
哇,这太疯狂了!
以前从不是这样,也许这是谷歌收购该公司后的新规定。
嗨,Jason,
感谢您的教程。我买了一些您的书,到目前为止,我读过的书都对我的当前项目非常有帮助。
我目前正在进行的项目之一是多元多步时间序列分类预测问题。
我想知道对于这种问题,您会推荐哪些预测方法,就像您在这篇文章中提到的回归问题一样。
先谢谢您了。
也许可以尝试用直接模型建模每个输出,作为比较的基线。
很高兴能找到另一个关于该数据集的教程!谢谢你,杰森!
LSTM 模型是否适合对该数据集进行预测?
没有“适当”这个概念,只有一些模型表现良好,有些则不然。
显然我们需要解决一个非线性问题。
但有趣的是,Huber 回归或带有 Huber 损失的鲁棒回归,是一种旨在对训练数据集中的异常值具有鲁棒性的方法。
ML(例如 DL)适用于处理复杂和非线性问题,但在此问题中表现不佳,
我注意到结果是不合理的。
如何更深入地讨论这些结果?
抱歉,我不明白你的问题,你说的更深入地讨论是什么意思?
你的意思是如何分析结果吗?如果是,以什么方式?
我从这篇文章中学到了很多!
感谢分享
谢谢,很高兴它有帮助。
嗨,Jason,
早上好,
您能告诉我们何时使用哪种时间序列算法吗?比如何时使用 ARIMA?何时使用 LSTM?
您能否告诉我们如何在 Python 中使用 ARIMA 实现多元时间序列?
谢谢
是的,我推荐这个过程
https://machinelearning.org.cn/how-to-develop-a-skilful-time-series-forecasting-model/
谢谢 Jason,
是否可以使用 Python 中的 ARIMA 实现多元时间序列?如果您有此类教程,能否分享一下?
谢谢
是的,那将是 VAR 或 VARIMA。这可能有所帮助。
https://machinelearning.org.cn/time-series-forecasting-methods-in-python-cheat-sheet/
非常感谢!!!
很高兴它有帮助。
是否有关于随机森林处理时间序列的教程?
我没听说过这种方法,抱歉。
你好,Jason。谢谢你的教程!
我是一名学生,刚开始学习机器学习。
我正在按照这个教程进行练习,并想使用 LSTM 模型。
但是,我不知道如何将“supervised_train.npy”的数据形状转换为 LSTM 的三维形状。
我还需要为每个变量和提前时间组合拟合一个 LSTM 模型吗?
谢谢!!QAQ
好问题,我相信这会有帮助
https://machinelearning.org.cn/faq/single-faq/how-do-i-prepare-my-data-for-an-lstm
嗨,Jason,
我在用 SVR(使用 R)预测股市时,我的数据遇到了一个有趣的问题。我发现有趣的是,当我使用 SVR 或 KNN 时,它能工作,但是图表中的输出(测试和正确)会延迟。问题会是什么呢??这意味着它无法提前预测。有什么帮助吗!!!
股市是不可预测的,这将是主要问题。
https://machinelearning.org.cn/faq/single-faq/can-you-help-me-with-machine-learning-for-finance-or-the-stock-market
你好 Jason,再次感谢您出色的“How to”教程!
我想知道您是否有关于使用 SVR 对时间序列数据进行多步预测的教程,或者这是否可行?
谢谢!
是的,你可以使用直接策略
https://machinelearning.org.cn/multi-step-time-series-forecasting/
我相信这里有一个例子
https://machinelearning.org.cn/multi-step-time-series-forecasting-with-machine-learning-models-for-household-electricity-consumption/
很棒的教程……谢谢
谢谢,很高兴它帮到了你!
你好 Jason,很棒的教程,非常感谢!我有一个问题:当我们想用多元时间序列进行时间序列预测时,几个输入指标是否必须预先单独检查与目标特征的相关性(例如 Pearson 相关系数 >0.5)?提前感谢!
也许吧。如果结果是非线性输入组合,可能会产生误导。
我通常建议使用所有/大部分数据拟合模型,让集成/模型发现哪些特征效果最好。
Jason,非常棒的教程。
我现在关心的是为多个 Y 和多个 X 实现预测模型。
数据集对应于世界多个国家/地区这些变量和 17 年的数据(每年一行)
到目前为止,我已对缺失数据进行了插补,对变量进行了相关性分析,并用 Python 设计了一个随机森林回归器。
我正在努力理解随机森林的解释部分。我将所有 X 和所有 Y 分配给回归器,我不确定如何继续解释结果。
也许您可以使用模型在预测上的误差来解释模型的技能?
这是您的意思吗?
嗨,Jason,
在对我的数据集运行线性算法时,我遇到了一个错误,但我不知道如何修复它。错误提示:
回溯(最近一次调用)
文件“compare.py”,第183行,在
evaluate_models(models, train, test, actual)
文件“compare.py”,第164行,在 evaluate_models 中
fits = fit_models(model, train)
文件“compare.py”,第59行,在 fit_models 中
X, y = data[:, :-1], data[:,-1]
IndexError: 数组索引过多
print train 和 test shape 的结果是
((96, 3), (160, 96, 3))
也许可以确认加载的数据符合代码的预期,先处理一个样本,然后逐行执行。
我遇到了同样的问题
print train 和 test shape 的结果是
((6, 3), (20, 6, 3))
我尝试了很多不同的方法。
我查看了您的数据集并调整了所有内容,但我仍然得到 2D,3D 而不是像您那样的 2D,2D。
很抱歉听到这个消息。我这里有一些建议可能会有帮助
https://machinelearning.org.cn/faq/single-faq/can-you-read-review-or-debug-my-code
嗨,Jason博士,
我正在研究一个类似的问题,因此对该问题的跨站点建模很感兴趣。
假设我们有 10 个目标变量,每个站点一个变量。您认为是否可以将目标变量组合成一个跨所有站点的数据集,然后我们可以为所有站点拟合一个单一模型?这样,我们就可以根据直接策略使用该模型预测每个提前期——根据预测范围,我们可以基于该单一模型有多个模型。
如果这种方法可行,我们是否应该忽略由于组合不同站点的变量而导致的日期时间索引上的任何重复?
提前感谢
是的,我这里有一些建议
https://machinelearning.org.cn/faq/single-faq/how-to-develop-forecast-models-for-multiple-sites
你列出的那些方法非常相关。我认为迁移学习策略将在探索该问题中发挥至关重要的作用,不是吗?
或许可以试试看?
谢谢
使用最新版本的 numpy:1.16.3,以下行正在生成一个已记录的错误
值错误:’当 allow_pickle=False 时,无法加载对象数组’
文档:https://stackoverflow.com/questions/55890813/how-to-fix-object-arrays-cannot-be-loaded-when-allow-pickle-false-for-imdb-loa
和
https://github.com/tensorflow/tensorflow/commit/79a8d5cdad942b9853aa70b59441983b42a8aeb3#diff-b0a029ad68170f59173eb2f6660cd8e0
imdb.py 中的更改对我不起作用(可能是多个环境问题)——但下面的代码更新起作用了
当前
train = load(‘AirQualityPrediction/supervised_train.npy’)
test = load(‘AirQualityPrediction/supervised_test.npy’)
更新为
train = load(‘AirQualityPrediction/supervised_train.npy’, allow_pickle=True)
test = load(‘AirQualityPrediction/supervised_test.npy’, allow_pickle=True)
-GM
感谢您这里所有出色的产品和教程。
谢谢,我已经更新了示例,修复了 NumPy 新版本中的 bug。
您能为电力数据集编写同样的教程吗?https://machinelearning.org.cn/multi-step-time-series-forecasting-with-machine-learning-models-for-household-electricity-consumption/ 中关于电力数据的多步教程本质上是单变量预测。您能使用电力数据的所有变量并结合机器学习模型来改进这个教程吗?
感谢您的建议。
嗨,Jason,
这很棒。我在 R 中做了一些关于贝叶斯局部条件自回归 (LCAR) 预测模型的工作。仍在努力得出结论。我很想得到您在 R 教程中关于贝叶斯 LCAR 以及 Python 中类似教程的帮助。
提前非常感谢。
感谢您的建议!
你好,Jason 博士。
您能制作一个递归 LSTM 示例吗?
我的意思是,如果我们想预测 t+2 或 t+3,我们可以使用 t+1 模型输出。
我想知道如何将输出用作输入。
谢谢。
感谢您的建议。
嗨,Jason,很棒的文章。
一个问题,我需要问。您如何评估您的模型是否过拟合?我看到您计算了测试数据上的 MAE。您如何判断接近于零的 MAE 是过拟合的信号还是您做得非常好?
提前感谢。
好问题,请看本教程
https://machinelearning.org.cn/learning-curves-for-diagnosing-machine-learning-model-performance/
请看这里的解决方案
https://machinelearning.org.cn/introduction-to-regularization-to-reduce-overfitting-and-improve-generalization-error/
尊敬的先生
非常感谢您,请告诉我 RNN-LSTM 代码以预测空气污染(预测)
不客气。
谢谢你。我想问一下在多步多变量情况下如何考虑多个变量的顺序。例如,工厂的生产过程有三台机器,每台机器都有温度(T)和速度(S)的特性,T1、S1 -> T2、S2 -> T3、S3。如何考虑顺序?
输入 LSTM 的特征顺序似乎无关紧要。
“开发一个单一模型进行一步预测,并递归使用该模型,其中先前的预测作为输入用于预测后续的提前期。”
非常感谢。当多元数据不是单变量数据时,我们也可以使用递归方法进行预测吗?
是的,请看这个
https://machinelearning.org.cn/multi-step-time-series-forecasting/
谢谢。但是我的意思是,如果我使用多个变量(多个特征)进行预测,我该如何使用递归方法?
与一个特征相同。
也许我不明白你遇到的问题?
非常感谢。我试图解释我的问题。正如您在这篇文章中提到的 (https://machinelearning.org.cn/multivariate-time-series-forecasting-lstms-keras/),我们可以看到 8 个输入变量(输入序列)和 1 个输出变量。如果像下面这样使用递归多步预测方法来预测未来两天的温度:
变量1(t)= 模型(变量1(t-1),变量2(t-1),变量3(t-1),变量4(t-1),变量5(t-1),变量6(t-1),变量7 (t-1),变量8(t-1))
变量1(t+1)= 模型(变量1(t),变量2(t),变量3(t),变量4(t),变量5(t),变量6(t),变量7(t),变量8(t))
但是我们目前没有变量2(t),变量3(t),变量4(t),变量5(t),变量6(t),变量7(t),变量8(t)这些输入参数。我们需要单独预测它们吗?
啊,我明白了,好问题。
你可以尝试预测它们。
你可以尝试用零填充。
你可以尝试保留最后的值。
你可以尝试使用直接方法代替。
我会尝试的。非常感谢
你好 Jason,是否可以用多个时间序列进行时间序列预测?例如,如果你有许多具有不同时间序列的航班?如果你有一些类似的东西的例子!
谢谢!
是的,请看本教程中一个很好的入门示例
https://machinelearning.org.cn/how-to-develop-lstm-models-for-time-series-forecasting/
HI json 我有像站1 站2 站3 温度这样的数据集,所以我需要分别预测所有三个站,但是这些站不相关,所有三个都是独立的站,所以我的问题是我们可以用一个单一模型预测所有三个站吗?还是我们需要建立独立的模型?
好问题,这会给你一些想法。
https://machinelearning.org.cn/faq/single-faq/how-to-develop-forecast-models-for-multiple-sites
很抱歉提出愚蠢的问题。我不确定我是否做错了什么,但我很难找到输入数据来跟着做。网站显示:发生错误:未找到数据
https://www.kaggle.com/c/dsg-hackathon/data
同意,看起来 Kaggle 目前已损坏。
你好 Jason,你有数据集的副本吗?我在 Kaggle 上收到“数据不可用”错误
我有,但它非常大。
也许向 Kaggle 报告故障。
我报告了,但 Kaggle 支持没有回复:(。您能否将其上传到 Kaggle 数据集或 Google Drive 并与我们分享链接?
我不确定我能做到,抱歉。
这是 Kaggle 团队的回复
应竞赛主办方的要求,我们不再提供该竞赛的数据。请考虑尝试其他竞赛:https://www.kaggle.com/competitions 或开放数据集:https://www.kaggle.com/datasets。或者,在我们的数据集论坛上发布请求:https://www.kaggle.com/data。
哦,天哪。
在 kaggle 上创建了关于缺失数据集的帖子:https://www.kaggle.com/data/154202
太棒了!
您可能需要将下载链接更改为
https://archive.ics.uci.edu/ml/datasets/Air+Quality
抱歉 Jason,那个数据集不是我们想要的,没关系🙁
不客气。
那是同一个数据集吗?
你好,Jason 博士!
除了 LSTM 和 CNN,强化学习在时间序列预测方面如何?
我对用于时间序列预测的强化学习没有经验,抱歉。听起来不太适合这些技术。
你好,医生,
我们没有这个模型的数据集。请设法安排一下。
也许继续学习另一个教程
https://machinelearning.org.cn/start-here/#deep_learning_time_series
嗨,Jason,
我正在使用 sklearn 中的 RandomForestRegressor。我用它进行单变量时间序列,效果很好。现在,我正在为多元变量而苦恼。如果有一个关于它的教程,请问您能帮我一下吗?
我的 train_x.shape: (205, 7, 18) train_y.shape: (205, 1)
我正在为数据形状而苦恼,因为我得到了
ValueError: 找到维度为 3 的数组。估计器预期 <= 2。
请问有什么帮助吗?
提前感谢,
也许这会有帮助。
https://machinelearning.org.cn/multi-output-regression-models-with-python/
这是一个非常详细的教程,帮助很大。
我有一个困惑,如果能得到解答将不胜感激。
假设我们已获得历史数据进行预测。
我们有两个选择:
1- 一个开源的自动化时间序列预测工具(免费)
我们输入数据并获得预测
2- 开发我们自己的机器学习模型来做同样的事情。
我感到困惑,如果选项1可用,那么选项2相对于选项1应该有什么优势,我们才会选择它。
或者自动化工具可能缺乏某些功能,比如特征工程因情况而异,它不是通用的,如果我们想充分利用这些工具,我们无论如何都必须在将数据输入它们之前进行自己的特征工程。
非常感谢您的回复
抱歉,我不知道有自动工具。
我很难从 kaggle 访问数据集。正如其他人建议的,我在手机上验证了我的账户,但没有成功。我甚至安装了 kaggle-api,创建了一个令牌,但是当我尝试从命令行访问它时,我得到了 403 错误。还有其他人有这个问题吗?
看来他们可能已经删除了数据集。这真的很可惜。
尝试联系他们的支持。
感谢您的本教程。我是机器学习的初学者,不明白本教程中的一句话。您说:
“当使用朴素模型时,我们只对目标变量感兴趣,而不是任何输入气象变量。因此,我们可以删除输入数据,并使训练和测试数据只包含每个数据块的 39 个目标变量,以及数据块内的位置和观测小时。”
我不明白这一点,因为在任何时候尝试预测污染时,一些气象数据,如风向等,肯定会在这里相关。如果您能指引我到一个更详细解释此内容的链接,我将不胜感激。
不客气。
这只是教程中问题的简化。你可以随意构建问题,我鼓励你这样做!
你好 Jason,你的书和博客到目前为止帮助很大。非常感谢。关于这个教程,数据集在 Kaggle 上不可用。你是否知道其他地方可能可以获得?
谢谢!
也许联系 Kaggle,看看他们是否会再次发布它?
也许尝试一个替代数据集?
也许尝试一个替代教程?
你好 Jason,你能告诉我应该使用哪个神经网络模型来进行多变量天气预测(例如使用温度、湿度、压力等)吗?
好问题,请看这个
https://machinelearning.org.cn/faq/single-faq/what-algorithm-config-should-i-use
还有这个。
https://machinelearning.org.cn/how-to-develop-a-skilful-time-series-forecasting-model/
非常感谢您的快速回复
不客气。
你好 Jason,
你的书和博客太棒了!它们到目前为止提供了很大的帮助。
我正在尝试寻找一个关于开发多元时间序列分类模型的教程或技巧。
我的数据看起来与这里处理的数据(医疗数据)相似,但我想预测一个二元结果。
例如,有不同患者,每个患者有 50 个变量,每个变量在不同时间测量 -> 每位患者有 50 个时间序列。我想预测患者是否患有(或发展出)特定疾病。
我正在为正确的数据准备而苦恼。输入应该是什么样子,LSTM 会是一个好的选择吗?
非常感谢
开发多元多步时间序列预测模型
谢谢!
听起来像是时间序列分类。我建议比较 MLP、CNN、LSTM、ML 和线性模型,然后找出最适合您数据集的模型。
非常感谢您的建议。
这些模型是否也适用于只有少量时间序列(2-3 个时间戳)的情况?
可能不会。
嗨,Jason,
非常感谢您分享这些以及其他 LSTM 时间序列文章。我的问题与数据归一化有关。我看到您采取了两种方法,第一种是将数据分为训练集和测试集,然后将训练数据集拟合到 scaler 并转换测试集。另一种是首先对整个数据集进行归一化,然后将其分为训练集和测试集。我的问题是哪种方法更好。就上下文而言,我将使用 create_dataset() 函数和一个 30 的 lookback 创建一个转换。
前者通常是适当的方法。后者是偷懒,只在教程中使用。
你好 Jason,
我有一个数据集,其中目标变量的数据如下所示:
E_Codes
0
0
0
40
0
0
0
90
0
0
0
10
0
0
这些实际上是生产线上机器生成的错误代码,目标是预测在接下来的“n”个周期内何时会出现特定的错误代码。我见过很多使用 LSTM 进行预测的例子,但所有这些都预测了连续值,如价格、天气或股票等。
这个问题是一种分类,但需要预测而不是简单的分类
您能否告诉我应该遵循哪个教程?我应该如何措辞我的问题才能在 Google 上搜索与此类数据相关的问题?
您可以在这里看到使用深度学习模型进行时间序列预测的示例
https://machinelearning.org.cn/start-here/#deep_learning_time_series
你好 Jason,很棒的教程。也许是个蠢问题,但为什么 Kaggle 获奖者的 MAE 比你评估的随机森林回归器低那么多?
谢谢。
我没有优化模型,只是演示了如何为数据集和问题框架开发模型。
感谢您的本教程,我有一个关于数据块的问题。它们能提高结果的精度吗?或者,我们可以在不分组的数据集上采用相同的方法吗?
不客气。
是的,这只是这个特定数据集是如何准备和可用的一个产物。你可以随意准备数据,我鼓励你这样做,或许可以从这里开始
https://machinelearning.org.cn/start-here/#deep_learning_time_series
你好,Brownlee 博士,
感谢您的详细教程。如果我的数据有不同的块但只有一个目标变量,教程会如何变化?
我想上面有些人有 3D 数组而不是 2D 数组。我也有同样的问题。我的训练和测试形状是
train_data 形状是 (1, 10),其中 1 是目标变量的数量,10 是提前时间
test_data 形状是 (288, 1, 12),其中 288 是块的数量,1 是目标变量的数量,12 是 n_lag?
我的 test_data 形状是 3D 数组,而 train_data 形状似乎是正确的。
也许这会有帮助。
https://machinelearning.org.cn/faq/single-faq/what-is-the-difference-between-samples-timesteps-and-features-for-lstm-input
嗨,Jason,
这是一篇很棒的文章。但我没有看到任何文章/论文在机器学习方法的背景下讨论传统的时间序列检验,例如平稳性。当我集成不同的时间序列并分配权重以获得最终预测时,我是否需要检查时间序列模型(例如 SARIMA、ARIMA 等)的平稳性?或者,由于机器学习可以自我适应,这是否意味着它会演变以预测非平稳数据,例如优化 P、D、Q,这样我就不必担心它的平稳性?
感谢您的想法。
谢谢你。
这取决于数据和模型。如果您首先使序列平稳,大多数模型会表现更好。
嗨,Jason,
这真是一个很棒的教程。有很多东西需要理解。在我正在开发类似模型的同时,我只是想知道为什么您在这里使用前向填充和后向填充来插值 NaN 小时?难道前向填充不足以完成我们的任务吗?
提前感谢!
假设时间序列 x[0], x[1], …, x[n],您只得到奇数数据。然后前向填充有助于我们设置 x[2]=x[1], x[4]=x[3] 等。但是 x[0] 仍然是 NaN,除非您用 x[1] 进行后向填充
是的,我发现了。感谢澄清!!
chunk_ids = unique(values[:, 1])
您的教程甚至从第一条命令开始就无法工作。我们没有定义任何名为 values 的数据帧
你好 Enes,
您是否使用了教程中的完整代码列表?
此致,
感谢您的非常有用的教程!快速提问:监督机器学习可以使用任何设定的时间点来学习,例如第 1、2、5 天来预测第 6 天。像 ARIMA 等经典模型能否做同样的事情,将不连续的时间序列作为输入,或者它必须将连续序列第 1、2、3、4、5 天作为输入来预测第 6 天?
你好 Le……有些时间序列数据是不连续的。
这意味着观测值之间的时间间隔不一致,可能会有所不同。
您可以在这篇文章中了解更多关于连续与不连续时间序列数据集的信息
时间序列预测问题分类
处理这种形式数据的方法有很多,您必须找到适合或最适合您的特定数据集和所选方法的方法。
最常见的方法是将不连续时间序列视为连续的,并将新观测时间点的观测值视为缺失(例如,具有缺失值的连续时间序列)。
您可能想要探索的一些想法包括
忽略问题的非连续性并按原样建模数据。
重采样数据(例如上采样)以使观测值之间具有一致的间隔。
插补观测值以形成一致的时间间隔。
填充观测值以形成一致的间隔,并使用遮罩层忽略填充值。
你好,Jason。我可能对将线性回归方法应用于此数据集有一些疑问。我是机器学习的新手,您能抽时间帮我一下吗?非常感谢!
首先,应用我在课堂上学到的知识,在这个数据集上,我认为创建一个像“variable1~variable2+variable3+…+variable11+Time”这样的模型很重要。但是我已经阅读了您对代码的解释,您是否只使用单变量滞后来进行预测?例如“variable1(t)~variable1(t-1)+variable1(t-2)”。我不确定我是否理解正确,但在这里我有点困惑为什么线性回归没有考虑这12个变量之间的线性关系。期待您的回复,非常感谢!
你好 YIZE……你为这个数据集建立模型了吗?如果建立了,请告诉我们你的发现。
抱歉,我没有对这个数据集进行建模,我只是在学习你的代码。但我很好奇为什么在建立线性模型时没有考虑12个变量之间的线性关系。模型只考虑了每个单一变量的滞后。
嗨,Jason,
感谢您为撰写这篇有益的帖子所付出的努力。
是否可以在进行预测之前对多变量时间序列中的一个系列进行一些操作,目的是期望修改会在预测结果中传播?
再次感谢!
Jason博士,您好,
感谢您提供如此精彩的内容,我学到了很多。
我有一个问题。如果数据集中的位置都在一列中,是否可以对多个站点的空气污染进行时间序列预测?
另外,在提供的数据集中,位置在哪里?每个特征是否在多列中重复,意味着每个位置一列?但我没有看到数据集中给出位置。
非常感谢您的帮助。
你好 Nourhan……每个位置都会有自己的数据集。每列将代表一个特征,每行将代表时间。这意味着每列将代表一个独立的时间序列。
嗨,Jason,
当我运行代码的第一部分时,data_prep函数产生以下错误:
return array(train_data), array(test_data)
ValueError: 用序列设置数组元素。请求的数组在2个维度后具有不均匀的形状。检测到的形状为(39, 10) + 不均匀部分。
看来train_data和test_data列表的形状不一致。
谢谢
你好 Felix……你是复制粘贴的代码还是手动输入的?另外,以下讨论可能对你有用:
https://stackoverflow.com/questions/4674473/valueerror-setting-an-array-element-with-a-sequence
https://www.geeksforgeeks.org/how-to-fix-valueerror-setting-an-array-element-with-a-sequence/
我完全按照原样复制了代码。
问题出在变量train_chunks上
print(array(train_chunks).shape)
ValueError: 用序列设置数组元素。请求的数组在1个维度后具有不均匀的形状。检测到的形状为(207,) + 不均匀部分。
我复制粘贴的代码出现了错误
“—————————————————————————
ValueError 回溯 (最近一次调用)
Cell In[113], line 163
161 # 将训练数据转换为监督学习数据
162 n_lag = 12
–> 163 train_data, test_data = data_prep(train_chunks, n_lag)
164 print(train_data.shape, test_data.shape)
165 # 将训练集和测试集保存到文件
Cell In[113], line 153, in data_prep(chunks, n_lag, n_vars)
151 for lead_time in range(len(lead_times))
152 train_data[var][lead_time] = array(train_data[var][lead_time])
–> 153 return array(train_data), array(test_data)
ValueError: 用序列设置数组元素。请求的数组在2个维度后具有不均匀的形状。检测到的形状为(39, 10) + 不均匀部分。”