Skip to content

Commit

Permalink
Merge pull request #49 from algrebe/stats-and-logs-corrections
Browse files Browse the repository at this point in the history
Stats and logs corrections
  • Loading branch information
algrebe authored Feb 7, 2017
2 parents a1734c9 + 5376a34 commit e8cff44
Showing 1 changed file with 19 additions and 86 deletions.
105 changes: 19 additions & 86 deletions funcserver/funcserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,72 +38,6 @@ def disable_requests_debug_logs():
# otherwise it swamps with too many logs
logging.getLogger('requests').setLevel(logging.WARNING)

class StatsCollector(object):
STATS_FLUSH_INTERVAL = 1

def __init__(self, prefix, stats_loc):
self.cache = {}
self.gauge_cache = {}

self.stats = None
if not stats_loc: return

port = None
if ':' in stats_loc:
ip, port = stats_loc.split(':')
port = int(port)
else:
ip = stats_loc

S = statsd.StatsClient
self.stats = S(ip, port, prefix) if port is not None else S(ip, prefix=prefix)

def fn():
while 1:
time.sleep(self.STATS_FLUSH_INTERVAL)
self._collect_ramusage()
self.send()

self.stats_thread = threading.Thread(target=fn)
self.stats_thread.daemon = True
self.stats_thread.start()

def incr(self, key, n=1):
if self.stats is None: return
self.cache[key] = self.cache.get(key, 0) + n

def decr(self, key, n=1):
if self.stats is None: return
self.cache[key] = self.cache.get(key, 0) - n

def timing(self, key, ms):
if self.stats is None: return
return self.stats.timing(key, ms)

def gauge(self, key, n, delta=False):
if delta:
v, _ = self.gauge_cache.get(key, (0, True))
n += v
self.gauge_cache[key] = (n, delta)

def _collect_ramusage(self):
self.gauge('resource.maxrss',
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)

def send(self):
if self.stats is None: return
p = self.stats.pipeline()

for k, v in self.cache.iteritems():
p.incr(k, v)

for k, (v, d) in self.gauge_cache.iteritems():
p.gauge(k, v, delta=d)

p.send()
self.cache = {}
self.gauge_cache = {}

def tag(*tags):
'''
Constructs a decorator that tags a function with specified
Expand Down Expand Up @@ -367,12 +301,11 @@ def _clean_kwargs(self, kwargs, fn):

def _handle_single_call(self, request, m):
fn_name = m.get('fn', None)
sname = 'api.%s' % fn_name
t = time.time()

tags = { "fn": fn_name or "unknown", "success": False }
try:
fn = self._get_apifn(fn_name)
self.stats.incr(sname)
args = m['args']
kwargs = self._clean_kwargs(m['kwargs'], fn)

Expand All @@ -383,24 +316,33 @@ def _handle_single_call(self, request, m):
r = fn(*args, **kwargs)
r = {'success': True, 'result': r}

tags['success'] = True

except Exception, e:
self.log.exception('Exception during RPC call. '
'fn=%s, args=%s, kwargs=%s' % \
(m.get('fn', ''), repr(m.get('args', '[]')),
repr(m.get('kwargs', '{}'))))
tags['success'] = False

self.log.exception("RPC failed",
fn=fn_name, args=m.get('args'), kwargs=m.get('kwargs'),
)
r = {'success': False, 'result': repr(e)}

finally:
tdiff = (time.time() - t) * 1000
self.stats.timing(sname, tdiff)
(
self.stats.measure('api', **tags)
.count(invoked=1)
.time(duration=tdiff)
)

try:
_r = self.server.on_api_call_end(fn_name, args, kwargs, self, r)
if _r is not None:
r = _r
except (SystemExit, KeyboardInterrupt): raise
except:
self.log.exception('In on_api_call_end for fn=%s' % fn_name)
self.log.exception("on_api_call_end failed",
fn=fn_name, args=args, kwargs=kwargs,
)

return r

Expand Down Expand Up @@ -483,10 +425,9 @@ def _handle_call_wrapper(self, request, fn, m, protocol):
try:
return self._handle_call(request, fn, m, protocol)
except Exception, e:
self.log.exception('Exception during RPC call. '
'fn=%s, args=%s, kwargs=%s' % \
(m.get('fn', ''), repr(m.get('args', '[]')),
repr(m.get('kwargs', '{}'))))
self.log.exception("RPC failed", fn=m.get('fn'),
args=m.get('args'), kwargs=m.get('kwargs'),
)
self.clear()
self.set_status(500)

Expand Down Expand Up @@ -539,10 +480,6 @@ class Server(BaseScript):

DISABLE_REQUESTS_DEBUG_LOGS = True

def create_stats(self):
stats_prefix = '.'.join([x for x in (self.hostname, self.name) if x])
return StatsCollector(stats_prefix, self.args.statsd_server)

def dump_stacks(self):
'''
Dumps the stack of all threads. This function
Expand Down Expand Up @@ -581,9 +518,6 @@ def define_args(self, parser):

parser.add_argument('--port', default=self.DEFAULT_PORT,
type=int, help='port to listen on for server')
parser.add_argument('--statsd-server', default=None,
help='Location of StatsD server to send statistics. '
'Format is ip[:port]. Eg: localhost, localhost:8125')
parser.add_argument('--debug', action='store_true',
help='When enabled, auto reloads server on code change')

Expand Down Expand Up @@ -686,7 +620,6 @@ def run(self):
if self.DISABLE_REQUESTS_DEBUG_LOGS:
disable_requests_debug_logs()

self.stats = self.create_stats()
self.threadpool = ThreadPool(self.THREADPOOL_WORKERS)

self.api = None
Expand Down

0 comments on commit e8cff44

Please sign in to comment.