-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdata.py
135 lines (116 loc) · 5.05 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import torch
from datasets import load_dataset
from torch.utils.data import Dataset
from torchvision.transforms.functional import InterpolationMode
import torchvision.transforms as T
from logutil import get_logger
IMAGENET_MEAN = (0.485, 0.456, 0.406)
IMAGENET_STD = (0.229, 0.224, 0.225)
def build_transform(input_size):
MEAN, STD = IMAGENET_MEAN, IMAGENET_STD
transform = T.Compose([
T.Lambda(lambda img: img.convert('RGB') if img.mode != 'RGB' else img),
T.Resize((input_size, input_size), interpolation=InterpolationMode.BICUBIC),
T.ToTensor(),
T.Normalize(mean=MEAN, std=STD)
])
return transform
def find_closest_aspect_ratio(aspect_ratio, target_ratios, width, height, image_size):
best_ratio_diff = float('inf')
best_ratio = (1, 1)
area = width * height
for ratio in target_ratios:
target_aspect_ratio = ratio[0] / ratio[1]
ratio_diff = abs(aspect_ratio - target_aspect_ratio)
if ratio_diff < best_ratio_diff:
best_ratio_diff = ratio_diff
best_ratio = ratio
elif ratio_diff == best_ratio_diff:
if area > 0.5 * image_size * image_size * ratio[0] * ratio[1]:
best_ratio = ratio
return best_ratio
def dynamic_preprocess(image, min_num=1, max_num=12, image_size=448, use_thumbnail=False):
orig_width, orig_height = image.size
aspect_ratio = orig_width / orig_height
# calculate the existing image aspect ratio
target_ratios = set(
(i, j) for n in range(min_num, max_num + 1) for i in range(1, n + 1) for j in range(1, n + 1) if
i * j <= max_num and i * j >= min_num)
target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1])
# find the closest aspect ratio to the target
target_aspect_ratio = find_closest_aspect_ratio(
aspect_ratio, target_ratios, orig_width, orig_height, image_size)
# calculate the target width and height
target_width = image_size * target_aspect_ratio[0]
target_height = image_size * target_aspect_ratio[1]
blocks = target_aspect_ratio[0] * target_aspect_ratio[1]
# resize the image
resized_img = image.resize((target_width, target_height))
processed_images = []
for i in range(blocks):
box = (
(i % (target_width // image_size)) * image_size,
(i // (target_width // image_size)) * image_size,
((i % (target_width // image_size)) + 1) * image_size,
((i // (target_width // image_size)) + 1) * image_size
)
# split the image
split_img = resized_img.crop(box)
processed_images.append(split_img)
assert len(processed_images) == blocks
if use_thumbnail and len(processed_images) != 1:
thumbnail_img = image.resize((image_size, image_size))
processed_images.append(thumbnail_img)
return processed_images
def process_image(image, input_size=448, max_num=12):
transform = build_transform(input_size=input_size)
images = dynamic_preprocess(image, image_size=input_size, use_thumbnail=True, max_num=max_num)
pixel_values = [transform(image) for image in images]
pixel_values = torch.stack(pixel_values)
return pixel_values
class BaseDataset(Dataset):
def __init__(self, split):
self._split = split
self.data = []
self.task_prompt = ""
def __len__(self):
return len(self.data)
def correct_casing_finqa(self, text, is_question=False):
if text and text[0].islower():
text = text.capitalize()
if not text.endswith(".") and not is_question:
text += "."
if not text.endswith("?") and is_question:
text += "?"
return text
class DocVQADataset(BaseDataset):
"""
To have a quick look at some specific sample, e.g. a question id is 53582 and it's in validation set.
https://huggingface.co/datasets/zhangfaen/DocumentVQA/viewer/default/validation?f[questionId][min]=53582&f[questionId][imax]=53582
"""
def __init__(self, split):
super().__init__(split)
get_logger().info(f"Loading {split} dataset of zhangfaen/DocumentVQA")
self.data = load_dataset("zhangfaen/DocumentVQA", split=split)
get_logger().info(f"Loaded {len(self.data)} samples from {split} of zhangfaen/DocumentVQA")
def __getitem__(self, idx):
example = self.data[idx]
question = self.correct_casing_finqa(example["question"], True)
first_answer = example["answers"][0]
answer = self.correct_casing_finqa(first_answer)
image = example["image"] # The image is already a PIL Image object
if image.mode != "RGB":
image = image.convert("RGB")
assert '<image>' not in question
question = '<image>\n' + question
pixel_values = process_image(image)
return {
'question': question,
'answer': answer,
'pixel_values': pixel_values,
'questionId': example['questionId'],
'image': image
}
if __name__ == "__main__":
dataset = DocVQADataset("train")
print(dataset[0])