对比之前的复杂版本,这次的torch实现其实简单了不少,不过这和上面的代码实现逻辑过于复杂也有关系。
一、PyTorch实现
# Author : hellcat # Time : 18-3-2 """ import os os.environ["CUDA_VISIBLE_DEVICES"]="-1" import numpy as np np.set_printoptions(threshold=np.inf) import tensorflow as tf config = tf.ConfigProto() config.gpu_options.allow_growth = True sess = tf.Session(config=config) """ import torch as t import torch.nn as nn from torch.nn import functional as F class ResidualBlock(nn.Module): def __init__(self, inchannel, outchannel, stride=1, shortcut=None): super(ResidualBlock, self).__init__() self.left = nn.Sequential( nn.Conv2d(inchannel, outchannel, kernel_size=3, stride=stride, padding=1), nn.BatchNorm2d(outchannel), nn.ReLU(inplace=True), nn.Conv2d(outchannel, outchannel, 3, 1, 1, bias=False), nn.BatchNorm2d(outchannel) ) self.right = shortcut def forward(self, x): out = self.left(x) residual = x if self.right is None else self.right(x) out += residual return F.relu(out) class ResNet(nn.Module): def __init__(self, num_classes=1000): super(ResNet, self).__init__() self.pre = nn.Sequential( nn.Conv2d(3, 64, 7, 2, 3, bias=False), nn.BatchNorm2d(64), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=2, padding=1) ) self.layer1 = self._make_layer(inchannel=64, outchannel=128, block_num=3) self.layer2 = self._make_layer(inchannel=128, outchannel=256, block_num=4, stride=2) self.layer3 = self._make_layer(inchannel=256, outchannel=512, block_num=6, stride=2) self.layer4 = self._make_layer(inchannel=512, outchannel=512, block_num=3, stride=2) self.fc = nn.Linear(512, num_classes) def _make_layer(self, inchannel, outchannel, block_num, stride=1): shortcut = nn.Sequential( nn.Conv2d(inchannel, outchannel, 1, stride, bias=False), nn.BatchNorm2d(outchannel) ) layers = [] layers.append(ResidualBlock(inchannel, outchannel, stride, shortcut)) for i in range(1, block_num): layers.append(ResidualBlock(outchannel, outchannel)) return nn.Sequential(*layers) def forward(self, x): x = self.pre(x) # [1, 64, 56, 56] x = self.layer1(x) # [1, 128, 56, 56] x = self.layer2(x) # [1, 256, 28, 28] x = self.layer3(x) # [1, 512, 14, 14] x = self.layer4(x) # [1, 512, 7, 7] x = F.avg_pool2d(x, 7) x = x.view(x.size(0), -1) return self.fc(x) def hook(module, inputdata, output): '''把这层的输出拷贝到features中''' print("钩子输出:", output.data.size()) module = ResNet() img = t.autograd.Variable(t.randn(1, 3, 224, 224)) handle = module.pre[0].register_forward_hook(hook) out = module(img) handle.remove() print(out)
上面代码中,我们注册了钩子尝试分析一下中间的输出,可以看到,torch中的卷积层默认是SAME模式,输出就是in/stride,和TensorFlow一致,
torch.Size([1, 64, 112, 112])
Variable containing:
0.6336 -0.5863 0.6472 … -0.4694 0.1808 0.2837
[torch.FloatTensor of size 1×1000]
二、TensorFlow实现
同样的逻辑下,ResNet34的TensorFlow实现如下,使用的封装包ops,之前有介绍过,这里面小修了卷积层的封装,使得conv2d可以舍弃bias(就是卷及计算后不加偏执),
# Author : hellcat # Time : 18-3-7 """ import os os.environ["CUDA_VISIBLE_DEVICES"]="-1" import numpy as np np.set_printoptions(threshold=np.inf) """ import ops import tensorflow as tf config = tf.ConfigProto() config.gpu_options.allow_growth = True sess = tf.Session(config=config) def ResidualBlock(x, outchannel, stride=1, shortcut=None, train=True, name="ResidualBlock"): with tf.variable_scope(name): conv1 = ops.conv2d(x, outchannel, k_h=3, k_w=3, s_h=stride, s_w=stride, scope="conv1") bn1 = tf.nn.relu(ops.batch_normal(conv1, train=train, scope="bn1")) conv2 = ops.conv2d(bn1, outchannel, k_h=3, k_w=3, s_h=1, s_w=1, with_bias=False, scope="conv2") left = ops.batch_normal(conv2, train=train, scope="bn2") right = x if shortcut is None else shortcut(x) return tf.nn.relu(left + right) class ResNet(): def __init__(self): with tf.variable_scope("input"): x = tf.placeholder(dtype=tf.float32, shape=[1, 224, 224, 3]) with tf.variable_scope("pre"): conv = ops.conv2d(x, output_dim=64, k_h=7, k_w=7, s_h=2, s_w=2, with_bias=False) bn = tf.nn.relu(ops.batch_normal(conv)) pool = tf.nn.max_pool(bn, ksize=[1, 3, 3, 1], strides=[1, 2, 2, 1], padding='SAME') with tf.variable_scope("layer1"): layer1 = self._make_layer(pool, outchannel=128, block_num=3) with tf.variable_scope("layer2"): layer2 = self._make_layer(layer1, outchannel=256, block_num=4, stride=2) with tf.variable_scope("layer3"): layer3 = self._make_layer(layer2, outchannel=512, block_num=6, stride=2) with tf.variable_scope("layer4"): layer4 = self._make_layer(layer3, outchannel=512, block_num=3, stride=2) # Tensor("layer1/ResidualBlock2/Relu_1:0", shape=(1, 56, 56, 128), dtype=float32) # Tensor("layer2/ResidualBlock3/Relu_1:0", shape=(1, 28, 28, 256), dtype=float32) # Tensor("layer3/ResidualBlock5/Relu_1:0", shape=(1, 14, 14, 512), dtype=float32) # Tensor("layer4/ResidualBlock2/Relu_1:0", shape=(1, 7, 7, 512), dtype=float32) pool = tf.nn.avg_pool(layer4, ksize=[1, 7, 7, 1], strides=[1, 7, 7, 1], padding='SAME') reshape = tf.reshape(pool, [layer4.get_shape()[0], -1]) self.fc = ops.linear(reshape, 1000) def __call__(self, *args, **kwargs): return self.fc def _make_layer(self,x, outchannel, block_num, stride=1): def shortcut(input_): with tf.variable_scope("shortcut"): conv = ops.conv2d(input_, output_dim=outchannel, k_w=1, k_h=1, s_w=stride, s_h=stride, with_bias=False) return ops.batch_normal(conv) x = ResidualBlock(x, outchannel, stride, shortcut, name="ResidualBlock0") for i in range(1, block_num): x = ResidualBlock(x, outchannel, name="ResidualBlock{}".format(i)) return x if __name__ == "__main__": resnet = ResNet() print(resnet())
ops.py卷积修改封装如下,
def conv2d(input_, output_dim, k_h=5, k_w=5, s_h=2, s_w=2, stddev=0.02, scope="conv2d", with_w=False, with_bias=True): """ 卷积网络封装 :param input_: :param output_dim: 输出的feature数目 :param k_h: :param k_w: :param s_h: :param s_w: :param stddev: :param scope: :param with_w: :param with_bias: 是否含有bias层 :return: """ with tf.variable_scope(scope): w = tf.get_variable('w', [k_h, k_w, input_.get_shape()[-1], output_dim], initializer=tf.truncated_normal_initializer(stddev=stddev)) conv = tf.nn.conv2d(input_, w, strides=[1, s_h, s_w, 1], padding='SAME') if with_bias: biases = tf.get_variable('biases', [output_dim], initializer=tf.constant_initializer(0.0)) conv = tf.reshape(tf.nn.bias_add(conv, biases), conv.get_shape()) else: biases = None if with_w: return conv, w, biases else: return conv
输出如下,
Tensor(“Linear/add:0”, shape=(1, 1000), dtype=float32)
附:
ops.py截止本文发布的最新版本状态,
# Author : hellcat # Time : 18-1-21 # Usage : 网络层函数封装 """ conv2d deconv2d lrelu linear """ import tensorflow as tf # def batch_normal(x, train=True, epsilon=1e-5, decay=0.9, scope="batch_norm"): # return tf.contrib.layers.batch_norm(x, # decay=decay, # updates_collections=None, # epsilon=epsilon, # scale=True, # is_training=train, # scope=scope) def batch_normal(x, epsilon=1e-5, momentum=0.9, train=True, scope='batch_norm'): with tf.variable_scope(scope): return tf.contrib.layers.batch_norm(x, decay=momentum, updates_collections=None, epsilon=epsilon, scale=True, is_training=train) ''' Note: when training, the moving_mean and moving_variance need to be updated. By default the update ops are placed in `tf.GraphKeys.UPDATE_OPS`, so they need to be added as a dependency to the `train_op`. For example: ```python update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS) with tf.control_dependencies(update_ops): train_op = optimizer.minimize(loss) ``` One can set updates_collections=None to force the updates in place, but that can have a speed penalty, especially in distributed settings. ''' # class batch_norm(object): # def __init__(self, epsilon=1e-5, decay=0.9, scope="batch_norm"): # with tf.variable_scope(scope): # self.epsilon = epsilon # self.decay = decay # # self.scope = scope # # def __call__(self, x, scope, train=True): # return tf.contrib.layers.batch_norm(x, # decay=self.decay, # updates_collections=None, # epsilon=self.epsilon, # scale=True, # is_training=train, # scope=scope) def concat(tensor_a, tensor_b): """ 组合Tensor,注意的是这里tensor_a的宽高应该大于等于tensor_b :param tensor_a: 前面的tensor :param tensor_b: 后面的tensor :return: """ if tensor_a.get_shape().as_list()[1] > tensor_b.get_shape().as_list()[1]: return tf.concat([tf.slice(tensor_a, begin=[0, (int(tensor_a.shape[1]) - int(tensor_b.shape[1])) // 2, (int(tensor_a.shape[1]) - int(tensor_b.shape[1])) // 2, 0], size=[int(tensor_b.shape[0]), int(tensor_b.shape[1]), int(tensor_b.shape[2]), int(tensor_a.shape[3])], name='slice'), tensor_b], axis=3, name='concat') elif tensor_a.get_shape().as_list()[1] < tensor_b.get_shape().as_list()[1]: return tf.concat([tensor_a, tf.slice(tensor_b, begin=[0, (int(tensor_b.shape[1]) - int(tensor_a.shape[1])) // 2, (int(tensor_b.shape[1]) - int(tensor_a.shape[1])) // 2, 0], size=[int(tensor_a.shape[0]), int(tensor_a.shape[1]), int(tensor_a.shape[2]), int(tensor_b.shape[3])], name='slice')], axis=3, name='concat') else: return tf.concat([tensor_a, tensor_b], axis=3) def conv_cond_concat(x, y): """ 广播并连接向量,用于ac_gan的标签对矩阵拼接 :param x: features,例如shape:[n,16,16,128] :param y: 扩暂维度后的标签,例如shape:[n,1,1,10] :return: 拼接后features,例如:[n,16,16,138] """ x_shapes = x.get_shape() y_shapes = y.get_shape() return tf.concat([x, y * tf.ones([x_shapes[0], x_shapes[1], x_shapes[2], y_shapes[3]])], axis=3) def conv2d(input_, output_dim, k_h=5, k_w=5, s_h=2, s_w=2, stddev=0.02, scope="conv2d", with_w=False, with_bias=True): """ 卷积网络封装 :param input_: :param output_dim: 输出的feature数目 :param k_h: :param k_w: :param s_h: :param s_w: :param stddev: :param scope: :param with_w: :param with_bias: 是否含有bias层 :return: """ with tf.variable_scope(scope): w = tf.get_variable('w', [k_h, k_w, input_.get_shape()[-1], output_dim], initializer=tf.truncated_normal_initializer(stddev=stddev)) conv = tf.nn.conv2d(input_, w, strides=[1, s_h, s_w, 1], padding='SAME') if with_bias: biases = tf.get_variable('biases', [output_dim], initializer=tf.constant_initializer(0.0)) conv = tf.reshape(tf.nn.bias_add(conv, biases), conv.get_shape()) else: biases = None if with_w: return conv, w, biases else: return conv def deconv2d(input_, output_shape, k_h=5, k_w=5, s_h=2, s_w=2, stddev=0.02, scope="deconv2d", with_w=False): """ 转置卷积网络封装 :param input_: :param output_shape: 输出的shape :param k_h: :param k_w: :param s_h: :param s_w: :param stddev: :param scope: :param with_w: :return: """ with tf.variable_scope(scope): # filter : [height, width, output_channels, in_channels] w = tf.get_variable('w', [k_h, k_w, output_shape[-1], input_.get_shape()[-1]], initializer=tf.random_normal_initializer(stddev=stddev)) try: deconv = tf.nn.conv2d_transpose(input_, w, output_shape=output_shape, strides=[1, s_h, s_w, 1]) # Support for verisons of TensorFlow before 0.7.0 except AttributeError: deconv = tf.nn.deconv2d(input_, w, output_shape=output_shape, strides=[1, s_h, s_w, 1]) biases = tf.get_variable('biases', [output_shape[-1]], initializer=tf.constant_initializer(0.0)) deconv = tf.reshape(tf.nn.bias_add(deconv, biases), deconv.get_shape()) if with_w: return deconv, w, biases else: return deconv def lrelu(x, leak=0.2): """ Leak_Relu层封装 :param x: :param leak: :return: """ return tf.maximum(x, leak*x) def linear(input_, output_size, stddev=0.02, bias_start=0.0, scope=None, with_w=False): """ 全连接层封装 :param input_: :param output_size: 输出节点数目 :param scope: :param stddev: :param bias_start: 使用常数初始化偏执,常数值设定 :param with_w: 返回是否返回参数Variable :return: """ shape = input_.get_shape().as_list() with tf.variable_scope(scope or "Linear"): matrix = tf.get_variable("Matrix", [shape[1], output_size], tf.float32, tf.random_normal_initializer(stddev=stddev)) bias = tf.get_variable("bias", [output_size], initializer=tf.constant_initializer(bias_start)) if with_w: return tf.matmul(input_, matrix) + bias, matrix, bias else: return tf.matmul(input_, matrix) + bias