Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sspectrum #5

Merged
merged 15 commits into from
Sep 14, 2023
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9"]
python-version: ["3.8", "3.9", "3.10"]

steps:
- name: Set up Python ${{ matrix.python-version }}
Expand Down
87 changes: 6 additions & 81 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
Expand All @@ -22,81 +13,15 @@ sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# pyenv
.python-version

# celery beat schedule file
celerybeat-schedule

# SageMath parsed files
*.sage.py

# dotenv
.env

# virtualenv
.venv
venv/
ENV/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site
# Unit test / coverage reports
.pytest_cache/

# mypy
.mypy_cache/
# Compiled bytecode
surfvis/*.pyc
surfvis/__pycache__/
tests/__pycache__/
Binary file removed surfvis/__pycache__/__init__.cpython-37.pyc
Binary file not shown.
Binary file removed surfvis/__pycache__/surfvis.cpython-37.pyc
Binary file not shown.
70 changes: 45 additions & 25 deletions surfvis/flagchi2.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,35 @@
import dask.array as da
from dask.diagnostics import ProgressBar
from surfvis.utils import flagchisq
from daskms import xds_from_ms, xds_from_table, xds_to_table
from daskms import xds_from_storage_ms as xds_from_ms
from daskms import xds_from_storage_table as xds_from_table
from daskms import xds_to_storage_table as xds_to_table


def create_parser():
parser = OptionParser(usage='%prog [options] msname')
parser.add_option('--rcol', default='RESIDUAL',
help='Residual column (default = RESIDUAL)')
parser.add_option('--scol',
help='sigma column to use. Overides wcol if provided.')
parser.add_option('--wcol', default='WEIGHT_SPECTRUM',
help='Weight column (default = WEIGHT_SPECTRUM)')
help='Weight column (default = WEIGHT_SPECTRUM). '
'The special value SIGMA_SPECTRUM can be passed to '
'initialise the weights as 1/sigma**2')
parser.add_option('--fcol', default='FLAG',
help='Flag column (default = FLAG)')
parser.add_option('--sigma', default=3, type=float,
help='chisq threshold (default = 3)')
parser.add_option('--flag-above', default=3, type=float,
help='flag data with chisq above this value (default = 3)')
parser.add_option('--unflag-below', default=1.15, type=float,
help='unflag data with chisq below this value (default = 1.15)')
parser.add_option('--nthreads', default=4, type=int,
help='Number of dask threads to use')
parser.add_option('--nrows', default=10000, type=int,
parser.add_option('--nrows', default=250000, type=int,
help='Number of rows in each chunk (default=10000)')
parser.add_option('--nfreqs', default=128, type=int,
parser.add_option('--nfreqs', default=512, type=int,
help='Number of frequencies in a chunk (default=128)')
parser.add_option("--use-corrs", type=str,
help='Comma seprated list of correlations to use (do not use spaces)')
parser.add_option("--respect-ants", type=str,
help='Comma seprated list of antennas to respect (do not use spaces)')
return parser

def main():
Expand All @@ -48,20 +54,17 @@ def main():
else:
msname = args[0].rstrip('/')

from multiprocessing.pool import ThreadPool
dask.config.set(pool=ThreadPool(options.nthreads))

schema = {}
schema[options.rcol] = {'dims': ('chan', 'corr')}
schema[options.wcol] = {'dims': ('chan', 'corr')}
schema[options.fcol] = {'dims': ('chan', 'corr')}
columns = [options.rcol, options.fcol]
if options.scol is not None:
schema[options.scol] = {'dims': ('chan', 'corr')}
columns.append(options.scol)
else:
schema[options.wcol] = {'dims': ('chan', 'corr')}
columns.append(options.wcol)


xds = xds_from_ms(msname,
columns=columns,
columns=[options.rcol, options.wcol, options.fcol,
'ANTENNA1', 'ANTENNA2'],
chunks={'row': options.nrows, 'chan': options.nfreqs},
group_cols=['FIELD_ID', 'DATA_DESC_ID', 'SCAN_NUMBER'],
table_schema=schema)
Expand All @@ -73,25 +76,42 @@ def main():
else:
use_corrs = [0]
else:
use_corrs = list(map(int, options.use_corrs.split(',')))
use_corrs = tuple(map(int, options.use_corrs.split(',')))
print(f"Using correlations {use_corrs}")

if options.respect_ants is not None:
rants = list(map(int, options.respect_ants.split(',')))
else:
rants = []

out_data = []
for i, ds in enumerate(xds):
resid = ds.get(options.rcol).data
if options.scol is not None:
sigma = ds.get(options.scol).data
weight = 1/sigma**2
if options.wcol == 'SIGMA_SPECTRUM':
weight = 1.0/ds.get(options.wcol).data**2
else:
weight = ds.get(options.wcol).data
flag = ds.get(options.fcol).data
ant1 = ds.ANTENNA1.data
ant2 = ds.ANTENNA2.data

uflag = flagchisq(resid, weight, flag, tuple(use_corrs), sigma=options.sigma)
uflag = flagchisq(resid, weight, flag, ant1, ant2,
use_corrs=tuple(use_corrs),
flag_above=options.flag_above,
unflag_below=options.unflag_below,
respect_ants=tuple(rants))

out_ds = ds.assign(**{options.fcol: (("row", "chan", "corr"), uflag)})

# update FLAG_ROW
flag_row = da.all(uflag.rechunk({1:-1, 2:-1}), axis=(1,2))

out_ds = out_ds.assign(**{'FLAG_ROW': (("row",), flag_row)})

out_data.append(out_ds)

writes = xds_to_table(out_data, msname, columns=[options.fcol])
writes = xds_to_table(out_data, msname,
columns=[options.fcol, 'FLAG_ROW'],
rechunk=True)

with ProgressBar():
dask.compute(writes)
dask.compute(writes)
Loading