From 091b420f46a7db6001293c55b06c1683130a33b0 Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Wed, 24 Aug 2022 10:23:00 -0400 Subject: [PATCH 01/19] Use numba in for loop of _update_word_topic_assignments and _update_peak_assignments --- nimare/annotate/gclda.py | 233 ++++++++++++++++++++++++++++++--------- 1 file changed, 179 insertions(+), 54 deletions(-) diff --git a/nimare/annotate/gclda.py b/nimare/annotate/gclda.py index 4668f8b6b..88dd42be9 100755 --- a/nimare/annotate/gclda.py +++ b/nimare/annotate/gclda.py @@ -6,6 +6,7 @@ import numpy as np import pandas as pd from nilearn._utils import load_niimg +from numba import jit from scipy.stats import multivariate_normal from nimare.base import NiMAREBase @@ -408,11 +409,49 @@ def _update(self, loglikely_freq=1): LGR.debug(f"Iter {self.iter:04d}: Sampling z") self.seed += 1 - self._update_word_topic_assignments(self.seed) # Update z-assignments + ( + self.topics["wtoken_topic_idx"], + self.topics["n_word_tokens_word_by_topic"], + self.topics["total_n_word_tokens_by_topic"], + self.topics["n_word_tokens_doc_by_topic"], + ) = self._update_word_topic_assignments( + self.topics["wtoken_topic_idx"], + self.topics["n_word_tokens_word_by_topic"], + self.topics["total_n_word_tokens_by_topic"], + self.topics["n_word_tokens_doc_by_topic"], + self.topics["n_peak_tokens_doc_by_topic"], + np.array(self.data["wtoken_doc_idx"]), + np.array(self.data["wtoken_word_idx"]), + len(self.vocabulary), + self.params["beta"], + self.params["gamma"], + self.seed, + ) # Update z-assignments LGR.debug(f"Iter {self.iter:04d}: Sampling y|r") self.seed += 1 - self._update_peak_assignments(self.seed) # Update y-assignments + # Retrieve p(x|r,y) for all subregions + peak_probs = self._get_peak_probs(self) + ( + self.topics["n_peak_tokens_region_by_topic"], + self.topics["n_peak_tokens_doc_by_topic"], + self.topics["peak_topic_idx"], + self.topics["peak_region_idx"], + ) = self._update_peak_assignments( + self.topics["n_peak_tokens_region_by_topic"], + self.topics["n_peak_tokens_doc_by_topic"], + self.topics["peak_topic_idx"], + self.topics["peak_region_idx"], + self.topics["n_word_tokens_doc_by_topic"], + peak_probs, + np.array(self.data["ptoken_doc_idx"]), + self.params["n_regions"], + self.params["n_topics"], + self.params["alpha"], + self.params["delta"], + self.params["gamma"], + self.seed, + ) # Update y-assignments LGR.debug(f"Iter {self.iter:04d}: Updating spatial params") self._update_regions() # Update gaussian estimates for all subregions @@ -430,46 +469,84 @@ def _update(self, loglikely_freq=1): f"tot = {self.loglikelihood['total'][-1]:10.1f}" ) - def _update_word_topic_assignments(self, randseed): + @staticmethod + @jit(nopython=True) + def _update_word_topic_assignments( + word_topic_idx, + nz_word_topic, + nz_sum_topic, + nz_doc_topic, + ny_doc_topic, + word_doc_idx, + word_idx, + voc_len, + beta, + gamma, + randseed, + ): """Update wtoken_topic_idx (z) indicator variables assigning words->topics. Parameters ---------- + word_topic_idx : :obj:`numpy.ndarray` + Topic assignment for word tokens. + nz_word_topic : (W x T) :obj:`numpy.ndarray` + Number of word-tokens assigned to each topic (T) per word-type (W). + nz_sum_topic : (1 x T) :obj:`numpy.ndarray` + Total number of word-tokens assigned to each topic (T) (across all docs). + nz_doc_topic : (D x T) :obj:`numpy.ndarray` + Number of word-tokens assigned to each topic (T) per document (D). + ny_doc_topic : (D x T) :obj:`numpy.ndarray` + Number of peak-tokens assigned to each topic (T) per document (D). + word_doc_idx : :obj:`numpy.ndarray` + Document index. + word_idx : :obj:`numpy.ndarray` + Word tokens. + voc_len : :obj:`int` + Total number of word-tokens in the vocabulary. + beta : :obj:`float` + Prior count on word-types for each topic. + gamma : :obj:`float` + Prior count added to y-counts when sampling z assignments. randseed : :obj:`int` Random seed for this iteration. + + Returns + ------- + word_topic_idx : :obj:`numpy.ndarray` + Updated topic assignment for word tokens. + nz_word_topic : (W x T) :obj:`numpy.ndarray` + Updated number of word-tokens assigned to each topic (T) per word-type (W). + nz_sum_topic : (1 x T) :obj:`numpy.ndarray` + Updated total number of word-tokens assigned to each topic (T) (across all docs). + nz_doc_topic : (D x T) :obj:`numpy.ndarray` + Updated number of word-tokens assigned to each topic (T) per document (D). """ # --- Seed random number generator np.random.seed(randseed) # Loop over all word tokens - for i_wtoken, word in enumerate(self.data["wtoken_word_idx"]): + for i_wtoken, word in enumerate(word_idx): # Find document in which word token (not just word) appears - doc = self.data["wtoken_doc_idx"][i_wtoken] + doc = word_doc_idx[i_wtoken] # current topic assignment for word token w_i - topic = self.topics["wtoken_topic_idx"][i_wtoken] + topic = word_topic_idx[i_wtoken] # Decrement count-matrices to remove current wtoken_topic_idx # because wtoken will be reassigned to a new topic - self.topics["n_word_tokens_word_by_topic"][word, topic] -= 1 - self.topics["total_n_word_tokens_by_topic"][0, topic] -= 1 - self.topics["n_word_tokens_doc_by_topic"][doc, topic] -= 1 + nz_word_topic[word, topic] -= 1 + nz_sum_topic[0, topic] -= 1 + nz_doc_topic[doc, topic] -= 1 # Get sampling distribution: # p(z_i|z,d,w) ~ p(w|t) * p(t|d) # ~ p_w_t * p_topic_g_doc - p_word_g_topic = ( - self.topics["n_word_tokens_word_by_topic"][word, :] + self.params["beta"] - ) / ( - self.topics["total_n_word_tokens_by_topic"] - + (self.params["beta"] * len(self.vocabulary)) - ) - p_topic_g_doc = ( - self.topics["n_peak_tokens_doc_by_topic"][doc, :] + self.params["gamma"] - ) + p_word_g_topic = (nz_word_topic[word, :] + beta) / (nz_sum_topic + (beta * voc_len)) + p_topic_g_doc = ny_doc_topic[doc, :] + gamma probs = p_word_g_topic * p_topic_g_doc # The unnormalized sampling distribution - + # print(probs.shape) # Sample a z_i assignment for the current word-token from the sampling distribution - probs = np.squeeze(probs) / np.sum(probs) # Normalize the sampling distribution + probs = probs.reshape(-1) / np.sum(probs) # Normalize the sampling distribution # Numpy returns a binary [1 x T] vector with a '1' in the index of sampled topic # and zeros everywhere else assigned_topic_vec = np.random.multinomial(1, probs) @@ -477,34 +554,84 @@ def _update_word_topic_assignments(self, randseed): topic = np.where(assigned_topic_vec)[0][0] # Update the indices and the count matrices using the sampled z assignment - self.topics["wtoken_topic_idx"][i_wtoken] = topic # Update w_i topic-assignment - self.topics["n_word_tokens_word_by_topic"][word, topic] += 1 - self.topics["total_n_word_tokens_by_topic"][0, topic] += 1 - self.topics["n_word_tokens_doc_by_topic"][doc, topic] += 1 - - def _update_peak_assignments(self, randseed): + word_topic_idx[i_wtoken] = topic # Update w_i topic-assignment + nz_word_topic[word, topic] += 1 + nz_sum_topic[0, topic] += 1 + nz_doc_topic[doc, topic] += 1 + + return word_topic_idx, nz_word_topic, nz_sum_topic, nz_doc_topic + + @staticmethod + @jit(nopython=True) + def _update_peak_assignments( + ny_region_topic, + ny_doc_topic, + peak_topic_idx, + peak_region_idx, + nz_doc_topic, + peak_probs, + doc_idx, + n_regions, + n_topics, + alpha, + delta, + gamma, + randseed, + ): """Update y / r indicator variables assigning peaks->topics/subregions. Parameters ---------- + ny_region_topic : (R x T) :obj:`numpy.ndarray` + Number of peak-tokens assigned to each subregion (R) per topic (T). + ny_doc_topic : (D x T) :obj:`numpy.ndarray` + Number of peak-tokens assigned to each topic (T) per document (D). + peak_topic_idx : :obj:`numpy.ndarray` + Topic index. + peak_region_idx : :obj:`numpy.ndarray` + Region index. + nz_doc_topic : (D x T) :obj:`numpy.ndarray` + Number of word-tokens assigned to each topic (T) per document (D). + peak_probs : (D x T x R) :obj:`numpy.ndarray` + Matrix of probabilities of all peaks given all topic/subregion spatial distributions + doc_idx : :obj:`numpy.ndarray` + Document index. + n_regions : :obj:`int` + Total number of regions. + n_topics : :obj:`int` + Total number of topics. + alpha : :obj:`float` + Prior count on topics for each document. + delta : :obj:`float` + Prior count on subregions for each topic. + gamma : :obj:`float` + Prior count added to y-counts when sampling z assignments. randseed : :obj:`int` Random seed for this iteration. + + Returns + ------- + ny_region_topic : (R x T) :obj:`numpy.ndarray` + Updated number of peak-tokens assigned to each subregion (R) per topic (T). + ny_doc_topic : (D x T) :obj:`numpy.ndarray` + Updated number of peak-tokens assigned to each topic (T) per document (D). + peak_topic_idx : :obj:`numpy.ndarray` + Updated topic index. + peak_region_idx : :obj:`numpy.ndarray` + Updated region index. """ # Seed random number generator np.random.seed(randseed) - # Retrieve p(x|r,y) for all subregions - peak_probs = self._get_peak_probs(self) - # Iterate over all peaks x, and sample a new y and r assignment for each - for i_ptoken, doc in enumerate(self.data["ptoken_doc_idx"]): - topic = self.topics["peak_topic_idx"][i_ptoken] - region = self.topics["peak_region_idx"][i_ptoken] + for i_ptoken, doc in enumerate(doc_idx): + topic = peak_topic_idx[i_ptoken] + region = peak_region_idx[i_ptoken] # Decrement count-matrices to remove current ptoken_topic_idx # because ptoken will be reassigned to a new topic - self.topics["n_peak_tokens_region_by_topic"][region, topic] -= 1 - self.topics["n_peak_tokens_doc_by_topic"][doc, topic] -= 1 + ny_region_topic[region, topic] -= 1 # Decrement count in Subregion x Topic count matx + ny_doc_topic[doc, topic] -= 1 # Decrement count in Document x Topic count matrix # Retrieve the probability of generating current x from all # subregions: [R x T] array of probs @@ -513,19 +640,17 @@ def _update_peak_assignments(self, randseed): # Compute the probabilities of all subregions given doc # p(r|d) ~ p(r|t) * p(t|d) # Counts of subregions per topic + prior: p(r|t) - p_region_g_topic = self.topics["n_peak_tokens_region_by_topic"] + self.params["delta"] + p_region_g_topic = ny_region_topic + delta # Normalize the columns such that each topic's distribution over subregions sums to 1 - p_region_g_topic = p_region_g_topic / np.sum(p_region_g_topic, axis=0, keepdims=True) + p_region_g_topic_sum = np.sum(p_region_g_topic, axis=0) + p_region_g_topic = np.divide(p_region_g_topic, p_region_g_topic_sum) # Counts of topics per document + prior: p(t|d) - p_topic_g_doc = ( - self.topics["n_peak_tokens_doc_by_topic"][doc, :] + self.params["alpha"] - ) + p_topic_g_doc = ny_doc_topic[doc, :] + alpha # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows - # Makes it the same shape as p_region_g_topic - p_topic_g_doc = np.array([p_topic_g_doc] * self.params["n_regions"]) + p_topic_g_doc = p_topic_g_doc.repeat(n_regions).reshape((-1, n_regions)).T # Compute p(subregion | document): p(r|d) ~ p(r|t) * p(t|d) # [R x T] array of probs @@ -536,25 +661,26 @@ def _update_peak_assignments(self, randseed): # The multinomial from which z is sampled is proportional to number # of y assigned to each topic, plus constant gamma # Compute the proportional probabilities in log-space - logp = self.topics["n_word_tokens_doc_by_topic"][doc, :] * np.log( - (self.topics["n_peak_tokens_doc_by_topic"][doc, :] + self.params["gamma"] + 1) - / (self.topics["n_peak_tokens_doc_by_topic"][doc, :] + self.params["gamma"]) + logp = nz_doc_topic[doc, :] * np.log( + (ny_doc_topic[doc, :] + gamma + 1) / (ny_doc_topic[doc, :] + gamma) ) # Add a constant before exponentiating to avoid any underflow issues p_peak_g_topic = np.exp(logp - np.max(logp)) # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows - p_peak_g_topic = np.array([p_peak_g_topic] * self.params["n_regions"]) + p_peak_g_topic = p_peak_g_topic.repeat(n_regions).reshape((-1, n_regions)).T # Get the full sampling distribution: # [R x T] array containing the proportional probability of all y/r combinations probs_pdf = p_x_subregions * p_region_g_doc * p_peak_g_topic # Convert from a [R x T] matrix into a [R*T x 1] array we can sample from - probs_pdf = np.reshape(probs_pdf, (self.params["n_regions"] * self.params["n_topics"])) + probs_pdf = np.reshape(probs_pdf, (n_regions * n_topics)) # Normalize the sampling distribution probs_pdf = probs_pdf / np.sum(probs_pdf) + # Float precision issue in Numba: https://github.com/numba/numba/issues/3426 + probs_pdf = np.trunc(probs_pdf * (10**12)) / (10**12) # Sample a single element (corresponding to a y_i and c_i assignment for the ptoken) # from the sampling distribution @@ -563,10 +689,7 @@ def _update_peak_assignments(self, randseed): assignment_vec = np.random.multinomial(1, probs_pdf) # Reshape 1D back to [R x T] 2D - assignment_arr = np.reshape( - assignment_vec, - (self.params["n_regions"], self.params["n_topics"]), - ) + assignment_arr = np.reshape(assignment_vec, (n_regions, n_topics)) # Transform the linear index of the sampled element into the # subregion/topic (r/y) assignment indices assignment_idx = np.where(assignment_arr) @@ -577,13 +700,15 @@ def _update_peak_assignments(self, randseed): # Update the indices and the count matrices using the sampled y/r assignments # Increment count in Subregion x Topic count matrix - self.topics["n_peak_tokens_region_by_topic"][region, topic] += 1 + ny_region_topic[region, topic] += 1 # Increment count in Document x Topic count matrix - self.topics["n_peak_tokens_doc_by_topic"][doc, topic] += 1 + ny_doc_topic[doc, topic] += 1 # Update y->topic assignment - self.topics["peak_topic_idx"][i_ptoken] = topic + peak_topic_idx[i_ptoken] = topic # Update y->subregion assignment - self.topics["peak_region_idx"][i_ptoken] = region + peak_region_idx[i_ptoken] = region + + return ny_region_topic, ny_doc_topic, peak_topic_idx, peak_region_idx def _update_regions(self): """Update spatial distribution parameters (Gaussians params for all subregions). From 4e7ca79c7e10dc813bfa15a98071eaeeef3d8946 Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Wed, 24 Aug 2022 15:02:16 -0400 Subject: [PATCH 02/19] Convert staticmethod to function --- nimare/annotate/gclda.py | 486 +++++++++++++++++++-------------------- 1 file changed, 243 insertions(+), 243 deletions(-) diff --git a/nimare/annotate/gclda.py b/nimare/annotate/gclda.py index 88dd42be9..396749206 100755 --- a/nimare/annotate/gclda.py +++ b/nimare/annotate/gclda.py @@ -15,6 +15,247 @@ LGR = logging.getLogger(__name__) +@jit(nopython=True) +def _update_word_topic_assignments( + word_topic_idx, + nz_word_topic, + nz_sum_topic, + nz_doc_topic, + ny_doc_topic, + word_doc_idx, + word_idx, + voc_len, + beta, + gamma, + randseed, +): + """Update wtoken_topic_idx (z) indicator variables assigning words->topics. + + Parameters + ---------- + word_topic_idx : :obj:`numpy.ndarray` + Topic assignment for word tokens. + nz_word_topic : (W x T) :obj:`numpy.ndarray` + Number of word-tokens assigned to each topic (T) per word-type (W). + nz_sum_topic : (1 x T) :obj:`numpy.ndarray` + Total number of word-tokens assigned to each topic (T) (across all docs). + nz_doc_topic : (D x T) :obj:`numpy.ndarray` + Number of word-tokens assigned to each topic (T) per document (D). + ny_doc_topic : (D x T) :obj:`numpy.ndarray` + Number of peak-tokens assigned to each topic (T) per document (D). + word_doc_idx : :obj:`numpy.ndarray` + Document index. + word_idx : :obj:`numpy.ndarray` + Word tokens. + voc_len : :obj:`int` + Total number of word-tokens in the vocabulary. + beta : :obj:`float` + Prior count on word-types for each topic. + gamma : :obj:`float` + Prior count added to y-counts when sampling z assignments. + randseed : :obj:`int` + Random seed for this iteration. + + Returns + ------- + word_topic_idx : :obj:`numpy.ndarray` + Updated topic assignment for word tokens. + nz_word_topic : (W x T) :obj:`numpy.ndarray` + Updated number of word-tokens assigned to each topic (T) per word-type (W). + nz_sum_topic : (1 x T) :obj:`numpy.ndarray` + Updated total number of word-tokens assigned to each topic (T) (across all docs). + nz_doc_topic : (D x T) :obj:`numpy.ndarray` + Updated number of word-tokens assigned to each topic (T) per document (D). + """ + # --- Seed random number generator + np.random.seed(randseed) + + # Loop over all word tokens + for i_wtoken, word in enumerate(word_idx): + # Find document in which word token (not just word) appears + doc = word_doc_idx[i_wtoken] + # current topic assignment for word token w_i + topic = word_topic_idx[i_wtoken] + + # Decrement count-matrices to remove current wtoken_topic_idx + # because wtoken will be reassigned to a new topic + nz_word_topic[word, topic] -= 1 + nz_sum_topic[0, topic] -= 1 + nz_doc_topic[doc, topic] -= 1 + + # Get sampling distribution: + # p(z_i|z,d,w) ~ p(w|t) * p(t|d) + # ~ p_w_t * p_topic_g_doc + p_word_g_topic = (nz_word_topic[word, :] + beta) / (nz_sum_topic + (beta * voc_len)) + p_topic_g_doc = ny_doc_topic[doc, :] + gamma + probs = p_word_g_topic * p_topic_g_doc # The unnormalized sampling distribution + # print(probs.shape) + # Sample a z_i assignment for the current word-token from the sampling distribution + probs = probs.reshape(-1) / np.sum(probs) # Normalize the sampling distribution + # Numpy returns a binary [1 x T] vector with a '1' in the index of sampled topic + # and zeros everywhere else + assigned_topic_vec = np.random.multinomial(1, probs) + # Extract selected topic from vector + topic = np.where(assigned_topic_vec)[0][0] + + # Update the indices and the count matrices using the sampled z assignment + word_topic_idx[i_wtoken] = topic # Update w_i topic-assignment + nz_word_topic[word, topic] += 1 + nz_sum_topic[0, topic] += 1 + nz_doc_topic[doc, topic] += 1 + + return word_topic_idx, nz_word_topic, nz_sum_topic, nz_doc_topic + + +@jit(nopython=True) +def _update_peak_assignments( + ny_region_topic, + ny_doc_topic, + peak_topic_idx, + peak_region_idx, + nz_doc_topic, + peak_probs, + doc_idx, + n_regions, + n_topics, + alpha, + delta, + gamma, + randseed, +): + """Update y / r indicator variables assigning peaks->topics/subregions. + + Parameters + ---------- + ny_region_topic : (R x T) :obj:`numpy.ndarray` + Number of peak-tokens assigned to each subregion (R) per topic (T). + ny_doc_topic : (D x T) :obj:`numpy.ndarray` + Number of peak-tokens assigned to each topic (T) per document (D). + peak_topic_idx : :obj:`numpy.ndarray` + Topic index. + peak_region_idx : :obj:`numpy.ndarray` + Region index. + nz_doc_topic : (D x T) :obj:`numpy.ndarray` + Number of word-tokens assigned to each topic (T) per document (D). + peak_probs : (D x T x R) :obj:`numpy.ndarray` + Matrix of probabilities of all peaks given all topic/subregion spatial distributions + doc_idx : :obj:`numpy.ndarray` + Document index. + n_regions : :obj:`int` + Total number of regions. + n_topics : :obj:`int` + Total number of topics. + alpha : :obj:`float` + Prior count on topics for each document. + delta : :obj:`float` + Prior count on subregions for each topic. + gamma : :obj:`float` + Prior count added to y-counts when sampling z assignments. + randseed : :obj:`int` + Random seed for this iteration. + + Returns + ------- + ny_region_topic : (R x T) :obj:`numpy.ndarray` + Updated number of peak-tokens assigned to each subregion (R) per topic (T). + ny_doc_topic : (D x T) :obj:`numpy.ndarray` + Updated number of peak-tokens assigned to each topic (T) per document (D). + peak_topic_idx : :obj:`numpy.ndarray` + Updated topic index. + peak_region_idx : :obj:`numpy.ndarray` + Updated region index. + """ + # Seed random number generator + np.random.seed(randseed) + + # Iterate over all peaks x, and sample a new y and r assignment for each + for i_ptoken, doc in enumerate(doc_idx): + topic = peak_topic_idx[i_ptoken] + region = peak_region_idx[i_ptoken] + + # Decrement count-matrices to remove current ptoken_topic_idx + # because ptoken will be reassigned to a new topic + ny_region_topic[region, topic] -= 1 # Decrement count in Subregion x Topic count matx + ny_doc_topic[doc, topic] -= 1 # Decrement count in Document x Topic count matrix + + # Retrieve the probability of generating current x from all + # subregions: [R x T] array of probs + p_x_subregions = (peak_probs[i_ptoken, :, :]).transpose() + + # Compute the probabilities of all subregions given doc + # p(r|d) ~ p(r|t) * p(t|d) + # Counts of subregions per topic + prior: p(r|t) + p_region_g_topic = ny_region_topic + delta + + # Normalize the columns such that each topic's distribution over subregions sums to 1 + p_region_g_topic_sum = np.sum(p_region_g_topic, axis=0) + p_region_g_topic = np.divide(p_region_g_topic, p_region_g_topic_sum) + + # Counts of topics per document + prior: p(t|d) + p_topic_g_doc = ny_doc_topic[doc, :] + alpha + + # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows + p_topic_g_doc = p_topic_g_doc.repeat(n_regions).reshape((-1, n_regions)).T + + # Compute p(subregion | document): p(r|d) ~ p(r|t) * p(t|d) + # [R x T] array of probs + p_region_g_doc = p_topic_g_doc * p_region_g_topic + + # Compute the multinomial probability: p(z|y) + # Need the current vector of all z and y assignments for current doc + # The multinomial from which z is sampled is proportional to number + # of y assigned to each topic, plus constant gamma + # Compute the proportional probabilities in log-space + logp = nz_doc_topic[doc, :] * np.log( + (ny_doc_topic[doc, :] + gamma + 1) / (ny_doc_topic[doc, :] + gamma) + ) + # Add a constant before exponentiating to avoid any underflow issues + p_peak_g_topic = np.exp(logp - np.max(logp)) + + # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows + p_peak_g_topic = p_peak_g_topic.repeat(n_regions).reshape((-1, n_regions)).T + + # Get the full sampling distribution: + # [R x T] array containing the proportional probability of all y/r combinations + probs_pdf = p_x_subregions * p_region_g_doc * p_peak_g_topic + + # Convert from a [R x T] matrix into a [R*T x 1] array we can sample from + probs_pdf = np.reshape(probs_pdf, (n_regions * n_topics)) + + # Normalize the sampling distribution + probs_pdf = probs_pdf / np.sum(probs_pdf) + # Float precision issue in Numba: https://github.com/numba/numba/issues/3426 + probs_pdf = np.trunc(probs_pdf * (10**12)) / (10**12) + + # Sample a single element (corresponding to a y_i and c_i assignment for the ptoken) + # from the sampling distribution + # Returns a binary [1 x R*T] vector with a '1' in location that was sampled + # and zeros everywhere else + assignment_vec = np.random.multinomial(1, probs_pdf) + + # Reshape 1D back to [R x T] 2D + assignment_arr = np.reshape(assignment_vec, (n_regions, n_topics)) + # Transform the linear index of the sampled element into the + # subregion/topic (r/y) assignment indices + assignment_idx = np.where(assignment_arr) + # Subregion sampled (r) + region = assignment_idx[0][0] + # Topic sampled (y) + topic = assignment_idx[1][0] + + # Update the indices and the count matrices using the sampled y/r assignments + # Increment count in Subregion x Topic count matrix + ny_region_topic[region, topic] += 1 + # Increment count in Document x Topic count matrix + ny_doc_topic[doc, topic] += 1 + # Update y->topic assignment + peak_topic_idx[i_ptoken] = topic + # Update y->subregion assignment + peak_region_idx[i_ptoken] = region + + return ny_region_topic, ny_doc_topic, peak_topic_idx, peak_region_idx + + class GCLDAModel(NiMAREBase): """Generate a generalized correspondence latent Dirichlet allocation (GCLDA) topic model. @@ -414,7 +655,7 @@ def _update(self, loglikely_freq=1): self.topics["n_word_tokens_word_by_topic"], self.topics["total_n_word_tokens_by_topic"], self.topics["n_word_tokens_doc_by_topic"], - ) = self._update_word_topic_assignments( + ) = _update_word_topic_assignments( self.topics["wtoken_topic_idx"], self.topics["n_word_tokens_word_by_topic"], self.topics["total_n_word_tokens_by_topic"], @@ -437,7 +678,7 @@ def _update(self, loglikely_freq=1): self.topics["n_peak_tokens_doc_by_topic"], self.topics["peak_topic_idx"], self.topics["peak_region_idx"], - ) = self._update_peak_assignments( + ) = _update_peak_assignments( self.topics["n_peak_tokens_region_by_topic"], self.topics["n_peak_tokens_doc_by_topic"], self.topics["peak_topic_idx"], @@ -469,247 +710,6 @@ def _update(self, loglikely_freq=1): f"tot = {self.loglikelihood['total'][-1]:10.1f}" ) - @staticmethod - @jit(nopython=True) - def _update_word_topic_assignments( - word_topic_idx, - nz_word_topic, - nz_sum_topic, - nz_doc_topic, - ny_doc_topic, - word_doc_idx, - word_idx, - voc_len, - beta, - gamma, - randseed, - ): - """Update wtoken_topic_idx (z) indicator variables assigning words->topics. - - Parameters - ---------- - word_topic_idx : :obj:`numpy.ndarray` - Topic assignment for word tokens. - nz_word_topic : (W x T) :obj:`numpy.ndarray` - Number of word-tokens assigned to each topic (T) per word-type (W). - nz_sum_topic : (1 x T) :obj:`numpy.ndarray` - Total number of word-tokens assigned to each topic (T) (across all docs). - nz_doc_topic : (D x T) :obj:`numpy.ndarray` - Number of word-tokens assigned to each topic (T) per document (D). - ny_doc_topic : (D x T) :obj:`numpy.ndarray` - Number of peak-tokens assigned to each topic (T) per document (D). - word_doc_idx : :obj:`numpy.ndarray` - Document index. - word_idx : :obj:`numpy.ndarray` - Word tokens. - voc_len : :obj:`int` - Total number of word-tokens in the vocabulary. - beta : :obj:`float` - Prior count on word-types for each topic. - gamma : :obj:`float` - Prior count added to y-counts when sampling z assignments. - randseed : :obj:`int` - Random seed for this iteration. - - Returns - ------- - word_topic_idx : :obj:`numpy.ndarray` - Updated topic assignment for word tokens. - nz_word_topic : (W x T) :obj:`numpy.ndarray` - Updated number of word-tokens assigned to each topic (T) per word-type (W). - nz_sum_topic : (1 x T) :obj:`numpy.ndarray` - Updated total number of word-tokens assigned to each topic (T) (across all docs). - nz_doc_topic : (D x T) :obj:`numpy.ndarray` - Updated number of word-tokens assigned to each topic (T) per document (D). - """ - # --- Seed random number generator - np.random.seed(randseed) - - # Loop over all word tokens - for i_wtoken, word in enumerate(word_idx): - # Find document in which word token (not just word) appears - doc = word_doc_idx[i_wtoken] - # current topic assignment for word token w_i - topic = word_topic_idx[i_wtoken] - - # Decrement count-matrices to remove current wtoken_topic_idx - # because wtoken will be reassigned to a new topic - nz_word_topic[word, topic] -= 1 - nz_sum_topic[0, topic] -= 1 - nz_doc_topic[doc, topic] -= 1 - - # Get sampling distribution: - # p(z_i|z,d,w) ~ p(w|t) * p(t|d) - # ~ p_w_t * p_topic_g_doc - p_word_g_topic = (nz_word_topic[word, :] + beta) / (nz_sum_topic + (beta * voc_len)) - p_topic_g_doc = ny_doc_topic[doc, :] + gamma - probs = p_word_g_topic * p_topic_g_doc # The unnormalized sampling distribution - # print(probs.shape) - # Sample a z_i assignment for the current word-token from the sampling distribution - probs = probs.reshape(-1) / np.sum(probs) # Normalize the sampling distribution - # Numpy returns a binary [1 x T] vector with a '1' in the index of sampled topic - # and zeros everywhere else - assigned_topic_vec = np.random.multinomial(1, probs) - # Extract selected topic from vector - topic = np.where(assigned_topic_vec)[0][0] - - # Update the indices and the count matrices using the sampled z assignment - word_topic_idx[i_wtoken] = topic # Update w_i topic-assignment - nz_word_topic[word, topic] += 1 - nz_sum_topic[0, topic] += 1 - nz_doc_topic[doc, topic] += 1 - - return word_topic_idx, nz_word_topic, nz_sum_topic, nz_doc_topic - - @staticmethod - @jit(nopython=True) - def _update_peak_assignments( - ny_region_topic, - ny_doc_topic, - peak_topic_idx, - peak_region_idx, - nz_doc_topic, - peak_probs, - doc_idx, - n_regions, - n_topics, - alpha, - delta, - gamma, - randseed, - ): - """Update y / r indicator variables assigning peaks->topics/subregions. - - Parameters - ---------- - ny_region_topic : (R x T) :obj:`numpy.ndarray` - Number of peak-tokens assigned to each subregion (R) per topic (T). - ny_doc_topic : (D x T) :obj:`numpy.ndarray` - Number of peak-tokens assigned to each topic (T) per document (D). - peak_topic_idx : :obj:`numpy.ndarray` - Topic index. - peak_region_idx : :obj:`numpy.ndarray` - Region index. - nz_doc_topic : (D x T) :obj:`numpy.ndarray` - Number of word-tokens assigned to each topic (T) per document (D). - peak_probs : (D x T x R) :obj:`numpy.ndarray` - Matrix of probabilities of all peaks given all topic/subregion spatial distributions - doc_idx : :obj:`numpy.ndarray` - Document index. - n_regions : :obj:`int` - Total number of regions. - n_topics : :obj:`int` - Total number of topics. - alpha : :obj:`float` - Prior count on topics for each document. - delta : :obj:`float` - Prior count on subregions for each topic. - gamma : :obj:`float` - Prior count added to y-counts when sampling z assignments. - randseed : :obj:`int` - Random seed for this iteration. - - Returns - ------- - ny_region_topic : (R x T) :obj:`numpy.ndarray` - Updated number of peak-tokens assigned to each subregion (R) per topic (T). - ny_doc_topic : (D x T) :obj:`numpy.ndarray` - Updated number of peak-tokens assigned to each topic (T) per document (D). - peak_topic_idx : :obj:`numpy.ndarray` - Updated topic index. - peak_region_idx : :obj:`numpy.ndarray` - Updated region index. - """ - # Seed random number generator - np.random.seed(randseed) - - # Iterate over all peaks x, and sample a new y and r assignment for each - for i_ptoken, doc in enumerate(doc_idx): - topic = peak_topic_idx[i_ptoken] - region = peak_region_idx[i_ptoken] - - # Decrement count-matrices to remove current ptoken_topic_idx - # because ptoken will be reassigned to a new topic - ny_region_topic[region, topic] -= 1 # Decrement count in Subregion x Topic count matx - ny_doc_topic[doc, topic] -= 1 # Decrement count in Document x Topic count matrix - - # Retrieve the probability of generating current x from all - # subregions: [R x T] array of probs - p_x_subregions = (peak_probs[i_ptoken, :, :]).transpose() - - # Compute the probabilities of all subregions given doc - # p(r|d) ~ p(r|t) * p(t|d) - # Counts of subregions per topic + prior: p(r|t) - p_region_g_topic = ny_region_topic + delta - - # Normalize the columns such that each topic's distribution over subregions sums to 1 - p_region_g_topic_sum = np.sum(p_region_g_topic, axis=0) - p_region_g_topic = np.divide(p_region_g_topic, p_region_g_topic_sum) - - # Counts of topics per document + prior: p(t|d) - p_topic_g_doc = ny_doc_topic[doc, :] + alpha - - # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows - p_topic_g_doc = p_topic_g_doc.repeat(n_regions).reshape((-1, n_regions)).T - - # Compute p(subregion | document): p(r|d) ~ p(r|t) * p(t|d) - # [R x T] array of probs - p_region_g_doc = p_topic_g_doc * p_region_g_topic - - # Compute the multinomial probability: p(z|y) - # Need the current vector of all z and y assignments for current doc - # The multinomial from which z is sampled is proportional to number - # of y assigned to each topic, plus constant gamma - # Compute the proportional probabilities in log-space - logp = nz_doc_topic[doc, :] * np.log( - (ny_doc_topic[doc, :] + gamma + 1) / (ny_doc_topic[doc, :] + gamma) - ) - # Add a constant before exponentiating to avoid any underflow issues - p_peak_g_topic = np.exp(logp - np.max(logp)) - - # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows - p_peak_g_topic = p_peak_g_topic.repeat(n_regions).reshape((-1, n_regions)).T - - # Get the full sampling distribution: - # [R x T] array containing the proportional probability of all y/r combinations - probs_pdf = p_x_subregions * p_region_g_doc * p_peak_g_topic - - # Convert from a [R x T] matrix into a [R*T x 1] array we can sample from - probs_pdf = np.reshape(probs_pdf, (n_regions * n_topics)) - - # Normalize the sampling distribution - probs_pdf = probs_pdf / np.sum(probs_pdf) - # Float precision issue in Numba: https://github.com/numba/numba/issues/3426 - probs_pdf = np.trunc(probs_pdf * (10**12)) / (10**12) - - # Sample a single element (corresponding to a y_i and c_i assignment for the ptoken) - # from the sampling distribution - # Returns a binary [1 x R*T] vector with a '1' in location that was sampled - # and zeros everywhere else - assignment_vec = np.random.multinomial(1, probs_pdf) - - # Reshape 1D back to [R x T] 2D - assignment_arr = np.reshape(assignment_vec, (n_regions, n_topics)) - # Transform the linear index of the sampled element into the - # subregion/topic (r/y) assignment indices - assignment_idx = np.where(assignment_arr) - # Subregion sampled (r) - region = assignment_idx[0][0] - # Topic sampled (y) - topic = assignment_idx[1][0] - - # Update the indices and the count matrices using the sampled y/r assignments - # Increment count in Subregion x Topic count matrix - ny_region_topic[region, topic] += 1 - # Increment count in Document x Topic count matrix - ny_doc_topic[doc, topic] += 1 - # Update y->topic assignment - peak_topic_idx[i_ptoken] = topic - # Update y->subregion assignment - peak_region_idx[i_ptoken] = region - - return ny_region_topic, ny_doc_topic, peak_topic_idx, peak_region_idx - def _update_regions(self): """Update spatial distribution parameters (Gaussians params for all subregions). From daf496253695206831843b1c4a6e50ce66cc61dd Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Wed, 24 Aug 2022 15:36:28 -0400 Subject: [PATCH 03/19] Disable numba execution to allow pytest-cov to detect coverage in functions with @jit --- nimare/annotate/gclda.py | 4 ++-- nimare/tests/conftest.py | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/nimare/annotate/gclda.py b/nimare/annotate/gclda.py index 396749206..c9d7c2233 100755 --- a/nimare/annotate/gclda.py +++ b/nimare/annotate/gclda.py @@ -15,7 +15,7 @@ LGR = logging.getLogger(__name__) -@jit(nopython=True) +@jit(nopython=True, cache=True) def _update_word_topic_assignments( word_topic_idx, nz_word_topic, @@ -107,7 +107,7 @@ def _update_word_topic_assignments( return word_topic_idx, nz_word_topic, nz_sum_topic, nz_doc_topic -@jit(nopython=True) +@jit(nopython=True, cache=True) def _update_peak_assignments( ny_region_topic, ny_doc_topic, diff --git a/nimare/tests/conftest.py b/nimare/tests/conftest.py index bc1f5c2e2..6f57f9eaa 100644 --- a/nimare/tests/conftest.py +++ b/nimare/tests/conftest.py @@ -6,6 +6,7 @@ import numpy as np import pytest from nilearn.image import resample_img +from numba import config import nimare from nimare.tests.utils import get_test_data_path @@ -14,6 +15,9 @@ # Only enable the following once in a while for a check for SettingWithCopyWarnings # pd.options.mode.chained_assignment = "raise" +# Disable numba execution to allow pytest-cov to detect coverage in functions with @jit +config.DISABLE_JIT = True + @pytest.fixture(scope="session") def testdata_ibma(tmp_path_factory): From 0d86fba78e7876c30794f020708297895ebec477 Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Wed, 24 Aug 2022 15:43:45 -0400 Subject: [PATCH 04/19] `DISABLE_JIT=True` only for GCLDA tests --- nimare/tests/conftest.py | 4 ---- nimare/tests/test_annotate_gclda.py | 4 ++++ 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/nimare/tests/conftest.py b/nimare/tests/conftest.py index 6f57f9eaa..bc1f5c2e2 100644 --- a/nimare/tests/conftest.py +++ b/nimare/tests/conftest.py @@ -6,7 +6,6 @@ import numpy as np import pytest from nilearn.image import resample_img -from numba import config import nimare from nimare.tests.utils import get_test_data_path @@ -15,9 +14,6 @@ # Only enable the following once in a while for a check for SettingWithCopyWarnings # pd.options.mode.chained_assignment = "raise" -# Disable numba execution to allow pytest-cov to detect coverage in functions with @jit -config.DISABLE_JIT = True - @pytest.fixture(scope="session") def testdata_ibma(tmp_path_factory): diff --git a/nimare/tests/test_annotate_gclda.py b/nimare/tests/test_annotate_gclda.py index 0d5facda6..8e9ce9206 100644 --- a/nimare/tests/test_annotate_gclda.py +++ b/nimare/tests/test_annotate_gclda.py @@ -3,9 +3,13 @@ import numpy as np import pandas as pd import pytest +from numba import config from nimare import annotate, decode +# Disable numba execution to allow pytest-cov to detect coverage in functions with @jit +config.DISABLE_JIT = True + def test_gclda_symmetric(testdata_laird): """A smoke test for GCLDA with symmetric regions.""" From 1502f2bbbbc7c286c07f11f98e53b92fe525e09a Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Wed, 24 Aug 2022 15:54:49 -0400 Subject: [PATCH 05/19] `DISABLE_JIT=True` inside GCLDA tests --- nimare/tests/test_annotate_gclda.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nimare/tests/test_annotate_gclda.py b/nimare/tests/test_annotate_gclda.py index 8e9ce9206..709d100a7 100644 --- a/nimare/tests/test_annotate_gclda.py +++ b/nimare/tests/test_annotate_gclda.py @@ -7,12 +7,12 @@ from nimare import annotate, decode -# Disable numba execution to allow pytest-cov to detect coverage in functions with @jit -config.DISABLE_JIT = True - def test_gclda_symmetric(testdata_laird): """A smoke test for GCLDA with symmetric regions.""" + # Disable numba execution to allow pytest-cov to detect coverage in functions with @jit + config.DISABLE_JIT = True + counts_df = annotate.text.generate_counts( testdata_laird.texts, text_column="abstract", From 6844ce0495c1d9b8276fe6b22f77fcf3c0e1cbcf Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Wed, 24 Aug 2022 17:12:28 -0400 Subject: [PATCH 06/19] `DISABLE_JIT=False` after GCLDA tests --- nimare/tests/test_annotate_gclda.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/nimare/tests/test_annotate_gclda.py b/nimare/tests/test_annotate_gclda.py index 709d100a7..a62e8cea2 100644 --- a/nimare/tests/test_annotate_gclda.py +++ b/nimare/tests/test_annotate_gclda.py @@ -7,12 +7,12 @@ from nimare import annotate, decode +# Disable numba execution to allow pytest-cov to detect coverage in functions with @jit +config.DISABLE_JIT = True + def test_gclda_symmetric(testdata_laird): """A smoke test for GCLDA with symmetric regions.""" - # Disable numba execution to allow pytest-cov to detect coverage in functions with @jit - config.DISABLE_JIT = True - counts_df = annotate.text.generate_counts( testdata_laird.texts, text_column="abstract", @@ -87,3 +87,7 @@ def test_gclda_asymmetric(testdata_laird): # Encode text encoded_img, _ = decode.encode.gclda_encode(model, "fmri activation") assert isinstance(encoded_img, nib.Nifti1Image) + + +# Disable numba execution to allow pytest-cov to detect coverage in functions with @jit +config.DISABLE_JIT = False From d61fff7b9a6c81d28dc3cc97cbc315e791fe5762 Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Wed, 24 Aug 2022 18:20:55 -0400 Subject: [PATCH 07/19] Fix comments --- nimare/tests/test_annotate_gclda.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nimare/tests/test_annotate_gclda.py b/nimare/tests/test_annotate_gclda.py index a62e8cea2..1887097fe 100644 --- a/nimare/tests/test_annotate_gclda.py +++ b/nimare/tests/test_annotate_gclda.py @@ -89,5 +89,5 @@ def test_gclda_asymmetric(testdata_laird): assert isinstance(encoded_img, nib.Nifti1Image) -# Disable numba execution to allow pytest-cov to detect coverage in functions with @jit +# Enable numba execution for tests that use sparse arrays config.DISABLE_JIT = False From 63b46d4dc718b38a842e258ea69f5ddb738f9f25 Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Wed, 24 Aug 2022 18:57:36 -0400 Subject: [PATCH 08/19] Disable JIT using environment variable --- nimare/tests/test_annotate_gclda.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/nimare/tests/test_annotate_gclda.py b/nimare/tests/test_annotate_gclda.py index 1887097fe..450ac28ff 100644 --- a/nimare/tests/test_annotate_gclda.py +++ b/nimare/tests/test_annotate_gclda.py @@ -1,4 +1,6 @@ """Test nimare.annotate.gclda (GCLDA).""" +import os + import nibabel as nib import numpy as np import pandas as pd @@ -7,12 +9,12 @@ from nimare import annotate, decode -# Disable numba execution to allow pytest-cov to detect coverage in functions with @jit -config.DISABLE_JIT = True - def test_gclda_symmetric(testdata_laird): """A smoke test for GCLDA with symmetric regions.""" + # Disable numba execution to allow pytest-cov to detect coverage in functions with @jit + os.environ["NUMBA_DISABLE_JIT"] = "1" + counts_df = annotate.text.generate_counts( testdata_laird.texts, text_column="abstract", @@ -54,9 +56,15 @@ def test_gclda_symmetric(testdata_laird): encoded_img, _ = decode.encode.gclda_encode(model, "fmri activation") assert isinstance(encoded_img, nib.Nifti1Image) + # Enable numba execution for tests that use sparse arrays + os.environ["NUMBA_DISABLE_JIT"] = "0" + def test_gclda_asymmetric(testdata_laird): """A smoke test for GCLDA with three asymmetric regions.""" + # Disable numba execution to allow pytest-cov to detect coverage in functions with @jit + os.environ["NUMBA_DISABLE_JIT"] = "1" + counts_df = annotate.text.generate_counts( testdata_laird.texts, text_column="abstract", @@ -88,6 +96,5 @@ def test_gclda_asymmetric(testdata_laird): encoded_img, _ = decode.encode.gclda_encode(model, "fmri activation") assert isinstance(encoded_img, nib.Nifti1Image) - -# Enable numba execution for tests that use sparse arrays -config.DISABLE_JIT = False + # Enable numba execution for tests that use sparse arrays + os.environ["NUMBA_DISABLE_JIT"] = "0" From 2abaf9a1ff6e2dabc11faec8a091e64d1eb86b3c Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Wed, 24 Aug 2022 18:59:41 -0400 Subject: [PATCH 09/19] Lint issue --- nimare/tests/test_annotate_gclda.py | 1 - 1 file changed, 1 deletion(-) diff --git a/nimare/tests/test_annotate_gclda.py b/nimare/tests/test_annotate_gclda.py index 450ac28ff..285a863e4 100644 --- a/nimare/tests/test_annotate_gclda.py +++ b/nimare/tests/test_annotate_gclda.py @@ -5,7 +5,6 @@ import numpy as np import pandas as pd import pytest -from numba import config from nimare import annotate, decode From 1e985e0710fbf138087622d40c52be3005754298 Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Wed, 24 Aug 2022 19:15:33 -0400 Subject: [PATCH 10/19] Try using `config.DISABLE_JIT` again --- nimare/tests/test_annotate_gclda.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/nimare/tests/test_annotate_gclda.py b/nimare/tests/test_annotate_gclda.py index 285a863e4..190ade77a 100644 --- a/nimare/tests/test_annotate_gclda.py +++ b/nimare/tests/test_annotate_gclda.py @@ -1,10 +1,9 @@ """Test nimare.annotate.gclda (GCLDA).""" -import os - import nibabel as nib import numpy as np import pandas as pd import pytest +from numba import config from nimare import annotate, decode @@ -12,7 +11,7 @@ def test_gclda_symmetric(testdata_laird): """A smoke test for GCLDA with symmetric regions.""" # Disable numba execution to allow pytest-cov to detect coverage in functions with @jit - os.environ["NUMBA_DISABLE_JIT"] = "1" + config.DISABLE_JIT = True counts_df = annotate.text.generate_counts( testdata_laird.texts, @@ -56,13 +55,13 @@ def test_gclda_symmetric(testdata_laird): assert isinstance(encoded_img, nib.Nifti1Image) # Enable numba execution for tests that use sparse arrays - os.environ["NUMBA_DISABLE_JIT"] = "0" + config.DISABLE_JIT = False def test_gclda_asymmetric(testdata_laird): """A smoke test for GCLDA with three asymmetric regions.""" # Disable numba execution to allow pytest-cov to detect coverage in functions with @jit - os.environ["NUMBA_DISABLE_JIT"] = "1" + config.DISABLE_JIT = True counts_df = annotate.text.generate_counts( testdata_laird.texts, @@ -96,4 +95,4 @@ def test_gclda_asymmetric(testdata_laird): assert isinstance(encoded_img, nib.Nifti1Image) # Enable numba execution for tests that use sparse arrays - os.environ["NUMBA_DISABLE_JIT"] = "0" + config.DISABLE_JIT = False From 694a632173287c4a87a721f0196ce226867022d8 Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Thu, 25 Aug 2022 15:38:11 -0400 Subject: [PATCH 11/19] Set `nopython` mode to False --- nimare/annotate/gclda.py | 4 ++-- nimare/tests/test_annotate_gclda.py | 13 ------------- 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/nimare/annotate/gclda.py b/nimare/annotate/gclda.py index c9d7c2233..82824a8a9 100755 --- a/nimare/annotate/gclda.py +++ b/nimare/annotate/gclda.py @@ -15,7 +15,7 @@ LGR = logging.getLogger(__name__) -@jit(nopython=True, cache=True) +@jit(nopython=False, cache=True) def _update_word_topic_assignments( word_topic_idx, nz_word_topic, @@ -107,7 +107,7 @@ def _update_word_topic_assignments( return word_topic_idx, nz_word_topic, nz_sum_topic, nz_doc_topic -@jit(nopython=True, cache=True) +@jit(nopython=False, cache=True) def _update_peak_assignments( ny_region_topic, ny_doc_topic, diff --git a/nimare/tests/test_annotate_gclda.py b/nimare/tests/test_annotate_gclda.py index 190ade77a..0d5facda6 100644 --- a/nimare/tests/test_annotate_gclda.py +++ b/nimare/tests/test_annotate_gclda.py @@ -3,16 +3,12 @@ import numpy as np import pandas as pd import pytest -from numba import config from nimare import annotate, decode def test_gclda_symmetric(testdata_laird): """A smoke test for GCLDA with symmetric regions.""" - # Disable numba execution to allow pytest-cov to detect coverage in functions with @jit - config.DISABLE_JIT = True - counts_df = annotate.text.generate_counts( testdata_laird.texts, text_column="abstract", @@ -54,15 +50,9 @@ def test_gclda_symmetric(testdata_laird): encoded_img, _ = decode.encode.gclda_encode(model, "fmri activation") assert isinstance(encoded_img, nib.Nifti1Image) - # Enable numba execution for tests that use sparse arrays - config.DISABLE_JIT = False - def test_gclda_asymmetric(testdata_laird): """A smoke test for GCLDA with three asymmetric regions.""" - # Disable numba execution to allow pytest-cov to detect coverage in functions with @jit - config.DISABLE_JIT = True - counts_df = annotate.text.generate_counts( testdata_laird.texts, text_column="abstract", @@ -93,6 +83,3 @@ def test_gclda_asymmetric(testdata_laird): # Encode text encoded_img, _ = decode.encode.gclda_encode(model, "fmri activation") assert isinstance(encoded_img, nib.Nifti1Image) - - # Enable numba execution for tests that use sparse arrays - config.DISABLE_JIT = False From f263e32d0c1c5265b4f476bf832fbeb0d57ccdd5 Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Tue, 30 Aug 2022 11:51:48 -0400 Subject: [PATCH 12/19] Remove truncation of `probs_pdf` --- nimare/annotate/gclda.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/nimare/annotate/gclda.py b/nimare/annotate/gclda.py index 82824a8a9..9c5e9cb72 100755 --- a/nimare/annotate/gclda.py +++ b/nimare/annotate/gclda.py @@ -15,7 +15,7 @@ LGR = logging.getLogger(__name__) -@jit(nopython=False, cache=True) +@jit(nopython=True, cache=True) def _update_word_topic_assignments( word_topic_idx, nz_word_topic, @@ -107,7 +107,7 @@ def _update_word_topic_assignments( return word_topic_idx, nz_word_topic, nz_sum_topic, nz_doc_topic -@jit(nopython=False, cache=True) +@jit(nopython=True, cache=True) def _update_peak_assignments( ny_region_topic, ny_doc_topic, @@ -224,8 +224,6 @@ def _update_peak_assignments( # Normalize the sampling distribution probs_pdf = probs_pdf / np.sum(probs_pdf) - # Float precision issue in Numba: https://github.com/numba/numba/issues/3426 - probs_pdf = np.trunc(probs_pdf * (10**12)) / (10**12) # Sample a single element (corresponding to a y_i and c_i assignment for the ptoken) # from the sampling distribution From e818cf91ed160b1330ff227cea7252320135045f Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Sat, 3 Sep 2022 10:06:55 -0400 Subject: [PATCH 13/19] Use Parallel in _update_peak_assignments --- nimare/annotate/gclda.py | 258 +++++++++++++++++++++------------------ 1 file changed, 137 insertions(+), 121 deletions(-) diff --git a/nimare/annotate/gclda.py b/nimare/annotate/gclda.py index 9c5e9cb72..b97aa4fb3 100755 --- a/nimare/annotate/gclda.py +++ b/nimare/annotate/gclda.py @@ -5,17 +5,18 @@ import nibabel as nib import numpy as np import pandas as pd +from joblib import Parallel, delayed from nilearn._utils import load_niimg from numba import jit from scipy.stats import multivariate_normal from nimare.base import NiMAREBase -from nimare.utils import get_template +from nimare.utils import _check_ncores, get_template LGR = logging.getLogger(__name__) -@jit(nopython=True, cache=True) +@jit(nopython=True) def _update_word_topic_assignments( word_topic_idx, nz_word_topic, @@ -107,26 +108,29 @@ def _update_word_topic_assignments( return word_topic_idx, nz_word_topic, nz_sum_topic, nz_doc_topic -@jit(nopython=True, cache=True) def _update_peak_assignments( + i_ptoken, + doc, ny_region_topic, ny_doc_topic, peak_topic_idx, peak_region_idx, nz_doc_topic, peak_probs, - doc_idx, n_regions, n_topics, alpha, delta, gamma, - randseed, ): """Update y / r indicator variables assigning peaks->topics/subregions. Parameters ---------- + i_ptoken : :obj:`int` + Peak token index. + doc : :obj:`int` + Document i_ptoken index. ny_region_topic : (R x T) :obj:`numpy.ndarray` Number of peak-tokens assigned to each subregion (R) per topic (T). ny_doc_topic : (D x T) :obj:`numpy.ndarray` @@ -139,8 +143,6 @@ def _update_peak_assignments( Number of word-tokens assigned to each topic (T) per document (D). peak_probs : (D x T x R) :obj:`numpy.ndarray` Matrix of probabilities of all peaks given all topic/subregion spatial distributions - doc_idx : :obj:`numpy.ndarray` - Document index. n_regions : :obj:`int` Total number of regions. n_topics : :obj:`int` @@ -156,102 +158,90 @@ def _update_peak_assignments( Returns ------- - ny_region_topic : (R x T) :obj:`numpy.ndarray` - Updated number of peak-tokens assigned to each subregion (R) per topic (T). - ny_doc_topic : (D x T) :obj:`numpy.ndarray` - Updated number of peak-tokens assigned to each topic (T) per document (D). - peak_topic_idx : :obj:`numpy.ndarray` - Updated topic index. - peak_region_idx : :obj:`numpy.ndarray` - Updated region index. + doc : :obj:`int` + Document i index. + ny_region_topic : :obj:`int` + Updated number of peak-tokens assigned to region i and topic i. + ny_doc_topic : :obj:`int` + Updated number of peak-tokens assigned to topic i and document i. """ - # Seed random number generator - np.random.seed(randseed) - # Iterate over all peaks x, and sample a new y and r assignment for each - for i_ptoken, doc in enumerate(doc_idx): - topic = peak_topic_idx[i_ptoken] - region = peak_region_idx[i_ptoken] - - # Decrement count-matrices to remove current ptoken_topic_idx - # because ptoken will be reassigned to a new topic - ny_region_topic[region, topic] -= 1 # Decrement count in Subregion x Topic count matx - ny_doc_topic[doc, topic] -= 1 # Decrement count in Document x Topic count matrix - - # Retrieve the probability of generating current x from all - # subregions: [R x T] array of probs - p_x_subregions = (peak_probs[i_ptoken, :, :]).transpose() - - # Compute the probabilities of all subregions given doc - # p(r|d) ~ p(r|t) * p(t|d) - # Counts of subregions per topic + prior: p(r|t) - p_region_g_topic = ny_region_topic + delta - - # Normalize the columns such that each topic's distribution over subregions sums to 1 - p_region_g_topic_sum = np.sum(p_region_g_topic, axis=0) - p_region_g_topic = np.divide(p_region_g_topic, p_region_g_topic_sum) - - # Counts of topics per document + prior: p(t|d) - p_topic_g_doc = ny_doc_topic[doc, :] + alpha - - # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows - p_topic_g_doc = p_topic_g_doc.repeat(n_regions).reshape((-1, n_regions)).T - - # Compute p(subregion | document): p(r|d) ~ p(r|t) * p(t|d) - # [R x T] array of probs - p_region_g_doc = p_topic_g_doc * p_region_g_topic - - # Compute the multinomial probability: p(z|y) - # Need the current vector of all z and y assignments for current doc - # The multinomial from which z is sampled is proportional to number - # of y assigned to each topic, plus constant gamma - # Compute the proportional probabilities in log-space - logp = nz_doc_topic[doc, :] * np.log( - (ny_doc_topic[doc, :] + gamma + 1) / (ny_doc_topic[doc, :] + gamma) - ) - # Add a constant before exponentiating to avoid any underflow issues - p_peak_g_topic = np.exp(logp - np.max(logp)) - - # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows - p_peak_g_topic = p_peak_g_topic.repeat(n_regions).reshape((-1, n_regions)).T - - # Get the full sampling distribution: - # [R x T] array containing the proportional probability of all y/r combinations - probs_pdf = p_x_subregions * p_region_g_doc * p_peak_g_topic - - # Convert from a [R x T] matrix into a [R*T x 1] array we can sample from - probs_pdf = np.reshape(probs_pdf, (n_regions * n_topics)) - - # Normalize the sampling distribution - probs_pdf = probs_pdf / np.sum(probs_pdf) - - # Sample a single element (corresponding to a y_i and c_i assignment for the ptoken) - # from the sampling distribution - # Returns a binary [1 x R*T] vector with a '1' in location that was sampled - # and zeros everywhere else - assignment_vec = np.random.multinomial(1, probs_pdf) - - # Reshape 1D back to [R x T] 2D - assignment_arr = np.reshape(assignment_vec, (n_regions, n_topics)) - # Transform the linear index of the sampled element into the - # subregion/topic (r/y) assignment indices - assignment_idx = np.where(assignment_arr) - # Subregion sampled (r) - region = assignment_idx[0][0] - # Topic sampled (y) - topic = assignment_idx[1][0] - - # Update the indices and the count matrices using the sampled y/r assignments - # Increment count in Subregion x Topic count matrix - ny_region_topic[region, topic] += 1 - # Increment count in Document x Topic count matrix - ny_doc_topic[doc, topic] += 1 - # Update y->topic assignment - peak_topic_idx[i_ptoken] = topic - # Update y->subregion assignment - peak_region_idx[i_ptoken] = region - - return ny_region_topic, ny_doc_topic, peak_topic_idx, peak_region_idx + topic = peak_topic_idx[i_ptoken] + region = peak_region_idx[i_ptoken] + + # Make ny_region_topic and ny_doc_topic writeable when running parallel + if not ny_region_topic.flags.writeable: + ny_region_topic = ny_region_topic.copy() + if not ny_doc_topic.flags.writeable: + ny_doc_topic = ny_doc_topic.copy() + + # Decrement count-matrices to remove current ptoken_topic_idx + # because ptoken will be reassigned to a new topic + ny_region_topic[region, topic] -= 1 # Decrement count in Subregion x Topic count matx + ny_doc_topic[doc, topic] -= 1 # Decrement count in Document x Topic count matrix + + # Retrieve the probability of generating current x from all + # subregions: [R x T] array of probs + p_x_subregions = (peak_probs[i_ptoken, :, :]).transpose() + + # Compute the probabilities of all subregions given doc + # p(r|d) ~ p(r|t) * p(t|d) + # Counts of subregions per topic + prior: p(r|t) + p_region_g_topic = ny_region_topic + delta + + # Normalize the columns such that each topic's distribution over subregions sums to 1 + p_region_g_topic = p_region_g_topic / np.sum(p_region_g_topic, axis=0, keepdims=True) + + # Counts of topics per document + prior: p(t|d) + p_topic_g_doc = ny_doc_topic[doc, :] + alpha + + # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows + p_topic_g_doc = np.array([p_topic_g_doc] * n_regions) + + # Compute p(subregion | document): p(r|d) ~ p(r|t) * p(t|d) + # [R x T] array of probs + p_region_g_doc = p_topic_g_doc * p_region_g_topic + + # Compute the multinomial probability: p(z|y) + # Need the current vector of all z and y assignments for current doc + # The multinomial from which z is sampled is proportional to number + # of y assigned to each topic, plus constant gamma + # Compute the proportional probabilities in log-space + logp = nz_doc_topic[doc, :] * np.log( + (ny_doc_topic[doc, :] + gamma + 1) / (ny_doc_topic[doc, :] + gamma) + ) + # Add a constant before exponentiating to avoid any underflow issues + p_peak_g_topic = np.exp(logp - np.max(logp)) + + # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows + p_peak_g_topic = np.array([p_peak_g_topic] * n_regions) + + # Get the full sampling distribution: + # [R x T] array containing the proportional probability of all y/r combinations + probs_pdf = p_x_subregions * p_region_g_doc * p_peak_g_topic + + # Convert from a [R x T] matrix into a [R*T x 1] array we can sample from + probs_pdf = np.reshape(probs_pdf, (n_regions * n_topics)) + + # Normalize the sampling distribution + probs_pdf = probs_pdf / np.sum(probs_pdf) + # Sample a single element (corresponding to a y_i and c_i assignment for the ptoken) + # from the sampling distribution + # Returns a binary [1 x R*T] vector with a '1' in location that was sampled + # and zeros everywhere else + assignment_vec = np.random.multinomial(1, probs_pdf) + + # Reshape 1D back to [R x T] 2D + assignment_arr = np.reshape(assignment_vec, (n_regions, n_topics)) + # Transform the linear index of the sampled element into the + # subregion/topic (r/y) assignment indices + assignment_idx = np.where(assignment_arr) + # Subregion sampled (r) + region = assignment_idx[0][0] + # Topic sampled (y) + topic = assignment_idx[1][0] + + return doc, region, topic class GCLDAModel(NiMAREBase): @@ -303,6 +293,10 @@ class GCLDAModel(NiMAREBase): requires n_regions = 2. The default is False. seed_init : :obj:`int`, optional Initial value of random seed. The default is 1. + n_cores : :obj:`int`, optional + Number of cores to use for parallelization. + If <=0, defaults to using all available cores. + Default is 1. Attributes ---------- @@ -341,11 +335,14 @@ def __init__( dobs=25, roi_size=50.0, seed_init=1, + n_cores=1, ): LGR.info("Constructing/Initializing GCLDA Model") count_df = count_df.copy() coordinates_df = coordinates_df.copy() + self.n_cores = _check_ncores(n_cores) + # Check IDs from DataFrames count_df.index = count_df.index.astype(str) count_df["id"] = count_df.index @@ -668,29 +665,44 @@ def _update(self, loglikely_freq=1): ) # Update z-assignments LGR.debug(f"Iter {self.iter:04d}: Sampling y|r") - self.seed += 1 # Retrieve p(x|r,y) for all subregions peak_probs = self._get_peak_probs(self) - ( - self.topics["n_peak_tokens_region_by_topic"], - self.topics["n_peak_tokens_doc_by_topic"], - self.topics["peak_topic_idx"], - self.topics["peak_region_idx"], - ) = _update_peak_assignments( - self.topics["n_peak_tokens_region_by_topic"], - self.topics["n_peak_tokens_doc_by_topic"], - self.topics["peak_topic_idx"], - self.topics["peak_region_idx"], - self.topics["n_word_tokens_doc_by_topic"], - peak_probs, - np.array(self.data["ptoken_doc_idx"]), - self.params["n_regions"], - self.params["n_topics"], - self.params["alpha"], - self.params["delta"], - self.params["gamma"], - self.seed, + self.seed += 1 # Seed random number generator + np.random.seed(self.seed) + old_regions, old_topics = self.topics["peak_region_idx"], self.topics["peak_topic_idx"] + docs, regions, topics = zip( + *Parallel(n_jobs=self.n_cores)( + delayed(_update_peak_assignments)( + i_ptoken, + doc, + self.topics["n_peak_tokens_region_by_topic"].copy(), + self.topics["n_peak_tokens_doc_by_topic"].copy(), + self.topics["peak_topic_idx"], + self.topics["peak_region_idx"], + self.topics["n_word_tokens_doc_by_topic"], + peak_probs, + self.params["n_regions"], + self.params["n_topics"], + self.params["alpha"], + self.params["delta"], + self.params["gamma"], + ) + for i_ptoken, doc in enumerate(self.data["ptoken_doc_idx"]) + ) ) # Update y-assignments + # Decrement count-matrices to remove current ptoken_topic_idx + # because ptoken will be reassigned to a new topic + np.subtract.at(self.topics["n_peak_tokens_region_by_topic"], (old_regions, old_topics), 1) + np.subtract.at(self.topics["n_peak_tokens_doc_by_topic"], (docs, old_topics), 1) + + # Update the indices and the count matrices using the sampled y/r assignments + np.add.at(self.topics["n_peak_tokens_region_by_topic"], (regions, topics), 1) + np.add.at(self.topics["n_peak_tokens_doc_by_topic"], (docs, topics), 1) + + # Update y->topic assignment + self.topics["peak_topic_idx"] = np.array(topics) + # Update y->subregion assignment + self.topics["peak_region_idx"] = np.array(regions) LGR.debug(f"Iter {self.iter:04d}: Updating spatial params") self._update_regions() # Update gaussian estimates for all subregions @@ -766,6 +778,10 @@ def _update_regions(self): ) / (n_obs1 + n_obs2) weighted_mean_otherdims = np.mean(all_xyz[:, 1:], axis=0) + if (n_obs1 == 0) and (n_obs2 == 0): + weighted_mean_dim1 = np.nan_to_num(weighted_mean_dim1) + weighted_mean_otherdims = np.nan_to_num(weighted_mean_otherdims) + # Store weighted mean estimates mu1 = np.zeros([1, self.data["ptoken_coords"].shape[1]]) mu2 = np.zeros([1, self.data["ptoken_coords"].shape[1]]) From 0c07abc5709dc4a48cfc5545b40e506c2b6c12be Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Fri, 23 Sep 2022 21:53:39 -0400 Subject: [PATCH 14/19] Speed parallelization by avoiding excessive `.copy()` across jobs --- nimare/annotate/gclda.py | 134 +++++++++++++++++---------------------- 1 file changed, 59 insertions(+), 75 deletions(-) diff --git a/nimare/annotate/gclda.py b/nimare/annotate/gclda.py index b97aa4fb3..15bf80406 100755 --- a/nimare/annotate/gclda.py +++ b/nimare/annotate/gclda.py @@ -109,40 +109,35 @@ def _update_word_topic_assignments( def _update_peak_assignments( - i_ptoken, - doc, + region, + topic, ny_region_topic, ny_doc_topic, - peak_topic_idx, - peak_region_idx, nz_doc_topic, - peak_probs, + p_x_subregions, n_regions, n_topics, alpha, delta, gamma, + rng, ): """Update y / r indicator variables assigning peaks->topics/subregions. Parameters ---------- - i_ptoken : :obj:`int` - Peak token index. - doc : :obj:`int` - Document i_ptoken index. - ny_region_topic : (R x T) :obj:`numpy.ndarray` - Number of peak-tokens assigned to each subregion (R) per topic (T). - ny_doc_topic : (D x T) :obj:`numpy.ndarray` - Number of peak-tokens assigned to each topic (T) per document (D). - peak_topic_idx : :obj:`numpy.ndarray` - Topic index. - peak_region_idx : :obj:`numpy.ndarray` - Region index. - nz_doc_topic : (D x T) :obj:`numpy.ndarray` + region : :obj:`int` + Region. + topic : :obj:`int` + Topic. + ny_region_topic : (1 x T) :obj:`numpy.ndarray` + Number of peak-tokens assigned to subregion (region) per topic (T). + ny_doc_topic : (1 x T) :obj:`numpy.ndarray` + Number of peak-tokens assigned to document (D) per topic (T). + nz_doc_topic : (1 x T) :obj:`numpy.ndarray` Number of word-tokens assigned to each topic (T) per document (D). - peak_probs : (D x T x R) :obj:`numpy.ndarray` - Matrix of probabilities of all peaks given all topic/subregion spatial distributions + p_x_subregions : (R x T) :obj:`numpy.ndarray` + Probability of generating current x from all subregions. n_regions : :obj:`int` Total number of regions. n_topics : :obj:`int` @@ -153,47 +148,30 @@ def _update_peak_assignments( Prior count on subregions for each topic. gamma : :obj:`float` Prior count added to y-counts when sampling z assignments. - randseed : :obj:`int` - Random seed for this iteration. + rng : :obj:`numpy.random.default_rng` + default_rng() instance Returns ------- - doc : :obj:`int` - Document i index. - ny_region_topic : :obj:`int` - Updated number of peak-tokens assigned to region i and topic i. - ny_doc_topic : :obj:`int` - Updated number of peak-tokens assigned to topic i and document i. + region : :obj:`int` + Subregion sampled (r). + topic : :obj:`int` + Topic sampled (y). """ - # Iterate over all peaks x, and sample a new y and r assignment for each - topic = peak_topic_idx[i_ptoken] - region = peak_region_idx[i_ptoken] - - # Make ny_region_topic and ny_doc_topic writeable when running parallel - if not ny_region_topic.flags.writeable: - ny_region_topic = ny_region_topic.copy() - if not ny_doc_topic.flags.writeable: - ny_doc_topic = ny_doc_topic.copy() - - # Decrement count-matrices to remove current ptoken_topic_idx - # because ptoken will be reassigned to a new topic - ny_region_topic[region, topic] -= 1 # Decrement count in Subregion x Topic count matx - ny_doc_topic[doc, topic] -= 1 # Decrement count in Document x Topic count matrix - - # Retrieve the probability of generating current x from all - # subregions: [R x T] array of probs - p_x_subregions = (peak_probs[i_ptoken, :, :]).transpose() - # Compute the probabilities of all subregions given doc # p(r|d) ~ p(r|t) * p(t|d) # Counts of subregions per topic + prior: p(r|t) p_region_g_topic = ny_region_topic + delta + p_region_g_topic[region, topic] -= 1 # Decrement count in Subregion x Topic count matx + # Decrement count-matrices to remove current ptoken_topic_idx + # because ptoken will be reassigned to a new topic # Normalize the columns such that each topic's distribution over subregions sums to 1 p_region_g_topic = p_region_g_topic / np.sum(p_region_g_topic, axis=0, keepdims=True) # Counts of topics per document + prior: p(t|d) - p_topic_g_doc = ny_doc_topic[doc, :] + alpha + p_topic_g_doc = ny_doc_topic + alpha + p_topic_g_doc[topic] -= 1 # Decrement count in Document x Topic count matrix # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows p_topic_g_doc = np.array([p_topic_g_doc] * n_regions) @@ -207,9 +185,10 @@ def _update_peak_assignments( # The multinomial from which z is sampled is proportional to number # of y assigned to each topic, plus constant gamma # Compute the proportional probabilities in log-space - logp = nz_doc_topic[doc, :] * np.log( - (ny_doc_topic[doc, :] + gamma + 1) / (ny_doc_topic[doc, :] + gamma) - ) + doc_y_counts = ny_doc_topic + gamma + doc_y_counts[topic] -= 1 # Decrement count in Document x Topic count matrix + logp = nz_doc_topic * np.log((doc_y_counts + 1) / (doc_y_counts)) + # Add a constant before exponentiating to avoid any underflow issues p_peak_g_topic = np.exp(logp - np.max(logp)) @@ -229,7 +208,7 @@ def _update_peak_assignments( # from the sampling distribution # Returns a binary [1 x R*T] vector with a '1' in location that was sampled # and zeros everywhere else - assignment_vec = np.random.multinomial(1, probs_pdf) + assignment_vec = rng.multinomial(1, probs_pdf) # Reshape 1D back to [R x T] 2D assignment_arr = np.reshape(assignment_vec, (n_regions, n_topics)) @@ -241,7 +220,7 @@ def _update_peak_assignments( # Topic sampled (y) topic = assignment_idx[1][0] - return doc, region, topic + return region, topic class GCLDAModel(NiMAREBase): @@ -666,38 +645,43 @@ def _update(self, loglikely_freq=1): LGR.debug(f"Iter {self.iter:04d}: Sampling y|r") # Retrieve p(x|r,y) for all subregions - peak_probs = self._get_peak_probs(self) + peak_probs = self._get_peak_probs(self).T self.seed += 1 # Seed random number generator - np.random.seed(self.seed) + rng = np.random.default_rng(self.seed) + + # Iterate over all peaks x, and sample a new y and r assignment for each old_regions, old_topics = self.topics["peak_region_idx"], self.topics["peak_topic_idx"] - docs, regions, topics = zip( - *Parallel(n_jobs=self.n_cores)( - delayed(_update_peak_assignments)( - i_ptoken, - doc, - self.topics["n_peak_tokens_region_by_topic"].copy(), - self.topics["n_peak_tokens_doc_by_topic"].copy(), - self.topics["peak_topic_idx"], - self.topics["peak_region_idx"], - self.topics["n_word_tokens_doc_by_topic"], - peak_probs, - self.params["n_regions"], - self.params["n_topics"], - self.params["alpha"], - self.params["delta"], - self.params["gamma"], - ) - for i_ptoken, doc in enumerate(self.data["ptoken_doc_idx"]) + results = Parallel(n_jobs=self.n_cores, verbose=20)( + delayed(_update_peak_assignments)( + old_regions[i_ptoken], + old_topics[i_ptoken], + self.topics["n_peak_tokens_region_by_topic"], + self.topics["n_peak_tokens_doc_by_topic"][doc, :], + self.topics["n_word_tokens_doc_by_topic"][doc, :], + peak_probs[:, :, i_ptoken], + self.params["n_regions"], + self.params["n_topics"], + self.params["alpha"], + self.params["delta"], + self.params["gamma"], + rng, ) + for i_ptoken, doc in enumerate(self.data["ptoken_doc_idx"]) ) # Update y-assignments + regions, topics = zip(*results) + # Decrement count-matrices to remove current ptoken_topic_idx # because ptoken will be reassigned to a new topic np.subtract.at(self.topics["n_peak_tokens_region_by_topic"], (old_regions, old_topics), 1) - np.subtract.at(self.topics["n_peak_tokens_doc_by_topic"], (docs, old_topics), 1) + np.subtract.at( + self.topics["n_peak_tokens_doc_by_topic"], (self.data["ptoken_doc_idx"], old_topics), 1 + ) # Update the indices and the count matrices using the sampled y/r assignments np.add.at(self.topics["n_peak_tokens_region_by_topic"], (regions, topics), 1) - np.add.at(self.topics["n_peak_tokens_doc_by_topic"], (docs, topics), 1) + np.add.at( + self.topics["n_peak_tokens_doc_by_topic"], (self.data["ptoken_doc_idx"], topics), 1 + ) # Update y->topic assignment self.topics["peak_topic_idx"] = np.array(topics) From 5870c3d080b58ef0135a6827d46f8edb7afe01e0 Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Mon, 3 Oct 2022 15:59:41 -0400 Subject: [PATCH 15/19] Create chunks of data to speed up Parallel --- nimare/annotate/gclda.py | 186 ++++++++++++++++++++++----------------- 1 file changed, 106 insertions(+), 80 deletions(-) diff --git a/nimare/annotate/gclda.py b/nimare/annotate/gclda.py index 15bf80406..b9a2d13ee 100755 --- a/nimare/annotate/gclda.py +++ b/nimare/annotate/gclda.py @@ -1,5 +1,7 @@ """Topic modeling with generalized correspondence latent Dirichlet allocation.""" +import itertools import logging +import math import os.path as op import nibabel as nib @@ -109,9 +111,9 @@ def _update_word_topic_assignments( def _update_peak_assignments( - region, - topic, ny_region_topic, + regions, + topics, ny_doc_topic, nz_doc_topic, p_x_subregions, @@ -126,15 +128,15 @@ def _update_peak_assignments( Parameters ---------- - region : :obj:`int` - Region. - topic : :obj:`int` - Topic. - ny_region_topic : (1 x T) :obj:`numpy.ndarray` + ny_region_topic : (R x T) :obj:`numpy.ndarray` Number of peak-tokens assigned to subregion (region) per topic (T). - ny_doc_topic : (1 x T) :obj:`numpy.ndarray` + regions : :obj:`int` + Region indices. + topics : :obj:`int` + Topic indices. + ny_doc_topic : (D x T) :obj:`numpy.ndarray` Number of peak-tokens assigned to document (D) per topic (T). - nz_doc_topic : (1 x T) :obj:`numpy.ndarray` + nz_doc_topic : (D x T) :obj:`numpy.ndarray` Number of word-tokens assigned to each topic (T) per document (D). p_x_subregions : (R x T) :obj:`numpy.ndarray` Probability of generating current x from all subregions. @@ -158,69 +160,76 @@ def _update_peak_assignments( topic : :obj:`int` Topic sampled (y). """ - # Compute the probabilities of all subregions given doc - # p(r|d) ~ p(r|t) * p(t|d) - # Counts of subregions per topic + prior: p(r|t) - p_region_g_topic = ny_region_topic + delta - p_region_g_topic[region, topic] -= 1 # Decrement count in Subregion x Topic count matx - # Decrement count-matrices to remove current ptoken_topic_idx - # because ptoken will be reassigned to a new topic - - # Normalize the columns such that each topic's distribution over subregions sums to 1 - p_region_g_topic = p_region_g_topic / np.sum(p_region_g_topic, axis=0, keepdims=True) - - # Counts of topics per document + prior: p(t|d) - p_topic_g_doc = ny_doc_topic + alpha - p_topic_g_doc[topic] -= 1 # Decrement count in Document x Topic count matrix - - # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows - p_topic_g_doc = np.array([p_topic_g_doc] * n_regions) - - # Compute p(subregion | document): p(r|d) ~ p(r|t) * p(t|d) - # [R x T] array of probs - p_region_g_doc = p_topic_g_doc * p_region_g_topic - - # Compute the multinomial probability: p(z|y) - # Need the current vector of all z and y assignments for current doc - # The multinomial from which z is sampled is proportional to number - # of y assigned to each topic, plus constant gamma - # Compute the proportional probabilities in log-space - doc_y_counts = ny_doc_topic + gamma - doc_y_counts[topic] -= 1 # Decrement count in Document x Topic count matrix - logp = nz_doc_topic * np.log((doc_y_counts + 1) / (doc_y_counts)) - - # Add a constant before exponentiating to avoid any underflow issues - p_peak_g_topic = np.exp(logp - np.max(logp)) - - # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows - p_peak_g_topic = np.array([p_peak_g_topic] * n_regions) - - # Get the full sampling distribution: - # [R x T] array containing the proportional probability of all y/r combinations - probs_pdf = p_x_subregions * p_region_g_doc * p_peak_g_topic - - # Convert from a [R x T] matrix into a [R*T x 1] array we can sample from - probs_pdf = np.reshape(probs_pdf, (n_regions * n_topics)) - - # Normalize the sampling distribution - probs_pdf = probs_pdf / np.sum(probs_pdf) - # Sample a single element (corresponding to a y_i and c_i assignment for the ptoken) - # from the sampling distribution - # Returns a binary [1 x R*T] vector with a '1' in location that was sampled - # and zeros everywhere else - assignment_vec = rng.multinomial(1, probs_pdf) - - # Reshape 1D back to [R x T] 2D - assignment_arr = np.reshape(assignment_vec, (n_regions, n_topics)) - # Transform the linear index of the sampled element into the - # subregion/topic (r/y) assignment indices - assignment_idx = np.where(assignment_arr) - # Subregion sampled (r) - region = assignment_idx[0][0] - # Topic sampled (y) - topic = assignment_idx[1][0] - - return region, topic + new_regions = [] + new_topics = [] + for idx, region in enumerate(regions): + topic = topics[idx] + # Compute the probabilities of all subregions given doc + # p(r|d) ~ p(r|t) * p(t|d) + # Counts of subregions per topic + prior: p(r|t) + p_region_g_topic = ny_region_topic + delta + p_region_g_topic[region, topic] -= 1 # Decrement count in Subregion x Topic count matx + # Decrement count-matrices to remove current ptoken_topic_idx + # because ptoken will be reassigned to a new topic + + # Normalize the columns such that each topic's distribution over subregions sums to 1 + p_region_g_topic = p_region_g_topic / np.sum(p_region_g_topic, axis=0, keepdims=True) + + # Counts of topics per document + prior: p(t|d) + p_topic_g_doc = ny_doc_topic[idx, :] + alpha + p_topic_g_doc[topic] -= 1 # Decrement count in Document x Topic count matrix + + # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows + p_topic_g_doc = np.array([p_topic_g_doc] * n_regions) + + # Compute p(subregion | document): p(r|d) ~ p(r|t) * p(t|d) + # [R x T] array of probs + p_region_g_doc = p_topic_g_doc * p_region_g_topic + + # Compute the multinomial probability: p(z|y) + # Need the current vector of all z and y assignments for current doc + # The multinomial from which z is sampled is proportional to number + # of y assigned to each topic, plus constant gamma + # Compute the proportional probabilities in log-space + doc_y_counts = ny_doc_topic[idx, :] + gamma + doc_y_counts[topic] -= 1 # Decrement count in Document x Topic count matrix + logp = nz_doc_topic[idx, :] * np.log((doc_y_counts + 1) / (doc_y_counts)) + + # Add a constant before exponentiating to avoid any underflow issues + p_peak_g_topic = np.exp(logp - np.max(logp)) + + # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows + p_peak_g_topic = np.array([p_peak_g_topic] * n_regions) + + # Get the full sampling distribution: + # [R x T] array containing the proportional probability of all y/r combinations + probs_pdf = p_x_subregions[:, :, idx] * p_region_g_doc * p_peak_g_topic + + # Convert from a [R x T] matrix into a [R*T x 1] array we can sample from + probs_pdf = np.reshape(probs_pdf, (n_regions * n_topics)) + + # Normalize the sampling distribution + probs_pdf = probs_pdf / np.sum(probs_pdf) + # Sample a single element (corresponding to a y_i and c_i assignment for the ptoken) + # from the sampling distribution + # Returns a binary [1 x R*T] vector with a '1' in location that was sampled + # and zeros everywhere else + assignment_vec = rng.multinomial(1, probs_pdf) + + # Reshape 1D back to [R x T] 2D + assignment_arr = np.reshape(assignment_vec, (n_regions, n_topics)) + # Transform the linear index of the sampled element into the + # subregion/topic (r/y) assignment indices + assignment_idx = np.where(assignment_arr) + # Subregion sampled (r) + region = assignment_idx[0][0] + # Topic sampled (y) + topic = assignment_idx[1][0] + + new_regions.append(region) + new_topics.append(topic) + + return new_regions, new_topics class GCLDAModel(NiMAREBase): @@ -649,16 +658,31 @@ def _update(self, loglikely_freq=1): self.seed += 1 # Seed random number generator rng = np.random.default_rng(self.seed) - # Iterate over all peaks x, and sample a new y and r assignment for each old_regions, old_topics = self.topics["peak_region_idx"], self.topics["peak_topic_idx"] - results = Parallel(n_jobs=self.n_cores, verbose=20)( + + # Create chunks of data to speed up Parallel()(delayed(f)(x) for x in X) + n_chunks = self.n_cores + n_elements = len(self.data["ptoken_doc_idx"]) + chunk_size = math.ceil(n_elements / n_chunks) + chunks = [ + [ + old_regions[chunk_i : chunk_i + chunk_size], + old_topics[chunk_i : chunk_i + chunk_size], + self.topics["n_peak_tokens_doc_by_topic"][ + self.data["ptoken_doc_idx"][chunk_i : chunk_i + chunk_size], : + ], + self.topics["n_word_tokens_doc_by_topic"][ + self.data["ptoken_doc_idx"][chunk_i : chunk_i + chunk_size], : + ], + peak_probs[:, :, chunk_i : chunk_i + chunk_size], + ] + for chunk_i in range(0, n_elements, chunk_size) + ] + + results = Parallel(n_jobs=self.n_cores)( delayed(_update_peak_assignments)( - old_regions[i_ptoken], - old_topics[i_ptoken], self.topics["n_peak_tokens_region_by_topic"], - self.topics["n_peak_tokens_doc_by_topic"][doc, :], - self.topics["n_word_tokens_doc_by_topic"][doc, :], - peak_probs[:, :, i_ptoken], + *chunk, self.params["n_regions"], self.params["n_topics"], self.params["alpha"], @@ -666,9 +690,11 @@ def _update(self, loglikely_freq=1): self.params["gamma"], rng, ) - for i_ptoken, doc in enumerate(self.data["ptoken_doc_idx"]) + for chunk in chunks ) # Update y-assignments regions, topics = zip(*results) + regions = list(itertools.chain(*regions)) + topics = list(itertools.chain(*topics)) # Decrement count-matrices to remove current ptoken_topic_idx # because ptoken will be reassigned to a new topic From 4e12046705a554f1ccb3ce42018c5051d1704b26 Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Mon, 10 Oct 2022 11:25:39 -0400 Subject: [PATCH 16/19] Clean up the code --- nimare/annotate/gclda.py | 121 +++++++++++++++++---------------------- 1 file changed, 54 insertions(+), 67 deletions(-) diff --git a/nimare/annotate/gclda.py b/nimare/annotate/gclda.py index b9a2d13ee..1fd21e5d7 100755 --- a/nimare/annotate/gclda.py +++ b/nimare/annotate/gclda.py @@ -92,7 +92,7 @@ def _update_word_topic_assignments( p_word_g_topic = (nz_word_topic[word, :] + beta) / (nz_sum_topic + (beta * voc_len)) p_topic_g_doc = ny_doc_topic[doc, :] + gamma probs = p_word_g_topic * p_topic_g_doc # The unnormalized sampling distribution - # print(probs.shape) + # Sample a z_i assignment for the current word-token from the sampling distribution probs = probs.reshape(-1) / np.sum(probs) # Normalize the sampling distribution # Numpy returns a binary [1 x T] vector with a '1' in the index of sampled topic @@ -116,7 +116,7 @@ def _update_peak_assignments( topics, ny_doc_topic, nz_doc_topic, - p_x_subregions, + peak_probs, n_regions, n_topics, alpha, @@ -138,8 +138,8 @@ def _update_peak_assignments( Number of peak-tokens assigned to document (D) per topic (T). nz_doc_topic : (D x T) :obj:`numpy.ndarray` Number of word-tokens assigned to each topic (T) per document (D). - p_x_subregions : (R x T) :obj:`numpy.ndarray` - Probability of generating current x from all subregions. + peak_probs : (D x T x R) :obj:`numpy.ndarray` + Matrix of probabilities of all peaks given all topic/subregion spatial distributions. n_regions : :obj:`int` Total number of regions. n_topics : :obj:`int` @@ -164,52 +164,50 @@ def _update_peak_assignments( new_topics = [] for idx, region in enumerate(regions): topic = topics[idx] - # Compute the probabilities of all subregions given doc - # p(r|d) ~ p(r|t) * p(t|d) - # Counts of subregions per topic + prior: p(r|t) - p_region_g_topic = ny_region_topic + delta - p_region_g_topic[region, topic] -= 1 # Decrement count in Subregion x Topic count matx - # Decrement count-matrices to remove current ptoken_topic_idx - # because ptoken will be reassigned to a new topic - # Normalize the columns such that each topic's distribution over subregions sums to 1 + # Retrieve the probability of generating current x from all + # subregions: [R x T] array of probs + p_x_subregions = (peak_probs[idx, :, :]).transpose() + + # Compute the probability of region given topic p(r|t) + # Counts of subregions per topic + prior: p(r|t) + # Decrement count in Subregion x Topic count matrix + # Normalize the columns such that each topic's distribution over subregions sums to 1 + p_region_g_topic = ny_region_topic + delta + p_region_g_topic[region, topic] -= 1 p_region_g_topic = p_region_g_topic / np.sum(p_region_g_topic, axis=0, keepdims=True) - # Counts of topics per document + prior: p(t|d) + # Compute the probability of topic given doc p(t|d) + # Counts of topics per document + prior: p(t|d) + # Decrement count in Document x Topic count matrix + # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows p_topic_g_doc = ny_doc_topic[idx, :] + alpha - p_topic_g_doc[topic] -= 1 # Decrement count in Document x Topic count matrix - - # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows + p_topic_g_doc[topic] -= 1 p_topic_g_doc = np.array([p_topic_g_doc] * n_regions) - # Compute p(subregion | document): p(r|d) ~ p(r|t) * p(t|d) - # [R x T] array of probs + # Compute the probability of region given doc: p(r|d) ~ p(r|t) * p(t|d) p_region_g_doc = p_topic_g_doc * p_region_g_topic # Compute the multinomial probability: p(z|y) - # Need the current vector of all z and y assignments for current doc - # The multinomial from which z is sampled is proportional to number - # of y assigned to each topic, plus constant gamma - # Compute the proportional probabilities in log-space + # The multinomial from which z is sampled is proportional to number + # of y assigned to each topic, plus constant gamma + # Decrement count in Document x Topic count matrix + # Compute the proportional probabilities in log-space + # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows doc_y_counts = ny_doc_topic[idx, :] + gamma - doc_y_counts[topic] -= 1 # Decrement count in Document x Topic count matrix + doc_y_counts[topic] -= 1 logp = nz_doc_topic[idx, :] * np.log((doc_y_counts + 1) / (doc_y_counts)) - - # Add a constant before exponentiating to avoid any underflow issues p_peak_g_topic = np.exp(logp - np.max(logp)) - - # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows p_peak_g_topic = np.array([p_peak_g_topic] * n_regions) # Get the full sampling distribution: # [R x T] array containing the proportional probability of all y/r combinations - probs_pdf = p_x_subregions[:, :, idx] * p_region_g_doc * p_peak_g_topic - - # Convert from a [R x T] matrix into a [R*T x 1] array we can sample from + # Convert from a [R x T] matrix into a [R*T x 1] array we can sample from + # Normalize the sampling distribution + probs_pdf = p_x_subregions * p_region_g_doc * p_peak_g_topic probs_pdf = np.reshape(probs_pdf, (n_regions * n_topics)) - - # Normalize the sampling distribution probs_pdf = probs_pdf / np.sum(probs_pdf) + # Sample a single element (corresponding to a y_i and c_i assignment for the ptoken) # from the sampling distribution # Returns a binary [1 x R*T] vector with a '1' in location that was sampled @@ -218,13 +216,12 @@ def _update_peak_assignments( # Reshape 1D back to [R x T] 2D assignment_arr = np.reshape(assignment_vec, (n_regions, n_topics)) + # Transform the linear index of the sampled element into the # subregion/topic (r/y) assignment indices assignment_idx = np.where(assignment_arr) - # Subregion sampled (r) - region = assignment_idx[0][0] - # Topic sampled (y) - topic = assignment_idx[1][0] + region = assignment_idx[0][0] # Subregion sampled (r) + topic = assignment_idx[1][0] # Topic sampled (y) new_regions.append(region) new_topics.append(topic) @@ -630,7 +627,6 @@ def _update(self, loglikely_freq=1): is 1 (log-likelihood is updated every iteration). """ self.iter += 1 # Update total iteration count - LGR.debug(f"Iter {self.iter:04d}: Sampling z") self.seed += 1 ( @@ -653,33 +649,29 @@ def _update(self, loglikely_freq=1): ) # Update z-assignments LGR.debug(f"Iter {self.iter:04d}: Sampling y|r") - # Retrieve p(x|r,y) for all subregions - peak_probs = self._get_peak_probs(self).T - self.seed += 1 # Seed random number generator - rng = np.random.default_rng(self.seed) - + self.seed += 1 old_regions, old_topics = self.topics["peak_region_idx"], self.topics["peak_topic_idx"] + peak_probs = self._get_peak_probs(self) # Retrieve p(x|r,y) for all subregions + rng = np.random.default_rng(self.seed) # Seed random number generator - # Create chunks of data to speed up Parallel()(delayed(f)(x) for x in X) + # When the size of X is large and the task is fast, Parallel()(delayed(f)(x) for x in X) + # is very slow. Split data into smaller chuncks to speed up Parallel(). + doc_idx = self.data["ptoken_doc_idx"] + n_docs = len(doc_idx) n_chunks = self.n_cores - n_elements = len(self.data["ptoken_doc_idx"]) - chunk_size = math.ceil(n_elements / n_chunks) + chunk_size = math.ceil(n_docs / n_chunks) chunks = [ [ - old_regions[chunk_i : chunk_i + chunk_size], - old_topics[chunk_i : chunk_i + chunk_size], - self.topics["n_peak_tokens_doc_by_topic"][ - self.data["ptoken_doc_idx"][chunk_i : chunk_i + chunk_size], : - ], - self.topics["n_word_tokens_doc_by_topic"][ - self.data["ptoken_doc_idx"][chunk_i : chunk_i + chunk_size], : - ], - peak_probs[:, :, chunk_i : chunk_i + chunk_size], + old_regions[chk_i : chk_i + chunk_size], + old_topics[chk_i : chk_i + chunk_size], + self.topics["n_peak_tokens_doc_by_topic"][doc_idx[chk_i : chk_i + chunk_size], :], + self.topics["n_word_tokens_doc_by_topic"][doc_idx[chk_i : chk_i + chunk_size], :], + peak_probs[chk_i : chk_i + chunk_size, :, :], ] - for chunk_i in range(0, n_elements, chunk_size) + for chk_i in range(0, n_docs, chunk_size) ] - results = Parallel(n_jobs=self.n_cores)( + results = Parallel(n_jobs=self.n_cores, max_nbytes=None)( delayed(_update_peak_assignments)( self.topics["n_peak_tokens_region_by_topic"], *chunk, @@ -692,26 +684,21 @@ def _update(self, loglikely_freq=1): ) for chunk in chunks ) # Update y-assignments + regions, topics = zip(*results) regions = list(itertools.chain(*regions)) topics = list(itertools.chain(*topics)) - # Decrement count-matrices to remove current ptoken_topic_idx - # because ptoken will be reassigned to a new topic + # Decrement count-matrices to remove previous ptoken_topic_idx (doc_idx) np.subtract.at(self.topics["n_peak_tokens_region_by_topic"], (old_regions, old_topics), 1) - np.subtract.at( - self.topics["n_peak_tokens_doc_by_topic"], (self.data["ptoken_doc_idx"], old_topics), 1 - ) + np.subtract.at(self.topics["n_peak_tokens_doc_by_topic"], (doc_idx, old_topics), 1) - # Update the indices and the count matrices using the sampled y/r assignments + # Update the indices and the count matrices using the regions and topics assignments np.add.at(self.topics["n_peak_tokens_region_by_topic"], (regions, topics), 1) - np.add.at( - self.topics["n_peak_tokens_doc_by_topic"], (self.data["ptoken_doc_idx"], topics), 1 - ) + np.add.at(self.topics["n_peak_tokens_doc_by_topic"], (doc_idx, topics), 1) - # Update y->topic assignment + # Update y->topic and y->subregion assignment self.topics["peak_topic_idx"] = np.array(topics) - # Update y->subregion assignment self.topics["peak_region_idx"] = np.array(regions) LGR.debug(f"Iter {self.iter:04d}: Updating spatial params") From dc2ac6c967a16cb1a05465dcca182e2dd6d49516 Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Tue, 11 Oct 2022 11:55:17 -0400 Subject: [PATCH 17/19] Use an array of seeds for multi-core reproducibility of default_rng() instance --- nimare/annotate/gclda.py | 109 ++++++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 47 deletions(-) diff --git a/nimare/annotate/gclda.py b/nimare/annotate/gclda.py index 1fd21e5d7..f542062b1 100755 --- a/nimare/annotate/gclda.py +++ b/nimare/annotate/gclda.py @@ -111,35 +111,40 @@ def _update_word_topic_assignments( def _update_peak_assignments( - ny_region_topic, + doc_idx, regions, topics, + peak_probs, + seeds, + ny_region_topic, ny_doc_topic, nz_doc_topic, - peak_probs, n_regions, n_topics, alpha, delta, gamma, - rng, ): """Update y / r indicator variables assigning peaks->topics/subregions. Parameters ---------- - ny_region_topic : (R x T) :obj:`numpy.ndarray` - Number of peak-tokens assigned to subregion (region) per topic (T). - regions : :obj:`int` + doc_idx : (P) :obj:`list` + Document indices. + regions : (P) :obj:`numpy.ndarray` Region indices. - topics : :obj:`int` + topics : (P) :obj:`numpy.ndarray` Topic indices. + peak_probs : (P x T x R) :obj:`numpy.ndarray` + Matrix of probabilities of all peaks given all topic/subregion spatial distributions. + seeds : (P) :obj:`numpy.ndarray` + Array of seeds for multi-core reproducibility of default_rng() instance. + ny_region_topic : (R x T) :obj:`numpy.ndarray` + Number of peak-tokens assigned to subregion (region) per topic (T). ny_doc_topic : (D x T) :obj:`numpy.ndarray` Number of peak-tokens assigned to document (D) per topic (T). nz_doc_topic : (D x T) :obj:`numpy.ndarray` Number of word-tokens assigned to each topic (T) per document (D). - peak_probs : (D x T x R) :obj:`numpy.ndarray` - Matrix of probabilities of all peaks given all topic/subregion spatial distributions. n_regions : :obj:`int` Total number of regions. n_topics : :obj:`int` @@ -150,39 +155,40 @@ def _update_peak_assignments( Prior count on subregions for each topic. gamma : :obj:`float` Prior count added to y-counts when sampling z assignments. - rng : :obj:`numpy.random.default_rng` - default_rng() instance Returns ------- - region : :obj:`int` - Subregion sampled (r). - topic : :obj:`int` - Topic sampled (y). + sampled_regions : :obj:`list` + Subregions sampled (r). + sampled_topics : :obj:`list` + Topics sampled (y). """ - new_regions = [] - new_topics = [] - for idx, region in enumerate(regions): - topic = topics[idx] + + sampled_regions = [] + sampled_topics = [] + for i_ptoken, doc in enumerate(doc_idx): + # --- Seed random number generator for reproducibility + rng = np.random.default_rng(seeds[i_ptoken]) + + topic = topics[i_ptoken] + region = regions[i_ptoken] # Retrieve the probability of generating current x from all # subregions: [R x T] array of probs - p_x_subregions = (peak_probs[idx, :, :]).transpose() + p_x_subregions = (peak_probs[i_ptoken, :, :]).transpose() # Compute the probability of region given topic p(r|t) # Counts of subregions per topic + prior: p(r|t) - # Decrement count in Subregion x Topic count matrix # Normalize the columns such that each topic's distribution over subregions sums to 1 p_region_g_topic = ny_region_topic + delta - p_region_g_topic[region, topic] -= 1 + p_region_g_topic[region, topic] -= 1 # Decrement count in Subregion x Topic count matrix p_region_g_topic = p_region_g_topic / np.sum(p_region_g_topic, axis=0, keepdims=True) # Compute the probability of topic given doc p(t|d) # Counts of topics per document + prior: p(t|d) - # Decrement count in Document x Topic count matrix # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows - p_topic_g_doc = ny_doc_topic[idx, :] + alpha - p_topic_g_doc[topic] -= 1 + p_topic_g_doc = ny_doc_topic[doc, :] + alpha + p_topic_g_doc[topic] -= 1 # Decrement count in Document (doc) x Topic count matrix p_topic_g_doc = np.array([p_topic_g_doc] * n_regions) # Compute the probability of region given doc: p(r|d) ~ p(r|t) * p(t|d) @@ -191,12 +197,11 @@ def _update_peak_assignments( # Compute the multinomial probability: p(z|y) # The multinomial from which z is sampled is proportional to number # of y assigned to each topic, plus constant gamma - # Decrement count in Document x Topic count matrix # Compute the proportional probabilities in log-space # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows - doc_y_counts = ny_doc_topic[idx, :] + gamma - doc_y_counts[topic] -= 1 - logp = nz_doc_topic[idx, :] * np.log((doc_y_counts + 1) / (doc_y_counts)) + doc_y_counts = ny_doc_topic[doc, :] + gamma + doc_y_counts[topic] -= 1 # Decrement count in Document (doc) x Topic count matrix + logp = nz_doc_topic[doc, :] * np.log((doc_y_counts + 1) / (doc_y_counts)) p_peak_g_topic = np.exp(logp - np.max(logp)) p_peak_g_topic = np.array([p_peak_g_topic] * n_regions) @@ -223,10 +228,10 @@ def _update_peak_assignments( region = assignment_idx[0][0] # Subregion sampled (r) topic = assignment_idx[1][0] # Topic sampled (y) - new_regions.append(region) - new_topics.append(topic) + sampled_regions.append(region) + sampled_topics.append(topic) - return new_regions, new_topics + return sampled_regions, sampled_topics class GCLDAModel(NiMAREBase): @@ -234,6 +239,10 @@ class GCLDAModel(NiMAREBase): This model was originally described in :footcite:t:`rubin2017decoding`. + .. versionchanged:: 0.0.13 + + * New parameter: `n_cores`. Number of cores to use for parallelization. + .. versionchanged:: 0.0.8 * [ENH] Support symmetric GC-LDA topics with more than two subregions. @@ -652,35 +661,37 @@ def _update(self, loglikely_freq=1): self.seed += 1 old_regions, old_topics = self.topics["peak_region_idx"], self.topics["peak_topic_idx"] peak_probs = self._get_peak_probs(self) # Retrieve p(x|r,y) for all subregions - rng = np.random.default_rng(self.seed) # Seed random number generator + np.random.seed(self.seed) # Seed random number generator # When the size of X is large and the task is fast, Parallel()(delayed(f)(x) for x in X) # is very slow. Split data into smaller chuncks to speed up Parallel(). - doc_idx = self.data["ptoken_doc_idx"] - n_docs = len(doc_idx) + n_ptoken = len(self.data["ptoken_doc_idx"]) n_chunks = self.n_cores - chunk_size = math.ceil(n_docs / n_chunks) + chunk_size = math.ceil(n_ptoken / n_chunks) + # Array of seeds for multi-core reproducibility + seeds = np.random.randint(1e8, size=n_ptoken) chunks = [ [ - old_regions[chk_i : chk_i + chunk_size], - old_topics[chk_i : chk_i + chunk_size], - self.topics["n_peak_tokens_doc_by_topic"][doc_idx[chk_i : chk_i + chunk_size], :], - self.topics["n_word_tokens_doc_by_topic"][doc_idx[chk_i : chk_i + chunk_size], :], - peak_probs[chk_i : chk_i + chunk_size, :, :], + self.data["ptoken_doc_idx"][chunk_i : chunk_i + chunk_size], + old_regions[chunk_i : chunk_i + chunk_size], + old_topics[chunk_i : chunk_i + chunk_size], + peak_probs[chunk_i : chunk_i + chunk_size, :, :], + seeds[chunk_i : chunk_i + chunk_size], ] - for chk_i in range(0, n_docs, chunk_size) + for chunk_i in range(0, n_ptoken, chunk_size) ] results = Parallel(n_jobs=self.n_cores, max_nbytes=None)( delayed(_update_peak_assignments)( - self.topics["n_peak_tokens_region_by_topic"], *chunk, + self.topics["n_peak_tokens_region_by_topic"], + self.topics["n_peak_tokens_doc_by_topic"], + self.topics["n_word_tokens_doc_by_topic"], self.params["n_regions"], self.params["n_topics"], self.params["alpha"], self.params["delta"], self.params["gamma"], - rng, ) for chunk in chunks ) # Update y-assignments @@ -689,13 +700,17 @@ def _update(self, loglikely_freq=1): regions = list(itertools.chain(*regions)) topics = list(itertools.chain(*topics)) - # Decrement count-matrices to remove previous ptoken_topic_idx (doc_idx) + # Decrement count-matrices to remove previous self.data["ptoken_doc_idx"] np.subtract.at(self.topics["n_peak_tokens_region_by_topic"], (old_regions, old_topics), 1) - np.subtract.at(self.topics["n_peak_tokens_doc_by_topic"], (doc_idx, old_topics), 1) + np.subtract.at( + self.topics["n_peak_tokens_doc_by_topic"], (self.data["ptoken_doc_idx"], old_topics), 1 + ) # Update the indices and the count matrices using the regions and topics assignments np.add.at(self.topics["n_peak_tokens_region_by_topic"], (regions, topics), 1) - np.add.at(self.topics["n_peak_tokens_doc_by_topic"], (doc_idx, topics), 1) + np.add.at( + self.topics["n_peak_tokens_doc_by_topic"], (self.data["ptoken_doc_idx"], topics), 1 + ) # Update y->topic and y->subregion assignment self.topics["peak_topic_idx"] = np.array(topics) From fc7c5e637b3e4c1d2f297bab19eca14b467180b7 Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Tue, 11 Oct 2022 11:58:17 -0400 Subject: [PATCH 18/19] Update gclda.py --- nimare/annotate/gclda.py | 1 - 1 file changed, 1 deletion(-) diff --git a/nimare/annotate/gclda.py b/nimare/annotate/gclda.py index f542062b1..058bb45ef 100755 --- a/nimare/annotate/gclda.py +++ b/nimare/annotate/gclda.py @@ -163,7 +163,6 @@ def _update_peak_assignments( sampled_topics : :obj:`list` Topics sampled (y). """ - sampled_regions = [] sampled_topics = [] for i_ptoken, doc in enumerate(doc_idx): From 0d2939cabdf2bc367a5d79453f95e598b832cace Mon Sep 17 00:00:00 2001 From: JulioAPeraza Date: Mon, 24 Oct 2022 14:52:19 -0400 Subject: [PATCH 19/19] Parallelize `_update_word_topic_assignments()` --- nimare/annotate/gclda.py | 170 ++++++++++++++++++++++----------------- 1 file changed, 98 insertions(+), 72 deletions(-) diff --git a/nimare/annotate/gclda.py b/nimare/annotate/gclda.py index 058bb45ef..91fc0865a 100755 --- a/nimare/annotate/gclda.py +++ b/nimare/annotate/gclda.py @@ -9,7 +9,6 @@ import pandas as pd from joblib import Parallel, delayed from nilearn._utils import load_niimg -from numba import jit from scipy.stats import multivariate_normal from nimare.base import NiMAREBase @@ -18,63 +17,54 @@ LGR = logging.getLogger(__name__) -@jit(nopython=True) def _update_word_topic_assignments( + word_idx, + word_doc_idx, word_topic_idx, + seeds, nz_word_topic, nz_sum_topic, - nz_doc_topic, ny_doc_topic, - word_doc_idx, - word_idx, voc_len, beta, gamma, - randseed, ): """Update wtoken_topic_idx (z) indicator variables assigning words->topics. Parameters ---------- - word_topic_idx : :obj:`numpy.ndarray` + word_idx : (WT) :obj:`list` + Word tokens. + word_doc_idx : (WT) :obj:`list` + Document index. + word_topic_idx : (WT) :obj:`numpy.ndarray` Topic assignment for word tokens. + seeds : (WT) :obj:`numpy.ndarray` + Array of seeds for multi-core reproducibility of default_rng() instance. nz_word_topic : (W x T) :obj:`numpy.ndarray` Number of word-tokens assigned to each topic (T) per word-type (W). nz_sum_topic : (1 x T) :obj:`numpy.ndarray` Total number of word-tokens assigned to each topic (T) (across all docs). - nz_doc_topic : (D x T) :obj:`numpy.ndarray` - Number of word-tokens assigned to each topic (T) per document (D). ny_doc_topic : (D x T) :obj:`numpy.ndarray` Number of peak-tokens assigned to each topic (T) per document (D). - word_doc_idx : :obj:`numpy.ndarray` - Document index. - word_idx : :obj:`numpy.ndarray` - Word tokens. voc_len : :obj:`int` Total number of word-tokens in the vocabulary. beta : :obj:`float` Prior count on word-types for each topic. gamma : :obj:`float` Prior count added to y-counts when sampling z assignments. - randseed : :obj:`int` - Random seed for this iteration. Returns ------- - word_topic_idx : :obj:`numpy.ndarray` - Updated topic assignment for word tokens. - nz_word_topic : (W x T) :obj:`numpy.ndarray` - Updated number of word-tokens assigned to each topic (T) per word-type (W). - nz_sum_topic : (1 x T) :obj:`numpy.ndarray` - Updated total number of word-tokens assigned to each topic (T) (across all docs). - nz_doc_topic : (D x T) :obj:`numpy.ndarray` - Updated number of word-tokens assigned to each topic (T) per document (D). + sampled_topics : :obj:`list` + Topics sampled (z). """ - # --- Seed random number generator - np.random.seed(randseed) - + sampled_topics = [] # Loop over all word tokens for i_wtoken, word in enumerate(word_idx): + # --- Seed random number generator for reproducibility + rng = np.random.default_rng(seeds[i_wtoken]) + # Find document in which word token (not just word) appears doc = word_doc_idx[i_wtoken] # current topic assignment for word token w_i @@ -82,14 +72,15 @@ def _update_word_topic_assignments( # Decrement count-matrices to remove current wtoken_topic_idx # because wtoken will be reassigned to a new topic - nz_word_topic[word, topic] -= 1 - nz_sum_topic[0, topic] -= 1 - nz_doc_topic[doc, topic] -= 1 + nz_topic = nz_word_topic[word, :] + beta + nz_topic[topic] -= 1 + sum_topic_counts = nz_sum_topic + (beta * voc_len) + sum_topic_counts[0, topic] -= 1 # Get sampling distribution: # p(z_i|z,d,w) ~ p(w|t) * p(t|d) # ~ p_w_t * p_topic_g_doc - p_word_g_topic = (nz_word_topic[word, :] + beta) / (nz_sum_topic + (beta * voc_len)) + p_word_g_topic = nz_topic / sum_topic_counts p_topic_g_doc = ny_doc_topic[doc, :] + gamma probs = p_word_g_topic * p_topic_g_doc # The unnormalized sampling distribution @@ -97,17 +88,13 @@ def _update_word_topic_assignments( probs = probs.reshape(-1) / np.sum(probs) # Normalize the sampling distribution # Numpy returns a binary [1 x T] vector with a '1' in the index of sampled topic # and zeros everywhere else - assigned_topic_vec = np.random.multinomial(1, probs) + assigned_topic_vec = rng.multinomial(1, probs) # Extract selected topic from vector topic = np.where(assigned_topic_vec)[0][0] - # Update the indices and the count matrices using the sampled z assignment - word_topic_idx[i_wtoken] = topic # Update w_i topic-assignment - nz_word_topic[word, topic] += 1 - nz_sum_topic[0, topic] += 1 - nz_doc_topic[doc, topic] += 1 + sampled_topics.append(topic) - return word_topic_idx, nz_word_topic, nz_sum_topic, nz_doc_topic + return sampled_topics def _update_peak_assignments( @@ -637,60 +624,99 @@ def _update(self, loglikely_freq=1): self.iter += 1 # Update total iteration count LGR.debug(f"Iter {self.iter:04d}: Sampling z") self.seed += 1 - ( - self.topics["wtoken_topic_idx"], + np.random.seed(self.seed) # Seed random number generator + + old_wt_idx = self.topics["wtoken_topic_idx"] + + n_wtoken = len(self.data["wtoken_word_idx"]) + chunk_size = math.ceil(n_wtoken / self.n_cores) + seeds = np.random.randint(1e8, size=n_wtoken) # Array of seeds for multicore reproducib... + chunks = [ + { + "word_idx": self.data["wtoken_word_idx"][chunk_i : chunk_i + chunk_size], + "word_doc_idx": self.data["wtoken_doc_idx"][chunk_i : chunk_i + chunk_size], + "word_topic_idx": self.topics["wtoken_topic_idx"][chunk_i : chunk_i + chunk_size], + "seeds": seeds[chunk_i : chunk_i + chunk_size], + } + for chunk_i in range(0, n_wtoken, chunk_size) + ] + + results = Parallel(n_jobs=self.n_cores, max_nbytes=None)( + delayed(_update_word_topic_assignments)( + **chunk, + nz_word_topic=self.topics["n_word_tokens_word_by_topic"], + nz_sum_topic=self.topics["total_n_word_tokens_by_topic"], + ny_doc_topic=self.topics["n_peak_tokens_doc_by_topic"], + voc_len=len(self.vocabulary), + beta=self.params["beta"], + gamma=self.params["gamma"], + ) + for chunk in chunks + ) # Update z-assignments + sampled_wt_idx = list(itertools.chain(*results)) + + # Decrement count-matrices to remove current wtoken_topic_idx + # because wtoken will be reassigned to a new topic + np.subtract.at( self.topics["n_word_tokens_word_by_topic"], - self.topics["total_n_word_tokens_by_topic"], + (self.data["wtoken_word_idx"], old_wt_idx), + 1, + ) + np.subtract.at(self.topics["total_n_word_tokens_by_topic"], (0, old_wt_idx), 1) + np.subtract.at( self.topics["n_word_tokens_doc_by_topic"], - ) = _update_word_topic_assignments( - self.topics["wtoken_topic_idx"], + (self.data["wtoken_doc_idx"], old_wt_idx), + 1, + ) + # Update the indices and the count matrices using the sampled z assignment + np.add.at( self.topics["n_word_tokens_word_by_topic"], - self.topics["total_n_word_tokens_by_topic"], + (self.data["wtoken_word_idx"], sampled_wt_idx), + 1, + ) + np.add.at(self.topics["total_n_word_tokens_by_topic"], (0, sampled_wt_idx), 1) + np.add.at( self.topics["n_word_tokens_doc_by_topic"], - self.topics["n_peak_tokens_doc_by_topic"], - np.array(self.data["wtoken_doc_idx"]), - np.array(self.data["wtoken_word_idx"]), - len(self.vocabulary), - self.params["beta"], - self.params["gamma"], - self.seed, - ) # Update z-assignments + (self.data["wtoken_doc_idx"], sampled_wt_idx), + 1, + ) + # Update w_i topic-assignment + self.topics["wtoken_topic_idx"] = np.array(sampled_wt_idx) LGR.debug(f"Iter {self.iter:04d}: Sampling y|r") self.seed += 1 + np.random.seed(self.seed) # Seed random number generator + old_regions, old_topics = self.topics["peak_region_idx"], self.topics["peak_topic_idx"] peak_probs = self._get_peak_probs(self) # Retrieve p(x|r,y) for all subregions - np.random.seed(self.seed) # Seed random number generator # When the size of X is large and the task is fast, Parallel()(delayed(f)(x) for x in X) # is very slow. Split data into smaller chuncks to speed up Parallel(). n_ptoken = len(self.data["ptoken_doc_idx"]) - n_chunks = self.n_cores - chunk_size = math.ceil(n_ptoken / n_chunks) - # Array of seeds for multi-core reproducibility - seeds = np.random.randint(1e8, size=n_ptoken) + chunk_size = math.ceil(n_ptoken / self.n_cores) + seeds = np.random.randint(1e8, size=n_ptoken) # Array of seeds for multicore reproducib... chunks = [ - [ - self.data["ptoken_doc_idx"][chunk_i : chunk_i + chunk_size], - old_regions[chunk_i : chunk_i + chunk_size], - old_topics[chunk_i : chunk_i + chunk_size], - peak_probs[chunk_i : chunk_i + chunk_size, :, :], - seeds[chunk_i : chunk_i + chunk_size], - ] + { + "doc_idx": self.data["ptoken_doc_idx"][chunk_i : chunk_i + chunk_size], + "regions": old_regions[chunk_i : chunk_i + chunk_size], + "topics": old_topics[chunk_i : chunk_i + chunk_size], + "peak_probs": peak_probs[chunk_i : chunk_i + chunk_size, :, :], + "seeds": seeds[chunk_i : chunk_i + chunk_size], + } for chunk_i in range(0, n_ptoken, chunk_size) ] results = Parallel(n_jobs=self.n_cores, max_nbytes=None)( delayed(_update_peak_assignments)( - *chunk, - self.topics["n_peak_tokens_region_by_topic"], - self.topics["n_peak_tokens_doc_by_topic"], - self.topics["n_word_tokens_doc_by_topic"], - self.params["n_regions"], - self.params["n_topics"], - self.params["alpha"], - self.params["delta"], - self.params["gamma"], + **chunk, + ny_region_topic=self.topics["n_peak_tokens_region_by_topic"], + ny_doc_topic=self.topics["n_peak_tokens_doc_by_topic"], + nz_doc_topic=self.topics["n_word_tokens_doc_by_topic"], + n_regions=self.params["n_regions"], + n_topics=self.params["n_topics"], + alpha=self.params["alpha"], + delta=self.params["delta"], + gamma=self.params["gamma"], ) for chunk in chunks ) # Update y-assignments