pytorch基础:FashionMNIST时装分类

环境搭建

安装指定pytorch版本:old pytorch version

1
2
conda
torch==2.0.1+cu117, torchvision==0.15.2

数据集

img img

我们这里的任务是对10个类别的“时装”图像进行分类,使用FashionMNIST数据集。 上图给出了FashionMNIST中数据的若干样例图,其中每个小图对应一个样本。
FashionMNIST数据集中包含已经预先划分好的训练集和测试集,其中训练集共60,000张图像,测试集共10,000张图像。每张图像均为单通道黑白图像,大小为28*28pixel,分属10个类别。

加载数据

加载数据主要通过构造Dataset结构,该结构体继承自torch.utils.data.Dataset,其中提供了一个函数__getitem__,用于根据index获取数据集中的元素。另外还需实现__len__用来返回数据集元素个数

对于这个时装分类,可以用pytorch封装好的Dataset,该对象会自动去下载数据集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from typing import Any
import torch.nn.functional as F

device = ( "cuda" if torch.cuda.is_available() else "cpu" )
print(f"Using {device} device")
# 1. 生成DataSet对象数据
# Download training/test data from open datasets.
training_data = datasets.FashionMNIST( root="data", train=True, download=True, transform=ToTensor())
test_data = datasets.FashionMNIST( root="data", train=False, download=True, transform=ToTensor())

也可以自己简易的封装一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import torch.utils.data as data
from typing import Any, Callable, Dict, List, Optional, Tuple
import torch
import codecs
import sys
import numpy as np
import os
from PIL import Image
from torchvision.transforms import ToTensor

class Dataset(data.Dataset):
resources = [
("train-images-idx3-ubyte.gz", "8d4fb7e6c68d591d4c3dfef9ec88bf0d"),
("train-labels-idx1-ubyte.gz", "25c81989df183df01b3e8a0aad5dffbe"),
("t10k-images-idx3-ubyte.gz", "bef4ecab320f06d8554ea6380940ec79"),
("t10k-labels-idx1-ubyte.gz", "bb300cfdad3c16e7a12a480ee83cd310"),
]
classes = ["T-shirt/top", "Trouser", "Pullover", "Dress", "Coat", "Sandal", "Shirt", "Sneaker", "Bag", "Ankle boot"]

def __init__(
self,
root:str,
train:bool = True,
) -> None:
super().__init__()
self.root = root
# 是否是训练
self.train = train
# 加载训练数据
self.data, self.targets = self._load_data()

def __getitem__(self, index) -> Tuple[Any,Any]:
"""继承Dataset接口,根据index获取某个元素,返回图片数据和标签"""
img, target = self.data[index], int(self.targets[index])
# 解析图片数据,并将其转换成tensor
img = Image.fromarray(img.numpy(), mode="L")
transform = ToTensor()
img = transform(img)
return img,target

def __len__(self) -> int:
return len(self.data)

@property
def raw_folder(self) -> str:
return os.path.join(self.root, "FashionMNIST", "raw")


def _load_data(self):
"""加载数据集数据"""
image_file = f"{'train' if self.train else 't10k'}-images-idx3-ubyte"
label_file = f"{'train' if self.train else 't10k'}-labels-idx1-ubyte"
# 解析数据
data = read_image_file(os.path.join(self.raw_folder, image_file))
targets = read_label_file(os.path.join(self.raw_folder, label_file))

return data, targets



def get_int(b: bytes) -> int:
return int(codecs.encode(b, "hex"), 16)

def _flip_byte_order(t: torch.Tensor) -> torch.Tensor:
return (
t.contiguous().view(torch.uint8).view(*t.shape, t.element_size()).flip(-1).view(*t.shape[:-1], -1).view(t.dtype)
)

SN3_PASCALVINCENT_TYPEMAP = {
8: torch.uint8,
9: torch.int8,
11: torch.int16,
12: torch.int32,
13: torch.float32,
14: torch.float64,
}

def read_sn3_pascalvincent_tensor(path: str, strict: bool = True) -> torch.Tensor:
"""Read a SN3 file in "Pascal Vincent" format (Lush file 'libidx/idx-io.lsh').
Argument may be a filename, compressed filename, or file object.
"""
# read
with open(path, "rb") as f:
data = f.read()
# parse
magic = get_int(data[0:4])
nd = magic % 256
ty = magic // 256
assert 1 <= nd <= 3
assert 8 <= ty <= 14
torch_type = SN3_PASCALVINCENT_TYPEMAP[ty]
s = [get_int(data[4 * (i + 1) : 4 * (i + 2)]) for i in range(nd)]

parsed = torch.frombuffer(bytearray(data), dtype=torch_type, offset=(4 * (nd + 1)))

# The MNIST format uses the big endian byte order, while `torch.frombuffer` uses whatever the system uses. In case
# that is little endian and the dtype has more than one byte, we need to flip them.
if sys.byteorder == "little" and parsed.element_size() > 1:
parsed = _flip_byte_order(parsed)

assert parsed.shape[0] == np.prod(s) or not strict
return parsed.view(*s)


def read_label_file(path: str) -> torch.Tensor:
x = read_sn3_pascalvincent_tensor(path, strict=False)
if x.dtype != torch.uint8:
raise TypeError(f"x should be of dtype torch.uint8 instead of {x.dtype}")
if x.ndimension() != 1:
raise ValueError(f"x should have 1 dimension instead of {x.ndimension()}")
return x.long()


def read_image_file(path: str) -> torch.Tensor:
x = read_sn3_pascalvincent_tensor(path, strict=False)
if x.dtype != torch.uint8:
raise TypeError(f"x should be of dtype torch.uint8 instead of {x.dtype}")
if x.ndimension() != 3:
raise ValueError(f"x should have 3 dimension instead of {x.ndimension()}")
return x



# if __name__ == "__main__":
# test_data = Dataset( root="data", train=False)
# print('label=',test_data.classes)
# print(f"data type={type(test_data.data.data)},data.shape={test_data.data.shape}")
# print(f"data type={type(test_data.targets.data)},data.shape={test_data.targets.shape}")

Dataset构造好之后,可以直接用torch.utils.data.DataLoader来包装一个dataloader进行数据加载

1
2
3
4
5
# 2. 加载数据到DataLoader对象中
# Create data loaders.
batch_size = 64
train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

构造模型

通过继承nn.Module封装一个自己的模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 3. 构造模型
class NeuralNetwork(nn.Module):
"""
当创建一个自定义的神经网络模型并继承自nn.Module类时
需要实现__init__()方法和forward()方法。
__init__()方法用于初始化模型的各个层,
forward()方法定义了数据在模型中前向传播的过程。
"""
def __init__(self) -> None:
super().__init__()
# 将张量按某一维展开,减少维度,默认保留第一维度,剩下的维度合并成一维
self.Flatten = nn.Flatten()
# 简单的序列模型,直接用nn.Sequential包装一下各层,即按顺序调用
self.linear_relu_stack = nn.Sequential(
nn.Linear(28*28, 512), # 输入28*28,输出512
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)

def forward(self,x):
"""模型推理过程"""
x = self.Flatten(x)
logits = self.linear_relu_stack(x)
return logits

上面是一个全连接的模型,可以自己封装一个cnn模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class MyAlexNet(nn.Module):
# 子类继承中重新定义Module类的__init__()和forward()函数
# init():进行初始化,申明模型中各层的定义
def __init__(self):
# super:引入父类的初始化方法给子类进行初始化
super(MyAlexNet, self).__init__()
# 使用ReLU作为激活函数
self.ReLU = nn.ReLU()
# 卷积层,输入大小为28*28,输出大小为28*28,输入通道为3,输出为48,卷积核为5,步长为1
self.c1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=5, stride=1, padding=2)
# 最大池化层,输入大小为28*28,输出大小为13*13,输入通道为48,输出为48,池化核为3,步长为2
self.s1 = nn.MaxPool2d(kernel_size=3, stride=2)
# 卷积层,输入大小为13*13,输出大小为15*15,输入通道为48,输出为96,卷积核为3,扩充边缘为2,步长为1
self.c2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=2)
# 卷积层,输入大小为15*15,输出大小为15*15,输入通道为96,输出为48,卷积核为3,扩充边缘为1,步长为1
self.c3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
# 最大池化层,输入大小为15*15,输出大小为7*7,输入通道为48,输出为48,池化核为3,步长为2
self.s3 = nn.MaxPool2d(kernel_size=3, stride=2)
# Flatten():将张量(多维数组)平坦化处理,神经网络中第0维表示的是batch_size,所以Flatten()默认从第二维开始平坦化
self.flatten = nn.Flatten()
# 全连接层
# Linear(in_features,out_features)
# in_features指的是[batch_size, size]中的size,即样本的大小
# out_features指的是[batch_size,output_size]中的output_size,样本输出的维度大小,也代表了该全连接层的神经元个数
self.f6 = nn.Linear(7*7*64, 10)
# self.f7 = nn.Linear(512, 10)

# forward():定义前向传播过程,描述了各层之间的连接关系
def forward(self, x):
# print(f"input img size={x.size()}")
x = self.ReLU(self.c1(x))
# print(f"after conv1, size={x.size()}")
x = self.s1(x)
# print(f"after maxpool1, size={x.size()}")
x = self.ReLU(self.c2(x))
# print(f"after conv2, size={x.size()}")
x = self.ReLU(self.c3(x))
# print(f"after conv3, size={x.size()}")
x = self.s3(x)
# print(f"after maxpool3, size={x.size()}")
x = self.flatten(x)
# print(f"after flatten, size={x.size()}")

x = self.f6(x)
# Dropout:随机地将输入中50%的神经元激活设为0,即去掉了一些神经节点,防止过拟合
# “失活的”神经元不再进行前向传播并且不参与反向传播,这个技术减少了复杂的神经元之间的相互影响
# x = F.dropout(x, p=0.5)
# x = self.f7(x)
return x

模型训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def train(dataloader:DataLoader, 
model:nn.Module,
loss_fn:Any,
optimizer:torch.optim.Optimizer):
size = len(dataloader.dataset)
model.train()
for batch, (X, y) in enumerate(dataloader):
X, y = X.to(device), y.to(device)
# 推理并计算损失
pred = model(X)
loss = loss_fn(pred, y)
# 反向传播
loss.backward()
optimizer.step()
optimizer.zero_grad()
if batch % 100 == 0:
loss, current = loss.item(), (batch + 1) * len(X)
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")

模型测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def test(
dataloader:DataLoader,
model:nn.Module,
loss_fn:nn.CrossEntropyLoss,
):
size = len(dataloader.dataset)
num_batches = len(dataloader)
test_loss, correct = 0,0
with torch.no_grad():
for X, y in dataloader:
X, y = X.to(device), y.to(device)
pred:torch.Tensor = model(X)
# tensor.item()将单个元素的张量变为python的标量
test_loss += loss_fn(pred,y).item()
correct += (pred.argmax(1)==y).type(torch.float).sum().item()
test_loss /= num_batches
correct /= size
print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

启动训练和测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if __name__ == "__main__":
# model = NeuralNetwork().to(device) # 全连接网络
model = MyAlexNet().to(device) # cnn网络
model.load_state_dict(torch.load("model.pth")) # 可以加载之前已经训练的模型继续训练
print(model)
loss_fn = nn.CrossEntropyLoss()
# optimizer = torch.optim.SGD(model.parameters(), lr=1e-3) # 该优化器最终准确率在74%左右
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # 该优化器最终准确率在90%左右
# 分批次训练
epochs = 5
for t in range(epochs):
print(f"Epoch {t+1}\n-------------------------------")
train(train_dataloader, model, loss_fn, optimizer)
test(test_dataloader, model, loss_fn)
# 将模型保存起来
torch.save(model.state_dict(), "model.pth")
print("Done!")

全部代码可见:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from typing import Any

import torch.nn.functional as F

device = ( "cuda" if torch.cuda.is_available() else "cpu" )
print(f"Using {device} device")

# 1. 生成DataSet对象数据
# Download training/test data from open datasets.
training_data = datasets.FashionMNIST( root="data", train=True, download=True, transform=ToTensor())
test_data = datasets.FashionMNIST( root="data", train=False, download=True, transform=ToTensor())
# from dataset import Dataset
# training_data = Dataset(root="data", train=True)
# test_data = Dataset(root="data", train=False)

# 2. 加载数据到DataLoader对象中
# Create data loaders.
batch_size = 64
train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=batch_size)
for X, y in test_dataloader:
print(f"Shape of X [N, C, H, W]: {X.shape}")
# Shape of X [N, C, H, W]: torch.Size([64, 1, 28, 28]) 64个单通道28*28的图片
print(f"Shape of y: {y.shape} {y.dtype}")
# Shape of y: torch.Size([64]) torch.int64 图片的标签0-9
# y=tensor([9, 2, 1, 1, 6, 0, ... , 8, 5])
# break仅打印首个批次,test_dataloader中实现了__iter__(),可以通过for遍历读取
break

# 3. 构造模型

class NeuralNetwork(nn.Module):
"""
当创建一个自定义的神经网络模型并继承自nn.Module类时
需要实现__init__()方法和forward()方法。
__init__()方法用于初始化模型的各个层,
forward()方法定义了数据在模型中前向传播的过程。
"""
def __init__(self) -> None:
super().__init__()
# 将张量按某一维展开,减少维度,默认保留第一维度,剩下的维度合并成一维
self.Flatten = nn.Flatten()
# 简单的序列模型,直接用nn.Sequential包装一下各层,即按顺序调用
self.linear_relu_stack = nn.Sequential(
nn.Linear(28*28, 512), # 输入28*28,输出512
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)

def forward(self,x):
"""模型推理过程"""
x = self.Flatten(x)
logits = self.linear_relu_stack(x)
return logits


# 定义AlexNet网络模型
# MyLeNet5(子类)继承nn.Module(父类)
class MyAlexNet(nn.Module):
# 子类继承中重新定义Module类的__init__()和forward()函数
# init():进行初始化,申明模型中各层的定义
def __init__(self):
# super:引入父类的初始化方法给子类进行初始化
super(MyAlexNet, self).__init__()
# 使用ReLU作为激活函数
self.ReLU = nn.ReLU()
# 卷积层,输入大小为28*28,输出大小为28*28,输入通道为3,输出为48,卷积核为5,步长为1
self.c1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=5, stride=1, padding=2)
# 最大池化层,输入大小为28*28,输出大小为13*13,输入通道为48,输出为48,池化核为3,步长为2
self.s1 = nn.MaxPool2d(kernel_size=3, stride=2)
# 卷积层,输入大小为13*13,输出大小为15*15,输入通道为48,输出为96,卷积核为3,扩充边缘为2,步长为1
self.c2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=2)
# 卷积层,输入大小为15*15,输出大小为15*15,输入通道为96,输出为48,卷积核为3,扩充边缘为1,步长为1
self.c3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
# 最大池化层,输入大小为15*15,输出大小为7*7,输入通道为48,输出为48,池化核为3,步长为2
self.s3 = nn.MaxPool2d(kernel_size=3, stride=2)
# Flatten():将张量(多维数组)平坦化处理,神经网络中第0维表示的是batch_size,所以Flatten()默认从第二维开始平坦化
self.flatten = nn.Flatten()
# 全连接层
# Linear(in_features,out_features)
# in_features指的是[batch_size, size]中的size,即样本的大小
# out_features指的是[batch_size,output_size]中的output_size,样本输出的维度大小,也代表了该全连接层的神经元个数
self.f6 = nn.Linear(7*7*64, 10)
# self.f7 = nn.Linear(512, 10)

# forward():定义前向传播过程,描述了各层之间的连接关系
def forward(self, x):
# print(f"input img size={x.size()}")
x = self.ReLU(self.c1(x))
# print(f"after conv1, size={x.size()}")
x = self.s1(x)
# print(f"after maxpool1, size={x.size()}")
x = self.ReLU(self.c2(x))
# print(f"after conv2, size={x.size()}")
x = self.ReLU(self.c3(x))
# print(f"after conv3, size={x.size()}")
x = self.s3(x)
# print(f"after maxpool3, size={x.size()}")
x = self.flatten(x)
# print(f"after flatten, size={x.size()}")

x = self.f6(x)
# Dropout:随机地将输入中50%的神经元激活设为0,即去掉了一些神经节点,防止过拟合
# “失活的”神经元不再进行前向传播并且不参与反向传播,这个技术减少了复杂的神经元之间的相互影响
# x = F.dropout(x, p=0.5)
# x = self.f7(x)
return x

def train(dataloader:DataLoader,
model:nn.Module,
loss_fn:Any,
optimizer:torch.optim.Optimizer):
size = len(dataloader.dataset)
model.train()
for batch, (X, y) in enumerate(dataloader):
X, y = X.to(device), y.to(device)
# 推理并计算损失
pred = model(X)
loss = loss_fn(pred, y)
# 反向传播
loss.backward()
optimizer.step()
optimizer.zero_grad()
if batch % 100 == 0:
loss, current = loss.item(), (batch + 1) * len(X)
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")

def test(
dataloader:DataLoader,
model:nn.Module,
loss_fn:nn.CrossEntropyLoss,
):
size = len(dataloader.dataset)
num_batches = len(dataloader)
test_loss, correct = 0,0
with torch.no_grad():
for X, y in dataloader:
X, y = X.to(device), y.to(device)
pred:torch.Tensor = model(X)
# tensor.item()将单个元素的张量变为python的标量
test_loss += loss_fn(pred,y).item()
correct += (pred.argmax(1)==y).type(torch.float).sum().item()
test_loss /= num_batches
correct /= size
print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")


if __name__ == "__main__":
# model = NeuralNetwork().to(device) # 全连接网络
model = MyAlexNet().to(device) # cnn网络
model.load_state_dict(torch.load("model.pth")) # 可以加载之前已经训练的模型继续训练
print(model)
loss_fn = nn.CrossEntropyLoss()
# optimizer = torch.optim.SGD(model.parameters(), lr=1e-3) # 该优化器最终准确率在74%左右
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # 该优化器最终准确率在90%左右
# 分批次训练
epochs = 5
for t in range(epochs):
print(f"Epoch {t+1}\n-------------------------------")
train(train_dataloader, model, loss_fn, optimizer)
test(test_dataloader, model, loss_fn)
# 将模型保存起来
torch.save(model.state_dict(), "model.pth")

print("Done!")

参考

  1. pytorch Quickstart: https://pytorch.org/tutorials/beginner/basics/quickstart_tutorial.html