-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcsv_analyzer.py
343 lines (279 loc) · 13.9 KB
/
csv_analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
import pandas as pd
import os
from typing import Dict, List, Union, Optional, Set
from pathlib import Path
class CSVAnalyzerGrouping:
"""
A class to load, analyze, and group CSV files based on column presence
and perform grouped operations on the data.
"""
def __init__(self, directory: Optional[str] = None):
"""Initialize the CSVAnalyzerGrouping with empty data storage."""
self.dataframes: Dict[str, pd.DataFrame] = {}
self.all_columns: Set[str] = set()
if directory:
self.load_from_directory(directory)
def load_from_directory(self, path: str) -> None:
"""
Load all CSV files from the specified directory into Pandas DataFrames.
Args:
path (str): Directory path containing CSV files.
Raises:
FileNotFoundError: If the directory doesn't exist.
"""
directory = Path(path)
if not directory.exists():
raise FileNotFoundError(f"Directory not found: {path}")
csv_files = list(directory.glob("*.csv"))
self.load_from_files([str(f) for f in csv_files])
def use_dataframes(self, dataframes: Dict[str, pd.DataFrame]) -> None:
"""
Use existing pandas DataFrames instead of loading from files.
Args:
dataframes (Dict[str, pd.DataFrame]): Dictionary mapping names to DataFrames.
"""
self.dataframes.clear()
self.all_columns.clear()
for name, df in dataframes.items():
try:
if not isinstance(df, pd.DataFrame):
raise ValueError(f"Value for {name} is not a pandas DataFrame")
self.dataframes[name] = df
self.all_columns.update(df.columns)
print(f"Successfully loaded DataFrame: {name}")
except Exception as e:
print(f"Error loading DataFrame {name}: {str(e)}")
def load_from_files(self, files: List[str]) -> None:
"""
Load specified CSV files into Pandas DataFrames.
Args:
files (List[str]): List of file paths to CSV files.
"""
self.dataframes.clear()
self.all_columns.clear()
for file_path in files:
try:
df = pd.read_csv(file_path)
self.dataframes[file_path] = df
self.all_columns.update(df.columns)
print(f"Successfully loaded: {file_path}")
except Exception as e:
print(f"Error loading {file_path}: {str(e)}")
def _create_agg_functions(self, df: pd.DataFrame, column_name: str) -> Dict[str, str]:
"""
Create aggregation functions for each column, using 'first' to maintain original values.
Args:
df (pd.DataFrame): DataFrame to analyze
column_name (str): Name of the grouping column to exclude
Returns:
Dict[str, str]: Dictionary of column names and their aggregation functions
"""
agg_funcs = {}
for col in df.columns:
if col != column_name and not df[col].isna().all(): # Only include non-NaN columns
agg_funcs[col] = 'first'
return agg_funcs
def grouped_data_by_column(self, column_name: str) -> Dict[str, Dict[str, pd.DataFrame]]:
"""
Group data by specified column for files that contain it.
Only includes non-NaN columns from the original CSV being grouped.
Args:
column_name (str): Column name to group by.
Returns:
Dict containing:
'matched': Dict[str, pd.DataFrame] - Grouped data from files with the column
'unmatched': Dict[str, pd.DataFrame] - Data from files without the column
"""
matched_dfs = {}
unmatched_dfs = {}
# Split into matched and unmatched
for file_path, df in self.dataframes.items():
if column_name in df.columns and not df[column_name].isna().all():
# Remove columns that are all NaN
non_nan_cols = [col for col in df.columns if not df[col].isna().all()]
matched_dfs[file_path] = df[non_nan_cols]
else:
unmatched_dfs[file_path] = df
# Group matched data
grouped_matched_dfs = {}
for file_path, df in matched_dfs.items():
try:
# Create appropriate aggregation functions
agg_funcs = self._create_agg_functions(df, column_name)
# Perform grouping with simple aggregation to maintain original column names
grouped_df = df.groupby(column_name).agg(agg_funcs)
# Remove the multi-level column index to keep original column names
if isinstance(grouped_df.columns, pd.MultiIndex):
grouped_df.columns = grouped_df.columns.get_level_values(0)
grouped_matched_dfs[file_path] = grouped_df
print(f"Successfully grouped data from: {file_path}")
except Exception as e:
print(f"Error grouping data from {file_path}: {str(e)}")
unmatched_dfs[file_path] = df # Move to unmatched if grouping fails
return {
"matched": grouped_matched_dfs,
"unmatched": unmatched_dfs
}
def export_matched_data(self, output_dir: str, dataset: Dict[str, Dict[str, pd.DataFrame]],
output_prefix: str = "grouped") -> None:
"""
Export matched (grouped) data to a single combined CSV file.
Only includes non-NaN columns from the original CSV files being grouped.
Args:
output_dir (str): Directory to save the exported file.
dataset (Dict): Dataset containing matched and unmatched data.
output_prefix (str): Prefix for output filename.
"""
matched_data = dataset.get("matched", {})
if not matched_data:
print("No matched data to export")
return
try:
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Initialize an empty list to store all DataFrames
all_dfs = []
# Process each grouped DataFrame
for file_path, df in matched_data.items():
# Remove columns that are all NaN
non_nan_cols = [col for col in df.columns if not df[col].isna().all()]
df_cleaned = df[non_nan_cols]
if isinstance(df_cleaned.index, pd.MultiIndex) or df_cleaned.index.name is not None:
df_cleaned = df_cleaned.reset_index()
# Add source file information
df_cleaned['source_file'] = os.path.basename(file_path)
all_dfs.append(df_cleaned)
# Combine all DataFrames
combined_df = pd.concat(all_dfs, axis=0, ignore_index=True)
# Reorder columns to ensure source_file is last
cols = [col for col in combined_df.columns if col != 'source_file'] + ['source_file']
combined_df = combined_df[cols]
# Export combined DataFrame
output_path = os.path.join(output_dir, f"{output_prefix}_combined.csv")
combined_df.to_csv(output_path, index=False)
print(f"Successfully exported combined data to: {output_path}")
except Exception as e:
print(f"Error exporting combined data: {str(e)}")
def export_unmatched_data(self, output_dir: str, dataset: Dict[str, Dict[str, pd.DataFrame]],
output_prefix: str = "grouped") -> None:
"""
Export unmatched data to CSV files.
Args:
output_dir (str): Directory to save the exported files.
dataset (Dict): Dataset containing matched and unmatched data.
output_prefix (str): Prefix for output filenames.
"""
self._export_data(output_dir, dataset.get("unmatched", {}), output_prefix)
def _export_data(self, output_dir: str, data: Dict[str, pd.DataFrame],
output_prefix: str) -> None:
"""
Helper method to export DataFrames to CSV files.
Args:
output_dir (str): Directory to save the exported files.
data (Dict[str, pd.DataFrame]): Dictionary of DataFrames to export.
output_prefix (str): Prefix for output filenames.
"""
os.makedirs(output_dir, exist_ok=True)
for file_path, df in data.items():
try:
# Extract original filename without extension
base_name = os.path.splitext(os.path.basename(file_path))[0]
output_path = os.path.join(output_dir, f"{output_prefix}_{base_name}.csv")
# Reset index if it's a grouped DataFrame
if isinstance(df.index, pd.MultiIndex) or df.index.name is not None:
df = df.reset_index()
df.to_csv(output_path, index=False)
print(f"Successfully exported: {output_path}")
except Exception as e:
print(f"Error exporting {file_path}: {str(e)}")
def list_all_matched_columns(self) -> Dict[str, List[str]]:
"""
List all columns present in each CSV file that contains matches.
Returns:
Dict[str, List[str]]: Dictionary with filenames as keys and their columns as values.
"""
return {os.path.basename(file_path): list(df.columns)
for file_path, df in self.dataframes.items()}
def list_all_unmatched_columns(self) -> Dict[str, List[str]]:
"""
List columns that are not present in all CSV files.
Returns:
Dict[str, List[str]]: Dictionary with filenames as keys and their unique columns as values.
"""
# Get all columns from all files
all_columns = set()
for df in self.dataframes.values():
all_columns.update(df.columns)
# Find unmatched columns for each file
unmatched_columns = {}
for file_path, df in self.dataframes.items():
missing_cols = all_columns - set(df.columns)
if missing_cols:
unmatched_columns[os.path.basename(file_path)] = list(missing_cols)
return unmatched_columns
def list_all_filenames(self) -> List[str]:
"""
List all loaded CSV filenames.
Returns:
List[str]: List of loaded CSV filenames.
"""
return [os.path.basename(file_path) for file_path in self.dataframes.keys()]
def get_column_data(self, column_name: str) -> Dict[str, pd.Series]:
"""
Get all non-NaN data from a specific column across all CSVs that contain it.
Args:
column_name (str): Name of the column to retrieve.
Returns:
Dict[str, pd.Series]: Dictionary with filenames as keys and column data as values.
"""
column_data = {}
for file_path, df in self.dataframes.items():
if column_name in df.columns and not df[column_name].isna().all():
# Only include non-NaN values
non_nan_data = df[column_name].dropna()
if not non_nan_data.empty:
column_data[os.path.basename(file_path)] = non_nan_data
return column_data
def search_column_value(self, value: Union[str, int, float] = None, **kwargs) -> pd.DataFrame:
"""
Search for rows containing a specific value across all non-NaN columns or matching specific column values.
Args:
value: Value to search for across all columns (optional)
**kwargs: Column-value pairs to search for (e.g., name='John')
Returns:
pd.DataFrame: DataFrame containing all matching rows with source_file column.
Example:
# Search for value across all columns
df = analyzer.search_column_value("John")
# Search for specific column value
df = analyzer.search_column_value(name="John")
# Search with multiple conditions
df = analyzer.search_column_value(name="John", age=30)
"""
matching_dfs = []
for file_path, df in self.dataframes.items():
# Remove columns that are all NaN
non_nan_cols = [col for col in df.columns if not df[col].isna().all()]
df_cleaned = df[non_nan_cols].copy()
df_cleaned['source_file'] = os.path.basename(file_path)
if value is not None:
# Search for value across all non-NaN columns
mask = df_cleaned.astype(str).apply(lambda x: x.str.contains(str(value),
case=False,
na=False)).any(axis=1)
matching_dfs.append(df_cleaned[mask])
elif kwargs:
# Search for specific column-value pairs in non-NaN columns
mask = pd.Series(True, index=df_cleaned.index)
for col, val in kwargs.items():
if col in df_cleaned.columns:
mask &= df_cleaned[col].astype(str).str.contains(str(val),
case=False,
na=False)
matching_dfs.append(df_cleaned[mask])
if not matching_dfs:
return pd.DataFrame()
# Combine all matching rows and ensure source_file is the last column
result = pd.concat(matching_dfs, axis=0, ignore_index=True)
cols = [col for col in result.columns if col != 'source_file'] + ['source_file']
return result[cols]