Skip to content

Commit

Permalink
finally memcg_watcher uses cgroups as desired
Browse files Browse the repository at this point in the history
creating eventfd is being done with linuxfd external library
  • Loading branch information
t3rn0 committed Oct 18, 2020
1 parent 9373673 commit 669f5ce
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 52 deletions.
37 changes: 23 additions & 14 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
import signal
import os
import multiprocessing
import memcg_watcher
import multiprocessing
from commons import logger, OutOfMemoryError
import sys


def signal_handler(signum, frame):
logger.critical(f'got signum {signum}, handling')
raise OutOfMemoryError()
if signum == 2:
raise KeyboardInterrupt
raise OutOfMemoryError


signal.signal(signal.SIGUSR1, signal_handler)
signal.signal(signal.SIGUSR2, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGCHLD, signal_handler)


def expernsive_function():
def expensive_work():
ttt = []
i = 0
while True:
Expand All @@ -30,23 +34,28 @@ def main():
p = os.getpid()
logger.info(f'main function started with pid {p}')
try:
expernsive_function()
except OutOfMemoryError as e:
expensive_work()
except OutOfMemoryError:
logger.critical(f'catched OutOfMemoryError, closing...', exc_info=True)
finally:
shutdown()


if __name__ == '__main__':
memcg_parameters = {
'memory_usage_factor_limit': 0.8,
import platform
if platform.system() != 'Linux':
raise EnvironmentError

params = {
'memory_usage_factor_limit': 0.6,
}

subprocess = multiprocessing.Process(
target=memcg_watcher.main, name='memcg_watcher',
kwargs=memcg_parameters,
daemon=True
target=memcg_watcher.main, kwargs=params, daemon=True
)
subprocess.start()

main()
try:
subprocess.start()
main()
finally:
subprocess.terminate()
80 changes: 42 additions & 38 deletions memcg_watcher.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
import time, os, sys
import signal
from commons import logger
import linuxfd
import select
import platform


if platform.system() != 'Linux':
raise EnvironmentError


MEMORY_USAGE_FACTOR_LIMIT = 0.5
NOTIFY_SIGNAL = signal.SIGUSR1
MEMORY_LIMIT_IN_BYTES = "/sys/fs/cgroup/memory/memory.limit_in_bytes"
MEMORY_USAGE_IN_BYTES = "/sys/fs/cgroup/memory/memory.usage_in_bytes"


def bytes_to_human(c):
if c < 1024: return "%i B" % c
S = "KMG"
i = 0
while i < len(S) - 1:
if c < 0.8 * 1024 ** (i + 2): break
i += 1
f = float(c) / (1024 ** (i + 1))
return "%.1f %sB" % (f, S[i])
CGROUP_EVENT_CONTROL = "/sys/fs/cgroup/memory/cgroup.event_control"


def get_limit_usage():
Expand All @@ -28,28 +22,38 @@ def get_current_usage():
return int(open(MEMORY_USAGE_IN_BYTES).read())


def main(memory_usage_factor_limit=MEMORY_USAGE_FACTOR_LIMIT):
def get_threshold(factor):
# Strange but necessary scaling
limit = get_limit_usage() // 2 ** 30
return int(limit * factor)


while True:
# Strange but necessary scaling.
limit = get_limit_usage() // 2 ** 30

used = get_current_usage()
fact = float(used) / limit
p = os.getppid()

if p <= 1:
# This means that our parent process has died. Stop now.
logging.info('closing watcher')
sys.exit()

logger.info("ppid: %s, mem limit: %s, current rss: %s, percentage: %s%%" % (
os.getppid(), bytes_to_human(limit), bytes_to_human(used), round(100.0*fact)))

if fact >= memory_usage_factor_limit:
logger.critical("sending signal to proc %i ..." % p)
os.kill(p, NOTIFY_SIGNAL)

time.sleep(1)

logger.warning('something is wrong')
def main(memory_usage_factor_limit=MEMORY_USAGE_FACTOR_LIMIT):
name = 'MemWatcher'
logger.info(f'starting {name}')

efd = linuxfd.eventfd(initval=0, nonBlocking=True)
mfd = open(MEMORY_USAGE_IN_BYTES)
mfd.fileno()
threshold = get_threshold(memory_usage_factor_limit)

with open(CGROUP_EVENT_CONTROL, 'w') as f:
f.write(f'{efd.fileno()} {mfd.fileno()} {threshold}')

epl = select.epoll()
epl.register(efd.fileno(), select.EPOLLIN)

try:
isrunning = True
while isrunning:
events = epl.poll(-1)
for fd, event in events:
if fd == efd.fileno() and event & select.EPOLLIN:
logger.info('event file received update')
logger.info(f'{name} sent signal to the main process')
efd.read()
isrunning = False
finally:
logger.info(f'closing {name}')
epl.unregister(efd.fileno())
epl.close()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
linuxfd==1.4.4

0 comments on commit 669f5ce

Please sign in to comment.