-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathmodel_All_Compare.py
259 lines (215 loc) · 8.86 KB
/
model_All_Compare.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# coding: utf-8
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
import numpy as np
import pandas as pd
from PIL import Image
import os
import matplotlib.pyplot as plt
from tqdm import tqdm
BATCH_SIZE = 128
LR = 0.01
EPOCH = 60
DEVICE = torch.device('cpu')
#这里直接用分好的训练集和验证集
path_train = '你选定模型的训练集'#train文件夹
path_vaild = '你选定模型的验证集'#valid文件夹
transforms_train = transforms.Compose([
transforms.Grayscale(),#使用ImageFolder默认扩展为三通道,重新变回去就行
transforms.RandomHorizontalFlip(),#随机翻转
transforms.ColorJitter(brightness=0.5, contrast=0.5),#随机调整亮度和对比度
transforms.ToTensor()
])
transforms_vaild = transforms.Compose([
transforms.Grayscale(),
transforms.ToTensor()
])
data_train = torchvision.datasets.ImageFolder(root=path_train,transform=transforms_train)
data_vaild = torchvision.datasets.ImageFolder(root=path_vaild,transform=transforms_vaild)
train_set = torch.utils.data.DataLoader(dataset=data_train,batch_size=BATCH_SIZE,shuffle=True)
vaild_set = torch.utils.data.DataLoader(dataset=data_vaild,batch_size=BATCH_SIZE,shuffle=False)
class Reshape(nn.Module):
def __init__(self, *args):
super(Reshape, self).__init__()
def forward(self, x):
return x.view(x.shape[0],-1)
class GlobalAvgPool2d(nn.Module):
# 全局平均池化层可通过将池化窗口形状设置成输入的高和宽实现
def __init__(self):
super(GlobalAvgPool2d, self).__init__()
def forward(self, x):
return F.avg_pool2d(x, kernel_size=x.size()[2:])
CNN = nn.Sequential(
nn.Conv2d(1,64,3),
nn.ReLU(True),
nn.MaxPool2d(2,2),
nn.Conv2d(64,256,3),
nn.ReLU(True),
nn.MaxPool2d(3,3),
Reshape(),
nn.Linear(256*7*7,4096),
nn.ReLU(True),
nn.Linear(4096,1024),
nn.ReLU(True),
nn.Linear(1024,7)
)
def vgg_block(num_convs, in_channels, out_channels):
blk = []
for i in range(num_convs):
if i == 0:
blk.append(nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1))
else:
blk.append(nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1))
blk.append(nn.ReLU())
blk.append(nn.MaxPool2d(kernel_size=2, stride=2)) # 这里会使宽高减半
return nn.Sequential(*blk)
conv_arch = ((2, 1, 32), (3, 32, 64), (3, 64, 128))
# 经过5个vgg_block, 宽高会减半5次, 变成 224/32 = 7
fc_features = 128 * 6* 6 # c * w * h
fc_hidden_units = 4096 # 任意
def vgg(conv_arch, fc_features, fc_hidden_units):
net = nn.Sequential()
# 卷积层部分
for i, (num_convs, in_channels, out_channels) in enumerate(conv_arch):
# 每经过一个vgg_block都会使宽高减半
net.add_module("vgg_block_" + str(i+1), vgg_block(num_convs, in_channels, out_channels))
# 全连接层部分
net.add_module("fc", nn.Sequential(
Reshape(),
nn.Linear(fc_features, fc_hidden_units),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(fc_hidden_units, fc_hidden_units),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(fc_hidden_units, 7)
))
return net
# 残差神经网络
class Residual(nn.Module):
def __init__(self, in_channels, out_channels, use_1x1conv=False, stride=1):
super(Residual, self).__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=stride)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
if use_1x1conv:
self.conv3 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride)
else:
self.conv3 = None
self.bn1 = nn.BatchNorm2d(out_channels)
self.bn2 = nn.BatchNorm2d(out_channels)
def forward(self, X):
Y = F.relu(self.bn1(self.conv1(X)))
Y = self.bn2(self.conv2(Y))
if self.conv3:
X = self.conv3(X)
return F.relu(Y + X)
def resnet_block(in_channels, out_channels, num_residuals, first_block=False):
if first_block:
assert in_channels == out_channels # 第一个模块的通道数同输入通道数一致
blk = []
for i in range(num_residuals):
if i == 0 and not first_block:
blk.append(Residual(in_channels, out_channels, use_1x1conv=True, stride=2))
else:
blk.append(Residual(out_channels, out_channels))
return nn.Sequential(*blk)
resnet = nn.Sequential(
nn.Conv2d(1, 64, kernel_size=7 , stride=2, padding=3),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
resnet.add_module("resnet_block1", resnet_block(64, 64, 2, first_block=True))
resnet.add_module("resnet_block2", resnet_block(64, 128, 2))
resnet.add_module("resnet_block3", resnet_block(128, 256, 2))
resnet.add_module("resnet_block4", resnet_block(256, 512, 2))
resnet.add_module("global_avg_pool", GlobalAvgPool2d()) # GlobalAvgPool2d的输出: (Batch, 512, 1, 1)
resnet.add_module("fc", nn.Sequential(Reshape(), nn.Linear(512, 7)))
# 用那个模型就切换注释即可
model = CNN
#model = resnet
#model = vgg(conv_arch, fc_features, fc_hidden_units)
model.to(DEVICE)
optimizer = optim.SGD(model.parameters(),lr=LR,momentum=0.9)
#optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()
print(model)
train_loss = []
train_ac = []
vaild_loss = []
vaild_ac = []
y_pred = []
def train(model,device,dataset,optimizer,epoch):
model.train()
correct = 0
for i,(x,y) in tqdm(enumerate(dataset)):
x , y = x.to(device), y.to(device)
optimizer.zero_grad()
output = model(x)
pred = output.max(1,keepdim=True)[1]
correct += pred.eq(y.view_as(pred)).sum().item()
loss = criterion(output,y)
loss.backward()
optimizer.step()
train_ac.append(correct/len(data_train))
train_loss.append(loss.item())
print("Epoch {} Loss {:.4f} Accuracy {}/{} ({:.0f}%)".format(epoch,loss,correct,len(data_train),100*correct/len(data_train)))
def vaild(model,device,dataset):
model.eval()
correct = 0
with torch.no_grad():
for i,(x,y) in tqdm(enumerate(dataset)):
x,y = x.to(device) ,y.to(device)
output = model(x)
loss = criterion(output,y)
pred = output.max(1,keepdim=True)[1]
global y_pred
y_pred += pred.view(pred.size()[0]).cpu().numpy().tolist()
correct += pred.eq(y.view_as(pred)).sum().item()
vaild_ac.append(correct/len(data_vaild))
vaild_loss.append(loss.item())
print("Test Loss {:.4f} Accuracy {}/{} ({:.0f}%)".format(loss,correct,len(data_vaild),100.*correct/len(data_vaild)))
def RUN():
for epoch in range(1,EPOCH+1):
'''if epoch==15 :
LR = 0.1
optimizer=optimizer = optim.SGD(model.parameters(),lr=LR,momentum=0.9)
if(epoch>30 and epoch%15==0):
LR*=0.1
optimizer=optimizer = optim.SGD(model.parameters(),lr=LR,momentum=0.9)
'''
#尝试动态学习率
train(model,device=DEVICE,dataset=train_set,optimizer=optimizer,epoch=epoch)
vaild(model,device=DEVICE,dataset=vaild_set)
torch.save(model,'m0.pth')
RUN()
#vaild(model,device=DEVICE,dataset=vaild_set)
def print_plot(train_plot,vaild_plot,train_text,vaild_text,ac,name):
x= [i for i in range(1,len(train_plot)+1)]
plt.plot(x,train_plot,label=train_text)
plt.plot(x[-1],train_plot[-1],marker='o')
plt.annotate("%.2f%%"%(train_plot[-1]*100) if ac else "%.4f"%(train_plot[-1]),xy=(x[-1],train_plot[-1]))
plt.plot(x,vaild_plot,label=vaild_text)
plt.plot(x[-1],vaild_plot[-1],marker='o')
plt.annotate("%.2f%%"%(vaild_plot[-1]*100) if ac else "%.4f"%(vaild_plot[-1]),xy=(x[-1],vaild_plot[-1]))
plt.legend()
plt.savefig(name)
#print_plot(train_loss,vaild_loss,"train_loss","vaild_loss",False,"loss.jpg")
#print_plot(train_ac,vaild_ac,"train_ac","vaild_ac",True,"ac.jpg")
import seaborn as sns
from sklearn.metrics import confusion_matrix
emotion = ["angry","disgust","fear","happy","sad","surprised","neutral"]
sns.set()
f,ax=plt.subplots()
y_true = [ emotion[i] for _,i in data_vaild]
y_pred = [emotion[i] for i in y_pred]
C2= confusion_matrix(y_true, y_pred, labels=["angry","disgust","fear","happy","sad","surprised","neutral"])#[0, 1, 2,3,4,5,6])
#print(C2) #打印出来看看
sns.heatmap(C2,annot=True ,fmt='.20g',ax=ax) #热力图
ax.set_title('confusion matrix') #标题
ax.set_xlabel('predict') #x轴
ax.set_ylabel('true') #y轴
plt.savefig('matrix.jpg')