-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconcat_parquet.py
145 lines (100 loc) · 4.4 KB
/
concat_parquet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from typing import List, Dict
import re
import glob
import os
from pathlib import Path
from dataclasses import dataclass
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import pyarrow as pa
import pyarrow.feather
import pyarrow.parquet
import pyarrow.dataset
import numpy as np
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
paths = ["./output/summary_r*.parquet"]
#paths = ["./output/summary_r*.arrow"]
output_dir = Path("./output")
output_dir.mkdir(exist_ok=True)
realidxregexp = re.compile(r"summary_r(\d+)")
globbedpaths = [glob.glob(path) for path in paths]
globbedpaths = sorted(list({item for sublist in globbedpaths for item in sublist}))
@dataclass
class FileEntry:
real: int;
filename: str;
files_to_process: List[FileEntry] = []
for path in globbedpaths:
real = None
for path_comp in reversed(path.split(os.path.sep)):
realmatch = re.match(realidxregexp, path_comp)
if realmatch:
real = int(realmatch.group(1))
files_to_process.append(FileEntry(real=real, filename=path))
break
files_to_process = sorted(files_to_process, key=lambda e: e.real)
# Limit number of files to process
#files_to_process = files_to_process[1:10]
LOGGER.info("Doing CONCATENATION")
start_s = time.perf_counter()
def read_and_build_table_for_one_real(entry: FileEntry) -> pa.Table:
LOGGER.info(f"real={entry.real}: {entry.filename}")
start_read_s = time.perf_counter()
if Path(entry.filename).suffix == ".parquet":
table = pa.parquet.read_table(entry.filename)
#table = pa.parquet.read_table(entry.filename, memory_map=True)
#table = pa.dataset.dataset(entry.filename, format="parquet").to_table()
else:
table = pa.feather.read_table(entry.filename)
et_read_s = time.perf_counter() - start_read_s
#table = table.select(["DATE", "FOPR", "TCPU", "BWIP:28,20,13"])
table = table.add_column(1, "REAL", pa.array(np.full(table.num_rows, entry.real)))
LOGGER.info(f"read time: {et_read_s:.2f}s table shape: {table.shape}")
return table
lap_s = time.perf_counter()
table_list = []
for entry in files_to_process:
table = read_and_build_table_for_one_real(entry)
table_list.append(table)
"""
#with ProcessPoolExecutor() as executor:
with ThreadPoolExecutor() as executor:
futures = executor.map(read_and_build_table_for_one_real, files_to_process)
for f in futures:
table_list.append(f)
"""
LOGGER.info(f"All input files read into memory in {(time.perf_counter() - lap_s):.2f}s")
unique_column_names = set()
for table in table_list:
unique_column_names.update(table.schema.names)
LOGGER.info(f"number of unique column names: {len(unique_column_names)}")
LOGGER.info(f"number of tables to concatenate: {len(table_list)}")
lap_s = time.perf_counter()
# Need to investigate this further
# The default promote=False requires all schemas to be the same
# What should really happen if the realizations have different columns?
#combined_table = pa.concat_tables(table_list)
combined_table = pa.concat_tables(table_list, promote=True)
LOGGER.info(f"combined table shape: {combined_table.shape}")
LOGGER.info(f"In-memory concatenation took {(time.perf_counter() - lap_s):.2f}s")
lap_s = time.perf_counter()
output_parquet_filename = str(output_dir / "concat.parquet")
LOGGER.info(f"Writing parquet output to: {output_parquet_filename}")
pa.parquet.write_table(combined_table, output_parquet_filename)
LOGGER.info(f"Writing parquet took {(time.perf_counter() - lap_s):.2f}s")
lap_s = time.perf_counter()
output_feather_filename = str(output_dir / "concat.arrow")
LOGGER.info(f"Writing feather output to: {output_feather_filename}")
pa.feather.write_feather(combined_table, dest=output_feather_filename)
LOGGER.info(f"Writing feather took {(time.perf_counter() - lap_s):.2f}s")
# combined_table_with_combined_chunks = combined_table.combine_chunks()
# output_feather_filename = str(output_dir / "concat_with_combined_chunks.arrow")
# LOGGER.info(f"Writing feather output to: {output_feather_filename}")
# pa.feather.write_feather(combined_table_with_combined_chunks, dest=output_feather_filename)
# LOGGER.info(f"Writing feather took {(time.perf_counter() - lap_s):.2f}s")
LOGGER.info(f"DONE! total time was {(time.perf_counter() - start_s):.2f}s")
#df = combined_table.to_pandas(timestamp_as_object=True)
#print(df.head())