-
Notifications
You must be signed in to change notification settings - Fork 0
/
xnat_scan_info
executable file
·164 lines (118 loc) · 5.66 KB
/
xnat_scan_info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#!/usr/bin/env python
# Copyright (c) 2016 Washington University
# Author(s) Timothy B. Brown (tbbrown at wustl.edu)
PROCESSING_SESSION_TYPE = 'xnat:mrSessionData'
import argparse # For parsing command line arguments
import csv # Comma Separated Value (csv) reading
import StringIO # Read string buffers as if they were files (i.e. read in-memory files)
from xnat_access import XnatAccess # The XnatAccess class allows us to interact with an XNAT instance
def create_resource_file_name_to_scan_number_map(xnat_access):
"""
create a map/dict which when given a file name will return the scan number that corresponds
to that file name
"""
scan_number_mapping_file_name = 'filescans.csv'
file_name_to_scan_number = dict()
file_name_list = xnat_access.get_resource_file_name_list()
if scan_number_mapping_file_name in file_name_list:
scan_mapping_str = xnat_access.get_named_resource_file_content(scan_number_mapping_file_name)
f = StringIO.StringIO(scan_mapping_str)
reader = csv.reader(f, delimiter=',')
for row in reader:
key = row[1].replace("'", "")
val = row[0].replace("'", "")
if val != 'Scan':
file_name_to_scan_number[key] = val
return file_name_to_scan_number
def check_resource_exists(args):
# Establish connection to XNAT
if args.debug:
print("\nxnat_scan_info.check_resource_exists: Connecting to: " + args.server)
xnat_access = XnatAccess(args.server, args.username, args.password)
# Establish project
if args.debug:
print("xnat_scan_info.check_resource_exists: Establishing project: " + args.project)
xnat_access.project=args.project
# Establish subject
if args.debug:
print("xnat_scan_info.check_resource_exists: Establishing subject: " + args.subject)
xnat_access.subject=args.subject
# Validate appropriate MR session exists and establish session
if args.debug:
print("xnat_scan_info.check_resource_exists: Validating session: " + args.session)
session_type = xnat_access.get_session_type(args.session)
if args.debug:
print("xnat_scan_info.check_resource_exists: Session Type: " + session_type)
if session_type != PROCESSING_SESSION_TYPE:
print("xnat_scan_info.check_resource_exists: Session: " + args.session + " is not of correct session type.")
sys.exit(1)
if args.debug:
print("xnat_scan_info.check_resource_exists: Establishing session: " + args.session)
xnat_access.session = args.session
# Determine if resource exists
if (xnat_access.does_resource_exist(args.resource)) :
print("TRUE")
else:
print("FALSE")
def get_data(args):
# Establish connection to XNAT
if args.debug:
print("\nxnat_scan_info.get_data: Connecting to: " + args.server)
xnat_access = XnatAccess(args.server, args.username, args.password)
# Establish project
if args.debug:
print("xnat_scan_info.get_data: Establishing project: " + args.project)
xnat_access.project=args.project
# Establish subject
if args.debug:
print("xnat_scan_info.get_data: Establishing subject: " + args.subject)
xnat_access.subject=args.subject
# Validate appropriate MR session exists and establish session
if args.debug:
print("xnat_scan_info.get_data: Validating session: " + args.session)
session_type = xnat_access.get_session_type(args.session)
if args.debug:
print("xnat_scan_info.get_data: Session Type: " + session_type)
if session_type != PROCESSING_SESSION_TYPE:
print("xnat_scan_info.get_data: Session: " + args.session + " is not of correct session type.")
sys.exit(1)
if args.debug:
print("xnat_scan_info.get_data: Establishing session: " + args.session)
xnat_access.session = args.session
# Determine if resource exists
if (xnat_access.does_resource_exist(args.resource)) :
if args.debug:
print("xnat_scan_info.get_data: Establishing resource: " + args.resource)
xnat_access.resource = args.resource
file_name_to_scan_number = create_resource_file_name_to_scan_number_map(xnat_access)
scan_number = file_name_to_scan_number[args.filename]
if args.debug:
print("xnat_scan_info.get_data: scan_number: " + scan_number);
item_value = xnat_access.get_scan_data_field_value(scan_number, args.item)
print(item_value)
def main():
parser = argparse.ArgumentParser(prog="xnat_scan_info",
description="Script to get XNAT scan info")
parser.add_argument("-d", "--debug", dest="debug", action='store_true')
parser.add_argument("-s", "--server", dest="server", default="db.humanconnectome.org", type=str)
parser.add_argument("-u", "--username", dest="username", required=True, type=str)
parser.add_argument("-p", "--password", dest="password", required=True, type=str)
parser.add_argument("-pr", "--project", dest="project", required=True, type=str,
help="e.g. PipelineTest, HCP_Staging, HCP_Staging_RT, etc.")
parser.add_argument("-su", "--subject", dest="subject", required=True, type=str)
parser.add_argument("-se", "--session", dest="session", required=True, type=str)
parser.add_argument("-r", "--resource", dest="resource", required=True, type=str)
subparsers = parser.add_subparsers(help='xnat_scan_info sub-commands')
# setup the exists sub-command
parser_check_resource_exists = subparsers.add_parser('check_resource_exists', help='determine whether the scan exists')
parser_check_resource_exists.set_defaults(func=check_resource_exists)
# setup the get_data sub-command
parser_get_data = subparsers.add_parser('get_data', help='get_data about a scan')
parser_get_data.add_argument("-f", "--filename", dest="filename", required=True, type=str)
parser_get_data.add_argument("-i", "--item", dest="item", required=True, type=str)
parser_get_data.set_defaults(func=get_data)
# parse the arguments and dispatch the call
args = parser.parse_args()
args.func(args)
if __name__ == '__main__':
main()