Python 是一种动态脚本语言。它不仅拥有动态类型系统,允许变量首先分配给一种类型,然后稍后更改,而且它的对象模型也是动态的。这允许我们在运行时修改其行为。其后果就是可能出现“monkey patching”(方法劫持)。这是一种无需修改更高层代码即可修改程序底层的方法。想象一下,你可以使用 print()
函数将内容打印到屏幕上,而我们可以修改此函数的定义,将其打印到文件,而无需修改你代码的任何一行。
之所以成为可能,是因为 Python 是一种解释型语言,因此我们可以在程序运行时进行更改。我们可以利用 Python 的这一特性来修改类或模块的接口。这在我们处理遗留代码或来自他人的代码时很有用,因为我们不想对其进行大量修改,但仍希望使其与不同版本的库或环境兼容。在本教程中,我们将看到如何将此技术应用于一些 Keras 和 TensorFlow 代码。
完成本教程后,你将学到:
- 什么是 Monkey Patching
- 如何在运行时更改 Python 中的对象或模块
通过我的新书 Python for Machine Learning 快速启动你的项目,其中包括分步教程和所有示例的Python 源代码文件。
让我们开始吧。

Monkey Patching Python 代码。照片由 Juan Rumimpunu 拍摄。部分权利保留。
教程概述
本教程分为三个部分;它们是
- 一个模型,两个接口
- 使用 Monkey Patching 扩展对象
- Monkey Patching 以复兴遗留代码
一个模型,两个接口
TensorFlow 是一个庞大的库。它提供了一个高级 Keras API 来用层来描述深度学习模型。它还附带了许多用于训练的函数,例如不同的优化器和数据生成器。仅仅因为我们需要运行我们的已训练模型就需要安装 TensorFlow,这可能会让人不知所措。因此,TensorFlow 为我们提供了一个名为TensorFlow Lite的对应物,它的大小要小得多,适合在移动设备或嵌入式设备等小型设备上运行。
我们想展示原始 TensorFlow Keras 模型和 TensorFlow Lite 模型是如何使用的。所以,我们来创建一个中等大小的模型,例如 LeNet-5 模型。以下是我们如何加载 MNIST 数据集并训练一个分类模型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
import numpy as np import tensorflow as tf from tensorflow.keras.datasets import mnist from tensorflow.keras.models import Sequential from tensorflow.keras.layers import Conv2D, Dense, AveragePooling2D, Dropout, Flatten from tensorflow.keras.callbacks import EarlyStopping # 加载 MNIST 数据 (X_train, y_train), (X_test, y_test) = mnist.load_data() # 将数据重塑为 (n_sample, height, width, n_channel) 的形状 X_train = np.expand_dims(X_train, axis=3).astype('float32') X_test = np.expand_dims(X_test, axis=3).astype('float32') # LeNet5 模型:可以使用 ReLU 代替 tanh model = Sequential([ Conv2D(6, (5,5), input_shape=(28,28,1), padding="same", activation="tanh"), AveragePooling2D((2,2), strides=2), Conv2D(16, (5,5), activation="tanh"), AveragePooling2D((2,2), strides=2), Conv2D(120, (5,5), activation="tanh"), Flatten(), Dense(84, activation="tanh"), Dense(10, activation="softmax") ]) # 训练 model.compile(loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["sparse_categorical_accuracy"]) earlystopping = EarlyStopping(monitor="val_loss", patience=4, restore_best_weights=True) model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=100, batch_size=32, callbacks=[earlystopping]) |
运行上面的代码将使用 TensorFlow 的数据集 API 下载 MNIST 数据集并训练模型。之后,我们可以保存模型。
1 |
model.save("lenet5-mnist.h5") |
或者我们可以用我们的测试集评估模型。
1 2 |
print(np.argmax(model.predict(X_test), axis=1)) print(y_test) |
然后你应该会看到
1 2 |
[7 2 1 ... 4 5 6] [7 2 1 ... 4 5 6] |
但是,如果我们打算将其与 TensorFlow Lite 结合使用,我们需要将其转换为 TensorFlow Lite 格式,如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# 使用动态范围优化进行 tflite 转换 import tensorflow as tf converter = tf.lite.TFLiteConverter.from_keras_model(model) converter.optimizations = [tf.lite.Optimize.DEFAULT] tflite_model = converter.convert() # 可选:保存数据以供测试 import numpy as np np.savez('mnist-test.npz', X=X_test, y=y_test) # 保存模型。 with open('lenet5-mnist.tflite', 'wb') as f f.write(tflite_model) |
我们可以向转换器添加更多选项,例如将模型量化为 16 位浮点数。但无论如何,转换的输出都是一个二进制字符串。转换不仅可以将模型减小到比 Keras 保存的 HDF5 文件小得多的尺寸,而且还允许我们使用轻量级库来使用它。有适用于 Android 和 iOS 移动设备的库。如果你使用的是嵌入式 Linux,你可能会在 PyPI 存储库中找到 tflite-runtime
模块(或者你可以从 TensorFlow 源代码编译一个)。以下是我们如何使用 tflite-runtime
来运行已转换的模型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
import numpy as np import tflite_runtime.interpreter as tflite loaded = np.load('mnist-test.npz') X_test = loaded["X"] y_test = loaded["y"] interpreter = tflite.Interpreter(model_path="lenet5-mnist.tflite") interpreter.allocate_tensors() input_details = interpreter.get_input_details() output_details = interpreter.get_output_details() print(input_details[0]['shape']) rows = [] for n in range(len(X_test)): # 此模型有一个输入和一个输出 interpreter.set_tensor(input_details[0]['index'], X_test[n:n+1]) interpreter.invoke() row = interpreter.get_tensor(output_details[0]['index']) rows.append(row) rows = np.vstack(rows) accuracy = np.sum(np.argmax(rows, axis=1) == y_test) / len(y_test) print(accuracy) |
事实上,更大的 TensorFlow 库也可以用非常相似的语法运行已转换的模型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import numpy as np import tensorflow as tf interpreter = tf.lite.Interpreter(model_path="lenet5-mnist.tflite") interpreter.allocate_tensors() input_details = interpreter.get_input_details() output_details = interpreter.get_output_details() rows = [] for n in range(len(X_test)): # 此模型有一个输入和一个输出 interpreter.set_tensor(input_details[0]['index'], X_test[n:n+1]) interpreter.invoke() row = interpreter.get_tensor(output_details[0]['index']) rows.append(row) rows = np.vstack(rows) accuracy = np.sum(np.argmax(rows, axis=1) == y_test) / len(y_test) print(accuracy) |
请注意使用模型的不同方式:在 Keras 模型中,我们有一个 predict()
函数,它接受一个批次作为输入并返回一个结果。然而,在 TensorFlow Lite 模型中,我们必须一次将一个输入张量注入“解释器”并调用它,然后检索结果。
总而言之,下面的代码展示了如何构建一个 Keras 模型,对其进行训练,将其转换为 TensorFlow Lite 格式,然后使用已转换的模型进行测试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
import numpy as np import tensorflow as tf from tensorflow.keras.datasets import mnist from tensorflow.keras.models import Sequential from tensorflow.keras.layers import Conv2D, Dense, AveragePooling2D, Dropout, Flatten from tensorflow.keras.callbacks import EarlyStopping # 加载 MNIST 数据 (X_train, y_train), (X_test, y_test) = mnist.load_data() # 将数据重塑为 (n_sample, height, width, n_channel) 的形状 X_train = np.expand_dims(X_train, axis=3).astype('float32') X_test = np.expand_dims(X_test, axis=3).astype('float32') # LeNet5 模型:可以使用 ReLU 代替 tanh model = Sequential([ Conv2D(6, (5,5), input_shape=(28,28,1), padding="same", activation="tanh"), AveragePooling2D((2,2), strides=2), Conv2D(16, (5,5), activation="tanh"), AveragePooling2D((2,2), strides=2), Conv2D(120, (5,5), activation="tanh"), Flatten(), Dense(84, activation="tanh"), Dense(10, activation="softmax") ]) # 训练 model.compile(loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["sparse_categorical_accuracy"]) earlystopping = EarlyStopping(monitor="val_loss", patience=4, restore_best_weights=True) model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=100, batch_size=32, callbacks=[earlystopping]) # 保存模型 model.save("lenet5-mnist.h5") # 比较预测值与测试数据 print(np.argmax(model.predict(X_test), axis=1)) print(y_test) # 使用动态范围优化进行 tflite 转换 import tensorflow as tf converter = tf.lite.TFLiteConverter.from_keras_model(model) converter.optimizations = [tf.lite.Optimize.DEFAULT] tflite_model = converter.convert() # 可选:保存数据以供测试 import numpy as np np.savez('mnist-test.npz', X=X_test, y=y_test) # 保存 tflite 模型。 with open('lenet5-mnist.tflite', 'wb') as f: f.write(tflite_model) # 加载 tflite 模型并运行测试 interpreter = tf.lite.Interpreter(model_path="lenet5-mnist.tflite") interpreter.allocate_tensors() input_details = interpreter.get_input_details() output_details = interpreter.get_output_details() rows = [] for n in range(len(X_test)): # 此模型有一个输入和一个输出 interpreter.set_tensor(input_details[0]['index'], X_test[n:n+1]) interpreter.invoke() row = interpreter.get_tensor(output_details[0]['index']) rows.append(row) rows = np.vstack(rows) accuracy = np.sum(np.argmax(rows, axis=1) == y_test) / len(y_test) print(accuracy) |
想开始学习机器学习 Python 吗?
立即参加我为期7天的免费电子邮件速成课程(附示例代码)。
点击注册,同时获得该课程的免费PDF电子书版本。
使用 Monkey Patching 扩展对象
我们可以在 TensorFlow Lite 解释器中使用 predict()
吗?
解释器对象没有这样的函数。但是,由于我们正在使用 Python,因此我们可以使用Monkey Patching技术来添加它。为了理解我们在做什么,首先,我们必须注意到我们在之前的代码中定义的 interpreter
对象可能包含许多属性和函数。当我们像函数一样调用 interpreter.predict()
时,Python 将在对象中查找具有该名称的函数,然后执行它。如果找不到该名称,Python 将引发 AttributeError
异常。
1 2 |
... interpreter.predict() |
这会给出
1 2 3 4 |
回溯(最近一次调用) File "/Users/MLM/pred_error.py", line 13, in <module> interpreter.predict() AttributeError: 'Interpreter' object has no attribute 'predict' |
为了使此功能正常工作,我们需要向 interpreter
对象添加一个名为 predict
的函数,并且在调用时它应该像一个函数一样运行。为了简单起见,我们注意到我们的模型是一个顺序模型,它接受一个数组作为输入并返回一个 softmax 结果数组作为输出。因此,我们可以编写一个 predict()
函数,它的行为方式类似于 Keras 模型中的函数,但使用 TensorFlow Lite 解释器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
... # Monkey Patching tflite 模型 def predict(self, input_batch): batch_size = len(input_batch) output = [] input_details = self.get_input_details() output_details = self.get_output_details() # 运行批次中的每个样本 for sample in range(batch_size): self.set_tensor(input_details[0]["index"], input_batch[sample:sample+1]) self.invoke() sample_output = self.get_tensor(output_details[0]["index"]) output.append(sample_output) # 将每个样本的输出堆叠起来 return np.vstack(output) interpreter.predict = predict.__get__(interpreter) |
上面最后一行是将我们创建的函数分配给 interpreter
对象,名称为 predict
。__get__(interpreter)
部分是必需的,以使我们定义的函数成为 interpreter
对象成员函数。
有了这些,我们现在就可以运行一个批次了。
1 2 3 4 5 6 7 |
... out_proba = interpreter.predict(X_test) out = np.argmax(out_proba, axis=1) print(out) accuracy = np.sum(out == y_test) / len(y_test) print(accuracy) |
1 2 |
[7 2 1 ... 4 5 6] 0.9879 |
这之所以成为可能,是因为 Python 拥有动态对象模型。我们可以在运行时修改对象的属性或成员函数。事实上,这不应该让我们感到惊讶。Keras 模型需要在运行 model.fit()
之前运行 model.compile()
。编译函数的一个作用是向模型添加 loss
属性来保存损失函数。这是在运行时完成的。
通过将 predict()
函数添加到 interpreter
对象,我们可以像训练好的 Keras 模型一样传递 interpreter
对象进行预测。虽然它们在后台是不同的,但它们共享相同的接口,因此其他函数可以在不修改任何代码行的情况下使用它。
以下是加载我们保存的 TensorFlow Lite 模型,然后对其进行 Monkey Patching 以使其看起来像 Keras 模型的完整代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
import numpy as np import tensorflow as tf from tensorflow.keras.datasets import mnist # 加载 MNIST 数据并重塑 (X_train, y_train), (X_test, y_test) = mnist.load_data() X_train = np.expand_dims(X_train, axis=3).astype('float32') X_test = np.expand_dims(X_test, axis=3).astype('float32') # Monkey Patching tflite 模型 def predict(self, input_batch): batch_size = len(input_batch) output = [] input_details = self.get_input_details() output_details = self.get_output_details() # 运行批次中的每个样本 for sample in range(batch_size): self.set_tensor(input_details[0]["index"], input_batch[sample:sample+1]) self.invoke() sample_output = self.get_tensor(output_details[0]["index"]) output.append(sample_output) # 将每个样本的输出堆叠起来 return np.vstack(output) # 加载并 Monkey Patch interpreter = tf.lite.Interpreter(model_path="lenet5-mnist.tflite") interpreter.predict = predict.__get__(interpreter) interpreter.allocate_tensors() # 测试输出 out_proba = interpreter.predict(X_test) out = np.argmax(out_proba, axis=1) print(out) accuracy = np.sum(out == y_test) / len(y_test) print(accuracy) |
Monkey Patching 以复兴遗留代码
我们可以再举一个 Python 中 Monkey Patching 的例子。考虑以下代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# https://machinelearning.org.cn/dropout-regularization-deep-learning-models-keras/ # Dropout 在 Sonar 数据集上的示例:隐藏层 from pandas import read_csv from keras.models import Sequential from keras.layers import Dense 从 keras.layers 导入 Dropout from keras.wrappers.scikit_learn import KerasClassifier from keras.constraints import maxnorm from keras.optimizers import SGD from sklearn.model_selection import cross_val_score from sklearn.preprocessing import LabelEncoder from sklearn.model_selection import StratifiedKFold from sklearn.preprocessing import StandardScaler from sklearn.pipeline import Pipeline # 加载数据集 dataframe = read_csv("sonar.csv", header=None) dataset = dataframe.values # 分割为输入 (X) 和输出 (Y) 变量 X = dataset[:,0:60].astype(float) Y = dataset[:,60] # 将类别值编码为整数 编码器 = LabelEncoder() encoder.fit(Y) encoded_Y = encoder.transform(Y) # 隐藏层中的 dropout 及权重约束 def create_model(): # 创建模型 model = Sequential() model.add(Dense(60, input_dim=60, activation='relu', kernel_constraint=maxnorm(3))) model.add(Dropout(0.2)) model.add(Dense(30, activation='relu', kernel_constraint=maxnorm(3))) model.add(Dropout(0.2)) model.add(Dense(1, activation='sigmoid')) # 编译模型 sgd = SGD(lr=0.1, momentum=0.9) model.compile(loss='binary_crossentropy', optimizer=sgd, metrics=['accuracy']) return model estimators = [] estimators.append(('standardize', StandardScaler())) estimators.append(('mlp', KerasClassifier(build_fn=create_model, epochs=300, batch_size=16, verbose=0))) pipeline = Pipeline(estimators) kfold = StratifiedKFold(n_splits=10, shuffle=True) results = cross_val_score(pipeline, X, encoded_Y, cv=kfold) print("隐藏层:%.2f%% (%.2f%%)" % (results.mean()*100, results.std()*100)) |
这段代码是几年前编写的,并且假设使用的是旧版本的 Keras 和 TensorFlow 1.x。数据文件 sonar.csv
可以在 另一篇文章 中找到。如果我们在 TensorFlow 2.5 中运行此代码,我们会遇到 SGD
行的 ImportError
问题。为了使其正常运行,我们至少需要对上述代码进行两处更改:
- 函数和类应该从
tensorflow.keras
导入,而不是从keras
导入。 - 约束类
maxnorm
应该使用驼峰命名法,即MaxNorm
。
以下是更新后的代码,其中我们仅修改了导入语句:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# Dropout 在 Sonar 数据集上的示例:隐藏层 from pandas import read_csv from tensorflow.keras.models import Sequential from tensorflow.keras.layers import Dense from tensorflow.keras.layers import Dropout from tensorflow.keras.wrappers.scikit_learn import KerasClassifier from tensorflow.keras.constraints import MaxNorm as maxnorm from tensorflow.keras.optimizers import SGD from sklearn.model_selection import cross_val_score from sklearn.preprocessing import LabelEncoder from sklearn.model_selection import StratifiedKFold from sklearn.preprocessing import StandardScaler from sklearn.pipeline import Pipeline # 加载数据集 dataframe = read_csv("sonar.csv", header=None) dataset = dataframe.values # 分割为输入 (X) 和输出 (Y) 变量 X = dataset[:,0:60].astype(float) Y = dataset[:,60] # 将类别值编码为整数 编码器 = LabelEncoder() encoder.fit(Y) encoded_Y = encoder.transform(Y) # 隐藏层中的 dropout 及权重约束 def create_model(): # 创建模型 model = Sequential() model.add(Dense(60, input_dim=60, activation='relu', kernel_constraint=maxnorm(3))) model.add(Dropout(0.2)) model.add(Dense(30, activation='relu', kernel_constraint=maxnorm(3))) model.add(Dropout(0.2)) model.add(Dense(1, activation='sigmoid')) # 编译模型 sgd = SGD(lr=0.1, momentum=0.9) model.compile(loss='binary_crossentropy', optimizer=sgd, metrics=['accuracy']) return model estimators = [] estimators.append(('standardize', StandardScaler())) estimators.append(('mlp', KerasClassifier(build_fn=create_model, epochs=300, batch_size=16, verbose=0))) pipeline = Pipeline(estimators) kfold = StratifiedKFold(n_splits=10, shuffle=True) results = cross_val_score(pipeline, X, encoded_Y, cv=kfold) print("隐藏层:%.2f%% (%.2f%%)" % (results.mean()*100, results.std()*100)) |
如果一个项目有很多脚本,逐行修改导入语句会很繁琐。但 Python 的模块系统只是 sys.modules
中的一个字典。因此,我们可以通过“猴子补丁”的方式,让旧代码兼容新库。下面是如何操作的。这适用于 TensorFlow 2.5 的安装(Keras 代码的这种向后兼容性问题已在 TensorFlow 2.9 中修复;因此,在最新版本的库中您不需要进行此修补)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# 猴子补丁 import sys import tensorflow.keras tensorflow.keras.constraints.maxnorm = tensorflow.keras.constraints.MaxNorm for x in sys.modules.keys(): if x.startswith("tensorflow.keras"): sys.modules[x[len("tensorflow."):]] = sys.modules[x] # 旧代码如下 # Dropout 在 Sonar 数据集上的示例:隐藏层 from pandas import read_csv from keras.models import Sequential from keras.layers import Dense 从 keras.layers 导入 Dropout from keras.wrappers.scikit_learn import KerasClassifier from keras.constraints import maxnorm from keras.optimizers import SGD from sklearn.model_selection import cross_val_score from sklearn.preprocessing import LabelEncoder from sklearn.model_selection import StratifiedKFold from sklearn.preprocessing import StandardScaler from sklearn.pipeline import Pipeline # 加载数据集 dataframe = read_csv("sonar.csv", header=None) dataset = dataframe.values # 分割为输入 (X) 和输出 (Y) 变量 X = dataset[:,0:60].astype(float) Y = dataset[:,60] # 将类别值编码为整数 编码器 = LabelEncoder() encoder.fit(Y) encoded_Y = encoder.transform(Y) # 隐藏层中的 dropout 及权重约束 def create_model(): # 创建模型 model = Sequential() model.add(Dense(60, input_dim=60, activation='relu', kernel_constraint=maxnorm(3))) model.add(Dropout(0.2)) model.add(Dense(30, activation='relu', kernel_constraint=maxnorm(3))) model.add(Dropout(0.2)) model.add(Dense(1, activation='sigmoid')) # 编译模型 sgd = SGD(lr=0.1, momentum=0.9) model.compile(loss='binary_crossentropy', optimizer=sgd, metrics=['accuracy']) return model estimators = [] estimators.append(('standardize', StandardScaler())) estimators.append(('mlp', KerasClassifier(build_fn=create_model, epochs=300, batch_size=16, verbose=0))) pipeline = Pipeline(estimators) kfold = StratifiedKFold(n_splits=10, shuffle=True) results = cross_val_score(pipeline, X, encoded_Y, cv=kfold) print("隐藏层:%.2f%% (%.2f%%)" % (results.mean()*100, results.std()*100)) |
这绝对不是一种干净整洁的代码,并且会给未来的维护带来问题。因此,“猴子补丁”在生产代码中是不受欢迎的。然而,这是一种利用 Python 语言内部机制来轻松实现功能的快速技巧。
进一步阅读
如果您想深入了解,本节提供了更多关于该主题的资源。
文章
- StackOverflow 问题 “什么是猴子补丁?“
- Python 快速入门,TensorFlow Lite 指南
- 导入系统,Python 语言参考
总结
在本教程中,我们学习了什么是猴子补丁以及如何进行。具体来说:
- 我们学习了如何为一个现有对象添加成员函数。
- 如何修改
sys.modules
中的 Python 模块缓存来欺骗import
语句。
暂无评论。