Skip to content

Commit

Permalink
Merge pull request #93 from bwheelz36/re_grid
Browse files Browse the repository at this point in the history
Re grid
  • Loading branch information
bwheelz36 authored Feb 23, 2023
2 parents 5a5a535 + a0d1ccc commit cdc2355
Show file tree
Hide file tree
Showing 8 changed files with 1,070 additions and 314 deletions.
230 changes: 190 additions & 40 deletions ParticlePhaseSpace/_ParticlePhaseSpace.py

Large diffs are not rendered by default.

174 changes: 87 additions & 87 deletions examples/basic_example.ipynb

Large diffs are not rendered by default.

507 changes: 507 additions & 0 deletions examples/grid_merge.ipynb

Large diffs are not rendered by default.

46 changes: 23 additions & 23 deletions examples/new_data_exporter.ipynb

Large diffs are not rendered by default.

28 changes: 14 additions & 14 deletions examples/new_data_loader.ipynb

Large diffs are not rendered by default.

182 changes: 108 additions & 74 deletions examples/resampling_via_gaussian_kde.ipynb

Large diffs are not rendered by default.

100 changes: 50 additions & 50 deletions examples/units.ipynb

Large diffs are not rendered by default.

117 changes: 91 additions & 26 deletions tests/test_ParticlePhaseSpace.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import numpy as np
import pandas as pd

this_file_loc = Path(__file__).parent
sys.path.insert(0, str(this_file_loc.parent))
from ParticlePhaseSpace import DataLoaders
Expand All @@ -17,45 +18,43 @@


def test_all_allowed_columns_can_be_filled():

for col in list(ps_cfg.allowed_columns.keys()):
try:
PS.__getattribute__(ps_cfg.allowed_columns[col])()
except AttributeError:
raise AttributeError(f'unable to fill required column {col}')

def test_downsample_phase_space():

def test_downsample_phase_space():
PS_downsample = PS.get_downsampled_phase_space(downsample_factor=10)
assert len(PS)/len(PS_downsample) > 9.5 and len(PS)/len(PS_downsample) < 10.5 # to account for rounding
assert len(PS) / len(PS_downsample) > 9.5 and len(PS) / len(PS_downsample) < 10.5 # to account for rounding

def test_twiss_stability():

def test_twiss_stability():
PS.calculate_twiss_parameters(beam_direction='z')
# compare with previous calc
with open(test_data_loc / 'twiss_stability.json') as f:
old_data = json.load(f)

for particle_key in old_data:
for direction_key in old_data[particle_key]:
for twiss_key in old_data[particle_key][direction_key]:
for twiss_key in old_data[particle_key][direction_key]:
np.allclose(old_data[particle_key][direction_key][twiss_key],
PS.twiss_parameters[particle_key][direction_key][twiss_key])

def test_energy_stats_stability():

def test_energy_stats_stability():
PS.calculate_energy_statistics()
# compare with previous calc
with open(test_data_loc / 'energy_stats.json') as f:
old_data = json.load(f)
for particle_key in old_data:
for energy_key in old_data[particle_key]:
for energy_key in old_data[particle_key]:
np.allclose(old_data[particle_key][energy_key],
PS.energy_stats[particle_key][energy_key])

def test_project_particles():


def test_project_particles():
# manual projection in z direction:
project_dist = 100
x1 = PS.ps_data['x [mm]'][0]
Expand All @@ -65,12 +64,13 @@ def test_project_particles():
py1 = PS.ps_data['py [MeV/c]'][0]
pz1 = PS.ps_data['pz [MeV/c]'][0]

x2 = x1 + ((px1/pz1) * project_dist)
x2 = x1 + ((px1 / pz1) * project_dist)
y2 = y1 + ((py1 / pz1) * project_dist)
z2 = z1 + ((pz1 / pz1) * project_dist)
PS_projected = PS.project_particles(beam_direction='z', distance=project_dist)
# compare:
assert np.allclose([x2, y2, z2], [PS_projected.ps_data['x [mm]'][0], PS_projected.ps_data['y [mm]'][0], PS_projected.ps_data['z [mm]'][0]])
assert np.allclose([x2, y2, z2], [PS_projected.ps_data['x [mm]'][0], PS_projected.ps_data['y [mm]'][0],
PS_projected.ps_data['z [mm]'][0]])

# manual projection in x direction:
project_dist = 100
Expand All @@ -81,12 +81,13 @@ def test_project_particles():
py1 = PS.ps_data['py [MeV/c]'][0]
pz1 = PS.ps_data['pz [MeV/c]'][0]

x2 = x1 + ((px1/px1) * project_dist)
x2 = x1 + ((px1 / px1) * project_dist)
y2 = y1 + ((py1 / px1) * project_dist)
z2 = z1 + ((pz1 / px1) * project_dist)
PS_projected = PS.project_particles(beam_direction='x', distance=project_dist)
# compare:
assert np.allclose([x2, y2, z2], [PS_projected.ps_data['x [mm]'][0], PS_projected.ps_data['y [mm]'][0], PS_projected.ps_data['z [mm]'][0]])
assert np.allclose([x2, y2, z2], [PS_projected.ps_data['x [mm]'][0], PS_projected.ps_data['y [mm]'][0],
PS_projected.ps_data['z [mm]'][0]])

# manual projection in y direction:
project_dist = 100
Expand All @@ -97,12 +98,14 @@ def test_project_particles():
py1 = PS.ps_data['py [MeV/c]'][0]
pz1 = PS.ps_data['pz [MeV/c]'][0]

x2 = x1 + ((px1/py1) * project_dist)
x2 = x1 + ((px1 / py1) * project_dist)
y2 = y1 + ((py1 / py1) * project_dist)
z2 = z1 + ((pz1 / py1) * project_dist)
PS_projected = PS.project_particles(beam_direction='y', distance=project_dist)
# compare:
assert np.allclose([x2, y2, z2], [PS_projected.ps_data['x [mm]'][0], PS_projected.ps_data['y [mm]'][0], PS_projected.ps_data['z [mm]'][0]])
assert np.allclose([x2, y2, z2], [PS_projected.ps_data['x [mm]'][0], PS_projected.ps_data['y [mm]'][0],
PS_projected.ps_data['z [mm]'][0]])


def test_reset_phase_space():
PS.reset_phase_space()
Expand All @@ -111,25 +114,26 @@ def test_reset_phase_space():
if not col_name in ps_cfg.get_required_column_names(PS._units):
raise AttributeError(f'non allowed column "{col_name}" in data.')

def test_get_particle_density():

def test_get_particle_density():
particle_density = PS.assess_density_versus_r(verbose=False)
old_density = pd.read_csv(test_data_loc / 'particle_density.csv', index_col=0).squeeze("columns")
np.allclose(particle_density, old_density)

def test_get_seperated_phase_space():

def test_get_seperated_phase_space():
# had to reload the data for some reason:
data = DataLoaders.Load_TopasData(test_data_loc / 'coll_PhaseSpace_xAng_0.00_yAng_0.00_angular_error_0.0.phsp')
PS = PhaseSpace(data)

electron_PS = PS('electrons')
electron_PS2= PS(11)
electron_PS2 = PS(11)
assert np.allclose(electron_PS.ps_data, electron_PS2.ps_data)
assert len(electron_PS) == 2853

no_electron_PS = PS - electron_PS


def test_add_phase_space():
# had to reload the data for some reason:
data = DataLoaders.Load_TopasData(test_data_loc / 'coll_PhaseSpace_xAng_0.00_yAng_0.00_angular_error_0.0.phsp')
Expand All @@ -142,18 +146,20 @@ def test_add_phase_space():
with pytest.raises(Exception):
wont_work_PS = gamma_PS + gamma_PS

def test_manipulate_data():

def test_manipulate_data():
old_x_mean = PS.ps_data['x [mm]'].mean()
PS.ps_data['x [mm]'] = PS.ps_data['x [mm]'] + 2
assert np.allclose(PS.ps_data['x [mm]'].mean(), old_x_mean + 2)


def test_filter_by_time():
data = DataLoaders.Load_TopasData(test_data_loc / 'coll_PhaseSpace_xAng_0.00_yAng_0.00_angular_error_0.0.phsp')
PS = PhaseSpace(data)
PS.ps_data['time [ps]'] = np.arange(len(PS))
filtered_PS = PS.filter_by_time(t_start=0, t_finish=np.floor(len(PS)/2))
assert len(filtered_PS) == np.ceil(len(PS)/2)
filtered_PS = PS.filter_by_time(t_start=0, t_finish=np.floor(len(PS) / 2))
assert len(filtered_PS) == np.ceil(len(PS) / 2)


def test_PS_reset():
data = DataLoaders.Load_TopasData(test_data_loc / 'coll_PhaseSpace_xAng_0.00_yAng_0.00_angular_error_0.0.phsp')
Expand All @@ -167,6 +173,7 @@ def test_PS_reset():
assert not PS.twiss_parameters
assert not PS.energy_stats


def test_beta_gamma_momentum_relation():
"""
for charged particles, should have
Expand All @@ -181,23 +188,26 @@ def test_beta_gamma_momentum_relation():

PS_electrons.fill_beta_and_gamma()
PS_electrons.fill_rest_mass()
px = np.multiply(np.multiply(PS_electrons.ps_data['beta_x'], PS_electrons.ps_data['gamma']), PS_electrons.ps_data['rest mass [MeV/c^2]'])
px = np.multiply(np.multiply(PS_electrons.ps_data['beta_x'], PS_electrons.ps_data['gamma']),
PS_electrons.ps_data['rest mass [MeV/c^2]'])
assert np.allclose(px, PS_electrons.ps_data['px [MeV/c]'])
py = np.multiply(np.multiply(PS_electrons.ps_data['beta_y'], PS_electrons.ps_data['gamma']), PS_electrons.ps_data['rest mass [MeV/c^2]'])
py = np.multiply(np.multiply(PS_electrons.ps_data['beta_y'], PS_electrons.ps_data['gamma']),
PS_electrons.ps_data['rest mass [MeV/c^2]'])
assert np.allclose(py, PS_electrons.ps_data['py [MeV/c]'])
pz = np.multiply(np.multiply(PS_electrons.ps_data['beta_z'], PS_electrons.ps_data['gamma']), PS_electrons.ps_data['rest mass [MeV/c^2]'])
pz = np.multiply(np.multiply(PS_electrons.ps_data['beta_z'], PS_electrons.ps_data['gamma']),
PS_electrons.ps_data['rest mass [MeV/c^2]'])
assert np.allclose(pz, PS_electrons.ps_data['pz [MeV/c]'])

def test_resample_kde():

def test_resample_kde():
data = DataLoaders.Load_TopasData(test_data_loc / 'coll_PhaseSpace_xAng_0.00_yAng_0.00_angular_error_0.0.phsp')
PS = PhaseSpace(data)
PS = PS('gammas') # take only the gammas for this example

resample_factor = 0.5
new_PS = PS.resample_via_gaussian_kde(n_new_particles_factor=resample_factor, interpolate_weights=False)

assert np.allclose(resample_factor, len(new_PS)/len(PS))
assert np.allclose(resample_factor, len(new_PS) / len(PS))
PS.calculate_energy_statistics()
new_PS.calculate_energy_statistics()
original_energy_stats = PS.energy_stats
Expand All @@ -210,3 +220,58 @@ def test_resample_kde():
assert abs(original_energy_stats['gammas'][energy_key] - new_energy_stats['gammas'][energy_key]) \
< energy_cut_off


def test_regrid():
data = DataLoaders.Load_TopasData(test_data_loc / 'coll_PhaseSpace_xAng_0.00_yAng_0.00_angular_error_0.0.phsp')
PS = PhaseSpace(data)
# regrid 100 bins
quantities = ['x', 'y', 'px', 'py', 'pz']
new_PS = PS.regrid(n_bins=100, quantities=quantities)

columns = new_PS._quantities_to_column_names(quantities)
for col in columns:
assert len(np.unique(new_PS.ps_data[col])) <= 100

# regrid x into 10 bins
new_PS = PS.regrid(quantities='x', n_bins=10)
assert len(np.unique(new_PS.ps_data['y [mm]'])) == len(np.unique(PS.ps_data['y [mm]']))
assert len(np.unique(new_PS.ps_data['x [mm]'])) == 10
# compare in Place to Note in Place
new_PS = PS.regrid()
PS.regrid(in_place=True)
assert np.allclose(new_PS.ps_data, PS.ps_data)


def test_merge():
data = DataLoaders.Load_TopasData(test_data_loc / 'coll_PhaseSpace_xAng_0.00_yAng_0.00_angular_error_0.0.phsp')
PS = PhaseSpace(data)

new_PS = PS.regrid(n_bins=10, in_place=False)
new_PS.merge(in_place=True)

# test merge didn't effect energy stats:
PS.calculate_energy_statistics()
new_PS.calculate_energy_statistics()
original_energy_stats = PS.energy_stats
for particle_key in original_energy_stats:
for energy_key in original_energy_stats[particle_key]:
np.allclose(original_energy_stats[particle_key][energy_key],
new_PS.energy_stats[particle_key][energy_key])
# test weight preservation
assert np.isclose(PS.ps_data['weight'].sum(), new_PS.ps_data['weight'].sum())

# test size of merge
assert len(new_PS) == 2043 # stability check


def test_sort():
data = DataLoaders.Load_TopasData(test_data_loc / 'coll_PhaseSpace_xAng_0.00_yAng_0.00_angular_error_0.0.phsp')
PS = PhaseSpace(data)
x_temp = PS.ps_data['x [mm]']
PS.sort('x')
assert np.allclose(np.sort(x_temp), PS.ps_data['x [mm]'])
# test default
PS.sort()
# test quantities
quantities = ['x', 'y', 'px', 'py', 'pz']
PS.sort(quantities_to_sort=quantities)

0 comments on commit cdc2355

Please sign in to comment.