Skip to content

Commit

Permalink
docstrings and cleanup, removal of redundant code.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jhsmit committed Feb 10, 2020
1 parent 64136a2 commit 4e4ab09
Showing 1 changed file with 115 additions and 79 deletions.
194 changes: 115 additions & 79 deletions pyhdx/pyhdx.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,21 @@

HEADER = 'Protein,Start,End,Sequence,Modification,Fragment,MaxUptake,MHP,State,Exposure,Center,Center SD,Uptake,Uptake SD,RT,RT SD'


class PeptideCSVFile(object):
"""
Input object of DynamX HDX .csv file
Parameters
----------
file_path : :obj:`str`
File path of the .csv file
drop_first : :obj:`int`
Number of N-terminal amino acids to ignore. Default is 1.
"""
def __init__(self, file_path, drop_first=1):
"""

:param file_path: filename or generator to read (StringIO)
:param drop_first: number of N-terminal amino acids to ignore
"""
names = names = [t[0] for t in CSV_DTYPE]
names = [t[0] for t in CSV_DTYPE]
self.data = np.genfromtxt(file_path, skip_header=1, delimiter=',', dtype=None, names=names, encoding='UTF-8')
if drop_first:
self.data['start'] += drop_first
Expand All @@ -44,6 +51,26 @@ def __init__(self, file_path, drop_first=1):
self.data['sequence'] = new_seq

def return_by_name(self, control_state, control_exposure):
#todo return dictionary of kinetic series instead

"""
Finds all peptides in the dataset which match the control peptides and the peptides are grouped by their state
and exposure and returned in a dictionary.
Parameters
----------
control_state : :obj:`str`
Name of the control state
control_exposure : :obj:`float`
Exposure time of the control
Returns
-------
out : :obj:`dict`
Dictionary of :class:`~pyhdx.pyhdx.PeptideMeasurement` objects
"""
bc1 = self.data['state'] == control_state
bc2 = self.data['exposure'] == control_exposure

Expand All @@ -52,6 +79,7 @@ def return_by_name(self, control_state, control_exposure):
exp = np.unique(self.data['exposure'])

out = {}
# Iterative over all permutations of state and exposure time to find all entries
for s, e in itertools.product(st, exp):
b1 = self.data['state'] == s
b2 = self.data['exposure'] == e
Expand All @@ -73,55 +101,83 @@ def return_by_name(self, control_state, control_exposure):
assert np.all(data_final['end'] == control_final['end'])
score = 100 * data_final['uptake'] / control_final['uptake']

#TODO finalize name
#bs = score < 100
#out[name] = PeptideMeasurements(d[bs], score[bs])
if len(score) > 0:
out[name] = PeptideMeasurements(data_final, score)

return out

def get_data(self, state, exposure):
return self.data[np.logical_and(self.data['state'] == state, self.data['exposure'] == exposure)]

def return_measurements(self, i_control, i_max):
# Deprecated
others = [i for i in range(i_max) if i != i_control]

control = self.data[i_control::i_max]['uptake']
out = {}
for i in others:
d = self.data[i::i_max]
score = 100*d['uptake'] / control
name = d['state'][0] + '_' + str(d['exposure'][0])
out[name] = PeptideMeasurements(d, score)
"""
Get all data matching the supplied state and exposure.
Parameters
----------
state : :obj:`str`
Measurement state
exposure : :obj:`float`
Measurement exposure time
Returns
-------
output_data : :class:`~numpy.ndarray`
Numpy structured array with selected peptides
"""

return out
output_data = self.data[np.logical_and(self.data['state'] == state, self.data['exposure'] == exposure)]
return output_data


class PeptideMeasurements(object):
def __init__(self, data, scores=None, min_len=0):
"""data: structured array with at least the fields 'start', 'end', 'sequence' 'exposure'
"""
"""
Class with subset of peptides corresponding to only one state and exposure
Parameters
----------
data : :class`~numpy.ndarray`
Numpy structured array with in put data
scores : :class:`~numpy.ndarray`
Array with D/H uptake scores, typically in percentages or absolute uptake numbers.
Attributes
----------
start : :obj:`int`
First peptide starts at this residue number (starting from 1)
stop : :obj:`int`
Last peptide ends at this residue number (incusive)
prot_len : :obj:`int`
Total number of residues in this set of peptides, not taking regions of no coverage into account.
exposure : :obj:`float`
Exposure time of this set of peptides (minutes)
state : :obj:`string`
State describing the experiment
bigX
X
properties:
big_x_norm
x_norm
scores nnls
scores lsq
"""

def __init__(self, data, scores=None):
assert len(np.unique(data['exposure'])) == 1, 'Exposure entries are not unique'
assert len(np.unique(self.data['state'])) == 1, 'State entries are not unique'
if scores is not None:
assert len(scores) == len(data), 'Length of scores must match the length of the data (number of peptides)'
self.data = data
self.start = np.min(self.data['start'])
self.stop = np.max(self.data['end'])
self.prot_len = self.stop - self.start + 1


#todo properties
if len(np.unique(self.data['exposure'])) == 1:
self.exposure = self.data['exposure'][0]
else:
self.exposure = None
self.stop = np.max(self.data['end']) #todo refactor to end
self.prot_len = self.stop - self.start + 1 # Total number of amino acids described by these measurments

if len(np.unique(self.data['state'])) == 1:
self.state = self.data['state'][0]
else:
self.state = None

self.scores = scores if scores is not None else None # TODO currently needs scores for len
self.exposure = self.data['exposure'][0]
self.state = self.data['state'][0]
self.scores = scores #if scores is not None else None # TODO currently needs scores for len

self.big_X = np.zeros((len(data), self.prot_len), dtype=float)
for row, entry in enumerate(self.data):
Expand All @@ -133,7 +189,6 @@ def __init__(self, data, scores=None, min_len=0):

## sectioning
combinations, num_letters = self.get_combinations(len(data))

uid_mx = np.zeros((len(data), self.prot_len), dtype='U' + str(num_letters + 4))
for c, (row, entry) in zip(combinations, enumerate(data)):
i0 = entry['start'] - self.start
Expand All @@ -148,7 +203,6 @@ def __init__(self, data, scores=None, min_len=0):
combinations, n = self.get_combinations(len(regions), prefix='gap_')
for r, c in zip(regions, combinations):
big_sum[r[0]:r[1]] = c
#big_sum[b] = combinations

vals, idx, counts = np.unique(big_sum, return_index=True, return_counts=True)
vals = vals[np.argsort(idx)] #
Expand All @@ -160,35 +214,6 @@ def __init__(self, data, scores=None, min_len=0):
gaps = vals.astype('<U4') == 'gap_'
self.states[gaps] = 0

# dont use this, doesnt work
# this is an attempt at removing small regions
if min_len > 0:
merged_counts = np.array([], dtype=int)
rr_prev = 0
for rl, rr in contiguous_regions(self.counts <= 2):
merged_counts = np.append(merged_counts, self.counts[rr_prev:rl])
rr_prev = rr
s = self.counts[rl:rr].sum()
merged_counts = np.append(merged_counts, s)

final_counts = merged_counts.copy()
for i, e in enumerate(merged_counts):
if e <= 2:
final_counts[i] -= e
if i == 0:
final_counts[i + 1] += e
elif i == len(merged_counts) - 1:
final_counts[i - 1] += e
elif final_counts[i + 1] < final_counts[i - 1]:
final_counts[i + 1] += e
else:
final_counts[i - 1] += e
final_counts = final_counts[final_counts > 0]
assert final_counts.sum() == self.counts.sum()
self.counts = final_counts
cs = np.cumsum(self.counts)


# Matrix of N columns with N equal to sections of identical residues.
# Values are the number of residues in the blocks
self.X = np.zeros((len(data), len(self.counts)), dtype=float)
Expand Down Expand Up @@ -227,10 +252,6 @@ def big_X_norm(self):
def big_X_sq_norm(self):
return self.big_X**2 / np.sum(self.big_X**2, axis=0)[np.newaxis, :]

def get_scores(self, method):
if method == 'avg':
return self.scores_average

@property
def scores_average(self):
return self.big_X_norm.T.dot(self.scores)
Expand All @@ -249,16 +270,31 @@ def scores_nnls_tikonov(self, reg):
return np.repeat(x, self.counts)

def scores_nnls(self):
print(np.any(np.isnan(self.X_norm)))
print(np.any(np.isinf(self.X_norm)))
x = scipy.optimize.nnls(self.X_norm, self.scores,)[0]
return np.repeat(x, self.counts)

def calc_scores(self, residue_scores):
return self.big_X.dot(residue_scores)
"""
Calculates uptake scores per peptide given an array of individual residue scores
Parameters
----------
residue_scores : :class:`~numpy.ndarray`
Array of scores per residue of length `prot_len`
Returns
-------
scores : :class`~numpy.ndarray`
Array of scores per peptide
"""

scores = self.big_X.dot(residue_scores)
return scores

@property
def sequence(self):
""":obj:`str: String of the full protein sequence. Gaps of no coverage are filled with Prolines."""
seq = np.full(self.stop, 'P', dtype='U')
for d in self.data:
i = d['start'] - 1
Expand Down

0 comments on commit 4e4ab09

Please sign in to comment.