From 96deaf8ef2df085e67954df27b98eba9970d28b2 Mon Sep 17 00:00:00 2001 From: Will Foran Date: Tue, 19 Nov 2024 21:42:50 -0500 Subject: [PATCH] mrqart: inotify use CLOSE_WRITE, CurSeqStation class --- mrqart.py | 88 +++++++++++++++++++++++++++++++++++---------- readme.md | 6 ++++ static/index.html | 15 ++++++-- template_checker.py | 4 +++ 4 files changed, 93 insertions(+), 20 deletions(-) diff --git a/mrqart.py b/mrqart.py index a965b7f8..59821f31 100755 --- a/mrqart.py +++ b/mrqart.py @@ -17,11 +17,42 @@ from template_checker import TemplateChecker +Station = str +Sequence = str +class CurSeqStation: + "Current Sequence settings at a MR Scanner station" + series_seqname : str + station : str + count : int + def __init__(self, station: Station): + "initialize new series" + self.station = station + self.series_seqname="" + self.count = 0 + + def update_isnew(self, series, seqname: Sequence) -> bool: + """ + Maintain count of repeats seen + :return: True if is new + """ + serseq=f"{series}{seqname}" + if self.series_seqname == serseq: + self.count += 1 + return False + + self.series_seqname=serseq + self.count = 0 + return True + + def __repr__(self) -> str: + return f"{self.station} {self.series_seqname} {self.count}" + #: Websocket port used to send updates to browser WS_PORT = 5000 #: HTTP port used to serve static/index.html HTTP_PORT = 8080 +FOLLOW_FLAGS = aionotify.Flags.CLOSE_WRITE | aionotify.Flags.CREATE #: list of all web socket connections to broadcast to #: TODO: will eventually need to track station id when serving multiple scanners WS_CONNECTIONS = set() @@ -29,12 +60,10 @@ FILEDIR = os.path.dirname(__file__) logging.basicConfig(level=os.environ.get("LOGLEVEL", logging.INFO)) -Station = str -Sequence = str #: track the current state of each scanner based on filename #: we can skip parsing a dicoms (and spamming the browser) if we've already seen the session -STATE: dict[Station, Sequence] = {} +STATE: dict[Station, CurSeqStation] = {} class WebServer(Application): @@ -91,6 +120,8 @@ async def track_ws(websocket): #### def session_from_fname(dcm_fname: os.PathLike) -> Sequence: """ + We can use the file name to see if session name has changed. + Don't need to read the dicom header -- if we know the station name extract ls /data/dicomstream/20241016.MRQART_test.24.10.16_16_50_16_DST_1.3.12.2.1107.5.2.43.67078/|head 001_000001_000001.dcm @@ -112,30 +143,49 @@ async def monitor_dirs(watcher, dcm_checker): logging.debug("watching for new files") while True: event = await watcher.get_event() - logging.info("got event %s", event) + logging.debug("got event %s", event) file = os.path.join(event.alias, event.name) - if(os.path.isdir(file)): - watcher.watch(path=file, flags=aionotify.Flags.CREATE) - logging.info("that's a dir! following") + if os.path.isdir(file): + watcher.watch(path=file, flags=FOLLOW_FLAGS) + logging.info("%s is a dir! following with %d", file, FOLLOW_FLAGS) + continue + if event.flags == aionotify.Flags.CREATE: + logging.debug("file created but waiting for WRITE finish") continue # Event(flags=256, cookie=0, name='a', alias='/home/foranw/src/work/mrrc-hdr-qa/./sim') if re.search("^MR.|.dcm$|.IMA$", event.name): - msg = dcm_checker.check_file(file) - logging.debug(msg) - seq = msg["input"] - sequence_info: Sequence = f'{seq["SeriesNumber"]}{seq["SequenceName"]}' - current_ses = STATE.get(seq["Station"]) + + # NB. we might be able to look at the file project_seqnum_seriesnum.dcm + # and skip without having to read the header + # not sure how we'd get station + hdr = dcm_checker.reader.read_dicom_tags(file) + current_ses = STATE.get(hdr["Station"]) + if not current_ses: + STATE[hdr["Station"]] = CurSeqStation(hdr["Station"]) + current_ses = STATE.get(hdr["Station"]) + # only send to browser if new - if current_ses != sequence_info: - logging.debug("already have %s", sequence_info) + if current_ses.update_isnew(hdr["SeriesNumber"], hdr["SequenceName"]): + logging.debug("first time seeing %s", current_ses) + msg = {'station': hdr["Station"], + 'type': 'new', + 'content': dcm_checker.check_header(hdr)} + logging.debug(msg) + broadcast(WS_CONNECTIONS, json.dumps(msg, default=list)) + else: + msg = {'station': hdr["Station"], + 'type': 'update', + 'content': current_ses.count} broadcast(WS_CONNECTIONS, json.dumps(msg, default=list)) - STATE[seq["Station"]] = sequence_info + logging.debug("already have %s", STATE[seq["Station"]]) # TODO: if epi maybe try plotting motion? + # async alignment else: logging.warning("non dicom file %s", event.name) - broadcast(WS_CONNECTIONS, f"non-dicom file: {event}") + # if we want to do this, we need msg formated + #broadcast(WS_CONNECTIONS, f"non-dicom file: {event}") async def main(path): @@ -146,7 +196,8 @@ async def main(path): dcm_checker = TemplateChecker() watcher = aionotify.Watcher() watcher.watch( - path=path, flags=aionotify.Flags.CREATE + path=path, flags=FOLLOW_FLAGS + # NB. prev had just aionotify.Flags.CREATE but that triggers too early (partial file) ) # aionotify.Flags.MODIFY|aionotify.Flags.CREATE |aionotify.Flags.DELETE) asyncio.create_task(monitor_dirs(watcher, dcm_checker)) @@ -163,5 +214,6 @@ async def main(path): if __name__ == "__main__": # TODO: watch based on input argument - watch_dir = os.path.join(FILEDIR, "sim") + # TODO: watch all sub directories? + watch_dir = os.path.join(FILEDIR, "/data/dicomstream/20241119.testMRQARAT.testMRQARAT/") asyncio.run(main(watch_dir)) diff --git a/readme.md b/readme.md index 07c0d4ae..bd04b4b4 100644 --- a/readme.md +++ b/readme.md @@ -19,6 +19,12 @@ See * should look at n echos 0018,0086 and collapse across dicoms to make protocol mutliecho parameter * precision changes between realtime streaming and offline dicom writes? see `check_template.py` + * inotify `CREATE` is catches files before they finish writting. Watch `CLOSE_WRITE` instead. + Test slower write with `smbclient` + ``` + smbclient -U mrqart //localhost/dicomstream/ -c 'put 001_000001_000002.dcm sim/y.dcm' + ``` + ## Prior Art * mrQA * sister project https://github.com/NPACore/mrqart/ diff --git a/static/index.html b/static/index.html index 75ff021e..07726e37 100644 --- a/static/index.html +++ b/static/index.html @@ -10,10 +10,15 @@ // TODO: // 1. build a sub element id= sessionDate (need to store that field still?) if el w/id doesn't exist // 2. append to that element instead of directly to "sequences". then we can later hide finished sequences - let el = document.createElement("li"); data = JSON.parse(msg.data) console.log("...data:", data); + if(data['type'] == 'new'){ + add_new_series(data['content']) + } +} +function add_new_series(data) { + let el = document.createElement("li"); el.className = data['conforms']?'conform':'no-conform'; dcm_in = data['input']; errors = data['errors']; @@ -21,7 +26,13 @@ note = Object.keys(errors).length==0?'💯':JSON.stringify(errors); console.log("...note:", note); el.textContent = `${identity}: ${note}`; - document.getElementById("sequences").prepend(el); //appendChild(el); + // TODO: per scanner tab + let seq = document.getElementById("sequences"); + // clear waiting + if(seq.innerHTML == "waiting for scanner"){ + seq.innerHTML = "" + } + seq.prepend(el); //appendChild(el); } function update_via_ws() { diff --git a/template_checker.py b/template_checker.py index 9bcf1cc2..834e1417 100644 --- a/template_checker.py +++ b/template_checker.py @@ -34,6 +34,10 @@ def find_errors(template: TagValues, current_hdr: TagValues) -> dict[str, ErrorC # TODO: more checks for specific headers #: TR is in milliseconds. no need to keep decimals precision if k == "TR": + if t_k == "null": + t_k = 0 + if h_k == "null": + h_k = 0 check = int(float(t_k)) == int(float(h_k)) elif k == "iPAT": check = t_k == h_k