Skip to content

Commit

Permalink
fix non parallel progress bars
Browse files Browse the repository at this point in the history
  • Loading branch information
dbernaciak committed Dec 22, 2023
1 parent 0586c3a commit bd7d382
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 19 deletions.
73 changes: 56 additions & 17 deletions fusion/fs_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
upload_files,
validate_file_names,
is_dataset_raw,
tqdm_joblib
tqdm_joblib,
)

logger = logging.getLogger(__name__)
VERBOSE_LVL = 25
DEFAULT_CHUNK_SIZE = 2 ** 16
DEFAULT_CHUNK_SIZE = 2**16


def _url_to_path(x):
Expand Down Expand Up @@ -65,11 +65,18 @@ def _download_files(row):
delayed(_download_files)(row) for index, row in df.iterrows()
)
else:
res = Parallel(n_jobs=n_par)(delayed(_download_files)(row) for index, row in df.iterrows())
res = Parallel(n_jobs=n_par)(
delayed(_download_files)(row) for index, row in df.iterrows()
)
elif len(df) > 0:
if tqdm_joblib:
with tqdm_joblib(tqdm(total=len(df))) as _:
res = [_download_files(row) for index, row in df.iterrows()]
if show_progress:
res = [None] * len(df)
with tqdm(total=len(df)) as p:
for i, row in df.iterrows():
r = _download_files(row)
res[i] = r
if r[0] is True:
p.update(1)
else:
res = [_download_files(row) for index, row in df]
else:
Expand All @@ -83,13 +90,19 @@ def _upload(fs_fusion, fs_local, df, n_par, show_progress=True, local_path=""):
df["path"] = local_path + df["path"]
parallel = True if len(df) > 1 else False
res = upload_files(
fs_fusion, fs_local, df, parallel=parallel, n_par=n_par, multipart=True, show_progress=show_progress
fs_fusion,
fs_local,
df,
parallel=parallel,
n_par=n_par,
multipart=True,
show_progress=show_progress,
)

return res


def _generate_sha256_token(path, fs, chunk_size=5 * 2 ** 20):
def _generate_sha256_token(path, fs, chunk_size=5 * 2**20):
hash_sha256 = hashlib.sha256()
chunk_count = 0
with fs.open(path, "rb") as file:
Expand Down Expand Up @@ -143,12 +156,20 @@ def _get_fusion_df(


def _get_local_state(
fs_local, fs_fusion, datasets, catalog, dataset_format=None, local_state=None, local_path=""
fs_local,
fs_fusion,
datasets,
catalog,
dataset_format=None,
local_state=None,
local_path="",
):
local_files = []
local_files_rel = []
local_dirs = (
[f"{local_path}{catalog}/{i}" for i in datasets] if len(datasets) > 0 else [local_path + catalog]
[f"{local_path}{catalog}/{i}" for i in datasets]
if len(datasets) > 0
else [local_path + catalog]
)

for local_dir in local_dirs:
Expand All @@ -162,7 +183,9 @@ def _get_local_state(
f for flag, f in zip(local_file_validation, local_files) if flag
]
local_files_rel += [
os.path.join(local_dir, relpath(i, local_dir)).replace("\\", "/").replace(local_path, "")
os.path.join(local_dir, relpath(i, local_dir))
.replace("\\", "/")
.replace(local_path, "")
for i in local_files
]

Expand Down Expand Up @@ -201,7 +224,7 @@ def _synchronize(
direction: str = "upload",
n_par: int = None,
show_progress: bool = True,
local_path = ""
local_path="",
):
"""Synchronize two filesystems."""

Expand All @@ -218,7 +241,12 @@ def _synchronize(
)
join_df = join_df[join_df["sha256_local"] != join_df["sha256_fusion"]]
res = _upload(
fs_fusion, fs_local, join_df, n_par, show_progress=show_progress, local_path=local_path
fs_fusion,
fs_local,
join_df,
n_par,
show_progress=show_progress,
local_path=local_path,
)
elif direction == "download":
if len(df_fusion) == 0:
Expand All @@ -234,7 +262,12 @@ def _synchronize(
)
join_df = join_df[join_df["sha256_local"] != join_df["sha256_fusion"]]
res = _download(
fs_fusion, fs_local, join_df, n_par, show_progress=show_progress, local_path=local_path
fs_fusion,
fs_local,
join_df,
n_par,
show_progress=show_progress,
local_path=local_path,
)
else:
raise ValueError("Unknown direction of operation.")
Expand All @@ -252,7 +285,7 @@ def fsync(
dataset_format=None,
n_par=None,
show_progress=True,
local_path = "",
local_path="",
log_level=logging.ERROR,
log_path: str = ".",
):
Expand Down Expand Up @@ -319,7 +352,13 @@ def fsync(
while True:
try:
local_state_temp = _get_local_state(
fs_local, fs_fusion, datasets, catalog, dataset_format, local_state, local_path
fs_local,
fs_fusion,
datasets,
catalog,
dataset_format,
local_state,
local_path,
)
fusion_state_temp = _get_fusion_df(
fs_fusion, datasets, catalog, flatten, dataset_format
Expand All @@ -335,7 +374,7 @@ def fsync(
direction,
n_par,
show_progress,
local_path
local_path,
)
if len(res) == 0 or all((i[0] for i in res)):
local_state = local_state_temp
Expand Down
9 changes: 7 additions & 2 deletions fusion/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,9 +911,14 @@ def _upload(row):
delayed(_upload)(row) for index, row in loop.iterrows()
)
else:
res = [None] * len(loop)
if show_progress:
with tqdm_joblib(tqdm(total=len(loop))) as _:
res = [_upload(row) for index, row in loop.iterrows()]
with tqdm(total=len(loop)) as p:
for i, row in loop.iterrows():
r = _upload(row)
res[i] = r
if r[0] is True:
p.update(1)
else:
res = [_upload(row) for index, row in loop.iterrows()]

Expand Down

0 comments on commit bd7d382

Please sign in to comment.