Skip to content

Commit

Permalink
Add Alluxio fsspec individual operation benchmark script (#27)
Browse files Browse the repository at this point in the history
Add Alluxio fsspec individual operation benchmark script
  • Loading branch information
LuQQiu authored Apr 1, 2024
1 parent c17b0ce commit 53f1cba
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 113 deletions.
31 changes: 21 additions & 10 deletions alluxio-py-bench.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
#!/bin/python3
import os, time
import argparse
import os
import time
from enum import Enum
from tests.benchmark import AbstractBench, AlluxioFSSpecBench, AlluxioRESTBench, RayBench

from tests.benchmark import AbstractBench
from tests.benchmark import AlluxioFSSpecBench
from tests.benchmark import AlluxioRESTBench
from tests.benchmark import RayBench


class TestSuite(Enum):
REST = "REST"
FSSPEC = "ALLUXIOFSSPEC"
FSSPEC = "FSSPEC"
RAY = "ALLUXIORAY"
PYTORCH = "PYTORCH"


def init_main_parser():
parser = argparse.ArgumentParser(
description="Main parser"
)
parser = argparse.ArgumentParser(description="Main parser")
parser.add_argument(
"--numjobs",
type=int,
Expand Down Expand Up @@ -73,16 +77,23 @@ def main(main_args, test_suite=AbstractBench):
# end_time = time.time()
# print(f"total time:{end_time-start_time}")


if __name__ == "__main__":
main_parser = init_main_parser()
main_args, remaining_args = main_parser.parse_known_args()
if main_args.testsuite == TestSuite.REST.name:
suite_parser = AlluxioRESTBench.AlluxioRESTArgumentParser(main_parser)
testsuite = AlluxioRESTBench.AlluxioRESTBench(suite_parser.parse_args())
testsuite = AlluxioRESTBench.AlluxioRESTBench(
suite_parser.parse_args()
)
elif main_args.testsuite == TestSuite.FSSPEC.name:
suite_parser = AlluxioFSSpecBench.AlluxioRESTArgumentParser()
testsuite = AlluxioFSSpecBench(suite_parser.parse_args())
suite_parser = AlluxioFSSpecBench.AlluxioFSSpecArgumentParser(
main_parser
)
testsuite = AlluxioFSSpecBench.AlluxioFSSpecBench(
suite_parser.parse_args()
)
elif main_args.testsuite == TestSuite.RAY.name:
suite_parser = RayBench.RayArgumentParser()
testsuite = RayBench(suite_parser.parse_args())
main(main_args, testsuite)
main(main_args, testsuite)
7 changes: 5 additions & 2 deletions tests/benchmark/AbstractBench.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from abc import ABC, abstractmethod
from abc import ABC
from abc import abstractmethod


class AbstractBench(ABC):
def __init__(self, *args, **kwargs):
Expand All @@ -14,7 +16,8 @@ def execute(self):
def init(self):
pass


class AbstractArgumentParser(ABC):
@abstractmethod
def parse_args(self, args=None, namespace=None):
pass
pass
197 changes: 134 additions & 63 deletions tests/benchmark/AlluxioFSSpecBench.py
Original file line number Diff line number Diff line change
@@ -1,90 +1,161 @@
import random
import re
import time
from enum import Enum

from alluxiofs import AlluxioClient
from tests.benchmark.AbstractBench import AbstractBench, AbstractArgumentParser
import argparse
from alluxiofs import AlluxioFileSystem
from tests.benchmark.AbstractBench import AbstractArgumentParser
from tests.benchmark.AbstractBench import AbstractBench


class Op(Enum):
ls = "ls"
info = "info"
open = "open"
cat_file = "cat_file"
open_seq_read = "open_seq_read"
open_random_read = "open_random_read"


class AlluxioFSSpecArgumentParser(AbstractArgumentParser):
def __init__(self):
self.parser = argparse.ArgumentParser(description='AlluxioFSSpec Argument Parser')
def __init__(self, main_parser):
self.parser = main_parser
self.parser.add_argument(
"--path", type=str, required=True, help="Path for the operation."
)
self.parser.add_argument(
'--op',
default=Op.ls,
required=False,
help='REST Op to bench against')
# cat_file args
"--op",
choices=[op.value for op in Op],
default=Op.cat_file.value,
help="Operation to perform.",
)
self.parser.add_argument(
'--filename',
type=str,
required=False,
help='filename to read')
"--bs",
type=int,
default=256 * 1024,
help="Buffer size for read operations.",
)
self.parser.add_argument(
'--irange',
type=str,
required=False,
help='read range in bytes, <str>-<end> (e.g. 1-1024, end inclusive)')
"--iteration", type=int, default=10, help="Iterations."
)

def parse_args(self, args=None, namespace=None):
args = self.parser.parse_args(args, namespace)
print("args:{}", args)
return args


class AlluxioFSSpecBench(AbstractBench):
def __init__(self, *args, **kwargs):
TOTAL_OPS = "total_ops"
TOTAL_BYTES = "total_bytes"

def __init__(self, args, **kwargs):
self.args = args

def init(self):
self.validate_args()

def execute(self):
print("Executing AlluxioFSSpecBench!")
if self.args.op == Op.cat_file:
self.test_cat_file()
pass

def validate_args(self):
if self.args.op == Op.cat_file.name:
required_args_absence = any(arg is None for arg in [
self.args.filename
])
if required_args_absence:
raise Exception(f"Missing args for {self.args.op}")

if self.args.irange is not None:
match = re.match(r"\d+-\d+", self.args.irange)
if match:
nums = [int(x) for x in match.group().split('-')]
if nums[0] >= nums[1]:
raise Exception(f"Invalid irange")
else:
raise Exception(f"Incorrect irange param passed.")
elif self.args.op == Op.ls.name:
pass
elif self.args.op == Op.info.name:
pass
elif self.args.op == Op.open.name:
pass

def test_cat_file(self):
print(f"{self.args.etcd_hosts}, {self.args.worker_hosts}")
alluxio_client = AlluxioClient(
self.alluxio_fs = AlluxioFileSystem(
etcd_hosts=self.args.etcd_hosts,
worker_hosts=self.args.worker_hosts
worker_hosts=self.args.worker_hosts,
)
file_path = self.args.filename
if self.args.irange is not None:
range_str = re.match(r"\d+-\d+", self.args.irange).group()
irange = [int(x) for x in range_str.split('-')]
off = random.randint(irange[0],irange[1])
len = random.randint(1, irange[1]-off+1)
alluxio_client.read_range(file_path, off, len)
self.directories = []
self.files = {}
self.traverse(self.args.path, self.directories, self.files)

def execute(self):
start_time = time.time()
result_metrics = {}
for _ in range(self.args.iteration):
if self.args.op == Op.ls.value:
result_metrics = self.bench_ls()
elif self.args.op == Op.info.value:
result_metrics = self.bench_info()
elif self.args.op == Op.cat_file.value:
result_metrics = self.bench_cat_file()
elif self.args.op == Op.open_seq_read.value:
result_metrics = self.bench_open_seq_read()
elif self.args.op == Op.open_random_read.value:
result_metrics = self.bench_open_random_read()
else:
raise Exception(
f"Unknown Op:{self.args.op} for {self.__class__.__name__}"
)
duration = time.time() - start_time

if result_metrics.get(self.TOTAL_OPS):
print(
f"Benchmark against {self.args.op}: total ops: {result_metrics[self.TOTAL_OPS]} ops/second: {result_metrics[self.TOTAL_OPS] / duration}"
)
if result_metrics.get(self.TOTAL_BYTES):
print(
f"Benchmark against {self.args.op}: total bytes: {result_metrics[self.TOTAL_BYTES]} bytes/second: {result_metrics[self.TOTAL_BYTES] / duration}"
)

if not result_metrics:
print(
f"Benchmark against {self.args.op}: iteration: {self.args.iteration} total time: {duration} seconds"
)

def traverse(self, path, directories, files):
entry = self.alluxio_fs.info(path)
entry_path = entry["name"]
if entry["type"] == "directory":
directories.append(entry_path)
for subpath in self.alluxio_fs.ls(path, detail=False):
self.traverse(subpath, directories, files)
else:
alluxio_client.read(file_path)
files[entry_path] = entry["size"]

def bench_ls(self):
for directory in self.directories:
self.alluxio_fs.ls(directory)
return {self.TOTAL_OPS: len(self.directories)}

def bench_info(self):
for file in self.files.keys():
self.alluxio_fs.info(file)
return {self.TOTAL_OPS: len(self.files.keys())}

def bench_cat_file(self):
total_bytes = 0
for file_path, file_size in self.files.items():
file_read = 0
while file_read < file_size:
read_bytes = min(self.args.bs, file_size - total_bytes)
self.alluxio_fs.cat_file(file_path, 0, read_bytes)
file_read += read_bytes
total_bytes += file_size
return {
self.TOTAL_OPS: len(self.files.keys()),
self.TOTAL_BYTES: total_bytes,
}

def bench_open_seq_read(self):
total_bytes = 0
for file_path, file_size in self.files.items():
with self.alluxio_fs.open(file_path, "rb") as f:
while True:
data = f.read(self.args.bs)
if not data:
break
total_bytes += file_size
return {
self.TOTAL_OPS: len(self.files.keys()),
self.TOTAL_BYTES: total_bytes,
}

def bench_open_random_read(self):
total_bytes = 0
total_ops = 0
for file_path, file_size in self.files.items():
bytes_read = 0
with self.alluxio_fs.open(file_path, "rb") as f:
bytes_to_read = min(file_size, 10 * self.args.bs)
while bytes_read < bytes_to_read:
offset = random.nextInt(file_size)
read_bytes = min(self.args.bs, file_size - offset)
f.seek(offset)
data = f.read(read_bytes)
bytes_read += len(data)
total_ops += 1
total_bytes += bytes_read

return {self.TOTAL_OPS: total_ops, self.TOTAL_BYTES: total_bytes}
Loading

0 comments on commit 53f1cba

Please sign in to comment.