昇思25天学习打卡营第12天|ResNet50迁移学习
前言
在实际应用场景中,由于训练数据集不足,所以很少有人会从头开始训练整个网络。普遍的做法是,在一个非常大的基础数据集上训练得到一个预训练模型,然后使用该模型来初始化网络的权重参数或作为固定特征提取器应用于特定的任务中。本章将使用迁移学习的方法对ImageNet数据集中的狼和狗图像进行分类。
`
一、数据准备
下载案例所用到的狗与狼分类数据集,数据集中的图像来自于ImageNet,每个分类有大约120张训练图像与30张验证图像。使用download接口下载数据集,并将下载后的数据集自动解压到当前目录下。
from download import download dataset_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/intermediate/Canidae_data.zip" download(dataset_url, "./datasets-Canidae", kind="zip", replace=True)
二、加载数据集
狼狗数据集提取自ImageNet分类数据集,使用mindspore.dataset.ImageFolderDataset接口来加载数据集,并进行相关图像增强操作。
首先执行过程定义一些输入:
batch_size = 18 # 批量大小 image_size = 224 # 训练图像空间大小 num_epochs = 5 # 训练周期数 lr = 0.001 # 学习率 momentum = 0.9 # 动量 workers = 4 # 并行线程个数
import mindspore as ms import mindspore.dataset as ds import mindspore.dataset.vision as vision # 数据集目录路径 data_path_train = "./datasets-Canidae/data/Canidae/train/" data_path_val = "./datasets-Canidae/data/Canidae/val/" # 创建训练数据集 def create_dataset_canidae(dataset_path, usage): """数据加载""" data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=workers, shuffle=True,) # 数据增强操作 mean = [0.485 * 255, 0.456 * 255, 0.406 * 255] std = [0.229 * 255, 0.224 * 255, 0.225 * 255] scale = 32 if usage == "train": # Define map operations for training dataset trans = [ vision.RandomCropDecodeResize(size=image_size, scale=(0.08, 1.0), ratio=(0.75, 1.333)), vision.RandomHorizontalFlip(prob=0.5), vision.Normalize(mean=mean, std=std), vision.HWC2CHW() ] else: # Define map operations for inference dataset trans = [ vision.Decode(), vision.Resize(image_size + scale), vision.CenterCrop(image_size), vision.Normalize(mean=mean, std=std), vision.HWC2CHW() ] # 数据映射操作 data_set = data_set.map( operations=trans, input_columns='image', num_parallel_workers=workers) # 批量操作 data_set = data_set.batch(batch_size) return data_set dataset_train = create_dataset_canidae(data_path_train, "train") step_size_train = dataset_train.get_dataset_size() dataset_val = create_dataset_canidae(data_path_val, "val") step_size_val = dataset_val.get_dataset_size()
数据集可视化
从mindspore.dataset.ImageFolderDataset接口中加载的训练数据集返回值为字典,用户可通过 create_dict_iterator 接口创建数据迭代器,使用 next 迭代访问数据集。本章中 batch_size 设为18,所以使用 next 一次可获取18个图像及标签数据。
data = next(dataset_train.create_dict_iterator()) images = data["image"] labels = data["label"] print("Tensor of image", images.shape) print("Labels:", labels)
输出为:
Tensor of image (18, 3, 224, 224) Labels: [1 1 0 1 1 0 0 1 1 0 0 1 0 1 0 0 1 0]
import matplotlib.pyplot as plt import numpy as np # class_name对应label,按文件夹字符串从小到大的顺序标记label class_name = {0: "dogs", 1: "wolves"} plt.figure(figsize=(5, 5)) for i in range(4): # 获取图像及其对应的label data_image = images[i].asnumpy() data_label = labels[i] # 处理图像供展示使用 data_image = np.transpose(data_image, (1, 2, 0)) mean = np.array([0.485, 0.456, 0.406]) std = np.array([0.229, 0.224, 0.225]) data_image = std * data_image + mean data_image = np.clip(data_image, 0, 1) # 显示图像 plt.subplot(2, 2, i+1) plt.imshow(data_image) plt.title(class_name[int(labels[i].asnumpy())]) plt.axis("off") plt.show()
三、训练模型
这节课使用ResNet50模型进行训练。搭建好模型框架后,通过将pretrained参数设置为True来下载ResNet50的预训练模型并将权重参数加载到网络中。
构建Resnet50网络
from typing import Type, Union, List, Optional from mindspore import nn, train from mindspore.common.initializer import Normal weight_init = Normal(mean=0, sigma=0.02) gamma_init = Normal(mean=1, sigma=0.02) class ResidualBlockBase(nn.Cell): # 残差块 expansion: int = 1 # 最后一个卷积核数量与第一个卷积核数量相等 def __init__(self, in_channel: int, out_channel: int, stride: int = 1, norm: Optional[nn.Cell] = None, down_sample: Optional[nn.Cell] = None) -> None: super(ResidualBlockBase, self).__init__() if not norm: self.norm = nn.BatchNorm2d(out_channel) else: self.norm = norm self.conv1 = nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=stride, weight_init=weight_init) self.conv2 = nn.Conv2d(in_channel, out_channel, kernel_size=3, weight_init=weight_init) self.relu = nn.ReLU() self.down_sample = down_sample def construct(self, x): """ResidualBlockBase construct.""" identity = x # shortcuts分支 out = self.conv1(x) # 主分支第一层:3*3卷积层 out = self.norm(out) out = self.relu(out) out = self.conv2(out) # 主分支第二层:3*3卷积层 out = self.norm(out) if self.down_sample is not None: identity = self.down_sample(x) out += identity # 输出为主分支与shortcuts之和 out = self.relu(out) return out
class ResidualBlock(nn.Cell): expansion = 4 # 最后一个卷积核的数量是第一个卷积核数量的4倍 def __init__(self, in_channel: int, out_channel: int, stride: int = 1, down_sample: Optional[nn.Cell] = None) -> None: super(ResidualBlock, self).__init__() self.conv1 = nn.Conv2d(in_channel, out_channel, kernel_size=1, weight_init=weight_init) self.norm1 = nn.BatchNorm2d(out_channel) self.conv2 = nn.Conv2d(out_channel, out_channel, kernel_size=3, stride=stride, weight_init=weight_init) self.norm2 = nn.BatchNorm2d(out_channel) self.conv3 = nn.Conv2d(out_channel, out_channel * self.expansion, kernel_size=1, weight_init=weight_init) self.norm3 = nn.BatchNorm2d(out_channel * self.expansion) self.relu = nn.ReLU() self.down_sample = down_sample def construct(self, x): identity = x # shortscuts分支 out = self.conv1(x) # 主分支第一层:1*1卷积层 out = self.norm1(out) out = self.relu(out) out = self.conv2(out) # 主分支第二层:3*3卷积层 out = self.norm2(out) out = self.relu(out) out = self.conv3(out) # 主分支第三层:1*1卷积层 out = self.norm3(out) if self.down_sample is not None: identity = self.down_sample(x) out += identity # 输出为主分支与shortcuts之和 out = self.relu(out) return out
def make_layer(last_out_channel, block: Type[Union[ResidualBlockBase, ResidualBlock]], channel: int, block_nums: int, stride: int = 1): down_sample = None # shortcuts分支 if stride != 1 or last_out_channel != channel * block.expansion: down_sample = nn.SequentialCell([ nn.Conv2d(last_out_channel, channel * block.expansion, kernel_size=1, stride=stride, weight_init=weight_init), nn.BatchNorm2d(channel * block.expansion, gamma_init=gamma_init) ]) layers = [] layers.append(block(last_out_channel, channel, stride=stride, down_sample=down_sample)) in_channel = channel * block.expansion # 堆叠残差网络 for _ in range(1, block_nums): layers.append(block(in_channel, channel)) return nn.SequentialCell(layers)
from mindspore import load_checkpoint, load_param_into_net class ResNet(nn.Cell): def __init__(self, block: Type[Union[ResidualBlockBase, ResidualBlock]], layer_nums: List[int], num_classes: int, input_channel: int) -> None: super(ResNet, self).__init__() self.relu = nn.ReLU() # 第一个卷积层,输入channel为3(彩色图像),输出channel为64 self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, weight_init=weight_init) self.norm = nn.BatchNorm2d(64) # 最大池化层,缩小图片的尺寸 self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same') # 各个残差网络结构块定义, self.layer1 = make_layer(64, block, 64, layer_nums[0]) self.layer2 = make_layer(64 * block.expansion, block, 128, layer_nums[1], stride=2) self.layer3 = make_layer(128 * block.expansion, block, 256, layer_nums[2], stride=2) self.layer4 = make_layer(256 * block.expansion, block, 512, layer_nums[3], stride=2) # 平均池化层 self.avg_pool = nn.AvgPool2d() # flattern层 self.flatten = nn.Flatten() # 全连接层 self.fc = nn.Dense(in_channels=input_channel, out_channels=num_classes) def construct(self, x): x = self.conv1(x) x = self.norm(x) x = self.relu(x) x = self.max_pool(x) x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) x = self.layer4(x) x = self.avg_pool(x) x = self.flatten(x) x = self.fc(x) return x def _resnet(model_url: str, block: Type[Union[ResidualBlockBase, ResidualBlock]], layers: List[int], num_classes: int, pretrained: bool, pretrianed_ckpt: str, input_channel: int): model = ResNet(block, layers, num_classes, input_channel) if pretrained: # 加载预训练模型 download(url=model_url, path=pretrianed_ckpt, replace=True) param_dict = load_checkpoint(pretrianed_ckpt) load_param_into_net(model, param_dict) return model def resnet50(num_classes: int = 1000, pretrained: bool = False): "ResNet50模型" resnet50_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/models/application/resnet50_224_new.ckpt" resnet50_ckpt = "./LoadPretrainedModel/resnet50_224_new.ckpt" return _resnet(resnet50_url, ResidualBlock, [3, 4, 6, 3], num_classes, pretrained, resnet50_ckpt, 2048)
固定特征进行训练
使用固定特征进行训练的时候,需要冻结除最后一层之外的所有网络层。通过设置 requires_grad == False 冻结参数,以便不在反向传播中计算梯度。
import mindspore as ms import matplotlib.pyplot as plt import os import time net_work = resnet50(pretrained=True) # 全连接层输入层的大小 in_channels = net_work.fc.in_channels # 输出通道数大小为狼狗分类数2 head = nn.Dense(in_channels, 2) # 重置全连接层 net_work.fc = head # 平均池化层kernel size为7 avg_pool = nn.AvgPool2d(kernel_size=7) # 重置平均池化层 net_work.avg_pool = avg_pool # 冻结除最后一层外的所有参数 for param in net_work.get_parameters(): if param.name not in ["fc.weight", "fc.bias"]: param.requires_grad = False # 定义优化器和损失函数 opt = nn.Momentum(params=net_work.trainable_params(), learning_rate=lr, momentum=0.5) loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean') def forward_fn(inputs, targets): logits = net_work(inputs) loss = loss_fn(logits, targets) return loss grad_fn = ms.value_and_grad(forward_fn, None, opt.parameters) def train_step(inputs, targets): loss, grads = grad_fn(inputs, targets) opt(grads) return loss # 实例化模型 model1 = train.Model(net_work, loss_fn, opt, metrics={"Accuracy": train.Accuracy()})
训练和评估
import mindspore as ms import matplotlib.pyplot as plt import os import time dataset_train = create_dataset_canidae(data_path_train, "train") step_size_train = dataset_train.get_dataset_size() dataset_val = create_dataset_canidae(data_path_val, "val") step_size_val = dataset_val.get_dataset_size() num_epochs = 5 # 创建迭代器 data_loader_train = dataset_train.create_tuple_iterator(num_epochs=num_epochs) data_loader_val = dataset_val.create_tuple_iterator(num_epochs=num_epochs) best_ckpt_dir = "./BestCheckpoint" best_ckpt_path = "./BestCheckpoint/resnet50-best-freezing-param.ckpt"
import mindspore as ms import matplotlib.pyplot as plt import os import time # 开始循环训练 print("Start Training Loop ...") best_acc = 0 for epoch in range(num_epochs): losses = [] net_work.set_train() epoch_start = time.time() # 为每轮训练读入数据 for i, (images, labels) in enumerate(data_loader_train): labels = labels.astype(ms.int32) loss = train_step(images, labels) losses.append(loss) # 每个epoch结束后,验证准确率 acc = model1.eval(dataset_val)['Accuracy'] epoch_end = time.time() epoch_seconds = (epoch_end - epoch_start) * 1000 step_seconds = epoch_seconds/step_size_train print("-" * 20) print("Epoch: [%3d/%3d], Average Train Loss: [%5.3f], Accuracy: [%5.3f]" % ( epoch+1, num_epochs, sum(losses)/len(losses), acc )) print("epoch time: %5.3f ms, per step time: %5.3f ms" % ( epoch_seconds, step_seconds )) if acc > best_acc: best_acc = acc if not os.path.exists(best_ckpt_dir): os.mkdir(best_ckpt_dir) ms.save_checkpoint(net_work, best_ckpt_path) print("=" * 80) print(f"End of validation the best Accuracy is: {best_acc: 5.3f}, " f"save the best ckpt file in {best_ckpt_path}", flush=True)
Start Training Loop ... -------------------- Epoch: [ 1/ 5], Average Train Loss: [0.641], Accuracy: [0.950] epoch time: 139729.208 ms, per step time: 9980.658 ms -------------------- Epoch: [ 2/ 5], Average Train Loss: [0.544], Accuracy: [0.783] epoch time: 935.736 ms, per step time: 66.838 ms -------------------- Epoch: [ 3/ 5], Average Train Loss: [0.496], Accuracy: [0.950] epoch time: 788.549 ms, per step time: 56.325 ms -------------------- Epoch: [ 4/ 5], Average Train Loss: [0.417], Accuracy: [0.967] epoch time: 752.244 ms, per step time: 53.732 ms -------------------- Epoch: [ 5/ 5], Average Train Loss: [0.389], Accuracy: [0.967] epoch time: 789.178 ms, per step time: 56.370 ms ================================================================================ End of validation the best Accuracy is: 0.967, save the best ckpt file in ./BestCheckpoint/resnet50-best-freezing-param.ckpt
可视化模型预测
使用固定特征得到的best.ckpt文件对对验证集的狼和狗图像数据进行预测。若预测字体为蓝色即为预测正确,若预测字体为红色则预测错误。
import matplotlib.pyplot as plt import mindspore as ms def visualize_model(best_ckpt_path, val_ds): net = resnet50() # 全连接层输入层的大小 in_channels = net.fc.in_channels # 输出通道数大小为狼狗分类数2 head = nn.Dense(in_channels, 2) # 重置全连接层 net.fc = head # 平均池化层kernel size为7 avg_pool = nn.AvgPool2d(kernel_size=7) # 重置平均池化层 net.avg_pool = avg_pool # 加载模型参数 param_dict = ms.load_checkpoint(best_ckpt_path) ms.load_param_into_net(net, param_dict) model = train.Model(net) # 加载验证集的数据进行验证 data = next(val_ds.create_dict_iterator()) images = data["image"].asnumpy() labels = data["label"].asnumpy() class_name = {0: "dogs", 1: "wolves"} # 预测图像类别 output = model.predict(ms.Tensor(data['image'])) pred = np.argmax(output.asnumpy(), axis=1) # 显示图像及图像的预测值 plt.figure(figsize=(5, 5)) for i in range(4): plt.subplot(2, 2, i + 1) # 若预测正确,显示为蓝色;若预测错误,显示为红色 color = 'blue' if pred[i] == labels[i] else 'red' plt.title('predict:{}'.format(class_name[pred[i]]), color=color) picture_show = np.transpose(images[i], (1, 2, 0)) mean = np.array([0.485, 0.456, 0.406]) std = np.array([0.229, 0.224, 0.225]) picture_show = std * picture_show + mean picture_show = np.clip(picture_show, 0, 1) plt.imshow(picture_show) plt.axis('off') plt.show()
visualize_model(best_ckpt_path, dataset_val)
总结与感想
今天学习了迁移学习的相关知识。我觉得迁移学习是机器学习中一种利用已经训练好的模型(通常是在大规模数据集上预训练的模型)来解决新的但相关的问题的技术。其核心思想是利用从一个任务中学到的知识来加速另一个相关任务的学习过程。除此之外,我还了解了关于ResNet50的相关知识。它是ResNet系列中的一员,其中的“50”表示网络中包含50个主要的卷积层。这几天学习的内容比较复杂,还需要留更多的时间来复习。