Skip to content

Commit

Permalink
Remove extract and remove support for partitioned arrow files
Browse files Browse the repository at this point in the history
  • Loading branch information
ddobie committed Jan 17, 2025
1 parent 30990f1 commit 6cbe3d5
Showing 1 changed file with 6 additions and 14 deletions.
20 changes: 6 additions & 14 deletions vasttools/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ def load_two_epoch_metrics(self) -> None:
measurement_pairs_df.pair_epoch_key, agg='count'
)

pair_counts = pair_counts.extract().to_pandas_df().rename(
pair_counts = pair_counts.to_pandas_df().rename(
columns={'count': 'total_pairs'}
).set_index('pair_epoch_key')
else:
Expand Down Expand Up @@ -978,7 +978,7 @@ def _filter_meas_pairs_df(
)

if not self._vaex_meas_pairs:
new_measurement_pairs = new_measurement_pairs.extract().to_pandas_df()
new_measurement_pairs = new_measurement_pairs.to_pandas_df()

return new_measurement_pairs

Expand Down Expand Up @@ -1027,7 +1027,7 @@ def recalc_measurement_pairs_df(

# convert a vaex measurements df to panads so an index can be set
if isinstance(measurements_df, vaex.dataframe.DataFrame):
measurements_df = measurements_df[flux_cols].extract().to_pandas_df()
measurements_df = measurements_df[flux_cols].to_pandas_df()
else:
measurements_df = measurements_df.loc[:, flux_cols].copy()

Expand Down Expand Up @@ -1213,7 +1213,7 @@ def recalc_sources_df(
)

# Switch to pandas at this point to perform join
sources_df = sources_df.extract().to_pandas_df().set_index('source')
sources_df = sources_df.to_pandas_df().set_index('source')

sources_df = sources_df.join(sources_df_fluxes)

Expand Down Expand Up @@ -1849,7 +1849,7 @@ def run_two_epoch_analysis(
(pairs_df[vs_label] > vs) & (pairs_df[m_abs_label] > m)
]

candidate_pairs = candidate_pairs.extract().to_pandas_df()
candidate_pairs = candidate_pairs.to_pandas_df()

else:
candidate_pairs = pairs_df.loc[
Expand Down Expand Up @@ -2677,19 +2677,11 @@ def load_run(

arrow_path = run_dir / 'measurements.arrow'

if arrow_path.is_file():
if arrow_path.exists():
vaex_meas = True
measurements = vaex.open(arrow_path)
warnings.warn(
"Measurements have been loaded with vaex from a single file.")

elif arrow_path.is_dir():
vaex_meas = True
measurements = vaex.open(arrow_path / '*.parquet')
warnings.warn(
"Measurements have been loaded with vaex "
"from a partitioned file.")

else:
m_files = images['measurements_path'].tolist()
m_files += sorted(run_dir.glob("forced_measurements*.parquet"))
Expand Down

0 comments on commit 6cbe3d5

Please sign in to comment.