Bahdanau 等人在 2014 年提出的注意力机制显著改进了序列到序列(seq2seq)模型。在本篇文章中,您将学习如何构建和训练一个带注意力机制的 seq2seq 模型来进行语言翻译,重点关注:
- 注意力机制为何至关重要
- 如何在 seq2seq 模型中实现注意力
让我们开始吧。

构建带注意力机制的 Seq2Seq 模型用于语言翻译
照片由 Esther T 拍摄。部分权利保留。
概述
本文分为四个部分;它们是:
- 为何注意力很重要:基础 Seq2Seq 模型的局限性
- 实现带注意力的 Seq2Seq 模型
- 训练和评估模型
- 使用模型
为何注意力很重要:基础 Seq2Seq 模型的局限性
传统的 seq2seq 模型采用编码器-解码器架构,其中编码器将输入序列压缩成一个单一的上下文向量,解码器然后利用该向量生成输出序列。这种方法有一个关键的局限性:无论输出序列的长度如何,解码器都必须依赖于这个单一的上下文向量。
这对于长序列来说会变得很麻烦,因为模型难以保留序列早期部分的重要细节。以英语到法语翻译为例:解码器使用上下文向量作为其初始状态来生成第一个词元,然后将每个前一个输出作为后续词元的输入。随着隐藏状态的更新,解码器会逐渐丢失原始上下文向量的信息。
注意力机制通过以下方式解决此问题:
- 在生成过程中让解码器能够访问所有编码器的隐藏状态
- 允许模型为每个输出词元聚焦于相关的输入部分
- 消除对单一上下文向量的依赖
实现带注意力的 Seq2Seq 模型
让我们按照 Bahdanau 等人 (2014) 的方法来实现一个带注意力的 seq2seq 模型。您将使用 GRU(门控循环单元)模块而不是 LSTM,因为它们更简单且训练速度更快,同时保持了相当的性能。
与训练数据集相同,并且与 上一篇文章中的普通 seq2seq 模型中的编码器类似,编码器的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
class EncoderRNN(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim, dropout=0.1): super().__init__() self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.embedding = nn.Embedding(vocab_size, embedding_dim) self.rnn = nn.GRU(embedding_dim, hidden_dim, batch_first=True) self.dropout = nn.Dropout(dropout) def forward(self, input_seq): embedded = self.dropout(self.embedding(input_seq)) outputs, hidden = self.rnn(embedded) return outputs, hidden |
dropout 模块通过应用于嵌入层输出来防止过拟合。RNN 使用 nn.GRU
并设置 batch_first=True
来接受形状为 (batch_size, seq_len, embedding_dim)
的输入。编码器的 forward()
方法返回:
- 一个形状为
(batch_size, seq_len, hidden_dim)
的 3D 张量,包含 RNN 输出 - 一个形状为
(1, batch_size, hidden_dim)
的 2D 张量,包含最终的隐藏状态
Bahdanau 注意力机制与现代 Transformer 注意力不同。这是它的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
class BahdanauAttention(nn.Module): def __init__(self, hidden_size): super(BahdanauAttention, self).__init__() self.Wa = nn.Linear(hidden_size, hidden_size) self.Ua = nn.Linear(hidden_size, hidden_size) self.Va = nn.Linear(hidden_size, 1) def forward(self, query, keys): scores = self.Va(torch.tanh(self.Wa(query) + self.Ua(keys))) scores = scores.transpose(1,2) # scores 的形状 = [B, 1, S] weights = F.softmax(scores, dim=-1) context = torch.bmm(weights, keys) return context, weights |
注意力机制在数学上定义为:
$$
y = \textrm{softmax}\big(W^V \tanh(W^Q Q + W^K K)\big) K
$$
与缩放点积注意力不同,它使用查询和键的求和投影。
使用 Bahdanau 注意力模块,解码器的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
class DecoderRNN(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim, dropout=0.1): super().__init__() self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.embedding = nn.Embedding(vocab_size, embedding_dim) self.dropout = nn.Dropout(dropout) self.attention = BahdanauAttention(hidden_dim) self.gru = nn.GRU(embedding_dim + hidden_dim, hidden_dim, batch_first=True) self.out_proj = nn.Linear(hidden_dim, vocab_size) def forward(self, input_seq, hidden, enc_out): """单词输入,单词输出""" embedded = self.dropout(self.embedding(input_seq)) context, attn_weights = self.attention(hidden.transpose(0, 1), enc_out) rnn_input = torch.cat([embedded, context], dim=-1) rnn_output, hidden = self.gru(rnn_input, hidden) output = self.out_proj(rnn_output) return output, hidden |
解码器的 forward()
方法需要三个输入:一个单词输入序列、最新的 RNN 隐藏状态以及编码器的完整输出序列。它将使用注意力机制对齐输入词元与编码器的输出序列,以生成解码器的上下文向量。然后,该上下文向量与输入词元一起,通过 GRU 模块用于生成下一个词元。最后,输出被投影到一个与词汇表大小相同的logit向量。
然后,通过连接编码器和解码器模块来构建 seq2seq 模型,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
class Seq2SeqRNN(nn.Module): def __init__(self, encoder, decoder): super().__init__() self.encoder = encoder self.decoder = decoder def forward(self, input_seq, target_seq): """给定部分目标序列,预测下一个词元""" batch_size, target_len = target_seq.shape device = target_seq.device # 用于存储输出 logits 的列表 outputs = [] # 编码器前向传播 enc_out, hidden = self.encoder(input_seq) dec_hidden = hidden # 解码器前向传播 for t in range(target_len-1): dec_in = target_seq[:, t].unsqueeze(1) dec_out, dec_hidden = self.decoder(dec_in, dec_hidden, enc_out) outputs.append(dec_out) outputs = torch.cat(outputs, dim=1) return outputs |
Seq2seq 模型在训练过程中采用教师强制(teacher forcing)策略,即使用真实目标词元(而不是上一步的解码器输出)作为输入来加速学习。在此实现中,编码器仅调用一次,而解码器则被调用多次以生成输出序列。
训练和评估模型
使用您在上一节中创建的模块,您可以初始化一个 seq2seq 模型:
1 2 3 4 5 6 7 8 9 10 11 |
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') enc_vocab = len(en_tokenizer.get_vocab()) dec_vocab = len(fr_tokenizer.get_vocab()) emb_dim = 256 hidden_dim = 256 dropout = 0.1 # 创建模型 encoder = EncoderRNN(enc_vocab, emb_dim, hidden_dim, dropout).to(device) decoder = DecoderRNN(dec_vocab, emb_dim, hidden_dim, dropout).to(device) model = Seq2SeqRNN(encoder, decoder).to(device) |
训练循环与 上一篇文章中的非常相似。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
optimizer = optim.Adam(model.parameters(), lr=0.001) loss_fn = nn.CrossEntropyLoss() N_EPOCHS = 50 for epoch in range(N_EPOCHS): model.train() epoch_loss = 0 for en_ids, fr_ids in dataloader: # 将“句子”移到设备上 en_ids = en_ids.to(device) fr_ids = fr_ids.to(device) # 清零梯度,然后进行前向传播 optimizer.zero_grad() outputs = model(en_ids, fr_ids) # 计算损失:比较 3D logits 和 2D 目标 loss = loss_fn(outputs.reshape(-1, dec_vocab), fr_ids[:, 1:].reshape(-1)) loss.backward() optimizer.step() epoch_loss += loss.item() print(f"Epoch {epoch+1}/{N_EPOCHS}; Avg loss {epoch_loss/len(dataloader)}; Latest loss {loss.item()}") torch.save(model.state_dict(), f"seq2seq_attn-epoch-{epoch+1}.pth") # 测试 if (epoch+1) % 5 != 0: continue model.eval() epoch_loss = 0 with torch.no_grad(): for en_ids, fr_ids in dataloader: en_ids = en_ids.to(device) fr_ids = fr_ids.to(device) outputs = model(en_ids, fr_ids) loss = loss_fn(outputs.reshape(-1, dec_vocab), fr_ids[:, 1:].reshape(-1)) epoch_loss += loss.item() print(f"Eval loss: {epoch_loss/len(dataloader)}") |
训练过程利用交叉熵损失来比较输出的 logits 和真实的法语翻译。解码器从 [start]
开始,一次预测一个 token。由于训练数据包含填充和特殊 token,我们比较 output
和 fr_ids[:, 1:]
来进行对齐。请注意,[pad]
token 被包含在损失计算中,但您可以在创建损失函数时通过指定 ignore_index
参数来跳过它。
模型训练 50 个 epoch。每五个 epoch 进行一次评估。由于您没有单独的测试集,可以使用训练数据进行评估。您应该将模型切换到评估模式,并在 torch.no_grad()
下使用模型,以避免计算梯度。
使用模型
一个训练良好的模型通常能达到大约 0.1 的平均交叉熵损失。虽然上一节中的训练循环概述了如何使用模型,但由于 Seq2SeqRNN
类的 forward()
方法是为训练而创建的,因此您应该在推理时单独使用编码器和解码器。以下是如何使用训练好的模型进行翻译:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
import random model.eval() N_SAMPLES = 5 MAX_LEN = 60 with torch.no_grad(): start_token = torch.tensor([fr_tokenizer.token_to_id("[start]")]).to(device) for en, true_fr in random.sample(text_pairs, N_SAMPLES): en_ids = torch.tensor(en_tokenizer.encode(en).ids).unsqueeze(0).to(device) enc_out, hidden = model.encoder(en_ids) pred_ids = [] prev_token = start_token.unsqueeze(0) for _ in range(MAX_LEN): output, hidden = model.decoder(prev_token, hidden, enc_out) output = output.argmax(dim=2) pred_ids.append(output.item()) prev_token = output # early stop if the predicted token is the end token if pred_ids[-1] == fr_tokenizer.token_to_id("[end]"): break # Decode the predicted IDs pred_fr = fr_tokenizer.decode(pred_ids) print(f"English: {en}") print(f"French: {true_fr}") print(f"Predicted: {pred_fr}") print() |
在推理过程中,您在每一步将序列长度为 1、批次大小为 1 的张量传递给解码器。解码器将为您提供一个序列长度为 1、批次大小为 1 的 logits 向量。您使用 argmax()
来解码输出 token ID。此输出 token 然后用作循环下一次迭代的输入,直到生成 [end]
token 或达到最大长度。
下面的样本输出展示了模型的性能:
1 2 3 4 5 6 7 8 9 10 11 |
English: we'll all die sooner or later. French: nous mourrons tous tôt ou tard. Predicted: nous mourronsrons tôt ou tard. English: tom made room for mary on the bench. French: tom fit de la place pour marie sur le banc. Predicted: tom fit fait sa pour pour sur le banc banc. English: keep quiet! French: restez tranquille ! Predicted: ailles tranquille |
为了进一步提高模型的性能,您可以:
- 增加分词器的词汇量大小
- 修改模型架构,例如,使用更大的嵌入维度、更大的隐藏状态维度或更多的 GRU 层。
- 改进训练过程,例如,调整学习率、 epoch 数、使用不同的优化器,或为评估使用单独的测试集。
为了完整起见,以下是您在此帖子中创建的完整代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 |
import random import os import re import unicodedata import zipfile import matplotlib.pyplot as plt import numpy as np import requests import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import tokenizers import tqdm # # Data preparation # # Download dataset provided by Anki: https://www.manythings.org/anki/ with requests if not os.path.exists("fra-eng.zip"): url = "http://storage.googleapis.com/download.tensorflow.org/data/fra-eng.zip" response = requests.get(url) with open("fra-eng.zip", "wb") as f: f.write(response.content) # Normalize text # each line of the file is in the format "<english>\t<french>" # We convert text to lowercasee, normalize unicode (UFKC) def normalize(line): """规范化一行文本并在制表符处分成两部分""" line = unicodedata.normalize("NFKC", line.strip().lower()) eng, fra = line.split("\t") return eng.lower().strip(), fra.lower().strip() text_pairs = [] with zipfile.ZipFile("fra-eng.zip", "r") as zip_ref: for line in zip_ref.read("fra.txt").decode("utf-8").splitlines(): eng, fra = normalize(line) text_pairs.append((eng, fra)) # # 使用 BPE 进行分词 # if os.path.exists("en_tokenizer.json") and os.path.exists("fr_tokenizer.json"): en_tokenizer = tokenizers.Tokenizer.from_file("en_tokenizer.json") fr_tokenizer = tokenizers.Tokenizer.from_file("fr_tokenizer.json") else: en_tokenizer = tokenizers.Tokenizer(tokenizers.models.BPE()) fr_tokenizer = tokenizers.Tokenizer(tokenizers.models.BPE()) # Configure pre-tokenizer to split on whitespace and punctuation, add space at beginning of the sentence en_tokenizer.pre_tokenizer = tokenizers.pre_tokenizers.ByteLevel(add_prefix_space=True) fr_tokenizer.pre_tokenizer = tokenizers.pre_tokenizers.ByteLevel(add_prefix_space=True) # Configure decoder: So that word boundary symbol "Ġ" will be removed en_tokenizer.decoder = tokenizers.decoders.ByteLevel() fr_tokenizer.decoder = tokenizers.decoders.ByteLevel() # Train BPE for English and French using the same trainer VOCAB_SIZE = 8000 trainer = tokenizers.trainers.BpeTrainer( vocab_size=VOCAB_SIZE, special_tokens=["[start]", "[end]", "[pad]"], show_progress=True ) en_tokenizer.train_from_iterator([x[0] for x in text_pairs], trainer=trainer) fr_tokenizer.train_from_iterator([x[1] for x in text_pairs], trainer=trainer) en_tokenizer.enable_padding(pad_id=en_tokenizer.token_to_id("[pad]"), pad_token="[pad]") fr_tokenizer.enable_padding(pad_id=fr_tokenizer.token_to_id("[pad]"), pad_token="[pad]") # Save the trained tokenizers en_tokenizer.save("en_tokenizer.json", pretty=True) fr_tokenizer.save("fr_tokenizer.json", pretty=True) # Test the tokenizer print("Sample tokenization:") en_sample, fr_sample = random.choice(text_pairs) encoded = en_tokenizer.encode(en_sample) print(f"Original: {en_sample}") print(f"Tokens: {encoded.tokens}") print(f"IDs: {encoded.ids}") print(f"Decoded: {en_tokenizer.decode(encoded.ids)}") print() encoded = fr_tokenizer.encode("[start] " + fr_sample + " [end]") print(f"Original: {fr_sample}") print(f"Tokens: {encoded.tokens}") print(f"IDs: {encoded.ids}") print(f"Decoded: {fr_tokenizer.decode(encoded.ids)}") print() # # Create PyTorch dataset for the BPE-encoded translation pairs # class TranslationDataset(torch.utils.data.Dataset): def __init__(self, text_pairs): self.text_pairs = text_pairs def __len__(self): return len(self.text_pairs) def __getitem__(self, idx): eng, fra = self.text_pairs[idx] return eng, "[start] " + fra + " [end]" def collate_fn(batch): en_str, fr_str = zip(*batch) en_enc = en_tokenizer.encode_batch(en_str, add_special_tokens=True) fr_enc = fr_tokenizer.encode_batch(fr_str, add_special_tokens=True) en_ids = [enc.ids for enc in en_enc] fr_ids = [enc.ids for enc in fr_enc] return torch.tensor(en_ids), torch.tensor(fr_ids) BATCH_SIZE = 32 dataset = TranslationDataset(text_pairs) dataloader = torch.utils.data.DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn) # # Create seq2seq model with attention for translation # class EncoderRNN(nn.Module): """A RNN encoder with an embedding layer""" def __init__(self, vocab_size, embedding_dim, hidden_dim, dropout=0.1): """ 参数 vocab_size: The size of the input vocabulary embedding_dim: The dimension of the embedding vector hidden_dim: The dimension of the hidden state dropout: The dropout rate """ super().__init__() self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.embedding = nn.Embedding(vocab_size, embedding_dim) self.rnn = nn.GRU(embedding_dim, hidden_dim, batch_first=True) self.dropout = nn.Dropout(dropout) def forward(self, input_seq): # input seq = [batch_size, seq_len] -> embedded = [batch_size, seq_len, embedding_dim] embedded = self.dropout(self.embedding(input_seq)) # outputs = [batch_size, seq_len, embedding_dim] # hidden = [1, batch_size, hidden_dim] outputs, hidden = self.rnn(embedded) return outputs, hidden class BahdanauAttention(nn.Module): """Bahdanau Attention https://arxiv.org/pdf/1409.0473.pdf The forward function takes query and keys only, and they should be the same shape (B,S,H) """ def __init__(self, hidden_size): super(BahdanauAttention, self).__init__() self.Wa = nn.Linear(hidden_size, hidden_size) self.Ua = nn.Linear(hidden_size, hidden_size) self.Va = nn.Linear(hidden_size, 1) def forward(self, query, keys): """Bahdanau Attention 参数 query: [B, 1, H] keys: [B, S, H] Returns context: [B, 1, H] weights: [B, 1, S] """ B, S, H = keys.shape assert query.shape == (B, 1, H) scores = self.Va(torch.tanh(self.Wa(query) + self.Ua(keys))) scores = scores.transpose(1,2) # scores = [B, 1, S] weights = F.softmax(scores, dim=-1) context = torch.bmm(weights, keys) return context, weights class DecoderRNN(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim, dropout=0.1): super().__init__() self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.embedding = nn.Embedding(vocab_size, embedding_dim) self.dropout = nn.Dropout(dropout) self.attention = BahdanauAttention(hidden_dim) self.gru = nn.GRU(embedding_dim + hidden_dim, hidden_dim, batch_first=True) self.out_proj = nn.Linear(hidden_dim, vocab_size) def forward(self, input_seq, hidden, enc_out): """单词输入,单词输出""" # input seq = [batch_size, 1] -> embedded = [batch_size, 1, embedding_dim] embedded = self.dropout(self.embedding(input_seq)) # hidden = [1, batch_size, hidden_dim] # context = [batch_size, 1, hidden_dim] context, attn_weights = self.attention(hidden.transpose(0, 1), enc_out) # rnn_input = [batch_size, 1, embedding_dim + hidden_dim] rnn_input = torch.cat([embedded, context], dim=-1) # rnn_output = [batch_size, 1, hidden_dim] rnn_output, hidden = self.gru(rnn_input, hidden) output = self.out_proj(rnn_output) return output, hidden class Seq2SeqRNN(nn.Module): def __init__(self, encoder, decoder): super().__init__() self.encoder = encoder self.decoder = decoder def forward(self, input_seq, target_seq): """给定部分目标序列,预测下一个词元""" # input seq = [batch_size, seq_len] # target seq = [batch_size, seq_len] batch_size, target_len = target_seq.shape device = target_seq.device # 用于存储输出 logits 的列表 outputs = [] # 编码器前向传播 enc_out, hidden = self.encoder(input_seq) dec_hidden = hidden # 解码器前向传播 for t in range(target_len-1): # during training, use the ground truth token as the input (teacher forcing) dec_in = target_seq[:, t].unsqueeze(1) # last target token and hidden states -> next token dec_out, dec_hidden = self.decoder(dec_in, dec_hidden, enc_out) # store the prediction outputs.append(dec_out) outputs = torch.cat(outputs, dim=1) return outputs # 初始化模型参数 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') enc_vocab = len(en_tokenizer.get_vocab()) dec_vocab = len(fr_tokenizer.get_vocab()) emb_dim = 256 hidden_dim = 256 dropout = 0.1 # 创建模型 encoder = EncoderRNN(enc_vocab, emb_dim, hidden_dim, dropout).to(device) decoder = DecoderRNN(dec_vocab, emb_dim, hidden_dim, dropout).to(device) model = Seq2SeqRNN(encoder, decoder).to(device) print(model) print("Model created with:") print(f" Input vocabulary size: {enc_vocab}") print(f" Output vocabulary size: {dec_vocab}") print(f" Embedding dimension: {emb_dim}") print(f" Hidden dimension: {hidden_dim}") print(f" Dropout: {dropout}") print(f" Total parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad)}") # 使用均匀分布 [-0.08, 0.08] 初始化模型参数 #for name, param in model.named_parameters() # if param.dim() > 1 # nn.init.normal_(param.data, mean=0, std=0.01) # 除非 model.pth 存在,否则进行训练 if os.path.exists("seq2seq_attn.pth"): model.load_state_dict(torch.load("seq2seq_attn.pth")) else: optimizer = optim.Adam(model.parameters(), lr=0.0005) loss_fn = nn.CrossEntropyLoss() #ignore_index=fr_tokenizer.token_to_id("[pad]")) N_EPOCHS = 100 for epoch in range(N_EPOCHS): model.train() epoch_loss = 0 for en_ids, fr_ids in tqdm.tqdm(dataloader, desc="Training"): # 将“句子”移到设备 en_ids = en_ids.to(device) fr_ids = fr_ids.to(device) # 梯度清零,然后进行前向传播 optimizer.zero_grad() outputs = model(en_ids, fr_ids) # 计算损失:比较 3D logits 和 2D targets loss = loss_fn(outputs.reshape(-1, dec_vocab), fr_ids[:, 1:].reshape(-1)) loss.backward() optimizer.step() epoch_loss += loss.item() print(f"Epoch {epoch+1}/{N_EPOCHS}; Avg loss {epoch_loss/len(dataloader)}; Latest loss {loss.item()}") torch.save(model.state_dict(), f"seq2seq_attn-epoch-{epoch+1}.pth") # 测试 if (epoch+1) % 5 != 0: continue model.eval() epoch_loss = 0 with torch.no_grad(): for en_ids, fr_ids in tqdm.tqdm(dataloader, desc="Evaluating"): en_ids = en_ids.to(device) fr_ids = fr_ids.to(device) outputs = model(en_ids, fr_ids) loss = loss_fn(outputs.reshape(-1, dec_vocab), fr_ids[:, 1:].reshape(-1)) epoch_loss += loss.item() print(f"Eval loss: {epoch_loss/len(dataloader)}") torch.save(model.state_dict(), "seq2seq_attn.pth") # 测试几个样本 model.eval() N_SAMPLES = 5 MAX_LEN = 60 with torch.no_grad(): start_token = torch.tensor([fr_tokenizer.token_to_id("[start]")]).to(device) for en, true_fr in random.sample(text_pairs, N_SAMPLES): en_ids = torch.tensor(en_tokenizer.encode(en).ids).unsqueeze(0).to(device) enc_out, hidden = model.encoder(en_ids) pred_ids = [] prev_token = start_token.unsqueeze(0) for _ in range(MAX_LEN): output, hidden = model.decoder(prev_token, hidden, enc_out) output = output.argmax(dim=2) pred_ids.append(output.item()) prev_token = output # early stop if the predicted token is the end token if pred_ids[-1] == fr_tokenizer.token_to_id("[end]"): break # Decode the predicted IDs pred_fr = fr_tokenizer.decode(pred_ids) print(f"English: {en}") print(f"French: {true_fr}") print(f"Predicted: {pred_fr}") print() |
请注意,上面的代码在解码器和编码器中使用了 GRU 作为 RNN 模块。您也可以使用其他 RNN 模块,例如 LSTM 或双向 RNN。您只需将编码器和解码器中的 nn.GRU
模块替换为不同的模块即可。下面是使用 LSTM 和缩放点积注意力实现的编码器和解码器。您可以替换上面的实现,代码应该可以正常运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
... class EncoderRNN(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1, dropout=0.1): super().__init__() self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.num_layers = num_layers self.embedding = nn.Embedding(vocab_size, embedding_dim) self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0) def forward(self, input_seq): embedded = self.embedding(input_seq) outputs, (hidden, cell) = self.lstm(embedded) return outputs, hidden, cell class DecoderRNN(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1, dropout=0.1): super().__init__() self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.num_layers = num_layers self.embedding = nn.Embedding(vocab_size, embedding_dim) self.attention = nn.MultiheadAttention(hidden_dim, num_heads=1, dropout=dropout, batch_first=True) self.lstm = nn.LSTM(embedding_dim + hidden_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0) self.out_proj = nn.Linear(hidden_dim, vocab_size) def forward(self, input_seq, hidden, cell, enc_out): embedded = self.embedding(input_seq) context = self.attention(hidden.transpose(0, 1), enc_out, enc_out)[0] rnn_input = torch.cat([embedded, context], dim=-1) output, (hidden, cell) = self.lstm(rnn_input, (hidden, cell)) output = self.out_proj(output) return output, hidden, cell |
进一步阅读
以下是一些您可能会觉得有用的资源:
- Neural Machine Translation by Jointly Learning to Align and Translate (Bahdanau 等人,2014 年论文)
- 使用神经网络进行序列到序列学习
- 使用 RNN 编码器-解码器学习短语表示进行统计机器翻译
- PyTorch 序列到序列翻译教程
总结
在这篇文章中,您学习了如何为英法翻译构建和训练一个基于注意力的序列到序列模型。具体来说,您学习了:
- 如何构建一个带有 GRU 的编码器-解码器架构
- 实现注意力机制以帮助模型关注相关的输入
- 在 PyTorch 中构建一个完整的翻译模型
- 使用教师强制进行有效训练
注意力机制通过在生成过程中允许动态关注相关的输入部分,显著提高了翻译质量。
暂无评论。