-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathspectrum_1d_joint.py
140 lines (105 loc) · 4.27 KB
/
spectrum_1d_joint.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import os
import time
from pathlib import Path
import astropy.units as u
import numpy as np
import yaml
from astropy.coordinates import Angle, SkyCoord
from regions import CircleSkyRegion
from gammapy.data import DataStore
from gammapy.datasets import Datasets, SpectrumDataset, SpectrumDatasetOnOff
from gammapy.estimators import FluxPointsEstimator
from gammapy.makers import (ReflectedRegionsBackgroundMaker, SafeMaskMaker,
SpectrumDatasetMaker)
from gammapy.maps import WcsGeom, MapAxis, RegionGeom
from gammapy.modeling import Fit
from gammapy.modeling.models import (PointSpatialModel, PowerLawSpectralModel,
SkyModel)
N_OBS = int(os.environ.get("GAMMAPY_BENCH_N_OBS", 10))
def data_prep():
data_store = DataStore.from_dir("$GAMMAPY_DATA/hess-dl3-dr1/")
OBS_ID = 23523
obs_ids = OBS_ID * np.ones(N_OBS)
observations = data_store.get_observations(obs_ids)
target_position = SkyCoord(ra=83.63, dec=22.01, unit="deg", frame="icrs")
on_region_radius = Angle("0.11 deg")
on_region = CircleSkyRegion(center=target_position, radius=on_region_radius)
exclusion_region = CircleSkyRegion(
center=SkyCoord(183.604, -8.708, unit="deg", frame="galactic"),
radius=0.5 * u.deg,
)
skydir = target_position.galactic
mask_geom = WcsGeom.create(
npix=(150, 150), binsz=0.05, skydir=skydir, proj="TAN", frame="galactic"
)
exclusion_mask = mask_geom.region_mask([exclusion_region], inside=False)
e_reco = MapAxis.from_bounds(
0.1, 40, nbin=40, interp="log", unit="TeV", name="energy"
)
e_true = MapAxis.from_bounds(
0.05, 100, nbin=200, interp="log", unit="TeV", name="energy_true"
)
geom = RegionGeom(on_region, axes=[e_reco])
empty = SpectrumDatasetOnOff.create(geom, energy_axis_true=e_true)
dataset_maker = SpectrumDatasetMaker(
containment_correction=True, selection=["counts", "exposure", "edisp"]
)
bkg_maker = ReflectedRegionsBackgroundMaker(exclusion_mask=exclusion_mask)
safe_mask_masker = SafeMaskMaker(methods=["aeff-max"], aeff_percent=10)
spectral_model = PowerLawSpectralModel(
index=2, amplitude=2e-11 * u.Unit("cm-2 s-1 TeV-1"), reference=1 * u.TeV
)
spatial_model = PointSpatialModel(
lon_0=target_position.ra, lat_0=target_position.dec, frame="icrs"
)
spatial_model.lon_0.frozen = True
spatial_model.lat_0.frozen = True
sky_model = SkyModel(
spatial_model=spatial_model, spectral_model=spectral_model, name=""
)
# Data preparation
datasets = []
for idx, observation in enumerate(observations):
dataset = empty.copy(name=f"dataset{idx}")
dataset = dataset_maker.run(dataset=dataset, observation=observation)
dataset_on_off = bkg_maker.run(dataset, observation)
dataset_on_off = safe_mask_masker.run(dataset_on_off, observation)
dataset_on_off.models = sky_model
datasets.append(dataset_on_off)
return Datasets(datasets)
def write(stacked, filename):
path = Path.cwd()
stacked.write( path / f"{filename}_datasets.yaml", filename_models= path / f"{filename}_models.yaml", overwrite=True)
def read(filename):
path = Path.cwd()
return Datasets.read(
path / f"{filename}_datasets.yaml", filename_models= path / f"{filename}_models.yaml"
)
def data_fit(datasets):
fit = Fit(optimize_opts={"print_level": 1})
result = fit.run(datasets=datasets)
def flux_point(datasets):
e_edges = MapAxis.from_bounds(0.7, 30, nbin=11, interp="log", unit="TeV").edges
fpe = FluxPointsEstimator(energy_edges=e_edges)
fpe.run(datasets=datasets)
def run_benchmark():
info = {"n_obs": N_OBS}
filename = "joint"
t = time.time()
datasets = data_prep()
info["data_preparation"] = time.time() - t
t = time.time()
write(datasets, filename)
info["writing"] = time.time() - t
t = time.time()
datasets = read(filename)
info["reading"] = time.time() - t
t = time.time()
data_fit(datasets)
info["data_fitting"] = time.time() - t
t = time.time()
flux_point(datasets)
info["flux_point"] = time.time() - t
Path("bench.yaml").write_text(yaml.dump(info, sort_keys=False, indent=4))
if __name__ == "__main__":
run_benchmark()