Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved fix for iPOPO issue 100 #101

Merged
merged 2 commits into from
Sep 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ install:
- sudo prosodyctl register user1 localhost foobar
- sudo prosodyctl register user2 localhost foobar
- sudo prosodyctl restart
- curl -qL http://apache.mediamirrors.org/karaf/4.2.0/apache-karaf-4.2.0.tar.gz | tar xz -C /tmp
- curl -qL http://apache.mediamirrors.org/karaf/4.2.1/apache-karaf-4.2.1.tar.gz | tar xz -C /tmp
- pip install nose coverage coverage_enable_subprocess coveralls
- pip install https://github.com/tcalmant/jsonrpclib/archive/master.zip
- pip install -r requirements.txt
Expand Down
73 changes: 56 additions & 17 deletions pelix/rsa/providers/distribution/py4j.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from threading import Thread
from threading import Thread, RLock
import logging

from osgiservicebridge.bridge import (
Expand All @@ -52,13 +52,20 @@
Instantiate,
Property,
Validate,
ValidateComponent,
Invalidate,
PostRegistration,
)

from pelix.ipopo.constants import (
ARG_BUNDLE_CONTEXT,
ARG_PROPERTIES,
)

# Providers API
from pelix.rsa import prop_dot_suffix
from pelix.rsa.providers.distribution import (
Container,
ExportContainer,
ImportContainer,
DistributionProvider,
Expand Down Expand Up @@ -126,10 +133,23 @@
@ComponentFactory(ECF_PY4J_CONTAINER_CONFIG_TYPE)
@Provides([SERVICE_EXPORT_CONTAINER, SERVICE_IMPORT_CONTAINER])
class Py4jContainer(ExportContainer, ImportContainer):
def __init__(self):
def __init__(self, max_workers=5):
ExportContainer.__init__(self)
ImportContainer.__init__(self)
self._executor = ThreadPoolExecutor(max_workers=2)
self._max_workers = max_workers
self._executor = None

@ValidateComponent(ARG_BUNDLE_CONTEXT, ARG_PROPERTIES)
def _validate_component(self, bundle_context, container_props):
Container._validate_component(self, bundle_context, container_props)
self._executor = ThreadPoolExecutor(max_workers=self._max_workers)

@Invalidate
def _invalidate_component(self, _):
Container._invalidate_component(self, _)
if self._executor:
self._executor.shutdown()
self._executor = None

def get_connected_id(self):
return ExportContainer.get_connected_id(self)
Expand Down Expand Up @@ -167,7 +187,11 @@ def _export_service(self, svc, ed):

def _unexport_service(self, ed):
# pylint: disable=W0212
self._get_distribution_provider()._get_bridge().unexport(ed.get_id())
dp = self._get_distribution_provider()
if dp:
bridge = dp._get_bridge()
if bridge:
bridge.unexport(ed.get_id())
ExportContainer._unexport_service(self, ed)
return True

Expand Down Expand Up @@ -203,13 +227,12 @@ def _prepare_proxy(self, endpoint_description):

def unimport_service(self, endpoint_description):
# pylint: disable=W0212
self._get_distribution_provider()._get_bridge().remove_import_endpoint(
endpoint_description.get_id()
)
dp = self._get_distribution_provider()
if dp:
bridge = dp._get_bridge()
if bridge:
bridge.remove_import_endpoint(endpoint_description.get_id())
ImportContainer.unimport_service(self, endpoint_description)
if self._executor:
self._executor.shutdown(False)
self._executor = None


@ComponentFactory("py4j-distribution-provider-factory")
Expand All @@ -223,7 +246,8 @@ def unimport_service(self, endpoint_description):
"supported_configs",
[ECF_PY4J_PYTHON_HOST_CONFIG_TYPE, ECF_PY4J_PYTHON_CONSUMER_CONFIG_TYPE],
)
@Property("_supported_intents", "supported_intents", ECF_PY4J_SUPPORTED_INTENTS)
@Property("_supported_intents", "supported_intents",
ECF_PY4J_SUPPORTED_INTENTS)
@Property(
"_supported_pb_intents",
"supported_pb_intents",
Expand Down Expand Up @@ -257,6 +281,8 @@ def __init__(self):
self._queue = Queue()
self._thread = Thread(target=self._worker)
self._thread.daemon = True
self._done = False
self._lock = RLock()
self._py4jcontainer = self._supported_pb_intents = None
self._javaport = self._pythonport = self._default_service_timeout = None

Expand Down Expand Up @@ -330,13 +356,19 @@ def _validate(self, _):
@Invalidate
def _invalidate(self, _):
if self._bridge:
with self._lock:
# Set done flag to True
self._done = True
# Trigger reading from queue in self._worker
# with empty task
self._queue.put((None, None, None))
try:
self._ipopo.invalidate(self._bridge.get_id())
except ValueError:
pass
try:
self._bridge.disconnect()
except:
except Exception:
pass
self._bridge = None
self._container = None
Expand All @@ -362,7 +394,10 @@ def service_unimported(
):
# _logger.info('_service_unimported endpointid='+endpointid+";proxy="+str(proxy)+";endpoint_props="+str(endpoint_props))
# put on task queue so no blocking, but fifo delivery to rsa
self._queue.put((endpointid, endpoint_props, self._handle_import_close))
self._queue.put(
(endpointid,
endpoint_props,
self._handle_import_close))

@PostRegistration
def _post_reg(self, _):
Expand All @@ -373,23 +408,27 @@ def _post_reg(self, _):
# read from queue, and import/unregister imported the discovered service
def _worker(self):
while True:
# block to get items from queue placed by service_imported,
with self._lock:
# If self._done flag is set, return and that's it
if self._done:
return
# otherwise block to get items from queue placed by service_imported,
# service_modified, and service_unimported
# called by Py4j handler thread
item = self._queue.get()
f = None
try:
# get the function from item[2]
f = item[2]
except:
except Exception:
logging.error("Exception getting code in item=%s", item)

if f:
try:
# get the endpoint description properties from item[1]
# and create EndpointDescription instance
ed = EndpointDescription(properties=item[1])
except:
except Exception:
logging.error(
"Exception creating endpoint description from props=%s",
item[1],
Expand All @@ -398,7 +437,7 @@ def _worker(self):
# call appropriate function
try:
f(ed)
except:
except Exception:
logging.error("Exception invoking function=%s", f)

# no matter what, we are done with this task
Expand Down