Skip to content

Commit

Permalink
use LoggedPV class for most of work, support setting adel - archive d…
Browse files Browse the repository at this point in the history
…elta - value, add headers with info at startup
  • Loading branch information
newville committed Nov 7, 2024
1 parent b02e38d commit d35408f
Showing 1 changed file with 120 additions and 42 deletions.
162 changes: 120 additions & 42 deletions epicsapps/pvlogger/pvlogger.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,114 @@
from ..instruments import InstrumentDB

from ..utils import (get_pvtypes, get_pvdesc, normalize_pvname,
debugtimer, fix_filename)
debugtimer, fix_filename, get_timestamp)

from .configfile import PVLoggerConfig


ARCHIVE_DELTA = 1.e-8
FLUSHTIME = 30.0
SLEEPTIME = 0.5
RUN_FOLDER = 'pvlog'
motor_fields = ('.VAL', '.OFF', '.FOFF', '.SET', '.HLS', '.LLS',
'.DIR', '_able.VAL', '.SPMG', '.DESC')

class LoggedPV():
"""wraps a PV for logging
"""
def __init__(self, pvname, desc=None, adel=ARCHIVE_DELTA):
self.desc = desc
try:
adel = float(adel)
except:
adel = ARCHIVE_DELTA
self.adel = adel

self.pvname = normalize_pvname(pvname)
logname = self.pvname.replace('.', '_') + '.log'
self.filename = Path(fix_filename(logname, new=True)
).absolute().as_posix()
self.datafile = open(self.filename, 'a')
self.needs_flush = False
self.lastflush = 0.0

self.data = deque()
self.needs_header = False
self.connected = None
self.value = None
self.pv = get_pv(self.pvname, callback=self.onChanges,
connection_callback=self.onConnect)

def onConnect(self, pvname, conn, pv):
if conn:
if self.connected is None: # initial connection
self.connected = True
self.needs_header = True
else:
buff = [f"{time.time():.3f} {self.value} <reconnected>"]
self.datafile.write('\n'.join(buff))
else:
buff = [f"{time.time():.3f} {self.value} <disconnected>"]
self.datafile.write('\n'.join(buff))
self.needs_flush = True

def onChanges(self, pvname, value, char_value='', timestamp=0.0, **kws):
try:
skip = (abs(value - self.value) < self.adel)
except:
skip = False
if not skip:
self.value = value
if timestamp is None:
timestamp = time.time()
self.data.append((timestamp, value, char_value))

def flush(self):
self.lastflush = time.time()
self.needs_flush = False
self.datafile.flush()

def process(self):
if self.needs_header:
buff = ["# pvlog data file",
f"# pvname = {self.pvname}",
f"# label = {self.desc}",
f"# delta = {self.adel}",
f"# start_time = {get_timestamp()}"]

for attr in ('count', 'nelm', 'type', 'units',
'precision', 'host', 'access'):
val = getattr(self.pv, attr, 'unknown')
buff.append(f"# {attr:10s} = {val}")
enum_strs = getattr(self.pv, 'enum_strs', None)
if enum_strs is not None:
buff.append("# enum strings:")
for index, nam in enumerate(enum_strs):
buff.append(f"# {index} = {nam}")

buff.extend(["#---------------------------------",
"# timestamp value char_value", ""])
self.datafile.write('\n'.join(buff))
self.needs_header = False
n = len(self.data)
if n > 0:
buff = []
for i in range(n):
ts, val, cval = self.data.popleft()
buff.append(f"{ts:.3f} {val} {cval}")
buff.append('')
self.datafile.write('\n'.join(buff))
self.needs_flush = True
if self.needs_flush and (time.time() > (self.lastflush + 15)):
self.flush()


class PVLogger():
about_msg = """Epics PV Logger, CLI
Matt Newville <[email protected]>
"""
def __init__(self, configfile=None, prompt=None, wxparent=None):
self.data = {}
self.datafiles = {}
self.lastflush = {}
self.pvs = []
self.pvs = {}
if configfile is not None:
self.read_configfile(configfile)

Expand All @@ -52,16 +140,23 @@ def read_configfile(self, configfile):
os.chdir(self.topfolder)

def connect_pvs(self):
_pvnames, _pvdesc = [], []
_pvnames, _pvdesc, _pvadel = [], [], []
cnf = self.config
for pvline in cnf.get('pvs', []):
name = pvline.strip()
desc = '<auto>'
adel = str(ARCHIVE_DELTA)
if '|' in pvline:
name, desc = pvline.split('|', 1)
words = pvline.split('|')
name = words[0].strip()
if len(words) > 1:
desc = words[1]
if len(words) > 2:
adel = words[2].strip()

_pvnames.append(name.strip())
_pvdesc.append(desc.strip())

_pvadel.append(adel.strip())
inst_names = cnf.get('instruments', [])
escan_cred = os.environ.get('ESCAN_CREDENTIALS', '')
inst_map = {}
Expand All @@ -74,38 +169,39 @@ def connect_pvs(self):
_pvnames.append(pvname)
inst_map[inst].append(pvname)
_pvdesc.append('<auto>')
_pvadel.append(str(ARCHIVE_DELTA))
except AttributeError:
pass

descpvs = {}
rtyppvs = {}
for ipv, pvname in enumerate(_pvnames):
dpv = pvname + '.DESC'
rpv = pvname + '.RTYP'
if pvname.endswith('.VAL'):
dpv = pvname[:-4] + '.DESC'
rpv = pvname[:-4] + '.RTYP'
pref = pvname[:-4] if pvname.endswith('.VAL') else pvname
rtyppvs[pvname] = get_pv(f"{pref}.RTYP")
if _pvdesc[ipv] == '<auto>':
descpvs[pvname] = get_pv(dpv)
rtyppvs[pvname] = get_pv(rpv)
descpvs[pvname] = get_pv(f"{pref}.DESC")

time.sleep(0.01)
desc_lines = []
motor_lines = []
self.pvs = []
self.pvs = {}
for ipv, pvname in enumerate(_pvnames):
desc = _pvdesc[ipv]
desc = _pvdesc[ipv]
adel = _pvadel[ipv]
if desc == '<auto>' and pvname in descpvs:
desc = descpvs[pvname].get()
desc_lines.append(f"{ipv:04d} | {pvname} | {desc}")
self.add_pv(pvname)
if adel == '<auto>':
adel = ARCHIVE_DELTA
desc_lines.append(f"{ipv:04d} | {pvname} | {desc} | {adel}")
self.add_pv(pvname, desc=desc, adel=adel)
if 'motor' == rtyppvs[pvname].get():
prefix = pvname
if pvname.endswith('.VAL'):
prefix = prefix[:-4]
motor_lines.append(prefix + '.VAL')
for mfield in motor_fields:
self.add_pv(f"{prefix}{mfield}")
self.add_pv(f"{prefix}{mfield}",
desc=f"{desc} {mfield}", adel=adel)

motor_lines.append('')
desc_lines.append('')
Expand All @@ -119,31 +215,13 @@ def connect_pvs(self):
with open("_Instruments.json", "w+") as fh:
json.dump(inst_map, fh)

def add_pv(self, pvname):
def add_pv(self, pvname, desc=None, adel=ARCHIVE_DELTA):
if pvname not in self.pvs:
self.pvs.append(get_pv(pvname))
self.data[pvname] = deque()
self.datafiles[pvname] = open(fix_filename(f"{pvname}.log"), 'w+')
self.lastflush[pvname] = 0.

def onChanges(self, pvname, value, char_value='', timestamp=0, **kws):
self.data[pvname].append((timestamp, value, char_value))

self.pvs[pvname] = LoggedPV(pvname, desc=desc, adel=adel)

def run(self):
self.connect_pvs()
print(" Run: ", len(self.pvs))
for pv in self.pvs:
pv.clear_callbacks()
pv.add_callback(self.onChanges)

while True:
time.sleep(SLEEPTIME)

for pvname, data in self.data.items():
if len(data) > 0:
n = len(data) # use this to permit inserts while writing
now = time.time()
print(pvname, self.datafiles[pvname], len(data))
if now > self.lastflush[pvname] + FLUSHTIME:
self.datafiles[pvname].flush()
for pv in self.pvs.values():
pv.process()

0 comments on commit d35408f

Please sign in to comment.