Skip to content

Commit

Permalink
truncation
Browse files Browse the repository at this point in the history
  • Loading branch information
sidd3888 committed Jul 10, 2024
1 parent 5640890 commit bcc19d9
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 269 deletions.
306 changes: 66 additions & 240 deletions HARK/distribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,13 +905,12 @@ def rvs(self, size: int = 1, random_state=None):

return np.exp(Z.rvs(size, random_state=random_state))

def _approx_equiprobable2(
def _approx_equiprobable(
self,
N: int,
tail_N: int = 0,
tail_bound: Union[float, list, tuple] = None,
tail_order: float = np.e,
endpoints: bool = False,
decomp: str = "cholesky",
):
"""
Makes a discrete approximation using the equiprobable method to this multivariate lognormal distribution.
Expand All @@ -920,8 +919,12 @@ def _approx_equiprobable2(
----------
N : int
The number of points in the discrete approximation.
tail_bound : Union[float, list, tuple], optional
The values of the CDF according to which the distribution is truncated. If only a single number is specified, it is the lower tail bound and a symmetric upper bound is chosen. Can make one-tailed approximations with 0.0 or 1.0 as the lower and upper bound respectively. By default the distribution is not truncated.
endpoints : bool
To be added
decomp : str, optional
The method of decomposing the covariance matrix. Available options are the Cholesky decomposition, the positive-definite square root, and the eigendecomposition. By default the Cholesky decomposition is used. NOTE: The method of decomposition might affect the expectations of the discretized distribution along each dimension dfferently.
Returns
-------
Expand All @@ -933,11 +936,10 @@ def _approx_equiprobable2(
if endpoints:
raise NotImplementedError("Endpoints have not yet been implemented")

if tail_N < 0:
raise ValueError("tail_N must be a non-negative integer")

if tail_order < 1:
raise ValueError("tail_order must be greater than or equal to 1")
if decomp not in ["cholesky", "sqrt", "eig"]:
raise NotImplementedError(
"Decomposition method must be 'cholesky', 'sqrt' or 'eig'"
)

if np.array_equal(self.Sigma, np.diag(np.diag(self.Sigma))):
ind_atoms = np.empty((self.M, N))
Expand All @@ -960,7 +962,7 @@ def _approx_equiprobable2(
).T
pmv = np.repeat(1 / (N**self.M), N**self.M)
else:
if tail_bound is not None and tail_N > 0:
if tail_bound is not None:
if type(tail_bound) is float:
tail_bound = [tail_bound, 1 - tail_bound]

Expand All @@ -969,10 +971,12 @@ def _approx_equiprobable2(

cdf_cuts = np.linspace(tail_bound[0], tail_bound[1], N + 1)
int_prob = tail_bound[1] - tail_bound[0]
tail_N = 1

else:
cdf_cuts = np.linspace(0, 1, N + 1)
int_prob = 1.0
tail_N = 0

Z = Normal()

Expand All @@ -994,8 +998,6 @@ def _approx_equiprobable2(
inners[:tail_N] = [(tail_N - i) for i in range(tail_N)]
inners[-tail_N:] = [(i + 1) for i in range(tail_N)]

L = np.linalg.cholesky(self.Sigma)

def eval(params, z):
inds = []
excl = []
Expand Down Expand Up @@ -1030,62 +1032,87 @@ def eval(params, z):

return x_exp * x_others * (N / int_prob) ** dim

for i in range(self.M):
mui = self.mu[i]
params = L[i, 0 : i + 1]
if decomp == "cholesky":
L = np.linalg.cholesky(self.Sigma)

Z_list = [z_bins for _ in range(i + 1)]
for i in range(self.M):
mui = self.mu[i]
params = L[i, 0 : i + 1]

Z_bins = [np.array(x) for x in list(product(*Z_list))]
Z_list = [z_bins for _ in range(i + 1)]

xi_atoms = []
Z_bins = [np.array(x) for x in list(product(*Z_list))]

for z_bin in Z_bins:
atom = np.exp(mui) * eval(params, z_bin)
xi_atoms.append(atom)
xi_atoms = []

xi_atoms_arr = np.repeat(
xi_atoms, (N + 2 * tail_N) ** (self.M - (i + 1))
)
for z_bin in Z_bins:
atom = np.exp(mui) * eval(params, z_bin)
xi_atoms.append(atom)

inners_i = [inners for _ in range((N + 2 * tail_N) ** i)]
xi_atoms_arr = np.repeat(
xi_atoms, (N + 2 * tail_N) ** (self.M - (i + 1))
)

interiors[i] = np.repeat(
[*inners_i], (N + 2 * tail_N) ** (self.M - (i + 1))
)
inners_i = [inners for _ in range((N + 2 * tail_N) ** i)]

interiors[i] = np.repeat(
[*inners_i], (N + 2 * tail_N) ** (self.M - (i + 1))
)

atoms[i] = xi_atoms_arr
else:
Λ, Q = np.linalg.eig(self.Sigma)

A = Q @ np.diag(np.sqrt(Λ))

if decomp == "sqrt":
A = A @ Q.T

for i in range(self.M):
mui = self.mu[i]
params = A[i]

Z_list = [z_bins for _ in range(self.M)]

Z_bins = [np.array(x) for x in list(product(*Z_list))]

xi_atoms = []

for z_bin in Z_bins:
atom = np.exp(mui) * eval(params, z_bin)
xi_atoms.append(atom)

inners_i = [inners for _ in range((N + 2 * tail_N) ** i)]

interiors[i] = np.repeat(
[*inners_i], (N + 2 * tail_N) ** (self.M - (i + 1))
)

atoms[i] = xi_atoms_arr
atoms[i] = xi_atoms

max_locs = np.argmax(np.abs(interiors), axis=0)

max_inds = np.stack([max_locs, np.arange(len(max_locs))], axis=1)

prob_locs = interiors[max_inds[:, 0], max_inds[:, 1]]

low_sum = np.sum(tail_order ** np.abs(prob_locs[prob_locs > 0]))

high_sum = np.sum(tail_order ** np.abs(prob_locs[prob_locs < 0]))

def prob_assign(x):
if x == 0:
return (int_prob) / (N**self.M)
elif x < 0:
return (1 - tail_bound[1]) * (tail_order ** (-x)) / high_sum
return 1 / (N**self.M)
else:
return tail_bound[0] * (tail_order**x) / low_sum
return 0.0

prob_vec = np.vectorize(prob_assign)

pmv = prob_vec(prob_locs)

limit = {
"dist": self,
"method": "equiprobable2",
"method": "equiprobable",
"N": N,
"endpoints": endpoints,
"tail_N": tail_N,
"tail_bound": tail_bound,
"tail_order": tail_order,
"decomp": decomp,
}

return DiscreteDistribution(
Expand All @@ -1095,207 +1122,6 @@ def prob_assign(x):
limit=limit,
)

def _approx_equiprobable(self, N, endpoints: bool = False):
"""
Makes a discrete approximation using the equiprobable method to this multivariate lognormal distribution.
Parameters
----------
N : int
The number of points in the discrete approximation.
endpoints : bool
To be added
Returns
-------
d : DiscreteDistribution
Probability associated with each point in array of discrete
points for discrete probability mass function.
"""

if endpoints:
raise NotImplementedError("Endpoints have not yet been implemented")

if np.array_equal(self.Sigma, np.diag(np.diag(self.Sigma))):
ind_atoms = np.empty((self.M, N))

for i in range(self.M):
if self.Sigma[i, i] == 0.0:
x_atoms = np.repeat(np.exp(self.mu[i]), N)
ind_atoms[i] = x_atoms
else:
x_atoms = (
Lognormal(mu=self.mu[i], sigma=np.sqrt(self.Sigma[i, i]))
._approx_equiprobable(N)
.atoms
)
ind_atoms[i] = x_atoms

atoms_list = [ind_atoms[i] for i in range(self.M)]
atoms = np.stack(
[ar.flatten() for ar in list(np.meshgrid(*atoms_list))], axis=1
).T
pmv = np.repeat(1 / (N**self.M), N**self.M)
else:
cdf_cuts = np.linspace(0, 1, N + 1)

Z = Normal()

z_cuts = Z.ppf(cdf_cuts)
z_bins = [(z_cuts[i], z_cuts[i + 1]) for i in range(N)]

atoms = np.empty([self.M, N**self.M])

L = np.linalg.cholesky(self.Sigma)

def eval(params, z):
dim = len(params)

p = np.repeat(params, 2).reshape(dim, 2)

Z = np.multiply(p, z)

bounds = ((p**2 - Z) / (np.sqrt(2) * p)).T

x_exp = (
-0.5
* np.exp(np.square(params) / 2)
* (special.erf(bounds[1]) - special.erf(bounds[0]))
)

return np.prod(x_exp) * N**dim

for i in range(self.M):
mui = self.mu[i]
params = L[i, 0 : i + 1]

Z_list = [z_bins for _ in range(i + 1)]

Z_bins = [np.array(x) for x in list(product(*Z_list))]

xi_atoms = []

for z_bin in Z_bins:
atom = np.exp(mui) * eval(params, z_bin)
xi_atoms.append(atom)

xi_atoms_arr = np.repeat(xi_atoms, N ** (self.M - (i + 1)))

atoms[i] = xi_atoms_arr

pmv = np.repeat(1 / (N**self.M), N**self.M)

limit = {"dist": self, "method": "equiprobable", "N": N}

return DiscreteDistribution(
pmv,
atoms,
seed=self._rng.integers(0, 2**31 - 1, dtype="int32"),
limit=limit,
)

def _approx_equiprobable_eig(self, N, endpoints: bool = False, sqrt: bool = True):
"""
Makes a discrete approximation using the equiprobable method to this multivariate lognormal distribution.
Parameters
----------
N : int
The number of points in the discrete approximation.
endpoints : bool
To be added
Returns
-------
d : DiscreteDistribution
Probability associated with each point in array of discrete
points for discrete probability mass function.
"""

if endpoints:
raise NotImplementedError("Endpoints have not yet been implemented")

if np.array_equal(self.Sigma, np.diag(np.diag(self.Sigma))):
ind_atoms = np.empty((self.M, N))

for i in range(self.M):
if self.Sigma[i, i] == 0.0:
x_atoms = np.repeat(np.exp(self.mu[i]), N)
ind_atoms[i] = x_atoms
else:
x_atoms = (
Lognormal(mu=self.mu[i], sigma=np.sqrt(self.Sigma[i, i]))
._approx_equiprobable(N)
.atoms
)
ind_atoms[i] = x_atoms

atoms_list = [ind_atoms[i] for i in range(self.M)]
atoms = np.stack(
[ar.flatten() for ar in list(np.meshgrid(*atoms_list))], axis=1
).T
pmv = np.repeat(1 / (N**self.M), N**self.M)
else:
cdf_cuts = np.linspace(0, 1, N + 1)

Z = Normal()

z_cuts = Z.ppf(cdf_cuts)
z_bins = [(z_cuts[i], z_cuts[i + 1]) for i in range(N)]

atoms = np.empty([self.M, N**self.M])

def eval(params, z):
dim = len(params)

p = np.repeat(params, 2).reshape(dim, 2)

Z = np.multiply(p, z)

bounds = ((p**2 - Z) / (np.sqrt(2) * p)).T

x_exp = (
-0.5
* np.exp(np.square(params) / 2)
* (special.erf(bounds[1]) - special.erf(bounds[0]))
)

return np.prod(x_exp) * N**dim

Λ, Q = np.linalg.eig(self.Sigma)

A = Q @ np.diag(np.sqrt(Λ))

if sqrt:
A = A @ Q.T

for i in range(self.M):
mui = self.mu[i]
params = A[i]

Z_list = [z_bins for _ in range(self.M)]

Z_bins = [np.array(x) for x in list(product(*Z_list))]

xi_atoms = []

for z_bin in Z_bins:
atom = np.exp(mui) * eval(params, z_bin)
xi_atoms.append(atom)

atoms[i] = xi_atoms

pmv = np.repeat(1 / (N**self.M), N**self.M)

limit = {"dist": self, "method": "equiprobable_eig", "N": N}

return DiscreteDistribution(
pmv,
atoms,
seed=self._rng.integers(0, 2**31 - 1, dtype="int32"),
limit=limit,
)


# DISCRETE DISTRIBUTIONS

Expand Down
Loading

0 comments on commit bcc19d9

Please sign in to comment.