Skip to content

Commit

Permalink
working
Browse files Browse the repository at this point in the history
  • Loading branch information
caseyjlaw committed Dec 27, 2024
1 parent 17aaa7e commit 9a07077
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 16 deletions.
6 changes: 3 additions & 3 deletions T2/cluster_heimdall.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,11 @@ def filter_clustered(
ewarr = ((df[[f'beams{i}' for i in range(nsnr)]].values <= 255)) & (df[[f'snrs{i}' for i in range(nsnr)]].values > 0)
twoarm = ewarr.any(axis=1) & nsarr.any(axis=1)
twoarm = (ewarr.any(axis=1) & nsarr.any(axis=1)) | (df['snr'].values > min_snr_1arm).any()
print(f'nsarr: {nsarr}, ewarr: {ewarr}, twoarm: {twoarm}')
#print(f'nsarr: {nsarr}, ewarr: {ewarr}, twoarm: {twoarm}')

good0 = (tab["snr"] > min_snr) * (tab["ibox"] < wide_ibox)
good1 = (tab["snr"] > min_snr_wide) * (tab["ibox"] >= wide_ibox)
print(f'good0: {good0}; good1: {good1}')
#print(f'good0: {good0}; good1: {good1}')
good0 *= twoarm
good1 *= twoarm
good *= good0 + good1
Expand Down Expand Up @@ -466,7 +466,7 @@ def dump_cluster_results_json(
sel_dm = np.abs(tab_inj["DM"] - dm) < dm_close
sel_beam = np.abs(tab_inj["Beam"] - ibeam) < beam_close
sel_beam_2 = np.abs(tab_inj["Beam"]+256 - ibeam) < beam_close
print(f"INJECTION TEST: min abs time diff {np.abs((tab_inj['MJD']-mjd)*24*3600).min()} seconds. Sel any? t {sel_t.any()}, dm {sel_dm.any()}, beam {sel_beam.any()}, beam2 {sel_beam_2.any()}")
#print(f"INJECTION TEST: min abs time diff {np.abs((tab_inj['MJD']-mjd)*24*3600).min()} seconds. Sel any? t {sel_t.any()}, dm {sel_dm.any()}, beam {sel_beam.any()}, beam2 {sel_beam_2.any()}")
sel = sel_t*sel_dm*sel_beam
sel2 = sel_t*sel_dm*sel_beam_2
if len(np.where(sel)[0]):
Expand Down
44 changes: 33 additions & 11 deletions T2/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def parse_socket(
assert isinstance(ports, list)

lastname = names.get_lastname()
# lastname_cleared = lastname
lastname_cleared = lastname

ss = []
cls = []
Expand Down Expand Up @@ -105,6 +105,10 @@ def parse_socket(
if len(futures):
lastname, trigtime, futures = manage_futures(lastname, trigtime, futures)

if trigtime is not None:
prev_trig_time = trigtime


# set up socket connections if needed
if len(ss) != len(ports):
for port in ports:
Expand Down Expand Up @@ -205,24 +209,33 @@ def parse_socket(
#gulp_status(0)
#continue

# send flush trigger after min_timedelt (once per candidate)
if (Time.now() - prev_trig_time).value > min_timedelt/86400. and lastname_cleared != lastname:
print("Sending flush trigger")
logger.info("Sending flush trigger")
ds.put_dict('/cmd/corr/0', {'cmd': 'trigger', 'val': '0-flush-'})
lastname_cleared = lastname # reset to avoid continuous calls
prev_trig_time = Time.now() # pass this on to log extra triggers in second latency window
else:
print(f"Cannot send flush: {Time.now() - prev_trig_time} {min_timedelt/86400.}")
logger.info(f"Cannot send flush: {Time.now() - prev_trig_time} {min_timedelt/86400.}")
print(f"{lastname_cleared} {lastname}")
logger.info(f"{lastname_cleared} {lastname}")


if candsfile == "\n" or candsfile == "": # skip empty candsfile
print(f"candsfile is empty. Skipping.")
logger.info(f"candsfile is empty. Skipping.")

gulp_status(0)
continue

# send flush trigger after min_timedelt (once per candidate)
# if Time.now() - prev_trig_time > min_timedelt*units.s and lastname_cleared != lastname:
# #ds.put_dict('/cmd/corr/0', {'cmd': 'trigger', 'val': '0-flush-'})
# lastname_cleared = lastname # reset to avoid continuous calls
# prev_trig_time = Time.now() # pass this on to log extra triggers in second latency window

tab = cluster_heimdall.parse_candsfile(candsfile)

# to handle too many futures
if len(futures)>1:
print(f"Waiting for >2 futures to finish -- skipping {gulps}")
print(f"Waiting for >=2 futures to finish -- skipping {gulps}")
else:
now = Time.now()
key = f'{gulp}-{globct}-{now}'
Expand All @@ -235,6 +248,7 @@ def parse_socket(
print(f'Processing {len(futures)} gulps')

try:

lastname, trigtime, futures = manage_futures(lastname, trigtime, futures) # returns latest result from iteration over futures
except:
print('Caught error in manage_futures. Closing sockets.')
Expand All @@ -245,18 +259,19 @@ def parse_socket(
prev_trig_time = trigtime


def manage_futures(lastname, trigtime, futures):
def manage_futures(old_lastname, old_trigtime, futures):
""" Take list of cluster_and_plot futures and handle the output.
Currently returns only one (lastname, trigtime) tuple for all futures that are done.
Small chance that lastname or prev_trig_time will not be updated correctly.
"""

done = []
triggered = False
for k, future in futures.items():
if future.done():
done.append(k)
try:
lastname,trigtime = future.result()
lastname,trigtime,triggered = future.result()
if trigtime is not None:
gulp_status(0) # success!
except KeyboardInterrupt:
Expand All @@ -273,7 +288,10 @@ def manage_futures(lastname, trigtime, futures):

print(f'{len(done)} gulp future(s) completed')

return lastname, trigtime, futures
if triggered is False:
return old_lastname, None, futures
else:
return lastname, trigtime, futures


def cluster_and_plot(tab, gulp=None, selectcols=["itime", "idm", "ibox"],
Expand All @@ -292,6 +310,7 @@ def cluster_and_plot(tab, gulp=None, selectcols=["itime", "idm", "ibox"],
# TODO: put these in json config file
min_timedelt = 60. ## TODO put this in etcd
trigtime = None
triggered = False
columns = ['snr','if','specnum','mjds','ibox','idm','dm','ibeam','cl','cntc','cntb','snrs0','beams0','snrs1','beams1','snrs2','beams2','snrs3','beams3','snrs4','beams4','snrs5','beams5','snrs6','beams6','snrs7','beams7','snrs8','beams8','snrs9','beams9','trigger']

# obtain this from etcd
Expand Down Expand Up @@ -429,6 +448,9 @@ def cluster_and_plot(tab, gulp=None, selectcols=["itime", "idm", "ibox"],
# write all T1 cands
outputted = cluster_heimdall.dump_cluster_results_heimdall(tab, outroot + f"T1_output{str(np.floor(time.time()).astype('int'))}.csv")

# did I trigger
triggered = True

# write T2 clustered/filtered results
if outroot is not None and len(tab2):
tab2["trigger"] = col_trigger
Expand Down Expand Up @@ -468,7 +490,7 @@ def cluster_and_plot(tab, gulp=None, selectcols=["itime", "idm", "ibox"],
dfc = pandas.concat(dfs)
dfc.to_csv(ofl, index=False)

return lastname, trigtime
return lastname, trigtime, triggered


def recvall(sock, n):
Expand Down
4 changes: 2 additions & 2 deletions scripts/dsa_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
nfrb = len(flist)

for kk in [17,18]:
for ii in range(128):
for ii in np.arange(128):

f = open(fnout,'a')
subbeam = ii
Expand All @@ -44,6 +44,6 @@
f.write(fmt_out % (imjd, beam, DM, SNR, Width_fwhm, spec_ind, frbno))
f.close()
print("Waiting to inject...")
time.sleep(3000)
time.sleep(1600)

f.close()

0 comments on commit 9a07077

Please sign in to comment.