Skip to content
This repository has been archived by the owner on Nov 9, 2020. It is now read-only.

Commit

Permalink
Merge pull request #612 from brunotm/brunotm.issue35
Browse files Browse the repository at this point in the history
Deserialize and handle request execution with threads in vmdk_ops.py
  • Loading branch information
Ritesh H Shukla authored Oct 29, 2016
2 parents d618cff + c0c6724 commit 190f59c
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 86 deletions.
31 changes: 19 additions & 12 deletions esx_service/utils/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import convert
import auth_data_const
import volume_kv as kv
import threading

# All supported vmdk commands
CMD_CREATE = 'create'
Expand All @@ -34,17 +35,20 @@

SIZE = 'size'

_auth_mgr = None
def connect_auth_db():
# thread local storage
thread_local = threading.local()

def get_auth_mgr():
""" Get a connection to auth DB. """
global _auth_mgr
if not _auth_mgr:
_auth_mgr = auth_data.AuthorizationDataManager()
_auth_mgr.connect()

global thread_local
if not hasattr(thread_local, '_auth_mgr'):
thread_local._auth_mgr = auth_data.AuthorizationDataManager()
thread_local._auth_mgr.connect()
return thread_local._auth_mgr

def get_tenant(vm_uuid):
""" Get tenant which owns this VM by querying the auth DB. """
global _auth_mgr
_auth_mgr = get_auth_mgr()
try:
cur = _auth_mgr.conn.execute(
"SELECT tenant_id FROM vms WHERE vm_id = ?",
Expand Down Expand Up @@ -82,7 +86,7 @@ def get_privileges(tenant_uuid, datastore):
querying the auth DB.
"""
global _auth_mgr
_auth_mgr = get_auth_mgr()
privileges = []
logging.debug("get_privileges tenant_uuid=%s datastore=%s", tenant_uuid, datastore)
try:
Expand Down Expand Up @@ -136,6 +140,7 @@ def get_total_storage_used(tenant_uuid, datastore):
by querying auth DB.
"""
_auth_mgr = get_auth_mgr()
total_storage_used = 0
try:
cur = _auth_mgr.conn.execute(
Expand Down Expand Up @@ -201,7 +206,7 @@ def check_privileges_for_command(cmd, opts, tenant_uuid, datastore, privileges):

def tables_exist():
""" Check tables needed for authorization exist or not. """
global _auth_mgr
_auth_mgr = get_auth_mgr()

try:
cur = _auth_mgr.conn.execute("SELECT name FROM sqlite_master WHERE type = 'table' and name = 'tenants';")
Expand Down Expand Up @@ -272,7 +277,7 @@ def authorize(vm_uuid, datastore, cmd, opts):
logging.debug("Authorize: opt=%s", opts)

try:
connect_auth_db()
get_auth_mgr()
except auth_data.DbConnectionError, e:
error_info = "Failed to connect auth DB({0})".format(e)
return error_info, None, None
Expand Down Expand Up @@ -306,6 +311,8 @@ def authorize(vm_uuid, datastore, cmd, opts):

def add_volume_to_volumes_table(tenant_uuid, datastore, vol_name, vol_size_in_MB):
""" Insert volume to volumes table. """
_auth_mgr = get_auth_mgr()

logging.debug("add to volumes table(%s %s %s %s)", tenant_uuid, datastore,
vol_name, vol_size_in_MB)
try:
Expand Down Expand Up @@ -366,4 +373,4 @@ def get_row_from_privileges_table(conn, tenant_uuid):

result = cur.fetchall()
return None, result


50 changes: 38 additions & 12 deletions esx_service/utils/kvESX.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import json
import logging
import sys
import errno
import time

# Python version 3.5.1
PYTHON64_VERSION = 50659824
Expand Down Expand Up @@ -60,6 +62,10 @@
# Flag to track the version of Python on the platform
is_64bits = False

# Number of times and sleep time to retry on IOError EBUSY
EBUSY_RETRY_COUNT = 8
EBUSY_RETRY_SLEEP = 0.5

class disk_info(Structure):
_fields_ = [('size', c_uint64),
('allocated', c_uint64),
Expand Down Expand Up @@ -238,12 +244,22 @@ def load(volpath):
meta_file = lib.DiskLib_SidecarMakeFileName(volpath.encode(),
DVOL_KEY.encode())

try:
with open(meta_file, "r") as fh:
kv_str = fh.read()
except:
logging.exception("Failed to access %s", meta_file)
return None
retry_count = 0
while True:
try:
with open(meta_file, "r") as fh:
kv_str = fh.read()
break
except Exception as open_error:
# This is a workaround to the timing/locking with metadata files issue #626
if (hasattr(open_error, "errno") and
(open_error.errno == errno.EBUSY and retry_count <= EBUSY_RETRY_COUNT)):
logging.warning("Meta file %s busy for load(), retrying...", meta_file)
retry_count += 1
time.sleep(EBUSY_RETRY_TIME)
else:
logging.exception("Failed to access %s", meta_file)
return None

try:
return json.loads(kv_str)
Expand All @@ -259,12 +275,22 @@ def save(volpath, kv_dict):

kv_str = json.dumps(kv_dict)

try:
with open(meta_file, "w") as fh:
fh.write(align_str(kv_str, KV_ALIGN))
except:
logging.exception("Failed to save meta-data for %s", volpath);
return False
retry_count = 0
while True:
try:
with open(meta_file, "w") as fh:
fh.write(align_str(kv_str, KV_ALIGN))
break
except Exception as open_error:
# This is a workaround to the timing/locking with metadata files issue #626
if (hasattr(open_error, "errno") and
(open_error.errno == errno.EBUSY and retry_count <= EBUSY_RETRY_COUNT)):
logging.warning("Meta file %s busy for save(), retrying...", meta_file)
retry_count += 1
time.sleep(EBUSY_RETRY_SLEEP)
else:
logging.exception("Failed to save meta-data for %s", volpath);
return False

return True

Expand Down
21 changes: 12 additions & 9 deletions esx_service/vmci/vmci_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
#include "connection_types.h"


// SO_QSIZE maximum number of connections (requests) in socket queue.
int SO_QSIZE = 128;

// Returns vSocket to listen on, or -1.
// errno indicates the reason for a failure, if any.
int
Expand Down Expand Up @@ -80,6 +83,15 @@ vmci_init(unsigned int port)
return CONN_FAILURE;
}

/*
* listen for client connections.
*/
ret = listen(socket_fd, SO_QSIZE);
if (ret == -1) {
perror("Failed to listen on socket");
return CONN_FAILURE;
}

return socket_fd;
}

Expand All @@ -104,15 +116,6 @@ vmci_get_one_op(const int s, // socket to listen on
return CONN_FAILURE;
}

/*
* listen for client connections.
*/
ret = listen(s, 1);
if (ret == -1) {
perror("Failed to listen on socket");
return CONN_FAILURE;
}

addrLen = sizeof addr;
client_socket = accept(s, (struct sockaddr *) &addr, &addrLen);
if (client_socket == -1) {
Expand Down
139 changes: 97 additions & 42 deletions esx_service/vmdk_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import threading
import time
from ctypes import *
from weakref import WeakValueDictionary

from vmware import vsi

Expand Down Expand Up @@ -98,6 +99,10 @@
MAX_JSON_SIZE = 1024 * 4 # max buf size for query json strings. Queries are limited in size
MAX_SKIP_COUNT = 16 # max retries on VMCI Get Ops failures

# Error codes
VMCI_ERROR = -1 # VMCI C code uses '-1' to indicate failures
ECONNABORTED = 103 # Error on non privileged client

# Volume data returned on Get request
CAPACITY = 'capacity'
SIZE = 'size'
Expand All @@ -118,6 +123,10 @@
# VMCI library used to communicate with clients
lib = None

# For managing resource locks in getLock.
managerLock = threading.Lock() # Serialize access to rsrcLocks
rsrcLocks = WeakValueDictionary() # WeakValueDictionary to track and reuse locks while they're alive

# Run executable on ESX as needed for vmkfstools invocation (until normal disk create is written)
# Returns the integer return value and the stdout str on success and integer return value and
# the stderr str on error
Expand Down Expand Up @@ -1123,22 +1132,100 @@ def load_vmci():
else:
lib = CDLL(os.path.join(LIB_LOC, "libvmci_srv.so"), use_errno=True)

def getLock(lockname):
'''
Create or return a existing lock identified by lockname.
'''
global rsrcLocks

with managerLock:
try:
lock = rsrcLocks[lockname]
logging.debug("getLock(): return existing lock %s", lockname)
except KeyError as e:
lock = threading.Lock()
rsrcLocks[lockname] = lock
logging.debug("getLock(): return new lock %s", lockname)
return lock

def send_vmci_reply(client_socket, reply_string):
reply = json.dumps(reply_string)
response = lib.vmci_reply(client_socket, c_char_p(reply.encode()))
errno = get_errno()
logging.debug("lib.vmci_reply: VMCI replied with errcode %s", response)
if response == VMCI_ERROR:
logging.warning("vmci_reply returned error %s (errno=%d)",
os.strerror(errno), errno)

def execRequestThread(client_socket, cartel, request):
'''
Execute requests in a thread context with a per volume locking.
'''
# Before we start, block to allow main thread or other running threads to advance.
# https://docs.python.org/2/faq/library.html#none-of-my-threads-seem-to-run-why
time.sleep(0.001)
try:
# Get VM name & ID from VSI (we only get cartelID from vmci, need to convert)
vmm_leader = vsi.get("/userworld/cartel/%s/vmmLeader" % str(cartel))
group_info = vsi.get("/vm/%s/vmmGroupInfo" % vmm_leader)
vm_name = group_info["displayName"]
cfg_path = group_info["cfgPath"]
uuid = group_info["uuid"]
# pyVmomi expects uuid like this one: 564dac12-b1a0-f735-0df3-bceb00b30340
# to get it from uuid in VSI vms/<id>/vmmGroup, we use the following format:
UUID_FORMAT = "{0}{1}{2}{3}-{4}{5}-{6}{7}-{8}{9}-{10}{11}{12}{13}{14}{15}"
vm_uuid = UUID_FORMAT.format(*uuid.replace("-", " ").split())

try:
req = json.loads(request.decode('utf-8'))
except ValueError as e:
reply_string = {u'Error': "Failed to parse json '%s'." % request}
send_vmci_reply(client_socket, reply_string)
else:
details = req["details"]
opts = details["Opts"] if "Opts" in details else {}

# Lock name defaults to volume name, or socket# if request has no volume defined
lockname = details["Name"] if len(details["Name"]) > 0 else "socket{0}".format(client_socket)
# Set thread name to vm_name-lockname
threading.currentThread().setName("{0}-{1}".format(vm_name, lockname))

# Get a resource lock
rsrcLock = getLock(lockname)

logging.debug("Trying to aquire lock: %s", lockname)
with rsrcLock:
logging.debug("Aquired lock: %s", lockname)

reply_string = executeRequest(vm_uuid=vm_uuid,
vm_name=vm_name,
config_path=cfg_path,
cmd=req["cmd"],
full_vol_name=details["Name"],
opts=opts)

logging.info("executeRequest '%s' completed with ret=%s", req["cmd"], reply_string)
send_vmci_reply(client_socket, reply_string)
logging.debug("Released lock: %s", lockname)

except Exception as ex_thr:
logging.exception("Unhandled Exception:")
reply_string = err("Server returned an error: {0}".format(repr(ex_thr)))
send_vmci_reply(client_socket, reply_string)

# load VMCI shared lib , listen on vSocket in main loop, handle requests
def handleVmciRequests(port):
VMCI_ERROR = -1 # VMCI C code uses '-1' to indicate failures
ECONNABORTED = 103 # Error on non privileged client

skip_count = MAX_SKIP_COUNT # retries for vmci_get_one_op failures
bsize = MAX_JSON_SIZE
txt = create_string_buffer(bsize)

cartel = c_int32()
sock = lib.vmci_init(c_uint(port))

if sock == VMCI_ERROR:
errno = get_errno()
raise OSError("Failed to initialize vSocket listener: %s (errno=%d)" \
% (os.strerror(errno), errno))

skip_count = MAX_SKIP_COUNT # retries for vmci_get_one_op failures
while True:
c = lib.vmci_get_one_op(sock, byref(cartel), txt, c_int(bsize))
logging.debug("lib.vmci_get_one_op returns %d, buffer '%s'",
Expand All @@ -1160,43 +1247,11 @@ def handleVmciRequests(port):
else:
skip_count = MAX_SKIP_COUNT # reset the counter, just in case

# Get VM name & ID from VSI (we only get cartelID from vmci, need to convert)
vmm_leader = vsi.get("/userworld/cartel/%s/vmmLeader" %
str(cartel.value))
group_info = vsi.get("/vm/%s/vmmGroupInfo" % vmm_leader)

vm_name = group_info["displayName"]
cfg_path = group_info["cfgPath"]
uuid = group_info["uuid"]
# pyVmomi expects uuid like this one: 564dac12-b1a0-f735-0df3-bceb00b30340
# to get it from uuid in VSI vms/<id>/vmmGroup, we use the following format:
UUID_FORMAT = "{0}{1}{2}{3}-{4}{5}-{6}{7}-{8}{9}-{10}{11}{12}{13}{14}{15}"
vm_uuid = UUID_FORMAT.format(*uuid.replace("-", " ").split())

try:
req = json.loads(txt.value.decode('utf-8'))
except ValueError as e:
ret = {u'Error': "Failed to parse json '%s'." % txt.value}
else:
details = req["details"]
opts = details["Opts"] if "Opts" in details else {}
threading.currentThread().setName(vm_name)
ret = executeRequest(vm_uuid=vm_uuid,
vm_name=vm_name,
config_path=cfg_path,
cmd=req["cmd"],
full_vol_name=details["Name"],
opts=opts)
logging.info("executeRequest '%s' completed with ret=%s",
req["cmd"], ret)

ret_string = json.dumps(ret)
response = lib.vmci_reply(c, c_char_p(ret_string.encode()))
errno = get_errno()
logging.debug("lib.vmci_reply: VMCI replied with errcode %s", response)
if response == VMCI_ERROR:
logging.warning("vmci_reply returned error %s (errno=%d)",
os.strerror(errno), errno)
client_socket = c # Bind to avoid race conditions.
# Fire a thread to execute the request
threading.Thread(
target=execRequestThread,
args=(client_socket, cartel.value, txt.value)).start()

lib.close(sock) # close listening socket when the loop is over

Expand Down
Loading

0 comments on commit 190f59c

Please sign in to comment.