From 25c653fba0a5e2f4ea14649f2c44cd7d97e99d73 Mon Sep 17 00:00:00 2001 From: Bruno Moura Date: Tue, 27 Sep 2016 12:12:34 +0100 Subject: [PATCH 01/14] Handle requests in separate threads with volume/resource level locks in vmdk_ops.py. Introduce getLock() to dynamically create or reuse existing locks. Address @msterin comments and remove the vmci lock. Address @andrewjstone comments and fix race condition. Address @govint comments and also log unhandled exceptions in VSI code. --- esx_service/vmdk_ops.py | 139 ++++++++++++++++++++++++++----------- vmdk_plugin/sanity_test.go | 3 +- 2 files changed, 99 insertions(+), 43 deletions(-) diff --git a/esx_service/vmdk_ops.py b/esx_service/vmdk_ops.py index 45e9ad934..91567f1ef 100755 --- a/esx_service/vmdk_ops.py +++ b/esx_service/vmdk_ops.py @@ -45,6 +45,7 @@ import threading import time from ctypes import * +from weakref import WeakValueDictionary from vmware import vsi @@ -98,6 +99,10 @@ MAX_JSON_SIZE = 1024 * 4 # max buf size for query json strings. Queries are limited in size MAX_SKIP_COUNT = 16 # max retries on VMCI Get Ops failures +# Error codes +VMCI_ERROR = -1 # VMCI C code uses '-1' to indicate failures +ECONNABORTED = 103 # Error on non privileged client + # Volume data returned on Get request CAPACITY = 'capacity' SIZE = 'size' @@ -118,6 +123,10 @@ # VMCI library used to communicate with clients lib = None +# For managing resource locks in getLock. +managerLock = threading.Lock() # Serialize access to rsrcLocks +rsrcLocks = WeakValueDictionary() # WeakValueDictionary to track and reuse locks while they're alive + # Run executable on ESX as needed for vmkfstools invocation (until normal disk create is written) # Returns the integer return value and the stdout str on success and integer return value and # the stderr str on error @@ -1123,22 +1132,100 @@ def load_vmci(): else: lib = CDLL(os.path.join(LIB_LOC, "libvmci_srv.so"), use_errno=True) +def getLock(lockname): + ''' + Create or return a existing lock identified by lockname. + ''' + global rsrcLocks + + with managerLock: + try: + lock = rsrcLocks[lockname] + logging.debug("getLock(): return existing lock %s", lockname) + except KeyError as e: + lock = threading.Lock() + rsrcLocks[lockname] = lock + logging.debug("getLock(): return new lock %s", lockname) + return lock + +def execRequestThread(client_socket, cartel, request): + ''' + Execute requests in a thread context with a per volume locking. + ''' + # Before we start, block to allow main thread or other running threads to advance. + # https://docs.python.org/2/faq/library.html#none-of-my-threads-seem-to-run-why + time.sleep(0.001) + try: + # Get VM name & ID from VSI (we only get cartelID from vmci, need to convert) + vmm_leader = vsi.get("/userworld/cartel/%s/vmmLeader" % str(cartel)) + group_info = vsi.get("/vm/%s/vmmGroupInfo" % vmm_leader) + vm_name = group_info["displayName"] + cfg_path = group_info["cfgPath"] + uuid = group_info["uuid"] + # pyVmomi expects uuid like this one: 564dac12-b1a0-f735-0df3-bceb00b30340 + # to get it from uuid in VSI vms//vmmGroup, we use the following format: + UUID_FORMAT = "{0}{1}{2}{3}-{4}{5}-{6}{7}-{8}{9}-{10}{11}{12}{13}{14}{15}" + vm_uuid = UUID_FORMAT.format(*uuid.replace("-", " ").split()) + + try: + req = json.loads(request.decode('utf-8')) + except ValueError as e: + ret_string = {u'Error': "Failed to parse json '%s'." % request} + response = lib.vmci_reply(client_socket, c_char_p(ret_string.encode())) + errno = get_errno() + logging.debug("lib.vmci_reply: VMCI replied with errcode %s", response) + if response == VMCI_ERROR: + logging.warning("vmci_reply returned error %s (errno=%d)", + os.strerror(errno), errno) + else: + details = req["details"] + opts = details["Opts"] if "Opts" in details else {} + + # Lock name defaults to volume name, or socket# if request has no volume defined + lockname = details["Name"] if len(details["Name"]) > 0 else "socket{0}".format(client_socket) + # Set thread name to vm_name-lockname + threading.currentThread().setName("{0}-{1}".format(vm_name, lockname)) + + # Get a resource lock + rsrcLock = getLock(lockname) + + logging.debug("Trying to aquire lock: %s", lockname) + with rsrcLock: + logging.debug("Aquired lock: %s", lockname) + + ret = executeRequest(vm_uuid=vm_uuid, + vm_name=vm_name, + config_path=cfg_path, + cmd=req["cmd"], + full_vol_name=details["Name"], + opts=opts) + logging.info("executeRequest '%s' completed with ret=%s", req["cmd"], ret) + + ret_string = json.dumps(ret) + response = lib.vmci_reply(client_socket, c_char_p(ret_string.encode())) + logging.debug("Released lock: %s", lockname) + + errno = get_errno() + logging.debug("lib.vmci_reply: VMCI replied with errcode %s", response) + if response == VMCI_ERROR: + logging.warning("vmci_reply returned error %s (errno=%d)", + os.strerror(errno), errno) + except Exception as e: + logging.exception("message") + # load VMCI shared lib , listen on vSocket in main loop, handle requests def handleVmciRequests(port): - VMCI_ERROR = -1 # VMCI C code uses '-1' to indicate failures - ECONNABORTED = 103 # Error on non privileged client - + skip_count = MAX_SKIP_COUNT # retries for vmci_get_one_op failures bsize = MAX_JSON_SIZE txt = create_string_buffer(bsize) - cartel = c_int32() sock = lib.vmci_init(c_uint(port)) + if sock == VMCI_ERROR: errno = get_errno() raise OSError("Failed to initialize vSocket listener: %s (errno=%d)" \ % (os.strerror(errno), errno)) - skip_count = MAX_SKIP_COUNT # retries for vmci_get_one_op failures while True: c = lib.vmci_get_one_op(sock, byref(cartel), txt, c_int(bsize)) logging.debug("lib.vmci_get_one_op returns %d, buffer '%s'", @@ -1160,43 +1247,11 @@ def handleVmciRequests(port): else: skip_count = MAX_SKIP_COUNT # reset the counter, just in case - # Get VM name & ID from VSI (we only get cartelID from vmci, need to convert) - vmm_leader = vsi.get("/userworld/cartel/%s/vmmLeader" % - str(cartel.value)) - group_info = vsi.get("/vm/%s/vmmGroupInfo" % vmm_leader) - - vm_name = group_info["displayName"] - cfg_path = group_info["cfgPath"] - uuid = group_info["uuid"] - # pyVmomi expects uuid like this one: 564dac12-b1a0-f735-0df3-bceb00b30340 - # to get it from uuid in VSI vms//vmmGroup, we use the following format: - UUID_FORMAT = "{0}{1}{2}{3}-{4}{5}-{6}{7}-{8}{9}-{10}{11}{12}{13}{14}{15}" - vm_uuid = UUID_FORMAT.format(*uuid.replace("-", " ").split()) - - try: - req = json.loads(txt.value.decode('utf-8')) - except ValueError as e: - ret = {u'Error': "Failed to parse json '%s'." % txt.value} - else: - details = req["details"] - opts = details["Opts"] if "Opts" in details else {} - threading.currentThread().setName(vm_name) - ret = executeRequest(vm_uuid=vm_uuid, - vm_name=vm_name, - config_path=cfg_path, - cmd=req["cmd"], - full_vol_name=details["Name"], - opts=opts) - logging.info("executeRequest '%s' completed with ret=%s", - req["cmd"], ret) - - ret_string = json.dumps(ret) - response = lib.vmci_reply(c, c_char_p(ret_string.encode())) - errno = get_errno() - logging.debug("lib.vmci_reply: VMCI replied with errcode %s", response) - if response == VMCI_ERROR: - logging.warning("vmci_reply returned error %s (errno=%d)", - os.strerror(errno), errno) + client_socket = c # Bind to avoid race conditions. + # Fire a thread to execute the request + threading.Thread( + target=execRequestThread, + args=(client_socket, cartel.value, txt.value)).start() lib.close(sock) # close listening socket when the loop is over diff --git a/vmdk_plugin/sanity_test.go b/vmdk_plugin/sanity_test.go index 4a3c2dc9a..c340427ac 100644 --- a/vmdk_plugin/sanity_test.go +++ b/vmdk_plugin/sanity_test.go @@ -21,6 +21,8 @@ package main import ( "flag" "fmt" + "testing" + log "github.com/Sirupsen/logrus" "github.com/docker/engine-api/client" "github.com/docker/engine-api/types" @@ -29,7 +31,6 @@ import ( "github.com/docker/engine-api/types/strslice" "github.com/vmware/docker-volume-vsphere/vmdk_plugin/utils/config" "golang.org/x/net/context" - "testing" ) const ( From 522de40ce6ac37606b51f2c0091241d8cbfbe395 Mon Sep 17 00:00:00 2001 From: Bruno Moura Date: Thu, 13 Oct 2016 18:52:20 +0100 Subject: [PATCH 02/14] Add basic parallel tests. --- vmdk_plugin/sanity_test.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/vmdk_plugin/sanity_test.go b/vmdk_plugin/sanity_test.go index c340427ac..78cd5e83d 100644 --- a/vmdk_plugin/sanity_test.go +++ b/vmdk_plugin/sanity_test.go @@ -21,6 +21,7 @@ package main import ( "flag" "fmt" + "strconv" "testing" log "github.com/Sirupsen/logrus" @@ -37,6 +38,8 @@ const ( defaultMountLocation = "/mnt/testvol" // tests are often run under regular account and have no access to /var/log defaultTestLogPath = "/tmp/test-docker-volume-vsphere.log" + // Number of volumes per client for parallel tests + parallelVolumes = 5 ) var ( @@ -231,4 +234,35 @@ func TestSanity(t *testing.T) { volumeName, elem.endPoint) } } + + fmt.Printf("Running parallel tests on %s and %s (may take a while)...\n", endPoint1, endPoint2) + // Create a short buffered channel to introduce random pauses + results := make(chan error, 5) + createRequest := types.VolumeCreateRequest{ + Name: volumeName, + Driver: driverName, + DriverOpts: map[string]string{ + "size": "1gb", + }, + } + // Create/delete routine + for idx, elem := range clients { + go func(idx int, c *client.Client) { + for i := 0; i < parallelVolumes; i++ { + volName := "volP" + strconv.Itoa(idx) + strconv.Itoa(i) + createRequest.Name = volName + _, err := c.VolumeCreate(context.Background(), createRequest) + results <- err + err = c.VolumeRemove(context.Background(), volName) + results <- err + } + }(idx, elem.client) + } + // We need to read #clients * #volumes * 2 operations from the channel + for i := 0; i < len(clients)*parallelVolumes*2; i++ { + err := <-results + if err != nil { + t.Fatalf("Parallel test failed, err: %v", err) + } + } } From 9a14d1b0dd06968b2c22852a2c6d0248bfa8447b Mon Sep 17 00:00:00 2001 From: Bruno Moura Date: Sun, 16 Oct 2016 21:25:51 +0100 Subject: [PATCH 03/14] Retry on errors in Run() on esx_vmdkcmd until maxRetryCount --- vmdk_plugin/vmdkops/esx_vmdkcmd.go | 34 ++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/vmdk_plugin/vmdkops/esx_vmdkcmd.go b/vmdk_plugin/vmdkops/esx_vmdkcmd.go index 3750dc1bd..bab53fd66 100644 --- a/vmdk_plugin/vmdkops/esx_vmdkcmd.go +++ b/vmdk_plugin/vmdkops/esx_vmdkcmd.go @@ -23,9 +23,10 @@ import ( "encoding/json" "errors" "fmt" - log "github.com/Sirupsen/logrus" "syscall" "unsafe" + + log "github.com/Sirupsen/logrus" ) /* @@ -39,7 +40,7 @@ type EsxVmdkCmd struct{} const ( commBackendName string = "vsocket" - maxRetryCount = 5 + maxRetryCount = 8 ) // A request to be passed to ESX service @@ -85,16 +86,27 @@ func (vmdkCmd EsxVmdkCmd) Run(cmd string, name string, opts map[string]string) ( ans := (*C.be_answer)(C.malloc(C.sizeof_struct_be_answer)) defer C.free(unsafe.Pointer(ans)) - _, err = C.Vmci_GetReply(C.int(EsxPort), cmdS, beS, ans) - if err != nil { - var errno syscall.Errno - errno = err.(syscall.Errno) - msg := fmt.Sprintf("'%s' failed: %v (errno=%d).", cmd, err, int(errno)) - if errno == syscall.ECONNRESET || errno == syscall.ETIMEDOUT { - msg += " Cannot communicate with ESX, please refer to the FAQ https://github.com/vmware/docker-volume-vsphere/wiki#faq" + for i := 0; i < maxRetryCount; i++ { + _, err = C.Vmci_GetReply(C.int(EsxPort), cmdS, beS, ans) + if err != nil { + var errno syscall.Errno + errno = err.(syscall.Errno) + msg := fmt.Sprintf("'%s' failed: %v (errno=%d).", cmd, err, int(errno)) + + // Still below maximum number of retries, log and continue + if i < maxRetryCount { + log.Warnf(msg + " Retrying...") + continue + } + + if errno == syscall.ECONNRESET || errno == syscall.ETIMEDOUT { + msg += " Cannot communicate with ESX, please refer to the FAQ https://github.com/vmware/docker-volume-vsphere/wiki#faq" + } + log.Warnf(msg) + return nil, errors.New(msg) } - log.Warnf(msg) - return nil, errors.New(msg) + // Received no error, exit loop + break } response := []byte(C.GoString(ans.buf)) From e708ad3ca7f403727e437a452b92f889ef0a4285 Mon Sep 17 00:00:00 2001 From: Bruno Moura Date: Mon, 17 Oct 2016 12:41:07 +0100 Subject: [PATCH 04/14] Mark socket as accepting connections in vmci_init() with a queue size of 128. There's no need to call listen() on socket in every vmci_get_one_op(). Fix retry loop in esx_vmdkcmd.Run() Address @msterin comments --- esx_service/vmci/vmci_server.c | 21 ++++++++++++--------- vmdk_plugin/vmdkops/esx_vmdkcmd.go | 6 ++++-- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/esx_service/vmci/vmci_server.c b/esx_service/vmci/vmci_server.c index 8b6b80587..f3c5b458a 100644 --- a/esx_service/vmci/vmci_server.c +++ b/esx_service/vmci/vmci_server.c @@ -35,6 +35,9 @@ #include "connection_types.h" +// SO_QSIZE maximum number of connections (requests) in socket queue. +int SO_QSIZE = 128; + // Returns vSocket to listen on, or -1. // errno indicates the reason for a failure, if any. int @@ -80,6 +83,15 @@ vmci_init(unsigned int port) return CONN_FAILURE; } + /* + * listen for client connections. + */ + ret = listen(socket_fd, SO_QSIZE); + if (ret == -1) { + perror("Failed to listen on socket"); + return CONN_FAILURE; + } + return socket_fd; } @@ -104,15 +116,6 @@ vmci_get_one_op(const int s, // socket to listen on return CONN_FAILURE; } - /* - * listen for client connections. - */ - ret = listen(s, 1); - if (ret == -1) { - perror("Failed to listen on socket"); - return CONN_FAILURE; - } - addrLen = sizeof addr; client_socket = accept(s, (struct sockaddr *) &addr, &addrLen); if (client_socket == -1) { diff --git a/vmdk_plugin/vmdkops/esx_vmdkcmd.go b/vmdk_plugin/vmdkops/esx_vmdkcmd.go index bab53fd66..34dbd0359 100644 --- a/vmdk_plugin/vmdkops/esx_vmdkcmd.go +++ b/vmdk_plugin/vmdkops/esx_vmdkcmd.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "syscall" + "time" "unsafe" log "github.com/Sirupsen/logrus" @@ -40,7 +41,7 @@ type EsxVmdkCmd struct{} const ( commBackendName string = "vsocket" - maxRetryCount = 8 + maxRetryCount = 5 ) // A request to be passed to ESX service @@ -86,7 +87,7 @@ func (vmdkCmd EsxVmdkCmd) Run(cmd string, name string, opts map[string]string) ( ans := (*C.be_answer)(C.malloc(C.sizeof_struct_be_answer)) defer C.free(unsafe.Pointer(ans)) - for i := 0; i < maxRetryCount; i++ { + for i := 0; i <= maxRetryCount; i++ { _, err = C.Vmci_GetReply(C.int(EsxPort), cmdS, beS, ans) if err != nil { var errno syscall.Errno @@ -96,6 +97,7 @@ func (vmdkCmd EsxVmdkCmd) Run(cmd string, name string, opts map[string]string) ( // Still below maximum number of retries, log and continue if i < maxRetryCount { log.Warnf(msg + " Retrying...") + time.Sleep(time.Second * 1) continue } From b1d99b08c909d5914cb0ae23c5d1caf673594c76 Mon Sep 17 00:00:00 2001 From: Bruno Moura Date: Sat, 22 Oct 2016 20:04:48 +0100 Subject: [PATCH 05/14] Handle busy error when opening metada in KvESX This is a ***workaround*** to the timing/locking with metadata files. issue #626 --- esx_service/utils/kvESX.py | 48 ++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/esx_service/utils/kvESX.py b/esx_service/utils/kvESX.py index 521861612..8358233bf 100644 --- a/esx_service/utils/kvESX.py +++ b/esx_service/utils/kvESX.py @@ -23,6 +23,8 @@ import json import logging import sys +import errno +import time # Python version 3.5.1 PYTHON64_VERSION = 50659824 @@ -60,6 +62,10 @@ # Flag to track the version of Python on the platform is_64bits = False +# Number of times and sleep time to retry on IOError EBUSY +EBUSY_RETRY_COUNT = 8 +EBUSY_RETRY_SLEEP = 0.5 + class disk_info(Structure): _fields_ = [('size', c_uint64), ('allocated', c_uint64), @@ -238,12 +244,21 @@ def load(volpath): meta_file = lib.DiskLib_SidecarMakeFileName(volpath.encode(), DVOL_KEY.encode()) - try: - with open(meta_file, "r") as fh: - kv_str = fh.read() - except: - logging.exception("Failed to access %s", meta_file) - return None + retry_count = 0 + while True: + try: + with open(meta_file, "r") as fh: + kv_str = fh.read() + break + except Exception as open_error: + # This is a workaround to the timing/locking with metadata files issue #626 + if open_error.errno == errno.EBUSY and retry_count <= EBUSY_RETRY_COUNT: + logging.warning("Meta file %s busy for load(), retrying...", meta_file) + retry_count += 1 + time.sleep(EBUSY_RETRY_TIME) + else: + logging.exception("Failed to access %s", meta_file) + return None try: return json.loads(kv_str) @@ -259,12 +274,21 @@ def save(volpath, kv_dict): kv_str = json.dumps(kv_dict) - try: - with open(meta_file, "w") as fh: - fh.write(align_str(kv_str, KV_ALIGN)) - except: - logging.exception("Failed to save meta-data for %s", volpath); - return False + retry_count = 0 + while True: + try: + with open(meta_file, "w") as fh: + fh.write(align_str(kv_str, KV_ALIGN)) + break + except Exception as open_error: + # This is a workaround to the timing/locking with metadata files issue #626 + if open_error.errno == errno.EBUSY and retry_count <= EBUSY_RETRY_COUNT: + logging.warning("Meta file %s busy for save(), retrying...", meta_file) + retry_count += 1 + time.sleep(EBUSY_RETRY_SLEEP) + else: + logging.exception("Failed to save meta-data for %s", volpath); + return False return True From 368a91f096f989d55d3b69a4bedc4ae60c270913 Mon Sep 17 00:00:00 2001 From: Bruno Moura Date: Wed, 26 Oct 2016 11:03:15 +0100 Subject: [PATCH 06/14] Make sure exception has errno before checking. --- esx_service/utils/kvESX.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/esx_service/utils/kvESX.py b/esx_service/utils/kvESX.py index 8358233bf..3f335b8d3 100644 --- a/esx_service/utils/kvESX.py +++ b/esx_service/utils/kvESX.py @@ -252,7 +252,8 @@ def load(volpath): break except Exception as open_error: # This is a workaround to the timing/locking with metadata files issue #626 - if open_error.errno == errno.EBUSY and retry_count <= EBUSY_RETRY_COUNT: + if (hasattr(open_error, "errno") and + (open_error.errno == errno.EBUSY and retry_count <= EBUSY_RETRY_COUNT)): logging.warning("Meta file %s busy for load(), retrying...", meta_file) retry_count += 1 time.sleep(EBUSY_RETRY_TIME) @@ -282,7 +283,8 @@ def save(volpath, kv_dict): break except Exception as open_error: # This is a workaround to the timing/locking with metadata files issue #626 - if open_error.errno == errno.EBUSY and retry_count <= EBUSY_RETRY_COUNT: + if (hasattr(open_error, "errno") and + (open_error.errno == errno.EBUSY and retry_count <= EBUSY_RETRY_COUNT)): logging.warning("Meta file %s busy for save(), retrying...", meta_file) retry_count += 1 time.sleep(EBUSY_RETRY_SLEEP) From 956af5026bf95bc79aaddbd7447d38ca8f9e5f56 Mon Sep 17 00:00:00 2001 From: Bruno Moura Date: Wed, 26 Oct 2016 11:05:58 +0100 Subject: [PATCH 07/14] Rename test vols. --- vmdk_plugin/sanity_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vmdk_plugin/sanity_test.go b/vmdk_plugin/sanity_test.go index 78cd5e83d..f4764a079 100644 --- a/vmdk_plugin/sanity_test.go +++ b/vmdk_plugin/sanity_test.go @@ -249,7 +249,7 @@ func TestSanity(t *testing.T) { for idx, elem := range clients { go func(idx int, c *client.Client) { for i := 0; i < parallelVolumes; i++ { - volName := "volP" + strconv.Itoa(idx) + strconv.Itoa(i) + volName := "volTestP" + strconv.Itoa(idx) + strconv.Itoa(i) createRequest.Name = volName _, err := c.VolumeCreate(context.Background(), createRequest) results <- err From e583866daa33e442ad16b3f395c678e67a0e8b82 Mon Sep 17 00:00:00 2001 From: Bruno Moura Date: Wed, 26 Oct 2016 23:24:08 +0100 Subject: [PATCH 08/14] Make sure we send a error response to volume plugin when dealing with unhandled exceptions. --- esx_service/vmdk_ops.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/esx_service/vmdk_ops.py b/esx_service/vmdk_ops.py index 91567f1ef..e9c5bd65a 100755 --- a/esx_service/vmdk_ops.py +++ b/esx_service/vmdk_ops.py @@ -1210,8 +1210,17 @@ def execRequestThread(client_socket, cartel, request): if response == VMCI_ERROR: logging.warning("vmci_reply returned error %s (errno=%d)", os.strerror(errno), errno) - except Exception as e: - logging.exception("message") + except Exception as ex_thr: + logging.exception("Unhandled Exception:") + + ret = err("Server returned an error: {0}".format(repr(ex_thr))) + ret_string = json.dumps(ret) + response = lib.vmci_reply(client_socket, c_char_p(ret_string.encode())) + errno = get_errno() + logging.debug("lib.vmci_reply: VMCI replied with errcode %s", response) + if response == VMCI_ERROR: + logging.warning("vmci_reply returned error %s (errno=%d)", + os.strerror(errno), errno) # load VMCI shared lib , listen on vSocket in main loop, handle requests def handleVmciRequests(port): From 67410390ec650552b3733fbc3e0f46170ca695b2 Mon Sep 17 00:00:00 2001 From: Bruno Moura Date: Thu, 27 Oct 2016 02:11:03 +0100 Subject: [PATCH 09/14] Add send_vmci_reply() and cleanup --- esx_service/vmdk_ops.py | 41 ++++++++++++++++------------------------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/esx_service/vmdk_ops.py b/esx_service/vmdk_ops.py index e9c5bd65a..f1eb97e94 100755 --- a/esx_service/vmdk_ops.py +++ b/esx_service/vmdk_ops.py @@ -1148,6 +1148,15 @@ def getLock(lockname): logging.debug("getLock(): return new lock %s", lockname) return lock +def send_vmci_reply(client_socket, reply_string): + reply = json.dumps(reply_string) + response = lib.vmci_reply(client_socket, c_char_p(reply.encode())) + errno = get_errno() + logging.debug("lib.vmci_reply: VMCI replied with errcode %s", response) + if response == VMCI_ERROR: + logging.warning("vmci_reply returned error %s (errno=%d)", + os.strerror(errno), errno) + def execRequestThread(client_socket, cartel, request): ''' Execute requests in a thread context with a per volume locking. @@ -1170,13 +1179,8 @@ def execRequestThread(client_socket, cartel, request): try: req = json.loads(request.decode('utf-8')) except ValueError as e: - ret_string = {u'Error': "Failed to parse json '%s'." % request} - response = lib.vmci_reply(client_socket, c_char_p(ret_string.encode())) - errno = get_errno() - logging.debug("lib.vmci_reply: VMCI replied with errcode %s", response) - if response == VMCI_ERROR: - logging.warning("vmci_reply returned error %s (errno=%d)", - os.strerror(errno), errno) + reply_string = {u'Error': "Failed to parse json '%s'." % request} + send_vmci_reply(client_socket, reply_string) else: details = req["details"] opts = details["Opts"] if "Opts" in details else {} @@ -1193,34 +1197,21 @@ def execRequestThread(client_socket, cartel, request): with rsrcLock: logging.debug("Aquired lock: %s", lockname) - ret = executeRequest(vm_uuid=vm_uuid, + reply_string = executeRequest(vm_uuid=vm_uuid, vm_name=vm_name, config_path=cfg_path, cmd=req["cmd"], full_vol_name=details["Name"], opts=opts) - logging.info("executeRequest '%s' completed with ret=%s", req["cmd"], ret) - ret_string = json.dumps(ret) - response = lib.vmci_reply(client_socket, c_char_p(ret_string.encode())) + logging.info("executeRequest '%s' completed with ret=%s", req["cmd"], reply_string) + send_vmci_reply(client_socket, reply_string) logging.debug("Released lock: %s", lockname) - errno = get_errno() - logging.debug("lib.vmci_reply: VMCI replied with errcode %s", response) - if response == VMCI_ERROR: - logging.warning("vmci_reply returned error %s (errno=%d)", - os.strerror(errno), errno) except Exception as ex_thr: logging.exception("Unhandled Exception:") - - ret = err("Server returned an error: {0}".format(repr(ex_thr))) - ret_string = json.dumps(ret) - response = lib.vmci_reply(client_socket, c_char_p(ret_string.encode())) - errno = get_errno() - logging.debug("lib.vmci_reply: VMCI replied with errcode %s", response) - if response == VMCI_ERROR: - logging.warning("vmci_reply returned error %s (errno=%d)", - os.strerror(errno), errno) + reply_string = err("Server returned an error: {0}".format(repr(ex_thr))) + send_vmci_reply(client_socket, reply_string) # load VMCI shared lib , listen on vSocket in main loop, handle requests def handleVmciRequests(port): From 2f654f2a6c28358454337087ce07a6f3faa2fb96 Mon Sep 17 00:00:00 2001 From: Bruno Moura Date: Thu, 27 Oct 2016 23:48:37 +0100 Subject: [PATCH 10/14] Allow every short lived request thread a exclusive AuthorizationDataManager. The version of sqlite.so in place don't allow runtime change of threading mode with sqlite3_config() --- esx_service/utils/auth.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/esx_service/utils/auth.py b/esx_service/utils/auth.py index f02ff0382..fe6dedc78 100644 --- a/esx_service/utils/auth.py +++ b/esx_service/utils/auth.py @@ -34,17 +34,15 @@ SIZE = 'size' -_auth_mgr = None -def connect_auth_db(): +def get_auth_mgr(): """ Get a connection to auth DB. """ - global _auth_mgr - if not _auth_mgr: - _auth_mgr = auth_data.AuthorizationDataManager() - _auth_mgr.connect() + _auth_mgr = auth_data.AuthorizationDataManager() + _auth_mgr.connect() + return _auth_mgr def get_tenant(vm_uuid): """ Get tenant which owns this VM by querying the auth DB. """ - global _auth_mgr + _auth_mgr = get_auth_mgr() try: cur = _auth_mgr.conn.execute( "SELECT tenant_id FROM vms WHERE vm_id = ?", @@ -82,7 +80,7 @@ def get_privileges(tenant_uuid, datastore): querying the auth DB. """ - global _auth_mgr + _auth_mgr = get_auth_mgr() privileges = [] logging.debug("get_privileges tenant_uuid=%s datastore=%s", tenant_uuid, datastore) try: @@ -201,7 +199,7 @@ def check_privileges_for_command(cmd, opts, tenant_uuid, datastore, privileges): def tables_exist(): """ Check tables needed for authorization exist or not. """ - global _auth_mgr + _auth_mgr = get_auth_mgr() try: cur = _auth_mgr.conn.execute("SELECT name FROM sqlite_master WHERE type = 'table' and name = 'tenants';") @@ -366,4 +364,4 @@ def get_row_from_privileges_table(conn, tenant_uuid): result = cur.fetchall() return None, result - \ No newline at end of file + From 2a6e343eb798599b1df533572c1de60c0910ca1c Mon Sep 17 00:00:00 2001 From: Bruno Moura Date: Thu, 27 Oct 2016 23:53:00 +0100 Subject: [PATCH 11/14] Increase the amount of parallelism in sanity_test.go --- vmdk_plugin/sanity_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vmdk_plugin/sanity_test.go b/vmdk_plugin/sanity_test.go index f4764a079..4cf9b9248 100644 --- a/vmdk_plugin/sanity_test.go +++ b/vmdk_plugin/sanity_test.go @@ -39,7 +39,7 @@ const ( // tests are often run under regular account and have no access to /var/log defaultTestLogPath = "/tmp/test-docker-volume-vsphere.log" // Number of volumes per client for parallel tests - parallelVolumes = 5 + parallelVolumes = 9 ) var ( @@ -237,7 +237,7 @@ func TestSanity(t *testing.T) { fmt.Printf("Running parallel tests on %s and %s (may take a while)...\n", endPoint1, endPoint2) // Create a short buffered channel to introduce random pauses - results := make(chan error, 5) + results := make(chan error, parallelVolumes) createRequest := types.VolumeCreateRequest{ Name: volumeName, Driver: driverName, From 9494d6a40c7f04747212f64ccfb92bf5f5bdfc92 Mon Sep 17 00:00:00 2001 From: Bruno Moura Date: Fri, 28 Oct 2016 00:37:41 +0100 Subject: [PATCH 12/14] Use thread local storage to persist AuthorizationDataManager between requests in same thread. --- esx_service/utils/auth.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/esx_service/utils/auth.py b/esx_service/utils/auth.py index fe6dedc78..d9d328544 100644 --- a/esx_service/utils/auth.py +++ b/esx_service/utils/auth.py @@ -23,6 +23,7 @@ import convert import auth_data_const import volume_kv as kv +import threading # All supported vmdk commands CMD_CREATE = 'create' @@ -34,12 +35,17 @@ SIZE = 'size' +# thread local storage +thread_local = threading.local() + def get_auth_mgr(): """ Get a connection to auth DB. """ - _auth_mgr = auth_data.AuthorizationDataManager() - _auth_mgr.connect() - return _auth_mgr - + global thread_local + if not hasattr(thread_local, '_auth_mgr'): + thread_local._auth_mgr = auth_data.AuthorizationDataManager() + thread_local._auth_mgr.connect() + return thread_local._auth_mgr + def get_tenant(vm_uuid): """ Get tenant which owns this VM by querying the auth DB. """ _auth_mgr = get_auth_mgr() From 13501fe16aa0cddf6ccbecfaa177a0f9df2b1a81 Mon Sep 17 00:00:00 2001 From: Bruno Moura Date: Fri, 28 Oct 2016 07:44:38 +0100 Subject: [PATCH 13/14] Add missing calls to get_auth_manager() in auth.py --- esx_service/utils/auth.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/esx_service/utils/auth.py b/esx_service/utils/auth.py index d9d328544..7a07255c1 100644 --- a/esx_service/utils/auth.py +++ b/esx_service/utils/auth.py @@ -140,6 +140,7 @@ def get_total_storage_used(tenant_uuid, datastore): by querying auth DB. """ + _auth_mgr = get_auth_mgr() total_storage_used = 0 try: cur = _auth_mgr.conn.execute( @@ -310,6 +311,8 @@ def authorize(vm_uuid, datastore, cmd, opts): def add_volume_to_volumes_table(tenant_uuid, datastore, vol_name, vol_size_in_MB): """ Insert volume to volumes table. """ + _auth_mgr = get_auth_mgr() + logging.debug("add to volumes table(%s %s %s %s)", tenant_uuid, datastore, vol_name, vol_size_in_MB) try: From c0c6724954a48d50fdc9af7551ac971058802a08 Mon Sep 17 00:00:00 2001 From: Ritesh H Shukla Date: Sat, 29 Oct 2016 06:45:35 +0000 Subject: [PATCH 14/14] Adjust merge conflict for #612 --- esx_service/utils/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/esx_service/utils/auth.py b/esx_service/utils/auth.py index 7a07255c1..97c5a9ef3 100644 --- a/esx_service/utils/auth.py +++ b/esx_service/utils/auth.py @@ -277,7 +277,7 @@ def authorize(vm_uuid, datastore, cmd, opts): logging.debug("Authorize: opt=%s", opts) try: - connect_auth_db() + get_auth_mgr() except auth_data.DbConnectionError, e: error_info = "Failed to connect auth DB({0})".format(e) return error_info, None, None