Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.0 Experiment: Dask Cluster and Processing Improvements (4/4) #708

Draft
wants to merge 52 commits into
base: dev
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
5223133
Updated deps
ajstewart Jul 10, 2023
c4238a1
Reset migrations for version 2
ajstewart Jul 10, 2023
9f213d9
Working image ingest using UUID
ajstewart Jul 10, 2023
ec8dc54
Update images website to handle uuids still WIP
ajstewart Jul 11, 2023
c500ec5
Working basic association, ideal and new source analysis
ajstewart Jul 11, 2023
96e6463
Updated advanced methods
ajstewart Jul 11, 2023
e085b4f
Updated forced extraction
ajstewart Jul 11, 2023
9d7a79a
Working UUID normal run end to end and website
ajstewart Jul 17, 2023
6c39836
Parallel association and epoch mode support
ajstewart Jul 18, 2023
3b20954
Working add mode
ajstewart Jul 18, 2023
4b8a967
Return parallel forced extraction
ajstewart Jul 18, 2023
f8289ba
Fixed some tests
ajstewart Jul 18, 2023
406a04f
Fixed remaining failing tests
ajstewart Jul 19, 2023
17bcf14
Fixed source search by id
ajstewart Jul 19, 2023
a561801
Working image ingest using UUID
ajstewart Jul 10, 2023
8fc80ca
Working basic association, ideal and new source analysis
ajstewart Jul 11, 2023
53f193f
Initial measurements django copy
ajstewart Jul 18, 2023
57c97a9
Working measurements batch copying
ajstewart Jul 19, 2023
c55c2ad
Implement copy sources
ajstewart Jul 19, 2023
e56a5a5
Working related and associations
ajstewart Jul 19, 2023
2f2bb89
Added docstrings
ajstewart Jul 19, 2023
43ed539
Fix set with copy and added logging
ajstewart Jul 19, 2023
baf3c15
Fix upload when pair_metrics is False
ajstewart Jul 19, 2023
e5e0cd3
Write fix better
ajstewart Jul 19, 2023
21cd907
Working basic association, ideal and new source analysis
ajstewart Jul 11, 2023
ecab961
Added dask[complete] to deps
ajstewart Jul 20, 2023
6a0c7e2
Added distributed to deps
ajstewart Jul 20, 2023
1a35efb
ignore dask worker folder
srggrs Sep 15, 2020
1b6c440
add Dask scheduler settings
srggrs Sep 15, 2020
9fca15d
add daskmanager
srggrs Sep 15, 2020
b8f436a
rename daskmanager, add worker init and dask config
srggrs Sep 17, 2020
3d2b898
moved daskmanager to vast_pipeline
srggrs Sep 23, 2020
0511c5c
change default Dask host and port
srggrs Sep 28, 2020
22bb67c
add run_local_dask_cluster command
srggrs Sep 28, 2020
35b610b
try connect first then start local cluster
srggrs Sep 28, 2020
db23f80
Fix not saved conflict resolve
ajstewart Jul 20, 2023
97d3489
suppress Astropy warning on Dask Cluster workers
srggrs Sep 23, 2020
30a2a13
Added warning log of no cluster found
ajstewart Jul 20, 2023
7ec0f1f
Working association
ajstewart Jul 22, 2023
dd2070e
scatter sources_df to Dask cluster
srggrs Sep 17, 2020
2e58a37
Working up to new sources
ajstewart Jul 22, 2023
4141865
Removed double dm declaration
ajstewart Jul 22, 2023
fe570de
Working new sources
ajstewart Jul 23, 2023
b989bfa
Added option to skip attempting to connect to dask cluster
ajstewart Jul 23, 2023
1a5cb59
Working forced
ajstewart Jul 23, 2023
1d808ba
Working end to end
ajstewart Aug 5, 2023
212af0d
Handled backup when parquet is directory
ajstewart Aug 5, 2023
cdcfe28
Fixed testing errors with add image
ajstewart Aug 5, 2023
fe7048e
Fixed epoch test issue
ajstewart Aug 6, 2023
f27b243
Remove commented function
ajstewart Aug 6, 2023
9d438b9
Fixed remaining tests
ajstewart Aug 6, 2023
a104f71
Remove ipdb lines
ajstewart Aug 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implement copy sources
ajstewart committed Feb 15, 2024
commit c55c2ad4eb4477326c297dc8a27f9c0408d1c9ee
9 changes: 6 additions & 3 deletions vast_pipeline/pipeline/finalise.py
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@
make_upload_sources,
make_upload_related_sources,
update_sources,
copy_upload_sources,
)
from vast_pipeline.pipeline.pairs import calculate_measurement_pair_metrics
from vast_pipeline.pipeline.utils import parallel_groupby
@@ -208,14 +209,15 @@ def final_operations(
"Skipping measurement pair metric calculation as specified in the run configuration."
)

# upload sources to DB, column 'id' with DB id is contained in return
# upload sources to DB
if add_mode:
# if add mode is being used some sources need to updated where as some
# need to be newly uploaded.
# upload new ones first
src_done_mask = srcs_df.index.isin(done_source_ids)
srcs_df_upload = srcs_df.loc[~src_done_mask].copy()
make_upload_sources(srcs_df_upload, p_run, add_mode)
# make_upload_sources(srcs_df_upload, p_run, add_mode)
copy_upload_sources(srcs_df_upload, p_run, add_mode)
# And now update
srcs_df_update = srcs_df.loc[src_done_mask].copy()
logger.info(f"Updating {srcs_df_update.shape[0]} sources with new metrics.")
@@ -224,7 +226,8 @@ def final_operations(
if not srcs_df_upload.empty:
srcs_df = pd.concat([srcs_df, srcs_df_upload])
else:
make_upload_sources(srcs_df, p_run, add_mode)
copy_upload_sources(srcs_df, p_run, add_mode)
# make_upload_sources(srcs_df, p_run, add_mode)

# gather the related df, upload to db and save to parquet file
# the df will look like
59 changes: 58 additions & 1 deletion vast_pipeline/pipeline/loading.py
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@
Image,
)
from vast_pipeline.pipeline.utils import get_create_img, get_create_img_band
from vast_pipeline.utils.utils import StopWatch
from vast_pipeline.utils.utils import StopWatch, deg2hms, deg2dms


logger = logging.getLogger(__name__)
@@ -198,6 +198,63 @@ def copy_upload_measurements(
copy_upload_model(measurements_df[columns_to_upload], Measurement, batch_size=batch_size)


def _generate_source_name(row: pd.Series) -> str:
"""
Generate an IAU compliant source name, see
https://cdsweb.u-strasbg.fr/Dic/iau-spec.html

Args:
row:
The row of the dataframe containing the source information.

Returns:
The generated source name.
"""
name = (
f"J{deg2hms(row['wavg_ra'], precision=1, truncate=True)}"
f"{deg2dms(row['wavg_dec'], precision=0, truncate=True)}"
).replace(":", "")

return name

def _prepare_sources_df_for_upload(sources_df: pd.DataFrame, run_id: str) -> pd.DataFrame:

sources_df["name"] = sources_df[["wavg_ra", "wavg_dec"]].apply(
_generate_source_name, axis=1
)

sources_df["run_id"] = run_id

sources_df = sources_df.reset_index().rename(columns={"source": "id"})

return sources_df


def copy_upload_sources(sources_df: pd.DataFrame, pipeline_run: Run, add_mode: bool = False, batch_size: int = 10_000) -> None:
with transaction.atomic():
if add_mode is False and Source.objects.filter(run=pipeline_run).exists():
logger.info("Removing objects from previous pipeline run")
n_del, detail_del = Source.objects.filter(run=pipeline_run).delete()
logger.info(
(
"Deleting all sources and related objects for this run. "
"Total objects deleted: %i"
),
n_del,
)
logger.debug("(type, #deleted): %s", detail_del)

sources_df_upload = _prepare_sources_df_for_upload(sources_df.copy(), str(pipeline_run.id))

columns_to_upload = []
for fld in Source._meta.get_fields():
if getattr(fld, "attname", None) and fld.attname in sources_df_upload.columns:
columns_to_upload.append(fld.attname)

copy_upload_model(sources_df_upload[columns_to_upload], Source, batch_size=batch_size)

del sources_df_upload


def make_upload_sources(
sources_df: pd.DataFrame, pipeline_run: Run, add_mode: bool = False