Skip to content

Commit

Permalink
fixes #7, #8
Browse files Browse the repository at this point in the history
  • Loading branch information
prashanthellina committed Apr 16, 2015
1 parent 5777c07 commit c7269b0
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 13 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ sudo pip install git+git://github.com/prashanthellina/vwserver.git
Start a VWServer by doing. Note that VWServer will use the specified directory "data" for storing all of its data. Make sure you do not have any of your own data there. You can change the port number to anything else you desire.

``` bash
vwserver --port 8889 data
vwserver --port 8889 data `which vw`
```

Now the server is running and is web accessible. Open http://localhost:8889 in the browser. You will see a web-based python console to interact with this VWServer.
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
'websocket-client',
'decorator',
'gevent',
'daemonize',
'psutil',
'funcserver',
],
dependency_links=[
Expand Down
84 changes: 72 additions & 12 deletions vwserver/vwserver.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from gevent import monkey; monkey.patch_all()

import time
import os
import sys
import time
import shlex
import shutil
import subprocess
Expand All @@ -10,16 +11,46 @@
import threading
import copy

import psutil
import tornado
import websocket
from decorator import decorator
from funcserver import RPCServer, RPCClient

# As we are using gevent for IO parallelization (network RPC requests) and
# gevent does not play well with process spawning, we need to make sure
# that the process spawned is a daemon otherwise the event loop gets stuck
# Vowpal wabbit does support daemonizing however that isn't working in
# active learning mode as of version 7.7. To overcome this we are using
# a Vowpal wabbit wrapping python script that daemonizes and execs into
# the Vowpal wabbit binary
VWWRAPPER = os.path.join(os.path.dirname(__file__), 'vwdaemon.py')

VWOPTIONS = {
'passes': 3,
'bit_precision': 27,
'active_learning': False,
'active_mellowness': 8,
}

def get_vwdaemon_pid(port):
'''
Finds the vwwrapper daemon process that is listening on @port
and returns pid of that process
'''
for pid in psutil.pids():
try:
p = psutil.Process(pid)
conns = p.connections()
except psutil.AccessDenied:
continue
conns = [c.laddr for c in conns if c.status == 'LISTEN']
ports = [_port for _, _port in conns]
if port in ports and p.parent().pid == 1:
return pid

return None

def is_process_running(process_id):
try:
os.kill(process_id, 0)
Expand Down Expand Up @@ -56,12 +87,13 @@ def sleep_until(fn, timeout=25.0):
class VWSocket(object):
CHUNK_SIZE = 4096

def __init__(self, vw, on_fatal_failure=None):
def __init__(self, vw, on_fatal_failure=None, on_connect=None):
self.vw = vw
self.log = vw.log
self.port = vw.port
self.lock = threading.RLock()
self.on_fatal_failure = on_fatal_failure
self.on_connect = on_connect

sleep_until(self.connect, timeout=5.0)

Expand All @@ -86,6 +118,7 @@ def connect(self):
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(('localhost', self.port))
if self.on_connect: self.on_connect()
return True
except socket.error:
return False
Expand Down Expand Up @@ -117,10 +150,11 @@ def send_commands(self, commands, num_responses=None):
class VW(object):
NUM_CHILD_PROCESSES = 8

def __init__(self, name, data_dir, log, options=None, on_fatal_failure=None):
def __init__(self, name, data_dir, vw_binary, log, options=None, on_fatal_failure=None):
self.log = log
self.name = name
self.data_dir = data_dir
self.vw_binary = vw_binary
self.port = 0
self.sock = None
self.on_fatal_failure = on_fatal_failure
Expand Down Expand Up @@ -155,6 +189,22 @@ def kill_vw_processes(self):
os.remove(self.pid_fpath)
return pid

def make_options(self):
o = []
for k, v in self.options.iteritems():
# boolean value: active=True becomes --active
if isinstance(v, bool):
if v is True:
o.append('--%s' % k)

elif isinstance(v, str):
o.append('--%s "%s"' % (k, v.replace('"', r'\"')))

else:
o.append('--%s %s' % (k, v))

return o

def load_vw(self):
# ensure cache file
open(self.cache_fpath, 'a+').close()
Expand All @@ -166,7 +216,7 @@ def load_vw(self):
self.port = get_free_port()

# user-specifiable options
opts = [('--%s %s' % (k, v)) for k, v in self.options.iteritems()]
opts = self.make_options()

# model file option
if os.path.exists(self.model_fpath):
Expand All @@ -177,14 +227,14 @@ def load_vw(self):
# standard options
# NOTE: disabling cache because it is causing issues
# with online training
opts.extend(['--daemon', '--no_stdin', '--save_resume', '--quiet',
opts.extend(['--no_stdin', '--save_resume', '--quiet',
'--num_children %s' % self.NUM_CHILD_PROCESSES,
'--pid_file %s' % self.pid_fpath,
#'--cache_file %s' % self.cache_fpath,
'--port %s' % self.port])

# construct vw command
cmd = 'vw %s %s' % (' '.join(opts), self.dummy_input_fpath)
cmd = '%s %s %s %s %s %s' % (sys.executable, VWWRAPPER, self.pid_fpath,
self.vw_binary, ' '.join(opts), self.dummy_input_fpath)
self.log.debug('cmd = %s' % cmd)

# launch command
Expand All @@ -195,7 +245,13 @@ def load_vw(self):
raise Exception('Failed to execute vw process. Pid file not found.')

# initilize socket for communication
self.sock = VWSocket(self, on_fatal_failure=self.on_fatal_failure)
# NOTE: Upon successful connection, we are finding the pid
# of the daemon process that is found to be listening on
# the correct port number. We are doing this and overwriting the pid
# written into pid file by VWWRAPPER because for some reason
# the pid is off by one!
self.sock = VWSocket(self, on_fatal_failure=self.on_fatal_failure,
on_connect=lambda: open(self.pid_fpath, 'w').write(str(get_vwdaemon_pid(self.port))))

def load_options(self):
if os.path.exists(self.options_fpath):
Expand Down Expand Up @@ -234,14 +290,15 @@ def ensurevw(fn, vw, *args, **kwargs):
raise Exception('vw "%s" does not exist' % vw)

if vw not in self.vws:
self.vws[vw] = VW(vw, data_dir, log=self.log,
self.vws[vw] = VW(vw, self.vw_binary, data_dir, log=self.log,
on_fatal_failure=lambda: self.unload(vw))

return fn(self, self.vws[vw], *args, **kwargs)

class VWAPI(object):
def __init__(self, data_dir):
def __init__(self, data_dir, vw_binary):
self.data_dir = data_dir
self.vw_binary = vw_binary
self.vws = {}

def _check_options(self, options):
Expand All @@ -267,7 +324,7 @@ def create(self, name, options=None):
raise Exception('vw model "%s" exists already' % name)

os.makedirs(data_dir)
self.vws[name] = VW(name, data_dir, self.log, options,
self.vws[name] = VW(name, data_dir, self.vw_binary, self.log, options,
on_fatal_failure=lambda: self.unload(name))

def unload(self, vw):
Expand Down Expand Up @@ -366,18 +423,21 @@ def __init__(self, *args, **kwargs):

# make data dir if not already present
self.data_dir = os.path.abspath(self.args.data_dir)
self.vw_binary = os.path.abspath(self.args.vw_binary)
if not os.path.exists(self.data_dir):
os.makedirs(self.data_dir)

def prepare_api(self):
return VWAPI(self.data_dir)
return VWAPI(self.data_dir, self.vw_binary)

def prepare_handlers(self):
return [('/ws/vw/([^/]+)', WSVWHandler)]

def define_args(self, parser):
parser.add_argument('data_dir', type=str, metavar='data-dir',
help='Directory path where data is stored')
parser.add_argument('vw_binary', type=str, metavar='vw-binary',
help='Absolute path of vw executable file')

if __name__ == '__main__':
VWServer().start()

0 comments on commit c7269b0

Please sign in to comment.