-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathutils.py
215 lines (164 loc) · 7.18 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import numpy as np
import logging
import numbers
import torch
import math
import json
import sys
import os
from torch.optim.lr_scheduler import LambdaLR
from torchvision.transforms.functional import pad
from tqdm import tqdm
class AverageMeter(object):
"""Computes and stores the average and current value"""
def __init__(self):
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
class ConstantLRSchedule(LambdaLR):
""" Constant learning rate schedule.
"""
def __init__(self, optimizer, last_epoch=-1):
super(ConstantLRSchedule, self).__init__(optimizer, lambda _: 1.0, last_epoch=last_epoch)
class WarmupConstantSchedule(LambdaLR):
""" Linear warmup and then constant.
Linearly increases learning rate schedule from 0 to 1 over `warmup_steps` training steps.
Keeps learning rate schedule equal to 1. after warmup_steps.
"""
def __init__(self, optimizer, warmup_steps, last_epoch=-1):
self.warmup_steps = warmup_steps
super(WarmupConstantSchedule, self).__init__(optimizer, self.lr_lambda, last_epoch=last_epoch)
def lr_lambda(self, step):
if step < self.warmup_steps:
return float(step) / float(max(1.0, self.warmup_steps))
return 1.
class WarmupLinearSchedule(LambdaLR):
""" Linear warmup and then linear decay.
Linearly increases learning rate from 0 to 1 over `warmup_steps` training steps.
Linearly decreases learning rate from 1. to 0. over remaining `t_total - warmup_steps` steps.
"""
def __init__(self, optimizer, warmup_steps, t_total, last_epoch=-1):
self.warmup_steps = warmup_steps
self.t_total = t_total
super(WarmupLinearSchedule, self).__init__(optimizer, self.lr_lambda, last_epoch=last_epoch)
def lr_lambda(self, step):
if step < self.warmup_steps:
return float(step) / float(max(1, self.warmup_steps))
return max(0.0, float(self.t_total - step) / float(max(1.0, self.t_total - self.warmup_steps)))
class WarmupCosineSchedule(LambdaLR):
""" Linear warmup and then cosine decay.
Linearly increases learning rate from 0 to 1 over `warmup_steps` training steps.
Decreases learning rate from 1. to 0. over remaining `t_total - warmup_steps` steps following a cosine curve.
If `cycles` (default=0.5) is different from default, learning rate follows cosine function after warmup.
"""
def __init__(self, optimizer, warmup_steps, t_total, cycles=.5, last_epoch=-1):
self.warmup_steps = warmup_steps
self.t_total = t_total
self.cycles = cycles
super(WarmupCosineSchedule, self).__init__(optimizer, self.lr_lambda, last_epoch=last_epoch)
def lr_lambda(self, step):
if step < self.warmup_steps:
return float(step) / float(max(1.0, self.warmup_steps))
# progress after warmup
progress = float(step - self.warmup_steps) / float(max(1, self.t_total - self.warmup_steps))
return max(0.0, 0.5 * (1. + math.cos(math.pi * float(self.cycles) * 2.0 * progress)))
def get_padding(image):
w, h = image.size
max_wh = np.max([w, h])
h_padding = (max_wh - w) / 2
v_padding = (max_wh - h) / 2
l_pad = h_padding if h_padding % 1 == 0 else h_padding + 0.5
t_pad = v_padding if v_padding % 1 == 0 else v_padding + 0.5
r_pad = h_padding if h_padding % 1 == 0 else h_padding - 0.5
b_pad = v_padding if v_padding % 1 == 0 else v_padding - 0.5
padding = (int(l_pad), int(t_pad), int(r_pad), int(b_pad))
return padding
def classify_by_database(data_base, embedding):
diff = (data_base - embedding).pow(2).sum(dim=2).sqrt()
val, indi = torch.sort(diff)
class_lib = []
for idx, i in enumerate(val):
for dist_id, dist in enumerate(i[:25]):
if dist == 0.0:
continue
if data_base[idx][indi[idx][dist_id]].sum() > 10000: continue
class_lib.append([idx, dist])
class_lib = sorted(class_lib, key=lambda x: x[1], reverse=False)
return class_lib
class NewPad(object):
def __init__(self, fill=0, padding_mode='constant'):
assert isinstance(fill, (numbers.Number, str, tuple))
assert padding_mode in ['constant', 'edge', 'reflect', 'symmetric']
self.fill = fill
self.padding_mode = padding_mode
def __call__(self, img):
"""
Args:
img (PIL Image): Image to be padded.
Returns:
PIL Image: Padded image.
"""
return pad(img, get_padding(img), self.fill, self.padding_mode)
def __repr__(self):
return self.__class__.__name__ + '(padding={0}, fill={1}, padding_mode={2})'. \
format(self.fill, self.padding_mode)
def get_data_config(dataset):
labels_dict = {}
for sample in tqdm(dataset):
base_name = os.path.basename(sample['filepath'])
width = sample['width']
height = sample['height']
polyline = sample['polyline']
if polyline['label'] not in labels_dict:
labels_dict.update({polyline['label']: []})
poly = [[int(point[0] * width), int(point[1] * height)] for point in polyline['points'][0]]
labels_dict[polyline['label']].append({
'id':sample['annotation_id'],
'name': polyline['label'],
'base_name': base_name,
'image_id': sample['image_id'],
'poly': poly,
'file_name': sample['filepath']})
return labels_dict
def bounding_box(points):
x_coordinates, y_coordinates = zip(*points)
return [min(x_coordinates), min(y_coordinates), max(x_coordinates) - min(x_coordinates), max(y_coordinates) - min(y_coordinates)]
def find_device():
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
return device
def read_json(data):
with open(data) as f:
return json.load(f)
def save_json(data, path):
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f)
def setup_logger():
logger = logging.getLogger('train')
logger.setLevel(logging.INFO)
if len(logger.handlers) == 0:
formatter = logging.Formatter('%(asctime)s | %(message)s')
ch = logging.StreamHandler(stream=sys.stdout)
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
def adjust_learning_rate(optimizer, epoch, lr):
"""Sets the learning rate to the initial LR decayed by 10 every 30 epochs"""
lr = lr * (0.1 ** (epoch // 30))
for param_group in optimizer.param_groups:
param_group['lr'] = lr
def save_checkpoint(model, path):
model.eval()
torch.save(model.state_dict(), path)
def reverse_norm_image(image):
MEAN = torch.tensor([0.485, 0.456, 0.406])
STD = torch.tensor([0.229, 0.224, 0.225])
reverse_image = image * STD[:, None, None] + MEAN[:, None, None]
return reverse_image.permute(1, 2, 0).cpu().numpy()