Skip to content

Commit

Permalink
Merge pull request #5 from ratt-ru/sspectrum
Browse files Browse the repository at this point in the history
Sspectrum
  • Loading branch information
landmanbester authored Sep 14, 2023
2 parents 119d0f4 + c528f14 commit 51c3de3
Show file tree
Hide file tree
Showing 8 changed files with 613 additions and 641 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9"]
python-version: ["3.8", "3.9", "3.10"]

steps:
- name: Set up Python ${{ matrix.python-version }}
Expand Down
87 changes: 6 additions & 81 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
Expand All @@ -22,81 +13,15 @@ sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# pyenv
.python-version

# celery beat schedule file
celerybeat-schedule

# SageMath parsed files
*.sage.py

# dotenv
.env

# virtualenv
.venv
venv/
ENV/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site
# Unit test / coverage reports
.pytest_cache/

# mypy
.mypy_cache/
# Compiled bytecode
surfvis/*.pyc
surfvis/__pycache__/
tests/__pycache__/
Binary file removed surfvis/__pycache__/__init__.cpython-37.pyc
Binary file not shown.
Binary file removed surfvis/__pycache__/surfvis.cpython-37.pyc
Binary file not shown.
70 changes: 45 additions & 25 deletions surfvis/flagchi2.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,35 @@
import dask.array as da
from dask.diagnostics import ProgressBar
from surfvis.utils import flagchisq
from daskms import xds_from_ms, xds_from_table, xds_to_table
from daskms import xds_from_storage_ms as xds_from_ms
from daskms import xds_from_storage_table as xds_from_table
from daskms import xds_to_storage_table as xds_to_table


def create_parser():
parser = OptionParser(usage='%prog [options] msname')
parser.add_option('--rcol', default='RESIDUAL',
help='Residual column (default = RESIDUAL)')
parser.add_option('--scol',
help='sigma column to use. Overides wcol if provided.')
parser.add_option('--wcol', default='WEIGHT_SPECTRUM',
help='Weight column (default = WEIGHT_SPECTRUM)')
help='Weight column (default = WEIGHT_SPECTRUM). '
'The special value SIGMA_SPECTRUM can be passed to '
'initialise the weights as 1/sigma**2')
parser.add_option('--fcol', default='FLAG',
help='Flag column (default = FLAG)')
parser.add_option('--sigma', default=3, type=float,
help='chisq threshold (default = 3)')
parser.add_option('--flag-above', default=3, type=float,
help='flag data with chisq above this value (default = 3)')
parser.add_option('--unflag-below', default=1.15, type=float,
help='unflag data with chisq below this value (default = 1.15)')
parser.add_option('--nthreads', default=4, type=int,
help='Number of dask threads to use')
parser.add_option('--nrows', default=10000, type=int,
parser.add_option('--nrows', default=250000, type=int,
help='Number of rows in each chunk (default=10000)')
parser.add_option('--nfreqs', default=128, type=int,
parser.add_option('--nfreqs', default=512, type=int,
help='Number of frequencies in a chunk (default=128)')
parser.add_option("--use-corrs", type=str,
help='Comma seprated list of correlations to use (do not use spaces)')
parser.add_option("--respect-ants", type=str,
help='Comma seprated list of antennas to respect (do not use spaces)')
return parser

def main():
Expand All @@ -48,20 +54,17 @@ def main():
else:
msname = args[0].rstrip('/')

from multiprocessing.pool import ThreadPool
dask.config.set(pool=ThreadPool(options.nthreads))

schema = {}
schema[options.rcol] = {'dims': ('chan', 'corr')}
schema[options.wcol] = {'dims': ('chan', 'corr')}
schema[options.fcol] = {'dims': ('chan', 'corr')}
columns = [options.rcol, options.fcol]
if options.scol is not None:
schema[options.scol] = {'dims': ('chan', 'corr')}
columns.append(options.scol)
else:
schema[options.wcol] = {'dims': ('chan', 'corr')}
columns.append(options.wcol)


xds = xds_from_ms(msname,
columns=columns,
columns=[options.rcol, options.wcol, options.fcol,
'ANTENNA1', 'ANTENNA2'],
chunks={'row': options.nrows, 'chan': options.nfreqs},
group_cols=['FIELD_ID', 'DATA_DESC_ID', 'SCAN_NUMBER'],
table_schema=schema)
Expand All @@ -73,25 +76,42 @@ def main():
else:
use_corrs = [0]
else:
use_corrs = list(map(int, options.use_corrs.split(',')))
use_corrs = tuple(map(int, options.use_corrs.split(',')))
print(f"Using correlations {use_corrs}")

if options.respect_ants is not None:
rants = list(map(int, options.respect_ants.split(',')))
else:
rants = []

out_data = []
for i, ds in enumerate(xds):
resid = ds.get(options.rcol).data
if options.scol is not None:
sigma = ds.get(options.scol).data
weight = 1/sigma**2
if options.wcol == 'SIGMA_SPECTRUM':
weight = 1.0/ds.get(options.wcol).data**2
else:
weight = ds.get(options.wcol).data
flag = ds.get(options.fcol).data
ant1 = ds.ANTENNA1.data
ant2 = ds.ANTENNA2.data

uflag = flagchisq(resid, weight, flag, tuple(use_corrs), sigma=options.sigma)
uflag = flagchisq(resid, weight, flag, ant1, ant2,
use_corrs=tuple(use_corrs),
flag_above=options.flag_above,
unflag_below=options.unflag_below,
respect_ants=tuple(rants))

out_ds = ds.assign(**{options.fcol: (("row", "chan", "corr"), uflag)})

# update FLAG_ROW
flag_row = da.all(uflag.rechunk({1:-1, 2:-1}), axis=(1,2))

out_ds = out_ds.assign(**{'FLAG_ROW': (("row",), flag_row)})

out_data.append(out_ds)

writes = xds_to_table(out_data, msname, columns=[options.fcol])
writes = xds_to_table(out_data, msname,
columns=[options.fcol, 'FLAG_ROW'],
rechunk=True)

with ProgressBar():
dask.compute(writes)
dask.compute(writes)
Loading

0 comments on commit 51c3de3

Please sign in to comment.