Skip to content

Commit

Permalink
thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
caseyjlaw committed Nov 13, 2024
1 parent 56d6672 commit 0f117d3
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 68 deletions.
128 changes: 63 additions & 65 deletions T2/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from etcd3.exceptions import ConnectionFailedError
from event import names
import os
from concurrent.futures import ThreadPoolExecutor
import pandas

try:
Expand Down Expand Up @@ -95,6 +96,9 @@ def parse_socket(

logger.info(f"Reading from {len(ports)} sockets...")
print(f"Reading from {len(ports)} sockets...")

pool = ThreadPoolExecutor(max_workers=10)
futures = []
while True:
if len(ss) != len(ports):
for port in ports:
Expand Down Expand Up @@ -161,27 +165,27 @@ def parse_socket(
f"not all clients received from same gulp: {set(gulps)}. Restarting socket connections."
)

for s in ss:
s.close()
time.sleep(0.1)
ss = []
for port in ports:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind((host, port)) # assigns the socket with an address
except OSError:
print("socket bind failed.")
continue
s.listen(1) # accept no. of incoming connections
ss.append(s)
# for s in ss:
# s.close()
# time.sleep(0.1)
# ss = []
# for port in ports:
# s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# try:
# s.bind((host, port)) # assigns the socket with an address
# except OSError:
# print("socket bind failed.")
# continue
# s.listen(1) # accept no. of incoming connections
# ss.append(s)
gulp_status(2)
continue
else:
gulp = set(gulps).pop() # get gulp number
ds.put_dict(
"/mon/service/T2gulp",
{"cadence": 60, "time": Time(datetime.datetime.utcnow()).mjd},
)
# else:
# gulp = set(gulps).pop() # get gulp number
# ds.put_dict(
# "/mon/service/T2gulp",
# {"cadence": 60, "time": Time(datetime.datetime.utcnow()).mjd},
# )

if candsfile == "\n" or candsfile == "": # skip empty candsfile
print(f"candsfile is empty. Skipping.")
Expand All @@ -196,53 +200,47 @@ def parse_socket(
lastname_cleared = lastname # reset to avoid continuous calls
prev_trig_time = Time.now() # pass this on to log extra triggers in second latency window

try:
tab = cluster_heimdall.parse_candsfile(candsfile)
lastname,trigtime = cluster_and_plot(
tab,
globct,
gulp=gulp,
selectcols=selectcols,
outroot=outroot,
plot_dir=plot_dir,
trigger=trigger,
lastname=lastname,
cat=source_catalog,
beam_model=model,
coords=coords,
snrs=snrs,
prev_trig_time=prev_trig_time
)
# lastname,trigtime = ft.func_timeout(4.1,
# cluster_and_plot,
# args=(tab,globct),
# kwargs={'gulp':gulp,
# 'selectcols':selectcols,
# 'outroot':outroot,
# 'plot_dir':plot_dir,
# 'trigger':trigger,
# 'lastname':lastname,
# 'cat':source_catalog,
# 'beam_model':model,
# 'coords':coords,
# 'snrs':snrs,
# 'prev_trig_time':prev_trig_time}
# )
if trigtime is not None:
prev_trig_time = trigtime
globct += 1
except KeyboardInterrupt:
print("Escaping parsing and plotting")
logger.info("Escaping parsing and plotting")
break
except OverflowError:
print("overflowing value. Skipping this gulp...")
logger.warning("overflowing value. Skipping this gulp...")
tab = cluster_heimdall.parse_candsfile(candsfile)
future = pool.submit(cluster_and_plot, tab, globct, gulp=gulp, selectcols=selectcols,
outroot=outroot, plot_dir=plot_dir, trigger=trigger, lastname=lastname,
cat=source_catalog, beam_model=model, coords=coords, snrs=snrs,
prev_trig_time=prev_trig_time)
globct += 1
futures.append(future)
lastname, trigtime, futures = manage_futures(futures) # returns latest result from iteration over futures
if trigtime is not None:
prev_trig_time = trigtime


def manage_futures(futures):
""" Take list of cluster_and_plot futures and handle the output.
Currently returns only one (lastname, trigtime) tuple for all futures that are done.
Small chance that lastname or prev_trig_time will not be updated correctly.
"""

print(candsfile)
gulp_status(3)
continue
gulp_status(0) # success!
lastname, trigtime = None, None
print(f'Managing {len(futures)} futures')
done = []
for future in futures:
if future.done():
done.append(future)
try:
lastname,trigtime = future.result()
if trigtime is not None:
print('new trigtime')
gulp_status(0) # success!
except KeyboardInterrupt:
print("Escaping parsing and plotting")
logger.info("Escaping parsing and plotting")
except OverflowError:
print("overflowing value. Skipping this gulp...")
logger.warning("overflowing value. Skipping this gulp...")
gulp_status(3)

print(f'{len(done)} futures done')
futures = [fut for fut in futures if fut not in done]

return lastname, trigtime, futures


def cluster_and_plot(tab, globct, gulp=None, selectcols=["itime", "idm", "ibox"],
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
setup(name='dsa110-T2',
version=get_git_version(),
url='http://github.com/dsa110/dsa110-T2',
requirements=['seaborn', 'astropy', 'hdbscan', 'progress'],
requirements=['seaborn', 'astropy', 'sklearn', 'progress'],
packages=['T2'],
zip_safe=False)
4 changes: 2 additions & 2 deletions version.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ def get_git_version(abbrev=7):
version = call_git_describe(abbrev).decode('UTF-8')
else:
version = '0.0.0'
if is_dirty():
version += "-dirty"
# if is_dirty():
# version += "-dirty"

# If that doesn't work, fall back on the value that's in
# RELEASE-VERSION.
Expand Down

0 comments on commit 0f117d3

Please sign in to comment.