From 4e4ab09de503ccb3605fb617b9dfa333ac1f8952 Mon Sep 17 00:00:00 2001 From: Jochem Smit Date: Mon, 10 Feb 2020 16:24:43 +0100 Subject: [PATCH] docstrings and cleanup, removal of redundant code. --- pyhdx/pyhdx.py | 194 +++++++++++++++++++++++++++++-------------------- 1 file changed, 115 insertions(+), 79 deletions(-) diff --git a/pyhdx/pyhdx.py b/pyhdx/pyhdx.py index bc0e4157..068aedbe 100644 --- a/pyhdx/pyhdx.py +++ b/pyhdx/pyhdx.py @@ -28,14 +28,21 @@ HEADER = 'Protein,Start,End,Sequence,Modification,Fragment,MaxUptake,MHP,State,Exposure,Center,Center SD,Uptake,Uptake SD,RT,RT SD' + class PeptideCSVFile(object): + """ + Input object of DynamX HDX .csv file + + Parameters + ---------- + file_path : :obj:`str` + File path of the .csv file + drop_first : :obj:`int` + Number of N-terminal amino acids to ignore. Default is 1. + """ def __init__(self, file_path, drop_first=1): - """ - :param file_path: filename or generator to read (StringIO) - :param drop_first: number of N-terminal amino acids to ignore - """ - names = names = [t[0] for t in CSV_DTYPE] + names = [t[0] for t in CSV_DTYPE] self.data = np.genfromtxt(file_path, skip_header=1, delimiter=',', dtype=None, names=names, encoding='UTF-8') if drop_first: self.data['start'] += drop_first @@ -44,6 +51,26 @@ def __init__(self, file_path, drop_first=1): self.data['sequence'] = new_seq def return_by_name(self, control_state, control_exposure): + #todo return dictionary of kinetic series instead + + """ + + Finds all peptides in the dataset which match the control peptides and the peptides are grouped by their state + and exposure and returned in a dictionary. + + Parameters + ---------- + control_state : :obj:`str` + Name of the control state + control_exposure : :obj:`float` + Exposure time of the control + + Returns + ------- + out : :obj:`dict` + Dictionary of :class:`~pyhdx.pyhdx.PeptideMeasurement` objects + + """ bc1 = self.data['state'] == control_state bc2 = self.data['exposure'] == control_exposure @@ -52,6 +79,7 @@ def return_by_name(self, control_state, control_exposure): exp = np.unique(self.data['exposure']) out = {} + # Iterative over all permutations of state and exposure time to find all entries for s, e in itertools.product(st, exp): b1 = self.data['state'] == s b2 = self.data['exposure'] == e @@ -73,55 +101,83 @@ def return_by_name(self, control_state, control_exposure): assert np.all(data_final['end'] == control_final['end']) score = 100 * data_final['uptake'] / control_final['uptake'] - #TODO finalize name - #bs = score < 100 - #out[name] = PeptideMeasurements(d[bs], score[bs]) if len(score) > 0: out[name] = PeptideMeasurements(data_final, score) return out def get_data(self, state, exposure): - return self.data[np.logical_and(self.data['state'] == state, self.data['exposure'] == exposure)] - - def return_measurements(self, i_control, i_max): - # Deprecated - others = [i for i in range(i_max) if i != i_control] - - control = self.data[i_control::i_max]['uptake'] - out = {} - for i in others: - d = self.data[i::i_max] - score = 100*d['uptake'] / control - name = d['state'][0] + '_' + str(d['exposure'][0]) - out[name] = PeptideMeasurements(d, score) + """ + Get all data matching the supplied state and exposure. + + Parameters + ---------- + state : :obj:`str` + Measurement state + exposure : :obj:`float` + Measurement exposure time + + Returns + ------- + output_data : :class:`~numpy.ndarray` + Numpy structured array with selected peptides + """ - return out + output_data = self.data[np.logical_and(self.data['state'] == state, self.data['exposure'] == exposure)] + return output_data class PeptideMeasurements(object): - def __init__(self, data, scores=None, min_len=0): - """data: structured array with at least the fields 'start', 'end', 'sequence' 'exposure' - - """ + """ + Class with subset of peptides corresponding to only one state and exposure + + Parameters + ---------- + data : :class`~numpy.ndarray` + Numpy structured array with in put data + scores : :class:`~numpy.ndarray` + Array with D/H uptake scores, typically in percentages or absolute uptake numbers. + + Attributes + ---------- + start : :obj:`int` + First peptide starts at this residue number (starting from 1) + stop : :obj:`int` + Last peptide ends at this residue number (incusive) + prot_len : :obj:`int` + Total number of residues in this set of peptides, not taking regions of no coverage into account. + exposure : :obj:`float` + Exposure time of this set of peptides (minutes) + state : :obj:`string` + State describing the experiment + + bigX + X + + properties: + big_x_norm + x_norm + + scores nnls + scores lsq + + + + """ + + def __init__(self, data, scores=None): + assert len(np.unique(data['exposure'])) == 1, 'Exposure entries are not unique' + assert len(np.unique(self.data['state'])) == 1, 'State entries are not unique' + if scores is not None: + assert len(scores) == len(data), 'Length of scores must match the length of the data (number of peptides)' self.data = data self.start = np.min(self.data['start']) - self.stop = np.max(self.data['end']) - self.prot_len = self.stop - self.start + 1 - - - #todo properties - if len(np.unique(self.data['exposure'])) == 1: - self.exposure = self.data['exposure'][0] - else: - self.exposure = None + self.stop = np.max(self.data['end']) #todo refactor to end + self.prot_len = self.stop - self.start + 1 # Total number of amino acids described by these measurments - if len(np.unique(self.data['state'])) == 1: - self.state = self.data['state'][0] - else: - self.state = None - - self.scores = scores if scores is not None else None # TODO currently needs scores for len + self.exposure = self.data['exposure'][0] + self.state = self.data['state'][0] + self.scores = scores #if scores is not None else None # TODO currently needs scores for len self.big_X = np.zeros((len(data), self.prot_len), dtype=float) for row, entry in enumerate(self.data): @@ -133,7 +189,6 @@ def __init__(self, data, scores=None, min_len=0): ## sectioning combinations, num_letters = self.get_combinations(len(data)) - uid_mx = np.zeros((len(data), self.prot_len), dtype='U' + str(num_letters + 4)) for c, (row, entry) in zip(combinations, enumerate(data)): i0 = entry['start'] - self.start @@ -148,7 +203,6 @@ def __init__(self, data, scores=None, min_len=0): combinations, n = self.get_combinations(len(regions), prefix='gap_') for r, c in zip(regions, combinations): big_sum[r[0]:r[1]] = c - #big_sum[b] = combinations vals, idx, counts = np.unique(big_sum, return_index=True, return_counts=True) vals = vals[np.argsort(idx)] # @@ -160,35 +214,6 @@ def __init__(self, data, scores=None, min_len=0): gaps = vals.astype(' 0: - merged_counts = np.array([], dtype=int) - rr_prev = 0 - for rl, rr in contiguous_regions(self.counts <= 2): - merged_counts = np.append(merged_counts, self.counts[rr_prev:rl]) - rr_prev = rr - s = self.counts[rl:rr].sum() - merged_counts = np.append(merged_counts, s) - - final_counts = merged_counts.copy() - for i, e in enumerate(merged_counts): - if e <= 2: - final_counts[i] -= e - if i == 0: - final_counts[i + 1] += e - elif i == len(merged_counts) - 1: - final_counts[i - 1] += e - elif final_counts[i + 1] < final_counts[i - 1]: - final_counts[i + 1] += e - else: - final_counts[i - 1] += e - final_counts = final_counts[final_counts > 0] - assert final_counts.sum() == self.counts.sum() - self.counts = final_counts - cs = np.cumsum(self.counts) - - # Matrix of N columns with N equal to sections of identical residues. # Values are the number of residues in the blocks self.X = np.zeros((len(data), len(self.counts)), dtype=float) @@ -227,10 +252,6 @@ def big_X_norm(self): def big_X_sq_norm(self): return self.big_X**2 / np.sum(self.big_X**2, axis=0)[np.newaxis, :] - def get_scores(self, method): - if method == 'avg': - return self.scores_average - @property def scores_average(self): return self.big_X_norm.T.dot(self.scores) @@ -249,16 +270,31 @@ def scores_nnls_tikonov(self, reg): return np.repeat(x, self.counts) def scores_nnls(self): - print(np.any(np.isnan(self.X_norm))) - print(np.any(np.isinf(self.X_norm))) x = scipy.optimize.nnls(self.X_norm, self.scores,)[0] return np.repeat(x, self.counts) def calc_scores(self, residue_scores): - return self.big_X.dot(residue_scores) + """ + Calculates uptake scores per peptide given an array of individual residue scores + + Parameters + ---------- + residue_scores : :class:`~numpy.ndarray` + Array of scores per residue of length `prot_len` + + Returns + ------- + + scores : :class`~numpy.ndarray` + Array of scores per peptide + """ + + scores = self.big_X.dot(residue_scores) + return scores @property def sequence(self): + """:obj:`str: String of the full protein sequence. Gaps of no coverage are filled with Prolines.""" seq = np.full(self.stop, 'P', dtype='U') for d in self.data: i = d['start'] - 1