Skip to content

Commit

Permalink
mrqart: inotify use CLOSE_WRITE, CurSeqStation class
Browse files Browse the repository at this point in the history
  • Loading branch information
WillForan committed Nov 20, 2024
1 parent 4d6b451 commit 96deaf8
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 20 deletions.
88 changes: 70 additions & 18 deletions mrqart.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,53 @@

from template_checker import TemplateChecker

Station = str
Sequence = str
class CurSeqStation:
"Current Sequence settings at a MR Scanner station"
series_seqname : str
station : str
count : int
def __init__(self, station: Station):
"initialize new series"
self.station = station
self.series_seqname=""
self.count = 0

def update_isnew(self, series, seqname: Sequence) -> bool:
"""
Maintain count of repeats seen
:return: True if is new
"""
serseq=f"{series}{seqname}"
if self.series_seqname == serseq:
self.count += 1
return False

self.series_seqname=serseq
self.count = 0
return True

def __repr__(self) -> str:
return f"{self.station} {self.series_seqname} {self.count}"

#: Websocket port used to send updates to browser
WS_PORT = 5000
#: HTTP port used to serve static/index.html
HTTP_PORT = 8080

FOLLOW_FLAGS = aionotify.Flags.CLOSE_WRITE | aionotify.Flags.CREATE
#: list of all web socket connections to broadcast to
#: TODO: will eventually need to track station id when serving multiple scanners
WS_CONNECTIONS = set()

FILEDIR = os.path.dirname(__file__)
logging.basicConfig(level=os.environ.get("LOGLEVEL", logging.INFO))

Station = str
Sequence = str

#: track the current state of each scanner based on filename
#: we can skip parsing a dicoms (and spamming the browser) if we've already seen the session
STATE: dict[Station, Sequence] = {}
STATE: dict[Station, CurSeqStation] = {}


class WebServer(Application):
Expand Down Expand Up @@ -91,6 +120,8 @@ async def track_ws(websocket):
####
def session_from_fname(dcm_fname: os.PathLike) -> Sequence:
"""
We can use the file name to see if session name has changed.
Don't need to read the dicom header -- if we know the station name
extract
ls /data/dicomstream/20241016.MRQART_test.24.10.16_16_50_16_DST_1.3.12.2.1107.5.2.43.67078/|head
001_000001_000001.dcm
Expand All @@ -112,30 +143,49 @@ async def monitor_dirs(watcher, dcm_checker):
logging.debug("watching for new files")
while True:
event = await watcher.get_event()
logging.info("got event %s", event)
logging.debug("got event %s", event)
file = os.path.join(event.alias, event.name)
if(os.path.isdir(file)):
watcher.watch(path=file, flags=aionotify.Flags.CREATE)
logging.info("that's a dir! following")
if os.path.isdir(file):
watcher.watch(path=file, flags=FOLLOW_FLAGS)
logging.info("%s is a dir! following with %d", file, FOLLOW_FLAGS)
continue
if event.flags == aionotify.Flags.CREATE:
logging.debug("file created but waiting for WRITE finish")
continue
# Event(flags=256, cookie=0, name='a', alias='/home/foranw/src/work/mrrc-hdr-qa/./sim')
if re.search("^MR.|.dcm$|.IMA$", event.name):
msg = dcm_checker.check_file(file)
logging.debug(msg)
seq = msg["input"]
sequence_info: Sequence = f'{seq["SeriesNumber"]}{seq["SequenceName"]}'
current_ses = STATE.get(seq["Station"])

# NB. we might be able to look at the file project_seqnum_seriesnum.dcm
# and skip without having to read the header
# not sure how we'd get station
hdr = dcm_checker.reader.read_dicom_tags(file)
current_ses = STATE.get(hdr["Station"])
if not current_ses:
STATE[hdr["Station"]] = CurSeqStation(hdr["Station"])
current_ses = STATE.get(hdr["Station"])

# only send to browser if new
if current_ses != sequence_info:
logging.debug("already have %s", sequence_info)
if current_ses.update_isnew(hdr["SeriesNumber"], hdr["SequenceName"]):
logging.debug("first time seeing %s", current_ses)
msg = {'station': hdr["Station"],
'type': 'new',
'content': dcm_checker.check_header(hdr)}
logging.debug(msg)
broadcast(WS_CONNECTIONS, json.dumps(msg, default=list))
else:
msg = {'station': hdr["Station"],
'type': 'update',
'content': current_ses.count}
broadcast(WS_CONNECTIONS, json.dumps(msg, default=list))
STATE[seq["Station"]] = sequence_info
logging.debug("already have %s", STATE[seq["Station"]])

# TODO: if epi maybe try plotting motion?
# async alignment

else:
logging.warning("non dicom file %s", event.name)
broadcast(WS_CONNECTIONS, f"non-dicom file: {event}")
# if we want to do this, we need msg formated
#broadcast(WS_CONNECTIONS, f"non-dicom file: {event}")


async def main(path):
Expand All @@ -146,7 +196,8 @@ async def main(path):
dcm_checker = TemplateChecker()
watcher = aionotify.Watcher()
watcher.watch(
path=path, flags=aionotify.Flags.CREATE
path=path, flags=FOLLOW_FLAGS
# NB. prev had just aionotify.Flags.CREATE but that triggers too early (partial file)
) # aionotify.Flags.MODIFY|aionotify.Flags.CREATE |aionotify.Flags.DELETE)
asyncio.create_task(monitor_dirs(watcher, dcm_checker))

Expand All @@ -163,5 +214,6 @@ async def main(path):

if __name__ == "__main__":
# TODO: watch based on input argument
watch_dir = os.path.join(FILEDIR, "sim")
# TODO: watch all sub directories?
watch_dir = os.path.join(FILEDIR, "/data/dicomstream/20241119.testMRQARAT.testMRQARAT/")
asyncio.run(main(watch_dir))
6 changes: 6 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ See
* should look at n echos 0018,0086 and collapse across dicoms to make protocol mutliecho parameter
* precision changes between realtime streaming and offline dicom writes? see `check_template.py`

* inotify `CREATE` is catches files before they finish writting. Watch `CLOSE_WRITE` instead.
Test slower write with `smbclient`
```
smbclient -U mrqart //localhost/dicomstream/ -c 'put 001_000001_000002.dcm sim/y.dcm'
```

## Prior Art
* mrQA
* sister project https://github.com/NPACore/mrqart/
Expand Down
15 changes: 13 additions & 2 deletions static/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,29 @@
// TODO:
// 1. build a sub element id= sessionDate (need to store that field still?) if el w/id doesn't exist
// 2. append to that element instead of directly to "sequences". then we can later hide finished sequences
let el = document.createElement("li");

data = JSON.parse(msg.data)
console.log("...data:", data);
if(data['type'] == 'new'){
add_new_series(data['content'])
}
}
function add_new_series(data) {
let el = document.createElement("li");
el.className = data['conforms']?'conform':'no-conform';
dcm_in = data['input'];
errors = data['errors'];
identity = `${dcm_in['Project']}/${dcm_in['SequenceName']} @ ${dcm_in['SeriesNumber']}`;
note = Object.keys(errors).length==0?'💯':JSON.stringify(errors);
console.log("...note:", note);
el.textContent = `${identity}: ${note}`;
document.getElementById("sequences").prepend(el); //appendChild(el);
// TODO: per scanner tab
let seq = document.getElementById("sequences");
// clear waiting
if(seq.innerHTML == "waiting for scanner"){
seq.innerHTML = ""
}
seq.prepend(el); //appendChild(el);
}

function update_via_ws() {
Expand Down
4 changes: 4 additions & 0 deletions template_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ def find_errors(template: TagValues, current_hdr: TagValues) -> dict[str, ErrorC
# TODO: more checks for specific headers
#: TR is in milliseconds. no need to keep decimals precision
if k == "TR":
if t_k == "null":
t_k = 0
if h_k == "null":
h_k = 0
check = int(float(t_k)) == int(float(h_k))
elif k == "iPAT":
check = t_k == h_k
Expand Down

0 comments on commit 96deaf8

Please sign in to comment.