Skip to content

Commit

Permalink
parallelise no_remote_merging.py
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshCu authored Nov 4, 2024
1 parent 4018ebf commit 9f3a314
Showing 1 changed file with 19 additions and 13 deletions.
32 changes: 19 additions & 13 deletions utilities/data_conversion/no_remote_merging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import os
import polars as pl
from collections import defaultdict

from functools import partial
import multiprocessing as mp

def sum_csv_files(file_pattern: str):
# Scan CSV files and assign column names
Expand All @@ -16,6 +17,17 @@ def sum_csv_files(file_pattern: str):

return df.sort("index")

def process_pair(output_path:Path, tup):
nexus, count = tup
if count > 1:
df = sum_csv_files(output_path / f"{nexus}_rank_*.csv")
df.collect().write_csv(output_path / f"{nexus}_output.csv", include_header=False)
for file in output_path.glob(f"{nexus}_rank_*.csv"):
file.unlink()
# use sed to add the spaces in the csv files
output_file = output_path / f"{nexus}_output.csv"
os.system(f"sed -i 's/,/, /g' {output_file.absolute()}")


def merge_outputs(output_path: Path) -> None:
# get all the file names in the folder
Expand All @@ -25,27 +37,21 @@ def merge_outputs(output_path: Path) -> None:
total_files = len(output_files)
# sort the files

nexuse_counts = defaultdict(int)
nexus_counts = defaultdict(int)

for file in output_files:
nexus_id = file.stem.split("_")[0]
nexuse_counts[nexus_id] += 1
nexus_counts[nexus_id] += 1

for file in output_files:
nexus_id = file.stem.split("_")[0]
if nexuse_counts[nexus_id] == 1:
if nexus_counts[nexus_id] == 1:
os.rename(file, output_path / f"{nexus_id}_output.csv")

for nexus, count in nexuse_counts.items():
if count > 1:
df = sum_csv_files(f"{output_path}/{nexus}_rank_*.csv")
df.collect().write_csv(f"{output_path}/{nexus}_output.csv", include_header=False)
for file in output_path.glob(f"{nexus}_rank_*.csv"):
file.unlink()
# use sed to add the spaces in the csv files
os.system(f"sed -i 's/,/, /g' {output_path}/{nexus}_output.csv")
partial_process = partial(process_pair, output_path)
with mp.Pool() as p:
p.map(partial_process,nexus_counts.items())

# delete the files that were merged


if __name__ == "__main__":
Expand Down

0 comments on commit 9f3a314

Please sign in to comment.