-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathbenchmark-ray-image-loading.py
111 lines (87 loc) · 3.07 KB
/
benchmark-ray-image-loading.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import argparse
import logging
import time
import core # Can be replaced by alluxiofs
import fsspec
from alluxio import AlluxioFileSystem
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
_logger = logging.getLogger("BenchmarkImageLoadingRunner")
def get_args():
parser = argparse.ArgumentParser(
description="Benchmark Ray image dataset loading"
)
parser.add_argument(
"-p",
"--path",
help="Path of the images",
default="/mnt/alluxio/imagenet-mini",
)
parser.add_argument(
"-l",
"--load",
help="Whether necessary to load the dataset into Alluxio",
action="store_true",
)
parser.add_argument(
"--etcd",
help="The etcd cluster hosts for Alluxio",
default="localhost",
)
parser.add_argument(
"--port", help="The port of the etcd cluster", default="2379"
)
return parser.parse_args()
class BenchmarkImageLoadingRunner:
def __init__(self, dataset_path, etcd_hosts, etcd_port):
self.dataset_path = dataset_path
self.alluxio_fs = AlluxioFileSystem(
etcd_hosts=etcd_hosts, etcd_port=etcd_port
)
fsspec.register_implementation(
"alluxio", core.AlluxioFileSystem, clobber=True
)
options = {
"alluxio.worker.page.store.page.size": "20MB",
"alluxio.user.consistent.hash.virtual.node.count.per.worker": "5",
}
self.alluxio_fsspec = fsspec.filesystem(
"alluxio",
etcd_hosts=etcd_hosts,
etcd_port=etcd_port,
options=options,
)
def load_dataset(self):
start_time = time.perf_counter()
_logger.debug(f"Loading dataset into Alluxio from {self.dataset_path}")
load_status = self.alluxio_fs.load(self.dataset_path)
_logger.debug(
f"Loading dataset into Alluxio from {self.dataset_path} completes"
)
end_time = time.perf_counter()
_logger.info(
f"Data loading into Alluxio in {end_time - start_time:0.4f} seconds"
)
_logger.info(f"Dataset loading status: {load_status}")
def benchmark_data_loading(self):
import ray
start_time = time.perf_counter()
path = "alluxio:" + self.dataset_path
_logger.debug(f"Loading dataset into Ray from {path}...")
ds = ray.data.read_images(path, filesystem=self.alluxio_fsspec)
_logger.debug(f"Loading dataset into Ray from {path} completed")
end_time = time.perf_counter()
_logger.info(
f"Data loading into Ray in {end_time - start_time:0.4f} seconds"
)
_logger.debug(f"Dataset schema: {ds.schema()}")
if __name__ == "__main__":
args = get_args()
benchmark_image_loading_runner = BenchmarkImageLoadingRunner(
dataset_path=args.path, etcd_hosts=args.etcd, etcd_port=args.port
)
if args.load:
benchmark_image_loading_runner.load_dataset()
benchmark_image_loading_runner.benchmark_data_loading()