基於Keras的擴展性使用

Keras是一個用於在python上搭神經網絡模型的框架,語法和torch比較相似。我個人認為Keras最大的特點是包裝很好,一些在訓練過程中要輸出的方法和常用的優化函數、目標函數都已經內置瞭,非常適合用來寫大作業。Keras和python的哲學有些相似,那就是盡量不自己造輪子。

但是最近逛知乎,看到有答案說,Keras隻能用來搭一些世面上已經普及的網絡,和其它框架相比比較小白。換句話說,就是Keras的擴展性不好。作為一個試用過theano、tensorflow、torch、caffe等框架,最後定居在Keras的人,我對此不太同意。事實上,Keras擁有不錯的擴展性,這一方面是因為設計時就留好的接口,另一方面是因為清晰的代碼結構,讓你可以有很多自定義的空間。所以下面用幾個例子介紹在Keras中如何自定義層和各種方法。

0、backend

如果想在Keras中自定義各種層和函數,一定會用到的就是backend。一般導入的方法是

from keras import backend as K

這是因為Keras可以有兩種後臺,即theano和tensorflow,所以一些操作張量的函數可能是隨後臺的不同而不同的,

通過引入這個backend,就可以讓Keras來處理兼容性。

比如求x的平均,就是K.mean(x)。backend文件本身在keras/backend文件夾下,可以通過閱讀代碼來瞭解backend都支持哪些操作。backend裡面函數很多,一般都夠用瞭。

1、Lambda 層

如果你隻是想對流經該層的數據做個變換,而這個變換本身沒有什麼需要學習的參數,那麼直接用Lambda Layer是最合適的瞭。

導入的方法是

from keras.layers.core import Lambda

Lambda函數接受兩個參數,第一個是輸入張量對輸出張量的映射函數,第二個是輸入的shape對輸出的shape的映射函數。比如想構建這樣一個層,流經該層的數據會被減去平均值,那麼可以這樣定義:

def sub_mean(x):
    x -= K.mean(x,axis=1,keepdims=True)
    return x
model.add( Lambda(sub_mean,output_shape=lambda input_shape:input_shape ))

因為輸出的shape和輸入的shape是一樣的,第二個參數就直接用瞭恒等映射。

把模型完整地建立出來:

def get_submean_model():
    model = Sequential()
    model.add(Dense(5,input_dim=7))
    def sub_mean(x):
        x -= K.mean(x,axis=1,keepdims=True)
        return x
    model.add( Lambda(sub_mean,output_shape=lambda input_shape:input_shape))
    model.compile(optimizer='rmsprop',loss='mse')
    return model
model = get_submean_model()
res=model.predict(np.random.random((3,7)))

得到地res的平均值是[ 5.96046448e-08 -5.96046448e-08 0.00000000e+00],可見確實實現瞭減去均值的作用。

2、自定義非遞歸層

如果自己想定義的層中有需要學習的變量,那麼就不能用lambda層瞭,需要自己寫一個出來。

比如說我想定義一個層,它的效果是對張量乘一個正對角陣(換句話說,輸入向量與一個要學習的向量逐元素相乘),那麼可以這樣寫:

首先要導入基類

from keras.engine.topology import Layer

然後對MyLaber定義如下:

class MyLayer(Layer):
    def __init__(self,output_dim,**kw):
        self.output_dim = output_dim
        super(MyLayer,self).__init__(**kw)
    def build(self,input_shape):
        input_dim = input_shape[1]
        assert(input_dim == self.output_dim)
        inital_SCALER = np.ones((input_dim,))*1000
        self.SCALER = K.variable(inital_SCALER)
        self.trainable_weights = [self.SCALER]
        super(MyLayer,self).build(input_shape)
    def call(self,x,mask=None):
        #return x - K.mean(x,axis=1,keepdims=True)
        x *= self.SCALER
        return x
    def get_output_shape_for(self,input_shape):
        return input_shape

主要參照Keras內置的層的寫法,比如Dense在keras/layers/core.py中,要把能學習的參數放在self.trainable_weights中。這裡把初始值設成瞭1000是為瞭讓該層的效果更顯著。然後把模型寫全來測試一下

def get_mylayer_model():
    model = Sequential()
    model.add(Dense(5,input_dim=7))
    model.add(MyLayer(5))
    model.compile(optimizer='rmsprop',loss='mse')
    return model
model = get_mylayer_model()
res=model.predict(np.random.random((3,7)))
print res

res如下:

[[ 271.2746582 -1053.31506348 147.17185974 -1120.33740234 609.54876709]

[ -263.69671631 -390.41921997 291.17721558 -594.58721924 615.97369385]

[ -46.58752823 -733.11328125 -21.9815979 -570.79351807 649.44158936]]

都是很大的數,而不加MyLayer時每個值一般也不超過+-2,這個層確實起瞭作用。

在fit之前調用model.get_weights(),看到該層的權重都是1000,隨便隨機出來個測試集,fit幾千個epoch隻後,loss變得很小,MyLayer的權重變成瞭997左右,而前面一層Dense的權重都成10^-4量級,說明MyLayer中的參數也確實是可學習的。

3、自定義損失函數

Keras內置的損失函數都在keras/objectives.py中,比如mse的定義是:

def mean_squared_error(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true), axis=-1)

按照相同的格式,可以定義自己的損失函數。比如我們想要差值的4次方的平均作為損失函數:

def my_object(y_true,y_pred):
    return K.mean(K.square(K.square(y_pred-y_true)),axis=-1)

把模型寫全:

def get_myobj_model():
    model = Sequential()
    model.add(Dense(5,input_dim=7))
    model.add(Dense(3))
    def my_object(y_true,y_pred):
        return K.mean(K.square(K.square(y_pred-y_true)),axis=-1)
    model.compile(optimizer='sgd',loss=my_object)
    return model
model = get_myobj_model()

能自定義損失函數是非常重要一環,它極大的擴展瞭網絡的應用。例如希望用cnn訓練出來一個前後景分割的濾波器,它的輸出的像素在對應前景的位置是1,在對應後景的位置是0。不但希望網絡輸出的值的mse小,而且希望0和1分別都連在一起,不要出來雪花狀的輸出。那麼自定義損失函數就能做到瞭,實際是把兩個損失函數放到瞭一個損失函數中。

另外一些很有用的損失函數如warp-ctc,就可以在這裡集成進模型。

4、自定義遞歸層

遞歸層的定義方法和非遞歸層不太一樣。根據Keras內LSTM的寫法,它還有一個reset_states函數和step函數,這是由遞歸的性質決定的。例子都在keras/layers/recurrent.py中。

之前看學長用lasagne寫的LSTM的變體,看得我想哭,還不如在Keras中把LSTM得代碼復制過來修修改改。不過LSTM也不能直接復制過來,還需要import幾個依賴:

rom keras.layers.recurrent import LSTM,Recurrent,time_distributed_dense
from keras import initializations,regularizers,activations
from keras.engine import InputSpec

5、自定義優化函數

Keras的代碼確實好,耦合度很低。Keras內置的優化函數在keras/optimizers.py中,基類Optimizer也在這個文件裡。例如把它內置的SGD算法拷貝到自己的文件中,隻要先from keras.optimizers import Optimizer就能編譯通過。

有時候要得到state-of-the-art的結果,需要用sgd加動量法充分收斂。比如學習率0.01學習上100epoch,再把學習率減半,再學100epoch,依次類推。如果不自定義優化函數的話,就要分階段調用fit函數,修改學習率,可能還要重新compile。這就不是很優美瞭。其它一些奇葩的學習策略,也可以通過自定義優化函數來得到。

6、後記

Keras確實非常強大,不但能用來寫大作業,做一些研究也夠用瞭。Yeah

補充:keras的擴展性:自定義keras

1. 自定義keras

keras是一種深度學習的API,能夠快速實現你的實驗。keras也集成瞭很多預訓練的模型,可以實現很多常規的任務,如圖像分類。TensorFlow 2.0之後tensorflow本身也變的很keras化。

另一方面,keras表現出高度的模塊化和封裝性,所以有的人會覺得keras不易於擴展, 比如實現一種新的Loss,新的網絡層結構;其實可以通過keras的基礎模塊進行快速的擴展,實現更新的算法。

本文就keras的擴展性,總結瞭對layer,model和loss的自定義。

2. 自定義keras layers

layers是keras中重要的組成部分,網絡結構中每一個組成都要以layers來表現。keras提供瞭很多常規的layer,如Convolution layers,pooling layers, activation layers, dense layers等, 我們可以通過繼承基礎layers來擴展自定義的layers。

2.1 base layer

layer實瞭輸入tensor和輸出tensor的操作類,以下為base layer的5個方法,自定義layer隻要重寫這些方法就可以瞭。

init(): 定義自定義layer的一些屬性

build(self, input_shape):定義layer需要的權重weights

call(self, *args, **kwargs):layer具體的操作,會在調用自定義layer自動執行

get_config(self):layer初始化的配置,是一個字典dictionary。

compute_output_shape(self,input_shape):計算輸出tensor的shape

2.2 例子

# 標準化層
class InstanceNormalize(Layer):
    def __init__(self, **kwargs):
        super(InstanceNormalize, self).__init__(**kwargs)
        self.epsilon = 1e-3
            
    def call(self, x, mask=None):
        mean, var = tf.nn.moments(x, [1, 2], keep_dims=True)
        return tf.div(tf.subtract(x, mean), tf.sqrt(tf.add(var, self.epsilon)))
                                                 
    def compute_output_shape(self,input_shape):
        return input_shape
# 調用
inputs = keras.Input(shape=(None, None, 3))
x = InstanceNormalize()(inputs)
# 可以通過add_weight() 創建權重
class SimpleDense(Layer):
  def __init__(self, units=32):
      super(SimpleDense, self).__init__()
      self.units = units
  def build(self, input_shape):
      self.w = self.add_weight(shape=(input_shape[-1], self.units),
                               initializer='random_normal',
                               trainable=True)
      self.b = self.add_weight(shape=(self.units,),
                               initializer='random_normal',
                               trainable=True)
  def call(self, inputs):
      return tf.matmul(inputs, self.w) + self.b
# 調用
inputs = keras.Input(shape=(None, None, 3))
x = SimpleDense(units=64)(inputs)

3. 自定義keras model

我們在定義完網絡結構時,會把整個工作流放在 keras.Model, 進行 compile(), 然後通過 fit() 進行訓練過程。執行 fit() 的時候,執行每個 batch size data 的時候,都會調用 Model 中train_step(self, data)

from keras.models import Sequential
from keras.layers import Dense, Activation
model = Sequential()
model.add(Dense(units=64, input_dim=100))
model.add(Activation("relu"))
model.add(Dense(units=10))
model.add(Activation("softmax"))
model.compile(loss='categorical_crossentropy', optimizer='sgd', metrics=['accuracy'])
model.fit(x_train, y_train, epochs=5, batch_size=32)

當你需要自己控制訓練過程的時候,可以重寫Model的train_step(self, data)方法

class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        x, y = data
        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value
            # (the loss function is configured in `compile()`)
            loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)
        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update metrics (includes the metric that tracks the loss)
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}
import numpy as np
# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)

4. 自定義keras loss

keras實現瞭交叉熵等常見的loss,自定義loss對於使用keras來說是比較常見,實現各種魔改loss,如focal loss。

我們來看看keras源碼中對loss實現

def categorical_crossentropy(y_true, y_pred):
    return K.categorical_crossentropy(y_true, y_pred)
def mean_squared_error(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true), axis=-1)

可以看出輸入是groud true y_true和預測值y_pred, 返回為計算loss的函數。自定義loss可以參照如此模式即可。

def focal_loss(weights=None, alpha=0.25, gamma=2):
    r"""Compute focal loss for predictions.
        Multi-labels Focal loss formula:
            FL = -alpha * (z-p)^gamma * log(p) -(1-alpha) * p^gamma * log(1-p)
                 ,which alpha = 0.25, gamma = 2, p = sigmoid(x), z = target_tensor.
    # https://github.com/ailias/Focal-Loss-implement-on-Tensorflow/blob/master/focal_loss.py
    Args:
     prediction_tensor: A float tensor of shape [batch_size, num_anchors,
        num_classes] representing the predicted logits for each class
     target_tensor: A float tensor of shape [batch_size, num_anchors,
        num_classes] representing one-hot encoded classification targets
     weights: A float tensor of shape [batch_size, num_anchors]
     alpha: A scalar tensor for focal loss alpha hyper-parameter
     gamma: A scalar tensor for focal loss gamma hyper-parameter
    Returns:
        loss: A (scalar) tensor representing the value of the loss function
    """
    def _custom_loss(y_true, y_pred):
        sigmoid_p = tf.nn.sigmoid(y_pred)
        zeros = array_ops.zeros_like(sigmoid_p, dtype=sigmoid_p.dtype)
        # For poitive prediction, only need consider front part loss, back part is 0;
        # target_tensor > zeros <=> z=1, so poitive coefficient = z - p.
        pos_p_sub = array_ops.where(y_true > zeros, y_true - sigmoid_p, zeros)
        # For negative prediction, only need consider back part loss, front part is 0;
        # target_tensor > zeros <=> z=1, so negative coefficient = 0.
        neg_p_sub = array_ops.where(y_true > zeros, zeros, sigmoid_p)
        per_entry_cross_ent = - alpha * (pos_p_sub ** gamma) * tf.log(tf.clip_by_value(sigmoid_p, 1e-8, 1.0)) \
                              - (1 - alpha) * (neg_p_sub ** gamma) * tf.log(
            tf.clip_by_value(1.0 - sigmoid_p, 1e-8, 1.0))
        return tf.reduce_sum(per_entry_cross_ent)
    return _custom_loss

5. 總結

本文分享瞭keras的擴展功能,擴展功能其實也是實現Keras模塊化的一種繼承實現。

總結如下:

繼承Layer實現自定義layer, 記住bulid() call()

繼續Model實現train_step定義訓練過程,記住梯度計算tape.gradient(loss, trainable_vars) ,權重更新optimizer.apply_gradients, 計算evaluate compiled_metrics.update_state(y, y_pred)

魔改loss,記住groud true y_true和預測值y_pred輸入,返回loss function

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: