Skip to content

Commit

Permalink
Merge pull request #41 from shenshan/master
Browse files Browse the repository at this point in the history
Auto-update the probe insertion qc; update probe insertion uuid
  • Loading branch information
Thinh Nguyen authored Mar 9, 2021
2 parents 69fa2a2 + 0fd61cf commit b729022
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 1,158 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,4 @@ data/*
# others
notebooks/notebooks_dev/*
clear_things_up.txt
scripts/updates/problematic_keys.npy
4 changes: 0 additions & 4 deletions ibl_pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,4 @@ class S3Access(dj.Manual):
bucket='ibl-dj-external',
location='/plotting'
),
'ephys_local': dict(
protocol='file',
location='/data/ephys'
)
}
34 changes: 27 additions & 7 deletions ibl_pipeline/ephys.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class CompleteClusterSession(dj.Computed):
'clusters.metrics.pqt',
'clusters.peakToTrough.npy',
'clusters.uuids.csv',
'clusters.metrics.pqt',
'clusters.waveforms.npy',
'clusters.waveformsChannels.npy',
'spikes.amps.npy',
Expand Down Expand Up @@ -124,6 +125,17 @@ class ProbeInsertion(dj.Imported):
"""


@schema
class ProbeInsertionMissingDataLog(dj.Manual):
definition = """
# probe insertion missing cluster data
-> ProbeInsertion
missing_data : varchar(255)
---
error_message : varchar(1024)
"""


@schema
class ChannelGroup(dj.Imported):
definition = """
Expand All @@ -137,7 +149,8 @@ class ChannelGroup(dj.Imported):

key_source = ProbeInsertion \
& (data.FileRecord & 'dataset_name="channels.rawInd.npy"') \
& (data.FileRecord & 'dataset_name="channels.localCoordinates.npy"')
& (data.FileRecord & 'dataset_name="channels.localCoordinates.npy"') - \
(ProbeInsertionMissingDataLog & 'missing_data="channels"')

def make(self, key):

Expand All @@ -152,8 +165,13 @@ def make(self, key):
ses_path = alf.io.get_session_path(files[0])

probe_name = (ProbeInsertion & key).fetch1('probe_label')
channels = alf.io.load_object(
ses_path.joinpath('alf', probe_name), 'channels')
try:
channels = alf.io.load_object(
ses_path.joinpath('alf', probe_name), 'channels')
except Exception as e:
ProbeInsertionMissingDataLog.insert1(
dict(**key, missing_data='channels', error_message=str(e)))
return

self.insert1(
dict(**key,
Expand Down Expand Up @@ -190,7 +208,8 @@ class DefaultCluster(dj.Imported):
cluster_spikes_samples=null: blob@ephys # Time of spikes, measured in units of samples in their own electrophysiology binary file.
cluster_ts=CURRENT_TIMESTAMP : timestamp
"""
key_source = ProbeInsertion & (CompleteClusterSession - ProblematicDataSet)
key_source = ProbeInsertion & (CompleteClusterSession - ProblematicDataSet) - \
(ProbeInsertionMissingDataLog & 'missing_data="clusters"')

def make(self, key):
eID = str((acquisition.Session & key).fetch1('session_uuid'))
Expand All @@ -217,8 +236,7 @@ def make(self, key):
spikes_times_dtype_name
]

files = one.load(eID, dataset_types=dtypes, download_only=True,
clobber=True)
files = one.load(eID, dataset_types=dtypes, download_only=True, clobber=True)
ses_path = alf.io.get_session_path(files[0])

probe_name = (ProbeInsertion & key).fetch1('probe_label')
Expand All @@ -228,7 +246,9 @@ def make(self, key):
ses_path.joinpath('alf', probe_name), 'clusters')
spikes = alf.io.load_object(
ses_path.joinpath('alf', probe_name), 'spikes')
except:
except Exception as e:
ProbeInsertionMissingDataLog.insert1(
dict(**key, missing_data='clusters', error_message=str(e)))
return

time_fnames = [k for k in spikes.keys() if 'times' in k]
Expand Down
43 changes: 32 additions & 11 deletions ibl_pipeline/histology.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ def make(self, key):
probe_trajectory.pop('provenance')
self.insert1(probe_trajectory)

if data_missing:
ephys.ProbeInsertionMissingDataLog.insert1(
dict(**key, missing_data='trajectory',
error_message='No probes.trajectory data for this probe insertion')
)


@schema
class ChannelBrainLocation(dj.Imported):
Expand All @@ -212,9 +218,10 @@ class ChannelBrainLocation(dj.Imported):
channel_dv : decimal(6, 1) # (um) dorso-ventral coordinate relative to Bregma, ventral negative
-> reference.BrainRegion
"""
key_source = ProbeTrajectory & \
key_source = (ProbeTrajectory & \
(data.FileRecord & 'dataset_name like "%channels.brainLocationIds%"') & \
(data.FileRecord & 'dataset_name like "%channels.mlapdv%"')
(data.FileRecord & 'dataset_name like "%channels.mlapdv%"')) - \
(ephys.ProbeInsertionMissingDataLog & 'missing_data="channels_brain_region"')

def make(self, key):

Expand All @@ -232,8 +239,15 @@ def make(self, key):
if not probe_label:
probe_label = 'probe0' + key['probe_idx']

channels = alf.io.load_object(
ses_path.joinpath('alf', probe_label), 'channels')
try:
channels = alf.io.load_object(
ses_path.joinpath('alf', probe_label), 'channels')
except Exception as e:
ephys.ProbeInsertionMissingDataLog.insert1(
dict(**key, missing_data='channels_brain_region',
error_message=str(e))
)
return

channel_entries = []
for ichannel, (brain_loc_id, loc) in tqdm(
Expand Down Expand Up @@ -287,8 +301,15 @@ def make(self, key):
if not probe_label:
probe_label = 'probe0' + key['probe_idx']

clusters = alf.io.load_object(
ses_path.joinpath('alf', probe_label), 'channels')
try:
clusters = alf.io.load_object(
ses_path.joinpath('alf', probe_label), 'clusters')
except Exception as e:
ephys.ProbeInsertionMissingDataLog.insert1(
dict(**key, missing_data='clusters_brain_region',
error_message=str(e))
)
return

cluster_entries = []
for icluster, (brain_loc_id, loc) in tqdm(
Expand All @@ -311,16 +332,16 @@ def make(self, key):


# @schema
# class SessionBrainRegion(dj.Computed):
# class ProbeBrainRegion(dj.Computed):
# definition = """
# # Brain regions assignment to each session
# # including the regions of finest granularity and their upper-level areas.
# -> acquisition.Session
# # Brain regions assignment to each probe insertion, including the regions of finest granularity and their upper-level areas.
# -> ProbeTrajectory
# -> reference.BrainRegion
# """
# key_source = acquisition.Session & ClusterBrainRegion
# key_source = ProbeTrajectory & ClusterBrainRegion

# def make(self, key):

# regions = (dj.U('acronym') & (ClusterBrainRegion & key)).fetch('acronym')

# associated_regions = [
Expand Down
1 change: 1 addition & 0 deletions ibl_pipeline/ingest/histology.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
'ibl_ingest_histology')



# Temporary probe trajectories and channel brain location based on methods
@schema
class Provenance(dj.Lookup):
Expand Down
3 changes: 2 additions & 1 deletion ibl_pipeline/ingest/qc.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ def make(self, key):

if len(ephys_real.ProbeInsertion & key) == 1:
probe_insertion_key = (ephys_real.ProbeInsertion & key).fetch1('KEY')
self.insert1(dict(**probe_insertion_key, probe_insertion_uuid=key['uuid']))
else:
return

Expand Down Expand Up @@ -220,6 +219,8 @@ def make(self, key):
# Only ingest when alignment is resolved
if qc_type == 'alignment_resolved' and extended_qc_alyx[qc_type]:

# only ingest into current table if alignment is resolved
self.insert1(dict(**probe_insertion_key, probe_insertion_uuid=key['uuid']))
qc_real.ProbeInsertionExtendedQC.insert1(
dict(**probe_insertion_key, qc_type=qc_type,
insertion_extended_qc=10),
Expand Down
2 changes: 1 addition & 1 deletion ibl_pipeline/process/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def get_timezone(t=datetime.datetime.now().time()):
timezone = 'European'
elif t > datetime.time(8, 30) and t < datetime.time(10, 30):
timezone = 'EST'
elif t > datetime.time(10, 30) and t < datetime.time(14, 30):
elif t > datetime.time(10, 30) and t < datetime.time(16, 30):
timezone = 'PST'
else:
timezone = 'other'
Expand Down
2 changes: 0 additions & 2 deletions ibl_pipeline/process/populate_ephys.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
ephys.ChannelGroup,
ephys_analyses.DepthPeth,
ephys_analyses.NormedDepthPeth,
histology.ClusterBrainRegion,
histology.SessionBrainRegion,
ephys_plotting.DepthRaster,
ephys_plotting.DepthPeth,
ephys_plotting.Raster,
Expand Down
95 changes: 56 additions & 39 deletions ibl_pipeline/process/process_qc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from ibl_pipeline.process import update_utils, ingest_alyx_raw
from ibl_pipeline.ingest import alyxraw
from ibl_pipeline import acquisition, qc
from ibl_pipeline import acquisition, ephys, qc
from ibl_pipeline.ingest import qc as qc_ingest
from ibl_pipeline.process.ingest_real import copy_table
import logging
Expand All @@ -14,35 +14,60 @@

logger = logging.getLogger(__name__)

alyx_model = 'actions.session'


def delete_qc_entries():
qc_update_models = {
'actions.session':
{
'ref_table': acquisition.Session,
'alyx_fields': ['qc', 'extended_qc'],
'uuid_name': 'session_uuid',
'ingest_tables': [qc_ingest.SessionQCIngest],
'real_tables': [
qc.SessionExtendedQC.Field,
qc.SessionExtendedQC,
qc.SessionQC
], # in the order of delete_quick()
},
'experiments.probeinsertion':
{
'ref_table': ephys.ProbeInsertion,
'alyx_fields': ['json'],
'uuid_name': 'probe_insertion_uuid',
'ingest_tables': [qc_ingest.ProbeInsertionQCIngest],
'real_tables': [
qc.ProbeInsertionExtendedQC.Field,
qc.ProbeInsertionExtendedQC,
qc.ProbeInsertionQC
] # in the order of delete_quick()
}
}


def delete_qc_entries(alyx_model):

model_info = qc_update_models[alyx_model]

qc_keys = update_utils.get_deleted_keys(alyx_model) + \
update_utils.get_updated_keys(alyx_model, fields=['qc', 'extended_qc'])

logger.log(25, 'Deleting updated qc and extended_qc from alyxraw...')
logger.log(25, f'Deleting updated entries for {alyx_model} from alyxraw fields...')
(alyxraw.AlyxRaw.Field &
'fname in ("qc", "extended_qc")' & qc_keys).delete_quick()
[dict(fname=f) for f in model_info['alyx_fields']] & qc_keys).delete_quick()

logger.log(25, f'Deleting updated qc and extended_qc for {alyx_model} from ingest tables...')
uuids_dict_list = [{model_info['uuid_name']: k['uuid']} for k in qc_keys]
q_real = model_info['ref_table'] & uuids_dict_list

logger.log(25, 'Deleting updated qc and extended_qc from shadow tables')
session_uuids = [{'session_uuid': k['uuid']} for k in qc_keys]
sessions = acquisition.Session & session_uuids
(qc_ingest.SessionQCIngest & session_uuids).delete_quick()
(qc_ingest.SessionQC & sessions).delete_quick()
(qc_ingest.SessionExtendedQC.Field & sessions).delete_quick()
(qc_ingest.SessionExtendedQC & sessions).delete_quick()
for m in model_info['ingest_tables']:
(m & uuids_dict_list).delete_quick()

logger.log(25, 'Deleting updated qc and extended_qc from real tables')
(qc.SessionExtendedQC.Field & sessions).delete_quick()
(qc.SessionExtendedQC & sessions).delete_quick()
(qc.SessionQC & sessions).delete_quick()
logger.log(25, f'Deleting updated qc and extended_qc for {alyx_model} from real tables...')
for m in model_info['real_tables']:
(m & q_real).delete_quick()


def process_alyxraw_qc(
filename='/data/alyxfull.json',
models=['actions.session']):
models=['actions.session', 'experiments.probeinsertion']):
'''
Ingest all qc entries in a particular alyx dump, regardless of the current status.
'''
Expand All @@ -56,40 +81,32 @@ def process_alyxraw_qc(
)


def ingest_tables():
def ingest_tables(alyx_model):

qc_ingest.SessionQCIngest.populate(
display_progress=True, suppress_errors=True)
qc_ingest.ProbeInsertionQCIngest.populate(
display_progress=True, suppress_errors=True)


def cleanup_qc_ingest():
'''
clean up the ProbeInsertionQC table to trigger ingestion if there is no alignment resolved entry
'''

(qc_ingest.ProbeInsertionQCIngest - (qc.ProbeInsertionExtendedQC & 'qc_type="alignment_resolved"')).delete()
for m in qc_update_models[alyx_model]['ingest_tables']:
m.populate(display_progress=True, suppress_errors=True)


def main(fpath='/data/alyxfull.json'):

alyx_models = list(qc_update_models.keys())

logger.log(25, 'Insert to update alyxraw...')
update_utils.insert_to_update_alyxraw(
filename=fpath, delete_tables=True,
models=['actions.session'])
models=alyx_models)

logger.log(25, 'Deleting updated entries...')
delete_qc_entries()

for alyx_model in alyx_models:
delete_qc_entries(alyx_model)

logger.log(25, 'Ingesting Alyxraw for QC...')
process_alyxraw_qc()
process_alyxraw_qc(models=alyx_models)

logger.log(25, 'Ingesting QC tables...')
ingest_tables()

logger.log(25, 'Cleaning up ProbeInsertionQCIngest table...')
cleanup_qc_ingest()
for alyx_model in alyx_models:
ingest_tables(alyx_model)


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -854,9 +854,9 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.6"
"version": "3.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
"nbformat_minor": 4
}
Loading

0 comments on commit b729022

Please sign in to comment.