-
Notifications
You must be signed in to change notification settings - Fork 0
/
gbmfile.py
358 lines (298 loc) · 10.5 KB
/
gbmfile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# file.py: Module containing GBM filenaming convention and operations
#
# Authors: William Cleveland (USRA),
# Adam Goldstein (USRA) and
# Daniel Kocevski (NASA)
#
# Portions of the code are Copyright 2020 William Cleveland and
# Adam Goldstein, Universities Space Research Association
# All rights reserved.
#
# Written for the Fermi Gamma-ray Burst Monitor (Fermi-GBM)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import copy
import datetime
import os.path
import re
from .detectors import Detector
from .gbmtime import Met
class GbmFile:
"""Parse or construct a GBM standardized filename.
Attributes:
data_type (str): The datatype of the file
detector (str): The detector with which the file is associated
directory (str): The directory hosting the file
extension (str): The filename extension
meta (str): Additional metadata in the filename
trigger (bool): True if the file is from a trigger. False otherwise
uid (str): The unique id of the file
version (int): The version number of the file
"""
REGEX_PATTERN = r'^glg_(?P<data_type>.+)_(?P<detector>[bn][0-9ab]|all)_(?P<trigger>(?:bn)?)(?P<uid>(?:\d{9}|\d{6}' \
r'_\d\dz|\d{6}))(?P<meta>(?:_.+)?)_v(?P<version>\d\d)\.(?P<extension>.+)$'
def __init__(self):
self.directory = ''
self.trigger = False
self.data_type = None
self._detector = None
self.uid = None
self.meta = None
self.version = 0
self.extension = 'fit'
def _init_by_dict(self, values):
for key, val in values.items():
# handle properties differently
try:
p = getattr(self, key)
if isinstance(p, property):
p.__set__(self, val)
else:
self.__setattr__(key, val)
except AttributeError:
raise ValueError("{} is not a valid attribute.".format(key))
@property
def detector(self):
if not self._detector:
return 'all'
return self._detector
@detector.setter
def detector(self, value):
if value == 'all':
self._detector = None
elif isinstance(value, Detector):
self._detector = value
else:
if isinstance(value, str):
d = Detector.from_str(value)
self._detector = d if d else value
elif isinstance(value, int):
d = Detector.from_num(value)
if d:
self._detector = d
else:
raise ValueError("Invalid detector value")
def version_str(self):
"""Return the file version number as a string
Returns:
str: The file version
"""
if isinstance(self.version, int):
v = "{:02d}".format(self.version)
else:
v = self.version
return v
def basename(self):
"""The file basename
Returns:
str: The file basename
"""
if self.trigger:
u = 'bn' + self.uid
else:
u = self.uid
if self.meta:
return str.format("glg_{}_{}_{}{}_v{}.{}",
self.data_type, self.detector, u, self.meta,
self.version_str(), self.extension)
return str.format("glg_{}_{}_{}_v{}.{}",
self.data_type, self.detector, u, self.version_str(),
self.extension)
def path(self):
"""The file path
Returns:
str: The path
"""
return os.path.join(self.directory, self.basename())
def __str__(self):
return self.path()
def __repr__(self):
return self.path()
@classmethod
def create(cls, **kwargs):
"""Create a GbmFile from keywords
Args:
**kwargs: The properties of a GbmFile
Returns:
:class:`GbmFile`: The new filename object
"""
obj = cls()
obj._init_by_dict(kwargs)
return obj
@classmethod
def from_path(cls, path):
"""Create a GbmFile from parsing a filename
Args:
path (str): A filename path
Returns:
:class:`GbmFile`: The new filename object
"""
m = re.match(cls.REGEX_PATTERN, os.path.basename(path), re.I | re.S)
result = None
if m:
result = cls.create(**m.groupdict())
result.directory = os.path.dirname(path)
return result
def detector_list(self):
"""Generate a list of GbmFile objects, one for each GBM detector
Returns:
list of :class:`GbmFile`: The new filename objects
"""
result = []
for d in Detector:
x = copy.copy(self)
x.detector = d
result.append(x)
return result
@classmethod
def list_from_paths(cls, path_list, unknown=None):
"""Create a many GbmFiles from a list of filepaths
Args:
path_list (list of str): List of filepaths
Returns:
list of :class:`GbmFile`: The new filename object(s)
"""
result = []
for p in path_list:
f = GbmFile.from_path(p)
if f:
result.append(f)
else:
if unknown is not None:
unknown.append(p)
else:
raise ValueError('Unrecognized file name')
return result
def scan_dir(path, hidden=False, recursive=False, absolute=False, regex=None):
"""
Scans the given directory for files.
Args:
path (str): The root directory to scan.
hidden (bool, optional): Set True if you want to include hidden files.
recursive (bool, optional): Set True if you want to scan subdirectories
within the given path.
absolute (bool, optional): Set true if you want the absolute path of
each file returned.
regex (str): Set if you want to only return files matching the given
regular expression.
Yields:
str: Full path to a file for each iteration.
"""
for f in os.listdir(path):
if not hidden:
if f.startswith('.'):
continue
file_path = os.path.join(path, f)
if absolute:
file_path = os.path.abspath(file_path)
if os.path.isfile(file_path):
if regex and re.search(regex, f) is None:
continue
yield file_path
elif recursive:
yield from scan_dir(file_path, hidden, recursive, absolute, regex)
def all_exists(file_list, parent_dir=None):
"""
Do all the files in the list exist in the filesystem?
Args:
file_list (list of str): List of file names to check
parent_dir (str, optional): parent directory
Returns:
bool: True if all files exist
"""
if not file_list:
return False
for f in file_list:
if parent_dir is not None:
path = os.path.join(parent_dir, f.basename())
else:
path = str(f)
if not os.path.exists(path):
return False
return True
def has_detector(file_list, detector):
"""
Does the file list contain a file for the given detector?
Args:
file_list (list of str): List of file names
detector (str): Detector being searched
Returns:
bool: True if the list of file names includes the given detector
"""
for f in file_list:
if f.detector == detector:
return True
return False
def is_complete(file_list):
"""
Does the file list contain a file for every detector?
Args:
file_list (list of str): List of files that represent a detector set
Returns:
bool: True if the file list contains a file for every detector
"""
for d in Detector:
if not has_detector(file_list, d):
return False
return True
def max_version(file_list):
"""
Returns the maximum _version of file name in the given list
Args:
file_list (list of str): list of file names
Returns:
int: Largest _version number in the list
"""
result = None
for f in file_list:
try:
v = int(f.version)
if result is None or v > result:
result = v
except ValueError:
pass
return result
def min_version(file_list):
"""
Returns the minimum _version of file name in the given list
Args:
file_list (list of str): list of file names
Returns:
int: Smallest _version number in the list
"""
result = None
for f in file_list:
try:
v = int(f.version)
if result is None or v < result:
result = v
except ValueError:
pass
return result
def ymd_path(base, name):
if isinstance(name, str) or isinstance(name, GbmFile):
v = name if isinstance(name, str) else name.basename()
m = re.match(r'.*_(?:(?:bn)?)(\d{6})(?:(\d{3})|(_\d\d)?)_.*', v,
re.I | re.S)
if m:
d = datetime.datetime.strptime(m.group(1), "%y%m%d")
return os.path.join(base, d.strftime('%Y-%m-%d'),
os.path.basename(name))
elif isinstance(name, Met):
return os.path.join(base, name.datetime.strftime('%Y-%m-%d'))
elif isinstance(name, datetime.datetime) or isinstance(name,
datetime.date):
return os.path.join(base, name.strftime('%Y-%m-%d'))
raise ValueError("Can't parse a YMD value")