Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deepfm -- fix the excessively high memory usage of data processing #141

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ opencv-python==4.1.2.30
flask_cors>=3.0.10
pycocotools>=2.0.0 # for st test
wget==3.2
tqdm==4.62.3
psutil==5.8.0
scikit-learn==1.0.1
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def _write_version(file):
'wget==3.2',
'scikit-learn==1.0.1',
'tqdm==4.62.3',
'psutil==5.8.0',
]

test_required_package = [
Expand Down
3 changes: 2 additions & 1 deletion tests/st/deepfm.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,14 @@ def create_dataset(data_path, batch_size=16000):
batch_size = args_opt.batch_size
dataset_path = args_opt.dataset_path
dataset_sink_mode = not args_opt.device_target == "CPU"
convert_dtype = not args_opt.device_target == "CPU"
checkpoint_dir = args_opt.checkpoint_dir if args_opt.checkpoint_dir is not None else "."

# create train and eval dataset
train_ds, eval_ds = create_dataset(data_path=dataset_path, batch_size=batch_size)
# build base network
data_size = train_ds.get_dataset_size()
net = DeepFM(field_size=39, vocab_size=184965, embed_size=80, convert_dtype=True)
net = DeepFM(field_size=39, vocab_size=184965, embed_size=80, convert_dtype=convert_dtype)
# build train network
train_net = DeepFMTrainModel(DeepFMWithLoss(net))
# build eval network
Expand Down
149 changes: 90 additions & 59 deletions tinyms/data/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import codecs
import collections
import pickle
import psutil

from itertools import chain
from tqdm import tqdm
Expand Down Expand Up @@ -351,8 +352,7 @@ def get_imdb_data(features, labels):

class KaggleDisplayAdvertisingDataset:
"""
parse aclImdb data to features and labels.
sentence->tokenized->encoded->padding->features
Convert KaggleDisplayAdvertisingDataset to MindRecord format.

Args:
data_dir (str): The path where the uncompressed dataset stored.
Expand All @@ -376,8 +376,9 @@ def __init__(self, data_dir, num_parallel_workers=None, shuffle=True):
self.skip_id_convert = False
self.train_line_count = 45840617
self.test_size = 0.1
self.seed = 20191005
self.line_per_sample = 1000
self.seed = 2020
self.line_per_sample = 100
self.write_per_sample = 1000
self.epochs = 1
self.num_parallel_workers = num_parallel_workers
self._check_num_parallel_workers()
Expand Down Expand Up @@ -525,6 +526,20 @@ def __map_cat2id(self, values, cats):
weight_list.append(1.0)
return id_list, weight_list

def __set_write_per_sample(self):
mem = psutil.virtual_memory()
mem_free = float(mem.free) / 1024 / 1024 / 1024
if mem_free > 32:
self.write_per_sample = 40000
elif mem_free > 16:
self.write_per_sample = 20000
elif mem_free > 8:
self.write_per_sample = 10000
elif mem_free > 4:
self.write_per_sample = 1000
else:
self.write_per_sample = 100

def stats_data(self):
"""
stats data
Expand All @@ -543,11 +558,7 @@ def stats_data(self):
items = line.split("\t")
if len(items) != num_splits:
error_stat_lines_num.append(num_line)
# print("Found line length: {}, suppose to be {}, the line is {}".format(
# len(items), num_splits, line))
continue
# if num_line % 1000000 == 0:
# print("Have handled {}w lines.".format(num_line // 10000))
values = items[1: self.dense_dim + 1]
cats = items[self.dense_dim + 1:]
assert len(values) == self.dense_dim, "values.size: {}".format(len(values))
Expand All @@ -556,34 +567,33 @@ def stats_data(self):
self.__stats_cats(cats)
self.__save_stats_dict()

print("\n************** Size of error_stat_lines_num: {} **************".format(len(error_stat_lines_num)), flush=True)
error_stat_path = os.path.join(self.data_dir, "error_stat_lines_num.npy")
np.save(error_stat_path, error_stat_lines_num)

def convert_to_mindrecord(self):
# set write_per_sample based on free memory.
self.__set_write_per_sample()
print("************** Flush mindrecord data every {} samples **************".
format(self.write_per_sample * self.line_per_sample), flush=True)

test_size = int(self.train_line_count * self.test_size)
all_indices = [i for i in range(self.train_line_count)]
np.random.seed(self.seed)
np.random.shuffle(all_indices)
test_indices_set = set(all_indices[:test_size])

train_data_list = []
test_data_list = []
ids_list = []
wts_list = []
label_list = []

schema = {
"label": {"type": "float32", "shape": [-1]},
"feat_ids": {"type": "int32", "shape": [-1]},
"feat_vals": {"type": "float32", "shape": [-1]},
"feat_ids": {"type": "int32", "shape": [-1]}
"label": {"type": "float32", "shape": [-1]}
}

train_writer = FileWriter(os.path.join(self.mindrecord_dir, "train_input_part.mindrecord"), 21)
test_writer = FileWriter(os.path.join(self.mindrecord_dir, "test_input_part.mindrecord"), 3)
train_writer.add_schema(schema, "CRITEO_TRAIN")
test_writer.add_schema(schema, "CRITEO_TEST")

part_rows = 2000000
num_splits = self.dense_dim + self.slot_dim + 1
error_conv_lines_num = []

Expand All @@ -592,12 +602,18 @@ def convert_to_mindrecord(self):
t_f = tqdm(f, total=self.train_line_count)
t_f.set_description("Processing Convert2MR")
num_line = 0
train_part_number = 0
test_part_number = 0
train_samples = []
test_samples = []

extend_train_ids = []
extend_train_wts = []
extend_train_labels = []

extend_test_ids = []
extend_test_wts = []
extend_test_labels = []
for line in t_f:
num_line += 1
# if num_line % 1000000 == 0:
# print("Converting to MindRecord. Have handle {}w lines.".format(num_line // 10000), flush=True)
line = line.strip("\n")
items = line.split("\t")
if len(items) != num_splits:
Expand All @@ -611,54 +627,69 @@ def convert_to_mindrecord(self):
assert len(cats) == self.slot_dim, "cats.size: {}".format(len(cats))

ids, wts = self.__map_cat2id(values, cats)
ids_list.extend(ids)
wts_list.extend(wts)
label_list.append(label)

if num_line % self.line_per_sample == 0:
if num_line not in test_indices_set:
train_data_list.append({"feat_ids": np.array(ids_list, dtype=np.int32),
"feat_vals": np.array(wts_list, dtype=np.float32),
"label": np.array(label_list, dtype=np.float32)
})
else:
test_data_list.append({"feat_ids": np.array(ids_list, dtype=np.int32),
"feat_vals": np.array(wts_list, dtype=np.float32),
"label": np.array(label_list, dtype=np.float32)
})
if train_data_list and len(train_data_list) % part_rows == 0:
train_writer.write_raw_data(train_data_list)
train_data_list.clear()
train_part_number += 1

if test_data_list and len(test_data_list) % part_rows == 0:
test_writer.write_raw_data(test_data_list)
test_data_list.clear()
test_part_number += 1

ids_list.clear()
wts_list.clear()
label_list.clear()
if train_data_list:
train_writer.write_raw_data(train_data_list)
if test_data_list:
test_writer.write_raw_data(test_data_list)

if num_line not in test_indices_set:
extend_train_ids.extend(ids)
extend_train_wts.extend(wts)
extend_train_labels.append(label)
if len(extend_train_labels) % self.line_per_sample == 0:
sample = {
"feat_ids": np.array(extend_train_ids, dtype=np.int32),
"feat_vals": np.array(extend_train_wts, dtype=np.float32),
"label": np.array(extend_train_labels, dtype=np.float32)
}
extend_train_ids.clear()
extend_train_wts.clear()
extend_train_labels.clear()
train_samples.append(sample)
if len(train_samples) % self.write_per_sample == 0:
train_writer.write_raw_data(train_samples)
train_samples.clear()
else:
extend_test_ids.extend(ids)
extend_test_wts.extend(wts)
extend_test_labels.append(label)
if len(extend_test_labels) % self.line_per_sample == 0:
sample = {
"feat_ids": np.array(extend_test_ids, dtype=np.int32),
"feat_vals": np.array(extend_test_wts, dtype=np.float32),
"label": np.array(extend_test_labels, dtype=np.float32)
}
extend_test_ids.clear()
extend_test_wts.clear()
extend_test_labels.clear()
test_samples.append(sample)
if len(test_samples) % self.write_per_sample == 0:
test_writer.write_raw_data(test_samples)
test_samples.clear()
# Maybe not enough batch size
if train_samples:
train_writer.write_raw_data(train_samples)
if test_samples:
test_writer.write_raw_data(test_samples)
train_writer.commit()
test_writer.commit()

print("\n************** Size of error_conv_lines_num: {} **************".
format(len(error_conv_lines_num)), flush=True)
error_stat_path = os.path.join(self.data_dir, "error_conv_lines_num.npy")
np.save(error_stat_path, error_conv_lines_num)

def load_mindreocrd_dataset(self, usage='train', batch_size=1000):
def load_mindreocrd_dataset(self, usage='train', batch_size=16000):
"""
load mindrecord dataset.
Args:
usage (str): Dataset mode. Default: 'train'.
batch_size (int): batch size. Default: 1000.
batch_size (int): batch size. Default: 16000.

Returns:
MindDataset
"""
real_batch_size = int(batch_size / self.line_per_sample)
if real_batch_size < 1:
real_batch_size = 1
num_samples = real_batch_size * self.line_per_sample

if usage == 'train':
train_mode = True
else:
Expand All @@ -673,10 +704,10 @@ def load_mindreocrd_dataset(self, usage='train', batch_size=1000):
num_parallel_workers=self.num_parallel_workers,
shuffle=self.shuffle, num_shards=None, shard_id=None)

dataset = dataset.batch(int(batch_size / self.line_per_sample), drop_remainder=True)
dataset = dataset.map(operations=(lambda x, y, z: (np.array(x).flatten().reshape(batch_size, 39),
np.array(y).flatten().reshape(batch_size, 39),
np.array(z).flatten().reshape(batch_size, 1))),
dataset = dataset.batch(real_batch_size, drop_remainder=True)
dataset = dataset.map(operations=(lambda x, y, z: (np.array(x).flatten().reshape(num_samples, 39),
np.array(y).flatten().reshape(num_samples, 39),
np.array(z).flatten().reshape(num_samples, 1))),
input_columns=['feat_ids', 'feat_vals', 'label'],
column_order=['feat_ids', 'feat_vals', 'label'],
num_parallel_workers=self.num_parallel_workers)
Expand Down
29 changes: 20 additions & 9 deletions tinyms/data/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,13 @@ def _fetch_and_unzip_by_wget(url, file_name):
file_name: str, local path of downloaded file
"""
# function to show download progress
def bar_progress(current, total, width=80):
def _bar_progress(current, total, width=80):
progress_message = "Downloading: %d%% [%d / %d] bytes" % (current / total * 100, current, total)
# Don't use print() as it will print in new line every time.
sys.stdout.write("\r" + progress_message)
sys.stdout.flush()
# using wget is faster than fetch.
wget.download(url, out=file_name, bar=bar_progress)
wget.download(url, out=file_name, bar=_bar_progress)
print("\n============== {} is ready ==============".format(file_name))
_unzip(file_name)
os.remove(file_name)
Expand Down Expand Up @@ -230,18 +230,29 @@ def _download_kaggle_display_advertising(local_path):
if not os.path.exists(dataset_path):
os.makedirs(dataset_path)

print("************** Downloading the Kaggle Display Advertising Challenge dataset **************")
remote_url = "http://go.criteo.net/criteo-research-kaggle-display-advertising-challenge-dataset.tar.gz"
file_name = os.path.join(dataset_path, remote_url.split('/')[-1])
# already exist criteo-research-kaggle-display-advertising-challenge-dataset.tar.gz

# existing uncompressed kaggle-display-advertising data
if _check_uncompressed_kaggle_display_advertising_files(dataset_path):
print("************** Uncompressed kaggle-display-advertising data already exists **************",
flush=True)
return os.path.join(dataset_path)

# existing criteo-research-kaggle-display-advertising-challenge-dataset.tar.gz
if os.path.exists(file_name):
if os.path.getsize(file_name) == 4576820670:
print("************** Uncompress already exists tar format data **************", flush=True)
print("************** Uncompress existing tar.gz format file **************", flush=True)
_unzip(file_name)
if not _check_uncompressed_kaggle_display_advertising_files(dataset_path):
_fetch_and_unzip_by_wget(remote_url, file_name)
else:
print("{} already have uncompressed kaggle display advertising dataset.".format(dataset_path), flush=True)
# check uncompressed files
if _check_uncompressed_kaggle_display_advertising_files(dataset_path):
return os.path.join(dataset_path)
else:
print("************** {} **************".
format("Uncompress existing tar.gz format file failed, need to download again"), flush=True)

print("************** Downloading the Kaggle Display Advertising Challenge dataset **************", flush=True)
_fetch_and_unzip_by_wget(remote_url, file_name)

return os.path.join(dataset_path)

Expand Down