让Keras更酷一些:中间变量、权重滑动和安全生成器
继续“让Keras更酷一些”之旅。
今天我们会用 Keras 实现灵活地 输出任意中间变量 ,还有无缝地进行 权重 滑动平均,最后顺便介绍一下 生成器的进程安全写法 。
首先是 输出中间变量 。在自定义层时,我们可能希望查看中间变量,这些需求有些是比较容易实现的,比如查看中间某个层的输出,只需要将截止到这个层的部分模型保存为一个新模型即可,但有些需求是比较困难的,比如在使用 Attention 层时我们可能希望查看那个 Attention 矩阵的值,如果用构建新模型的方法则会非常麻烦。而本文则给出一种简单的方法,彻底满足这个需求。
接着是 权重 滑动平均。权重滑动平均是稳定、加速模型训练甚至提升模型效果的一种有效方法,很多大型模型(尤其是 GAN)几乎都用到了权重滑动平均。一般来说权重滑动平均是作为优化器的一部分,所以一般需要重写优化器才能实现它。本文介绍一个权重滑动平均的实现,它可以无缝插入到任意 Keras 模型中,不需要自定义优化器。
至于 生成器的进程安全写法 ,则是因为 Keras 读取生成器的时候,用到了多进程,如果生成器本身也包含了一些多进程操作,那么可能就会导致异常,所以需要解决这个这个问题。
输出中间变量
这一节以基本模型为例,逐步深入地介绍如何获取 Keras 的中间变量。
x_in = Input(shape=(784,))
x = x_in
x = Dense(512, activation='relu')(x)
x = Dropout(0.2)(x)
x = Dense(256, activation='relu')(x)
x = Dropout(0.2)(x)
x = Dense(num_classes, activation='softmax')(x)
model = Model(x_in, x)
作为一个新模型
假如模型训练完成后,我想要获取 x = Dense(256, activation='relu')(x) 对应的输出,那可以在定义模型的时候,先把对应的变量存起来,然后重新定义一个模型:
x_in = Input(shape=(784,))
x = x_in
x = Dense(512, activation='relu')(x)
x = Dropout(0.2)(x)
x = Dense(256, activation='relu')(x)
y = x
x = Dropout(0.2)(x)
x = Dense(num_classes, activation='softmax')(x)
model = Model(x_in, x)
model2 = Model(x_in, y)
将 model 训练完成后,直接用 model2.predict 就可以查看对应的 256 维的输出了。这样做的前提是 y **必须是某个层的输出,不能是随意一个 张量。 **
K.function!
有时候我们自定义了一个比较复杂的层,比较典型的就是 Attention 层,我们希望查看层的一些中间变量,比如对应的 Attention 矩阵,这时候就比较麻烦了,如果想要用前面的方式,那么就要把原来的 Attention 层分开为两个层定义才行。
因为前面已经说了, 新定义一个 Keras 模型时输入输出都必须是 Keras 层的输入输出,不能是随意一个 张量。这样一来,如果想要分别查看层的多个中间变量,那就要将层不断地拆开为多个层来定义,显然是不够友好的。
其实 Keras 提供了一个终极的解决方案: K.function !
介绍 K.function 之前,我们先写一个简单示例:
class Normal(Layer):
def __init__(self, **kwargs):
super(Normal, self).__init__(**kwargs)
def build(self, input_shape):
self.kernel = self.add_weight(name='kernel',
shape=(1,),
initializer='zeros',
trainable=True)
self.built = True
def call(self, x):
self.x_normalized = K.l2_normalize(x, -1)
return self.x_normalized * self.kernel
x_in = Input(shape=(784,))
x = x_in
x = Dense(512, activation='relu')(x)
x = Dropout(0.2)(x)
x = Dense(256, activation='relu')(x)
x = Dropout(0.2)(x)
normal = Normal()
x = normal(x)
x = Dense(num_classes, activation='softmax')(x)
model = Model(x_in, x)
在上面的例子中, Normal 定义了一个层,层的输出是 self.xnormalized * self.kernel ,不过我想在训练完成后获取 self.xnormalized 的值,而它是跟输入有关,并且不是一个层的输出。这样一来前面的方法就没法用了,但用 K.function 就只是一行代码:
fn = K.function([x_in], [normal.x_normalized])
K.function 的用法跟定义一个新模型类似,要把所有跟 normal.x_normalized 相关的输入张量都传进去,但是不要求输出是一个层的输出,允许是任意张量!返回的 fn 是一个具有函数功能的对象,所以只需要:
就可以获取到 xtest 对应的 xnormalized 了!比定义一个新模型简单通用多了。
事实上 K.function 就是 Keras 底层的基础函数之一,它直接封装好了后端的输入输出操作,换句话说,你用 Tensorflow 为后端时, fn([x_test]) 就相当于:
sess.run(normal.x_normalized, feed_dict={x_in: x_test})
所以 K.function 的输出允许是任意张量,因为它本来就在直接操作后端了。
权重滑动平均
权重滑动平均是提供训练稳定性的有效方法,通过滑动平均可以几乎零额外成本地提高解的性能。权重滑动平均一般就是指“Exponential Moving Average”,简称 EMA,这是因为一般滑动平均时会使用指数衰减作为权重的比例。它已经被主流模型所接受,尤其是 GAN,在很多 GAN 论文中我们通常会看到类似的描述:
we use an exponential moving average with decay 0.999 over the weight ...
这就意味着 GAN 模型使用了 EMA。此外,普通模型也会使用,比如 QANet: Combining Local Convolution with Global Self-Attention for Reading Comprehension 就在训练过程中用了 EMA,衰减率是 0.9999。
滑动平均的格式
滑动平均的格式其实非常简单,假设每次优化器的更新为:
这里的 Δθn 就是优化器带来的更新,优化器可以是 SGD、Adam 等任意一种。而滑动平均则是维护一组新的新的变量 Θ:
其中 α 是一个 接近于 1 的正常数 ,称为“衰减率(decay rate)”。
注意,尽管在形式上有点相似,但它跟动量加速不一样:EMA 不改变原来优化器的轨迹,即原来优化器怎么走,现在依然是同样的走法,只不过它维护一组新变量,来平均原来优化器的轨迹;而动量加速则是改变了原来优化器的轨迹。
再次强调, **权重 滑动平均不改变优化器的走向,只不过它降优化器的优化轨迹上的点做了平均后,作为最终的模型权重。 **
关于权重滑动平均的原理和效果,可以进一步参考《从动力学角度看优化算法(四):GAN 的第三个阶段》一文。
巧妙的注入实现
实现 EMA 的要点是如何在原来优化器的基础上引入一组新的平均变量,并且在每次参数更新后执行平均变量的更新。这需要对 Keras 的源码及其实现逻辑有一定的了解。
在此给出的参考实现如下:
class ExponentialMovingAverage:
"""对模型权重进行指数滑动平均。
用法:在model.compile之后、第一次训练之前使用;
先初始化对象,然后执行inject方法。
"""
def __init__(self, model, momentum=0.9999):
self.momentum = momentum
self.model = model
self.ema_weights = [K.zeros(K.shape(w)) for w in model.weights]
def inject(self):
"""添加更新算子到model.metrics_updates。
"""
self.initialize()
for w1, w2 in zip(self.ema_weights, self.model.weights):
op = K.moving_average_update(w1, w2, self.momentum)
self.model.metrics_updates.append(op)
def initialize(self):
"""ema_weights初始化跟原模型初始化一致。
"""
self.old_weights = K.batch_get_value(self.model.weights)
K.batch_set_value(zip(self.ema_weights, self.old_weights))
def apply_ema_weights(self):
"""备份原模型权重,然后将平均权重应用到模型上去。
"""
self.old_weights = K.batch_get_value(self.model.weights)
ema_weights = K.batch_get_value(self.ema_weights)
K.batch_set_value(zip(self.model.weights, ema_weights))
def reset_old_weights(self):
"""恢复模型到旧权重。
"""
K.batch_set_value(zip(self.model.weights, self.old_weights))
使用方法很简单:
EMAer = ExponentialMovingAverage(model) # 在模型compile之后执行
EMAer.inject() # 在模型compile之后执行
model.fit(x_train, y_train) # 训练模型
训练完成后:
EMAer.apply_ema_weights() # 将EMA的权重应用到模型中
model.predict(x_test) # 进行预测、验证、保存等操作
EMAer.reset_old_weights() # 继续训练之前,要恢复模型旧权重。还是那句话,EMA不影响模型的优化轨迹。
model.fit(x_train, y_train) # 继续训练
现在翻看实现过程,可以发现主要的一点是引入了 K.movingaverageupdate 操作,并且插入到 model.metricsupdates 中,在训练过程中,模型会读取并执行 model.metricsupdates 的所有算子,从而完成了滑动平均。
进程安全生成器
一般来说,当训练数据无法全部载入内存,或者需要动态生成训练数据时,就会用到 generator。一般来说,Keras 模型的 generator 的写法是:
def data_generator():
while True:
x_train = something
y_train = otherthing
yield x_train, y_train
但如果 someting 或 otherthing 里边包含了多进程操作,就可能出问题。这时候有两种解决方法,一是 fitgenerator 时将设置参数usemultiprocessing=False, worker=0 ;另一种方法就是通过继承 keras.utils.Sequence 类来写生成器。
官方参考例子
官方对 keras.utils.Sequence 类的介绍如下:
https://keras.io/utils/#sequence
官方强调:
Sequence are a safer way to do multiprocessing. This structure guarantees that the network will only train once on each sample per epoch which is not the case with generators.
总之,就是对于多进程来说它是安全的,可以放心用。官方提供的例子如下:
from skimage.io import imread
from skimage.transform import resize
import numpy as np
# Here, `x_set` is list of path to the images
# and `y_set` are the associated classes.
class CIFAR10Sequence(Sequence):
def __init__(self, x_set, y_set, batch_size):
self.x, self.y = x_set, y_set
self.batch_size = batch_size
def __len__(self):
return int(np.ceil(len(self.x) / float(self.batch_size)))
def __getitem__(self, idx):
batch_x = self.x[idx * self.batch_size:(idx + 1) * self.batch_size]
batch_y = self.y[idx * self.batch_size:(idx + 1) * self.batch_size]
return np.array([
resize(imread(file_name), (200, 200))
for file_name in batch_x]), np.array(batch_y)
就是按格式定义好 len 和 getitem 方法就行了, getitem 方法直接返回一个 batch 的数据。
bert as service例子
我第一次发现 Sequence 的必要性,是在试验 bert as service 的时候。bert as service 是肖涵大佬搞的一个快速获取 bert 编码向量的服务组件,我曾经想用它获取字向量,然后传入到 Keras 中训练,但发现总会训练着训练着就卡住了。
经过搜索,确认是 Keras 的 fit_generator 所带的多进程,和 bert-as-service 自带的多进程冲突问题,具体怎么冲突我也比较模糊,就不深究了。而这里提供了一个参考的解决方案,用的就是继承 Sequence 类来写生成器。
https://github.com/hanxiao/bert-as-service/issues/29#issuecomment-442362241
PS:就调用 bert as service 而言,后面肖涵大佬提供了协程版的 ConcurrentBertClient ,可以取代原来的 BertClient ,这样哪怕在原始生成器也不会有问题了。
清流般的Keras
在我眼里,Keras 就是深度学习框架中的一股清流,就好比 Python 是所有编程语言中的一股清流一样。用 Keras 实现所需要做的事情,就好比一次次惬意的享受。
作者暂无likerid, 赞赏暂由本网站代持,当作者有likerid后会全部转账给作者(我们会尽力而为)。Tips: Until now, everytime you want to store your article, we will help you store it in Filecoin network. In the future, you can store it in Filecoin network using your own filecoin.
Support author:
Author's Filecoin address:
Or you can use Likecoin to support author: