序列到序列(seq2seq)模型是处理将一个序列转换为另一个序列的任务的强大架构,例如机器翻译。这些模型采用编码器-解码器架构,其中编码器处理输入序列,解码器根据编码器的输出生成输出序列。注意力机制是为 seq2seq 模型开发的,理解 seq2seq 如何工作有助于阐明注意力机制的原理。在本篇文章中,您将探索如何构建和训练一个简单的 seq2seq 模型,使用 LSTM 进行语言翻译。具体来说:
- 如何使用 PyTorch 中的 LSTM 单元实现编码器-解码器架构
- 如何使用数据集中的句子对来训练模型
- 如何使用 seq2seq 模型生成可变长度序列
让我们开始吧。

构建一个用于语言翻译的普通 Seq2Seq 模型
照片作者:Pourya Gohari。部分权利保留。
概述
本文分为五个部分,它们是:
- 准备训练数据集
- 实现带有 LSTM 的 Seq2Seq 模型
- 训练 Seq2Seq 模型
- 使用 Seq2Seq 模型
- 改进 Seq2Seq 模型
准备训练数据集
在上一篇文章中,您学习了如何构建一个 Transformer 模型来将法语句子翻译成英语。在本篇文章中,您将重用相同的数据集,并为同一任务构建一个 seq2seq 模型。
seq2seq 模型包含两个主要组件:编码器和解码器。编码器处理输入序列(法语句子),并生成一个固定大小的表示,称为上下文向量。然后,解码器使用此上下文向量一次生成一个输出序列(英语句子)。
要训练这样的模型,您需要一个句子对数据集。模型通过数据集中的示例句子对来学习如何翻译。您可以自行搜集数据集。在本篇文章中,您将使用 Anki 数据集,可以从 https://www.manythings.org/anki/ 下载,也可以使用 Google 托管的副本。
1 2 3 4 5 6 7 8 |
import os import requests if not os.path.exists("fra-eng.zip"): url = "http://storage.googleapis.com/download.tensorflow.org/data/fra-eng.zip" response = requests.get(url) with open("fra-eng.zip", "wb") as f: f.write(response.content) |
这就是您可以使用 requests
库在 Python 中下载文件的方法。这个 zip 文件只包含一个文件,fra.txt
,这是一个纯文本文件。每一行包含一个英语句子,后面跟着一个制表符,然后是一个对应的法语句子。
为了使数据对训练有用,需要对其进行规范化。首先,法语句子是 Unicode 格式的,但某些字符可能有多种表示形式。为了帮助您的模型更好地理解句子,您需要规范化 Unicode 表示形式,例如 NFKC。您还可以将字母转换为小写,以减小词汇量的大小(因为模型会将不同大小写的同一单词视为不同的单词)。您可以读取句子对并执行规范化,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import unicodedata import zipfile def normalize(line): """规范化一行文本并在制表符处分成两部分""" line = unicodedata.normalize("NFKC", line.strip().lower()) eng, fra = line.split("\t") return eng.lower().strip(), fra.lower().strip() text_pairs = [] with zipfile.ZipFile("fra-eng.zip", "r") as zip_ref: for line in zip_ref.read("fra.txt").decode("utf-8").splitlines(): eng, fra = normalize(line) text_pairs.append((eng, fra)) |
您将构建的 seq2seq 模型是使用 LSTM 的。它是一种循环神经网络,可以处理可变长度的序列。它不能直接处理单词序列,而是需要先将它们分词并编码为数字形式。您可以创建一个词典作为分词器,将词汇表中的每个单词映射到一个唯一的整数。您也可以使用更高级的技术,如字节对编码(BPE),通过识别子词单元来更有效地处理未知单词。让我们分别创建英语和法语的独立分词器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
import os import tokenizers if os.path.exists("en_tokenizer.json") and os.path.exists("fr_tokenizer.json"): en_tokenizer = tokenizers.Tokenizer.from_file("en_tokenizer.json") fr_tokenizer = tokenizers.Tokenizer.from_file("fr_tokenizer.json") else: en_tokenizer = tokenizers.Tokenizer(tokenizers.models.BPE()) fr_tokenizer = tokenizers.Tokenizer(tokenizers.models.BPE()) # Configure pre-tokenizer to split on whitespace and punctuation, add space at beginning of the sentence en_tokenizer.pre_tokenizer = tokenizers.pre_tokenizers.ByteLevel(add_prefix_space=True) fr_tokenizer.pre_tokenizer = tokenizers.pre_tokenizers.ByteLevel(add_prefix_space=True) # Configure decoder: So that word boundary symbol "Ġ" will be removed en_tokenizer.decoder = tokenizers.decoders.ByteLevel() fr_tokenizer.decoder = tokenizers.decoders.ByteLevel() # Train BPE for English and French using the same trainer VOCAB_SIZE = 8000 trainer = tokenizers.trainers.BpeTrainer( vocab_size=VOCAB_SIZE, special_tokens=["[start]", "[end]", "[pad]"], show_progress=True ) en_tokenizer.train_from_iterator([x[0] for x in text_pairs], trainer=trainer) fr_tokenizer.train_from_iterator([x[1] for x in text_pairs], trainer=trainer) en_tokenizer.enable_padding(pad_id=en_tokenizer.token_to_id("[pad]"), pad_token="[pad]") fr_tokenizer.enable_padding(pad_id=fr_tokenizer.token_to_id("[pad]"), pad_token="[pad]") # Save the trained tokenizers en_tokenizer.save("en_tokenizer.json", pretty=True) fr_tokenizer.save("fr_tokenizer.json", pretty=True) |
这里,BPE 分词器来自 tokenizers
库。训练好的分词器被保存到 en_tokenizer.json
和 fr_tokenizer.json
以供将来使用。要训练 BPE,您需要指定最大词汇量大小。上面的代码将其设置为 8000,这是一个较小的数字(考虑到该数据集大约有 15,000 个英语单词和 30,000 个法语单词)。如果您认为模型翻译效果不佳,可以增加词汇量大小。上面的 BPE 实现中有一些特殊的处理:
- 默认情况下,预分词器会在空格和标点符号处分割文本。但您还在句子开头添加了一个空格,以便所有单词都以空格为前缀。这有助于重用词汇,而与单词在句子中的位置无关。
- 词汇表中添加了三个特殊标记:
[start]
、[end]
和[pad]
。这些标记在分词器训练之前添加。特别是[pad]
标记被设置为填充标记,用于将句子填充到更长的序列长度。
BPE 分词器是从数据集训练的,存储在字符串对列表 text_pairs
中。上面的代码对两种语言使用了相同的训练器,但分词器是分开的。
分词器训练完成后,您可以对一些句子进行测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# Test the tokenizer print("Sample tokenization:") en_sample, fr_sample = random.choice(text_pairs) encoded = en_tokenizer.encode(en_sample) print(f"Original: {en_sample}") print(f"Tokens: {encoded.tokens}") print(f"IDs: {encoded.ids}") print(f"Decoded: {en_tokenizer.decode(encoded.ids)}") print() encoded = fr_tokenizer.encode("[start] " + fr_sample + " [end]") print(f"Original: {fr_sample}") print(f"Tokens: {encoded.tokens}") print(f"IDs: {encoded.ids}") print(f"Decoded: {fr_tokenizer.decode(encoded.ids)}") print() |
输出将如下所示:
1 2 3 4 5 6 7 8 9 10 |
样本分词 原始:it happens to all of us. Token:['Ġit', 'Ġhappens', 'Ġto', 'Ġall', 'Ġof', 'Ġus', '.'] ID:[124, 1689, 80, 208, 128, 238, 12] 解码:it happens to all of us. 原始:ça nous arrive à tous. Token:['[start]', 'Ġça', 'Ġnous', 'Ġarrive', 'ĠÃł', 'Ġtous', '.', 'Ġ', '[end]'] ID:[0, 220, 159, 1621, 123, 392, 14, 74, 1] 解码:ça nous arrive à tous. |
Seq2Seq 架构与 LSTM
传统上,使用神经网络处理任意长度序列需要循环神经网络(RNN)架构。它是一种神经网络,其中一个模块维护一个隐藏状态,并在处理序列时更新它。
有几种模块可用于实现 RNN。LSTM 是其中之一。构建一个简单的 LSTM 编码器来处理输入序列非常直接:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import torch import torch.nn as nn class EncoderLSTM(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1, dropout=0.1): super().__init__() self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.num_layers = num_layers self.embedding = nn.Embedding(vocab_size, embedding_dim) self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0) def forward(self, input_seq): embedded = self.embedding(input_seq) outputs, (hidden, cell) = self.lstm(embedded) return outputs, hidden, cell |
LSTM 的特殊之处在于它有两个隐藏状态,在上面的代码中分别命名为 hidden
和 cell
。在 PyTorch 中,您无需实现循环结构。nn.LSTM
模块可以很好地处理这个问题。
在上面的实现中,您将 seq2seq 模型的一部分编码器实现为一个派生自 nn.Module
的类。您期望传入一个整数 ID 的二维张量作为输入序列批次。这个输入将被转换为三维张量,将每个 token ID 替换为嵌入向量。在上面的 forward()
函数中,变量 embedded
是一个形状为 (batch_size, seq_len, embedding_dim)
的三维张量。然后,它被 LSTM 模块处理。LSTM 模块的输出是一个形状为 (batch_size, seq_len, hidden_dim)
的三维张量,它对应于 LSTM 在处理输入序列的每个步骤的隐藏状态。最终的隐藏状态和单元状态也会被返回。
请注意,您创建 LSTM 模块时设置了 batch_first=True
,这意味着输入张量的第一个维度是批次大小。这在语言数据中是一种常见的约定。此模块还将 LSTM 的 num_layers
设置为默认值 1。人们认为多层 LSTM 功能更强大;但是,您将构建一个更大的模型,这需要更长的训练时间。
创建 seq2seq 模型的一部分解码器是类似的,除了您还需要产生输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
class DecoderLSTM(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1, dropout=0.1): super().__init__() self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.num_layers = num_layers self.embedding = nn.Embedding(vocab_size, embedding_dim) self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0) self.out = nn.Linear(embedding_dim, vocab_size) def forward(self, input_seq, hidden, cell): embedded = self.embedding(input_seq) output, (hidden, cell) = self.lstm(embedded, (hidden, cell)) prediction = self.out(output) return prediction, hidden, cell |
解码器 LSTM 与编码器 LSTM 类似。在 forward()
方法中,输入序列是部分目标序列,隐藏和单元状态是编码器 LSTM 模块的最后一个隐藏和单元状态。当调用解码器的 LSTM 模块时,将使用编码器的隐藏和单元状态。如果未提供,隐藏和单元状态将被初始化为零,就像在编码器中一样。
forward 方法的输入是一个二维的 token ID 张量。在 LSTM 模块使用它之前,嵌入层需要将其转换为一个三维张量。LSTM 模块的输出是隐藏状态序列。它们应该通过一个线性层转换为一个 logit 向量,以预测下一个 token。
decoder 模块的设计要求你传递一个形状为 (batch_size, seq_len)
的部分目标序列。forward()
方法返回一个形状为 (batch, seq_len+1, hidden_dim)
的预测序列,这是 LSTM 模块的输出,经过了线性层的转换。你取序列长度维度上的最后一个 token 作为预测的下一个 token。你需要多次调用 decoder 模块来生成整个目标序列。
要构建一个完整的 seq2seq 模型,你需要连接 encoder 和 decoder 模块。以下是如何做到这一点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
class Seq2SeqLSTM(nn.Module): def __init__(self, encoder, decoder): super().__init__() self.encoder = encoder self.decoder = decoder def forward(self, input_seq, target_seq): batch_size, target_len = target_seq.shape device = target_seq.device outputs = [] _enc_out, hidden, cell = self.encoder(input_seq) dec_in = target_seq[:, :1] for t in range(target_len-1): pred, hidden, cell = self.decoder(dec_in, hidden, cell) pred = pred[:, -1:, :] outputs.append(pred) dec_in = torch.cat([dec_in, pred.argmax(dim=2)], dim=1) outputs = torch.cat(outputs, dim=1) return outputs |
此模块仅连接 encoder 和 decoder 模块。forward()
方法的创建是为了帮助训练模型。它以输入序列(英语)和目标序列(法语)作为输入。英语句子将通过 encoder 转换为“上下文向量”。encoder 还输出一个处理过的序列,但它不被使用。
decoder 在其 LSTM 模块中设置了由 encoder 提供的上下文向量。然后处理部分目标序列以产生下一个 token 的预测。最初,decoder 以特殊 token [start]
开始。迭代地,它一次生成一个 token,直到目标序列的长度被填满。
请注意,上面的模型不读取目标序列的内容,而是使用其长度来控制迭代次数。另外,请注意,在对 forward()
方法的单个调用中,会多次调用同一个 decoder。
训练 Seq2Seq 模型
要构建一个完整的 seq2seq 模型,你需要创建一个数据集对象,以便你可以按批次和随机顺序迭代数据集。你已经在上一节收集了数据并将其存储为 text_pairs
。PyTorch 提供了 Dataset
类来帮助你对数据进行洗牌和分批。这是如何创建数据集对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
import torch from torch.utils.data import Dataset, DataLoader class TranslationDataset(Dataset): def __init__(self, text_pairs): self.text_pairs = text_pairs def __len__(self): return len(self.text_pairs) def __getitem__(self, idx): en, fr = self.text_pairs[idx] return eng, "[start] " + fra + " [end]" def collate_fn(batch): en_str, fr_str = zip(*batch) en_enc = en_tokenizer.encode_batch(en_str, add_special_tokens=True) fr_enc = fr_tokenizer.encode_batch(fr_str, add_special_tokens=True) en_ids = [enc.ids for enc in en_enc] fr_ids = [enc.ids for enc in fr_enc] return torch.tensor(en_ids), torch.tensor(fr_ids) BATCH_SIZE = 32 dataset = TranslationDataset(text_pairs) dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn) |
你可以尝试打印数据集中的一个样本
1 2 3 4 |
for en_ids, fr_ids in dataloader: print(f"English: {en_ids}") print(f"French: {fr_ids}") break |
dataloader
对象是一个可迭代对象,它以随机顺序扫描整个数据集。它返回一个包含两个张量的元组,每个张量的形状为 (batch_size, seq_len)
。你会发现打印的两个张量都是整数,因为 token ID 用整数表示。
dataloader 是使用 collate_fn()
函数创建的。PyTorch dataloader 只将数据集对象中的元素收集到一个列表中,这里每个元素是一个包含两个字符串的元组。collate 函数使用 BPE tokenizers 将字符串转换为 token ID,然后创建一个 PyTorch 张量。
下一步是创建一个模型。这很简单:
1 2 3 4 5 6 7 8 9 10 |
... device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') emb_dim = 256 hidden_dim = 256 num_layers = 1 encoder = EncoderLSTM(enc_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device) decoder = DecoderLSTM(dec_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device) model = Seq2SeqLSTM(encoder, decoder).to(device) print(model) |
这将打印:
1 2 3 4 5 6 7 8 9 10 11 |
Seq2SeqLSTM( (encoder): EncoderLSTM( (embedding): Embedding(8000, 256) (lstm): LSTM(256, 256, batch_first=True) ) (decoder): DecoderLSTM( (embedding): Embedding(8000, 256) (lstm): LSTM(256, 256, batch_first=True) (out): Linear(in_features=256, out_features=8000, bias=True) ) ) |
因此,你可以看到模型非常简单。事实上,它只有 700 万个参数,但它足够大,需要相当长的时间来训练。
训练代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
optimizer = optim.Adam(model.parameters(), lr=0.001) loss_fn = nn.CrossEntropyLoss(ignore_index=fr_tokenizer.token_to_id("[pad]")) N_EPOCHS = 30 for epoch in range(N_EPOCHS): model.train() epoch_loss = 0 for en_ids, fr_ids in dataloader: # 将“句子”移到设备上 en_ids = en_ids.to(device) fr_ids = fr_ids.to(device) # 清零梯度,然后进行前向传播 optimizer.zero_grad() outputs = model(en_ids, fr_ids) # 计算损失:比较 3D logits 和 2D 目标 loss = loss_fn(outputs.reshape(-1, dec_vocab), fr_ids[:, 1:].reshape(-1)) loss.backward() optimizer.step() epoch_loss += loss.item() print(f"Epoch {epoch+1}/{N_EPOCHS}; Avg loss {epoch_loss/len(dataloader)}; Latest loss {loss.item()}") torch.save(model.state_dict(), f"seq2seq-epoch-{epoch+1}.pth") # Test if (epoch+1) % 5 != 0: continue model.eval() epoch_loss = 0 with torch.no_grad(): for en_ids, fr_ids in dataloader: en_ids = en_ids.to(device) fr_ids = fr_ids.to(device) outputs = model(en_ids, fr_ids) loss = loss_fn(outputs.reshape(-1, dec_vocab), fr_ids[:, 1:].reshape(-1)) epoch_loss += loss.item() print(f"Eval loss: {epoch_loss/len(dataloader)}") |
这是一个简单的训练循环,许多改进的训练技术都没有实现。例如,没有使用数据的训练-测试分割、提前停止和梯度裁剪。它所做的是按批次读取数据集,然后通过前向和后向传播运行模型,然后更新模型参数。
使用的损失函数是交叉熵,因为模型需要预测词汇表中下一个 token。当你创建整个模型时,它会生成与目标序列长度相匹配的整个输出序列。因此,损失函数可以一次性比较整个序列,而不是逐个 token 计算损失。但是,在此应用中,张量是序列的批次,序列将被填充以匹配最长的长度。序列应该以 [end]
token 终止。填充 token 的位置应该包含在总的损失计算中。这就是为什么在创建具有 nn.CrossEntropyLoss()
的损失函数时使用 ignore_index
参数。
如果你有单独的测试集,你可以用它来进行评估。在上面,你在 for 循环的后半部分每 5 个 epoch 重用了训练数据进行评估。请记住在 model.train()
和 model.eval()
之间切换模型以获得正确的训练/推理行为。
使用模型
在上面的代码中,你使用 torch.save()
在每个 epoch 结束时保存了模型。当你有了模型文件,你可以用以下方式将其加载回来:
1 2 3 4 5 6 |
... encoder = EncoderLSTM(enc_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device) decoder = DecoderLSTM(dec_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device) model = Seq2SeqLSTM(encoder, decoder).to(device) model.load_state_dict(torch.load("seq2seq.pth")) |
有了训练好的模型,你可以用它来生成翻译。但是,你不能使用与训练时相同的 forward()
方法。相反,你需要使用一个循环多次调用 decoder,直到目标序列被生成。
以下是如何执行原始数据集中一些随机句子的翻译的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
import random model.eval() N_SAMPLES = 5 MAX_LEN = 60 with torch.no_grad(): start_token = torch.tensor([fr_tokenizer.token_to_id("[start]")]).to(device) for en, true_fr in random.sample(text_pairs, N_SAMPLES): en_ids = torch.tensor(en_tokenizer.encode(en).ids).unsqueeze(0).to(device) _output, hidden, cell = model.encoder(en_ids) pred_ids = [start_token] for _ in range(MAX_LEN): decoder_input = torch.tensor(pred_ids).unsqueeze(0).to(device) output, hidden, cell = model.decoder(decoder_input, hidden, cell) output = output[:, -1, :].argmax(dim=1) pred_ids.append(output.item()) # stop if the predicted token is the end token if pred_ids[-1] == fr_tokenizer.token_to_id("[end]"): break # Decode the predicted IDs pred_fr = fr_tokenizer.decode(pred_ids) print(f"English: {en}") print(f"French: {true_fr}") print(f"Predicted: {pred_fr}") print() |
首先,将模型切换到评估模式,并在 torch.no_grad()
上下文中运行。这将节省时间和内存。
你使用 random.sample()
从数据集中选择几个样本。输入句子(英语)被 tokenized 并编码到张量 en_ids
中。它是一个形状为 (1, seq_len)
的二维张量,因为模型总是期望一个序列批次,即使批次大小为 1。
你将英语句子通过模型的 encoder 运行,以提取上下文向量,该向量代表 LSTM 模块的最终状态。然后,你从特殊 token [start]
开始,并在循环中生成法语句子。
这是一个典型的 seq2seq 模型使用循环。你期望模型最终生成 [end]
token;否则,当生成的序列长度达到最大长度时,你将停止生成。在循环的每次迭代中,你为 decoder 创建一个新的输入张量。然后 decoder 将生成一个额外的 token,作为 decoder 输出序列的最后一个 token。这个输出是词汇表大小的 logit 向量。你通过 PyTorch 中的 argmax()
方法选择概率最高的 token 作为下一个 token。
列表 pred_ids
累积了 token ID 列表。循环的每次迭代都基于此列表生成 decoder 的输入张量。当循环终止时,你再次运行 tokenizer 将 token ID 转换为句子字符串。
当你运行上面的代码时,你可能会看到以下输出:
1 2 3 4 5 6 7 |
English: it was his silence that made her angry. French: ce fut son silence qui la mit en colère. Predicted: ce fut son silence qui qui mit colère colère. English: you're the teacher. French: tu es le professeur. Predicted: c'est professeur. |
改进模型
以上概述了如何构建一个用于翻译的朴素 seq2seq LSTM 模型。正如你从上面看到的,输出并不完美。有几种方法可以改进它:
- 改进 tokenizer:使用的词汇量很小,这可能会限制模型理解词义的能力。通过纳入更大的词汇量可以改进模型。但这可能需要更多训练数据。
- 使用更大的模型:上面使用了一个 LSTM 层,使用更多层可能会带来改进。你也可以在 LSTM 模块中添加 dropout 来防止在层数更多时过拟合。
- 改进训练:将数据集分割成训练集和测试集,并使用测试集来评估模型。这样更容易确定哪一个 epoch 产生了最佳模型,从而可以使用它进行推理或提前停止训练。通过监控测试集的损失,你也可以判断模型是否已收敛。
- 尝试不同的解码器模型:上面的解码器以编码器的状态作为初始状态,运行整个目标部分序列。另一种方法是,你只将最后一个 token 传递给解码器来生成下一个 token。两者的区别在于,后者直接使用初始状态生成下一个 token,而前者在扫描先前生成的序列时会改变状态。据认为,当序列很长时,循环神经网络很容易“忘记”初始状态(即上下文向量)。
为了完整起见,以下是您在此帖子中创建的完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 |
import random import os import re import unicodedata import zipfile import requests import torch import torch.nn as nn import torch.optim as optim import tokenizers import tqdm # # Data preparation # # Download dataset provided by Anki: https://www.manythings.org/anki/ with requests if not os.path.exists("fra-eng.zip"): url = "http://storage.googleapis.com/download.tensorflow.org/data/fra-eng.zip" response = requests.get(url) with open("fra-eng.zip", "wb") as f: f.write(response.content) # Normalize text # each line of the file is in the format "<english>\t<french>" # We convert text to lowercasee, normalize unicode (UFKC) def normalize(line): """规范化一行文本并在制表符处分成两部分""" line = unicodedata.normalize("NFKC", line.strip().lower()) eng, fra = line.split("\t") return eng.lower().strip(), fra.lower().strip() text_pairs = [] with zipfile.ZipFile("fra-eng.zip", "r") as zip_ref: for line in zip_ref.read("fra.txt").decode("utf-8").splitlines(): eng, fra = normalize(line) text_pairs.append((eng, fra)) # # 使用 BPE 进行分词 # if os.path.exists("en_tokenizer.json") and os.path.exists("fr_tokenizer.json"): en_tokenizer = tokenizers.Tokenizer.from_file("en_tokenizer.json") fr_tokenizer = tokenizers.Tokenizer.from_file("fr_tokenizer.json") else: en_tokenizer = tokenizers.Tokenizer(tokenizers.models.BPE()) fr_tokenizer = tokenizers.Tokenizer(tokenizers.models.BPE()) # Configure pre-tokenizer to split on whitespace and punctuation, add space at beginning of the sentence en_tokenizer.pre_tokenizer = tokenizers.pre_tokenizers.ByteLevel(add_prefix_space=True) fr_tokenizer.pre_tokenizer = tokenizers.pre_tokenizers.ByteLevel(add_prefix_space=True) # Configure decoder: So that word boundary symbol "Ġ" will be removed en_tokenizer.decoder = tokenizers.decoders.ByteLevel() fr_tokenizer.decoder = tokenizers.decoders.ByteLevel() # Train BPE for English and French using the same trainer VOCAB_SIZE = 8000 trainer = tokenizers.trainers.BpeTrainer( vocab_size=VOCAB_SIZE, special_tokens=["[start]", "[end]", "[pad]"], show_progress=True ) en_tokenizer.train_from_iterator([x[0] for x in text_pairs], trainer=trainer) fr_tokenizer.train_from_iterator([x[1] for x in text_pairs], trainer=trainer) en_tokenizer.enable_padding(pad_id=en_tokenizer.token_to_id("[pad]"), pad_token="[pad]") fr_tokenizer.enable_padding(pad_id=fr_tokenizer.token_to_id("[pad]"), pad_token="[pad]") # Save the trained tokenizers en_tokenizer.save("en_tokenizer.json", pretty=True) fr_tokenizer.save("fr_tokenizer.json", pretty=True) # Test the tokenizer print("Sample tokenization:") en_sample, fr_sample = random.choice(text_pairs) encoded = en_tokenizer.encode(en_sample) print(f"Original: {en_sample}") print(f"Tokens: {encoded.tokens}") print(f"IDs: {encoded.ids}") print(f"Decoded: {en_tokenizer.decode(encoded.ids)}") print() encoded = fr_tokenizer.encode("[start] " + fr_sample + " [end]") print(f"Original: {fr_sample}") print(f"Tokens: {encoded.tokens}") print(f"IDs: {encoded.ids}") print(f"Decoded: {fr_tokenizer.decode(encoded.ids)}") print() # # Create PyTorch dataset for the BPE-encoded translation pairs # class TranslationDataset(torch.utils.data.Dataset): def __init__(self, text_pairs): self.text_pairs = text_pairs def __len__(self): return len(self.text_pairs) def __getitem__(self, idx): eng, fra = self.text_pairs[idx] return eng, "[start] " + fra + " [end]" def collate_fn(batch): en_str, fr_str = zip(*batch) en_enc = en_tokenizer.encode_batch(en_str, add_special_tokens=True) fr_enc = fr_tokenizer.encode_batch(fr_str, add_special_tokens=True) en_ids = [enc.ids for enc in en_enc] fr_ids = [enc.ids for enc in fr_enc] return torch.tensor(en_ids), torch.tensor(fr_ids) BATCH_SIZE = 32 dataset = TranslationDataset(text_pairs) dataloader = torch.utils.data.DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn) # 测试数据集 for en_ids, fr_ids in dataloader: print(f"English: {en_ids}") print(f"French: {fr_ids}") break # # 创建用于翻译的 LSTM 序到序模型 # class EncoderLSTM(nn.Module): """带嵌入层的堆叠 LSTM 编码器""" def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1, dropout=0.1): """ 使用普通的 LSTM。不使用双向 LSTM。 参数 vocab_size: 输入词汇表的大小 embedding_dim: 嵌入向量的维度 hidden_dim: 隐藏状态的维度 num_layers: 循环层的数量(堆叠 LSTM 的层数) dropout: dropout 率,应用于除最后一层外的所有 LSTM 层 """ super().__init__() self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.num_layers = num_layers self.embedding = nn.Embedding(vocab_size, embedding_dim) self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0) def forward(self, input_seq): # input seq = [batch_size, seq_len] -> embedded = [batch_size, seq_len, embedding_dim] embedded = self.embedding(input_seq) # outputs = [batch_size, seq_len, embedding_dim] # hidden = cell = [n_layers, batch_size, hidden_dim] outputs, (hidden, cell) = self.lstm(embedded) return outputs, hidden, cell class DecoderLSTM(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1, dropout=0.1): super().__init__() self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.num_layers = num_layers self.embedding = nn.Embedding(vocab_size, embedding_dim) self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0) self.out = nn.Linear(embedding_dim, vocab_size) def forward(self, input_seq, hidden, cell): # input seq = [batch_size, seq_len] -> embedded = [batch_size, seq_len, embedding_dim] # hidden = cell = [n_layers, batch_size, hidden_dim] embedded = self.embedding(input_seq) # output = [batch_size, seq_len, embedding_dim] output, (hidden, cell) = self.lstm(embedded, (hidden, cell)) prediction = self.out(output) return prediction, hidden, cell class Seq2SeqLSTM(nn.Module): def __init__(self, encoder, decoder): super().__init__() self.encoder = encoder self.decoder = decoder def forward(self, input_seq, target_seq): """根据部分目标序列,预测下一个 token""" # input seq = [batch_size, seq_len] # target seq = [batch_size, seq_len] batch_size, target_len = target_seq.shape device = target_seq.device # 存储输出 logits outputs = [] # 编码器前向传播 _enc_out, hidden, cell = self.encoder(input_seq) dec_in = target_seq[:, :1] # 解码器前向传播 for t in range(target_len-1): # 最后一个目标 token 和隐藏状态 -> 下一个 token pred, hidden, cell = self.decoder(dec_in, hidden, cell) # 存储预测 pred = pred[:, -1:, :] outputs.append(pred) # 使用预测的 token 作为下一个输入 dec_in = torch.cat([dec_in, pred.argmax(dim=2)], dim=1) outputs = torch.cat(outputs, dim=1) return outputs # 初始化模型参数 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') enc_vocab = len(en_tokenizer.get_vocab()) dec_vocab = len(fr_tokenizer.get_vocab()) emb_dim = 256 hidden_dim = 256 num_layers = 2 dropout = 0.1 # 创建模型 encoder = EncoderLSTM(enc_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device) decoder = DecoderLSTM(dec_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device) model = Seq2SeqLSTM(encoder, decoder).to(device) print(model) print("Model created with:") print(f" 输入词汇表大小: {enc_vocab}") print(f" 输出词汇表大小: {dec_vocab}") print(f" 嵌入维度: {emb_dim}") print(f" 隐藏维度: {hidden_dim}") print(f" 层数: {num_layers}") print(f" Dropout: {dropout}") print(f" Total parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad)}") # 除非 model.pth 存在,否则进行训练 if os.path.exists("seq2seq.pth"): model.load_state_dict(torch.load("seq2seq.pth")) else: optimizer = optim.Adam(model.parameters(), lr=0.001) loss_fn = nn.CrossEntropyLoss(ignore_index=fr_tokenizer.token_to_id("[pad]")) N_EPOCHS = 30 for epoch in range(N_EPOCHS): model.train() epoch_loss = 0 for en_ids, fr_ids in tqdm.tqdm(dataloader, desc="Training"): # 将“句子”移到设备 en_ids = en_ids.to(device) fr_ids = fr_ids.to(device) # 梯度清零,然后进行前向传播 optimizer.zero_grad() outputs = model(en_ids, fr_ids) # 计算损失:比较 3D logits 和 2D targets loss = loss_fn(outputs.reshape(-1, dec_vocab), fr_ids[:, 1:].reshape(-1)) loss.backward() optimizer.step() epoch_loss += loss.item() print(f"Epoch {epoch+1}/{N_EPOCHS}; Avg loss {epoch_loss/len(dataloader)}; Latest loss {loss.item()}") torch.save(model.state_dict(), f"seq2seq-epoch-{epoch+1}.pth") # 测试 if (epoch+1) % 5 != 0: continue model.eval() epoch_loss = 0 with torch.no_grad(): for en_ids, fr_ids in tqdm.tqdm(dataloader, desc="Evaluating"): en_ids = en_ids.to(device) fr_ids = fr_ids.to(device) outputs = model(en_ids, fr_ids) loss = loss_fn(outputs.reshape(-1, dec_vocab), fr_ids[:, 1:].reshape(-1)) epoch_loss += loss.item() print(f"Eval loss: {epoch_loss/len(dataloader)}") # 保存最终模型 torch.save(model.state_dict(), "seq2seq.pth") # 测试几个样本 model.eval() N_SAMPLES = 5 MAX_LEN = 60 with torch.no_grad(): start_token = torch.tensor([fr_tokenizer.token_to_id("[start]")]).to(device) for en, true_fr in random.sample(text_pairs, N_SAMPLES): en_ids = torch.tensor(en_tokenizer.encode(en).ids).unsqueeze(0).to(device) _output, hidden, cell = model.encoder(en_ids) pred_ids = [start_token] for _ in range(MAX_LEN): decoder_input = torch.tensor(pred_ids).unsqueeze(0).to(device) output, hidden, cell = model.decoder(decoder_input, hidden, cell) output = output[:, -1, :].argmax(dim=1) pred_ids.append(output.item()) # 如果预测的 token 是结束 token,则提前停止 if pred_ids[-1] == fr_tokenizer.token_to_id("[end]"): break # Decode the predicted IDs pred_fr = fr_tokenizer.decode(pred_ids) print(f"English: {en}") print(f"French: {true_fr}") print(f"Predicted: {pred_fr}") print() |
进一步阅读
以下是一些您可能会觉得有用的资源:
总结
在这篇文章中,您学习了如何使用 LSTM 构建和训练一个用于英法翻译的序到序模型。具体来说,您学习了:
- 编码器-解码器架构如何与 LSTM 单元协同工作
- 如何准备数据集以训练序到序模型
- 如何在 PyTorch 中实现和训练完整的翻译模型
实现很简单,但它概述了序到序模型的一般机制。
暂无评论。