Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Firebase db listener cannot handle with internet disconnetions #287

Open
jeffersonkr opened this issue May 17, 2019 · 13 comments
Open

Firebase db listener cannot handle with internet disconnetions #287

jeffersonkr opened this issue May 17, 2019 · 13 comments

Comments

@jeffersonkr
Copy link

jeffersonkr commented May 17, 2019

[READ] Step 1: Are you in the right place?

  • For issues or feature requests related to the code in this repository
    file a Github issue.
    • If this is a feature request make sure the issue title starts with "FR:".
  • For general technical questions, post a question on StackOverflow
    with the firebase tag.
  • For general Firebase discussion, use the firebase-talk
    google group.
  • For help troubleshooting your application that does not fall under one
    of the above categories, reach out to the personalized
    Firebase support channel.

[REQUIRED] Step 2: Describe your environment

  • Operating System version: Ubuntu 18 and Raspbian
  • Firebase SDK version: lasted realesed with pip install
  • Library version: lasted realesed with pip install
  • Firebase Product: firebase db stream (listener)

[REQUIRED] Step 3: Describe the problem

db.reference().listen() hang all code, do not run the rest of code, if when during connection of listener get internet disconnected, or if streamming allready connected and get disconnected for more than 2~3min and reconnect, do not get any updates from firebase.
i was searching for the root of this problem, and realised SSEclient have timeout default set by 3000, and the Event class retry time is set by None. Im not sure about this but maybe this is the problem?

Steps to reproduce:

To simulate the error we can just pull the cable or simply disconnect the internet from the system, wait 2 minutes to 3 minutes to reconnect it, after the reconnection, update any end-point that the streamming is listening. We can notice that we will not receive any updates.

It is also possible to simulate it during the streaming connection opening disconnect the internet and wait 2 to 3 minutes again and reconnect. We will not receive any updates from the firebase, and it will also be noticed that when this occurs, the codes below the opening stream do not run even wait for long time.

Relevant Code:

class SSEClient(object):
'''SSE client implementation.'''

def __init__(self, url, session, retry=3000, **kwargs):
    """Initializes the SSEClient.
    Args:
      url: The remote url to connect to.
      session: The requests session.
      retry: The retry interval in milliseconds (optional).
      **kwargs: Extra kwargs that will be sent to ``requests.get()`` (optional).
    """

class Event(object):
'''Event represents the events fired by SSE.'''

sse_line_pattern = re.compile('(?P<name>[^:]*):?( ?(?P<value>.*))?')

def __init__(self, data='', event_type='message', event_id=None, retry=None):
    self.data = data
    self.event_type = event_type
    self.event_id = event_id
    self.retry = retry

// TODO(you): code here to reproduce the problem

import firebase_admin
from firebase_admin import db
from firebase_admin import credentials

# Init Firebase.
dir_path = os.path.dirname(os.path.realpath(__file__))
cred = credentials.Certificate(dir_path + '/serviceAccountKey.json')
firebase_admin.initialize_app(cred, {
    'databaseURL': DB_URL
})

try:
    streaming = db.reference('path').listen(callback)
except ApiCallError:
    timer = 0

# did this code above to handle with disconnections.
timer = 100
while True:
    if internet_on():
        if timer == 0:
            try:
                streaming.close()
           except:
                #we do not have streaming connections open.
                pass
            try:
                streaming = db.reference('path').listen(callback)
                timer = 100
            except ApiCallError:
                timer = 0
    else:
        time.sleep(1)
        timer -= 1 if timer > 0 else 0
@google-oss-bot
Copy link

I couldn't figure out how to label this issue, so I've labeled it for a human to triage. Hang tight.

@jeffersonkr
Copy link
Author

jeffersonkr commented May 17, 2019

Hi, currently trying to solve the problem, I realized that was missing the timeout option in the instantiation of the SSEClient class, and figured out the requests.get() inside SSEClient._connect() has no timeout and that why i get stuck with my code when my internet is disconnected. To avoid these types of problems when internet disconnect on requests.get() method, probrably because is an streamer its no have timeout but when there is no connections its cause stuck in code, we should pass timeout as options, timeout will close a perfecly working stream threads too.. but anyway we already open new connections when token expires in 1 hour, so we should set default timeout to 3600?

db.py:

def _listen_with_session(self, callback, session, timeout=3600):
    url = self._client.base_url + self._add_suffix()
    try:
        sse = _sseclient.SSEClient(url, session, {'timeout': timeout})
        return ListenerRegistration(callback, sse)
    except requests.exceptions.RequestException as error:
        raise ApiCallError(_Client.extract_error_message(error), error)

_ssclient.py:

def _connect(self):
    """Connects to the server using requests."""
    if self.should_connect:
        if self.last_id:
            self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id
            self.resp = self.session.get(self.url, stream=True, **self.requests_kwargs)
            self.resp_iterator = self.resp.iter_content(decode_unicode=True)
            self.resp.raise_for_status()
    else:
        raise StopIteration()

@hiranya911
Copy link
Contributor

I think the first thing to try would be to pass the httpTimeout option to the request session started by the SSE client. @jeffersonkr perhaps you can test this and provide a pull request?

@jeffersonkr
Copy link
Author

jeffersonkr commented May 20, 2019

saying httpTimeout do you mean initializate_app ** kwargs? or requests.Session timeout options.
because better I can do is to create a new parameter for the .listen (callback, timeout) method and pass this parameter to request.Session, but it will end up causing a streamer to close after a timeout time. I was trying to trace where httpTimeout of initialize_app goes to get it, but I could not.
Im new programmer so it's hard for me to know how this great library works..
Maybe with your help i could do it.

@hiranya911
Copy link
Contributor

httpTimeout is an option that developers can specify when calling initialize_app(). We ought to somehow capture this value (when specified by the developer), and pass it into KeepAuthSession.

Check these links to starts with:

options: A dictionary of configuration options (optional). Supported options include
``databaseURL``, ``storageBucket``, ``projectId``, ``databaseAuthVariableOverride``,
``serviceAccountId`` and ``httpTimeout``. If ``httpTimeout`` is not set, HTTP
connections initiated by client modules such as ``db`` will not time out.

experimental feature. It currently does not honor the auth overrides and timeout settings.

self._timeout = app.options.get('httpTimeout')

@hiranya911
Copy link
Contributor

You're setting a refresh_timeout, which is not the right field to set as far as I can tell. You need to set a connect or read timeout, but it appears the parent class AuthorizedSession does not really allow this. You will have to pass the value further down the stack, and pass it to the session.get() call made by SSEClient. The timeout parameter mentioned in this doc is what you need to set: https://2.python-requests.org/en/master/api/#requests.request

@jeffersonkr
Copy link
Author

jeffersonkr commented May 22, 2019

yes i realized what i did and then i removed comment above, for now i am passing httptimeout to session.get() method, i dont know why if i set httptimeout as 10sec it throws timeout as expected but if i set 100sec it do not give a timeout

def _connect(self):
        """Connects to the server using requests."""
        if self.should_connect:
            if self.last_id:
                self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id
            self.resp = self.session.get(self.url, stream=True, timeout=self.session.http_timeout, **self.requests_kwargs)
            self.resp_iterator = self.resp.iter_content(decode_unicode=True)
            self.resp.raise_for_status()
        else:
            raise StopIteration()

@hiranya911
Copy link
Contributor

self.session.http_timeout doesn't look correct. Session in this case is an AuthorizedSession from google.auth, and I doubt they have a timeout attribute.

@jeffersonkr
Copy link
Author

jeffersonkr commented May 24, 2019

Here super(AuthorizedSession, self).request() this request method have a timeout parameter.
which .request() came from requests lib

def request(self, method, url, data=None, headers=None, **kwargs):
        """Implementation of Requests' request."""
        # pylint: disable=arguments-differ
        # Requests has a ton of arguments to request, but only two
        # (method, url) are required. We pass through all of the other
        # arguments to super, so no need to exhaustively list them here.

        # Use a kwarg for this instead of an attribute to maintain
        # thread-safety.
        _credential_refresh_attempt = kwargs.pop(
            '_credential_refresh_attempt', 0)

        # Make a copy of the headers. They will be modified by the credentials
        # and we want to pass the original headers if we recurse.
        request_headers = headers.copy() if headers is not None else {}

        self.credentials.before_request(
            self._auth_request, method, url, request_headers)

        response = super(AuthorizedSession, self).request(
            method, url, data=data, headers=request_headers, **kwargs)

and AuthorizedSession inherit from requests.Session, then i belive there is a timeout .

class AuthorizedSession(requests.Session):
    """A Requests Session class with credentials.

    This class is used to perform requests to API endpoints that require
    authorization::

        from google.auth.transport.requests import AuthorizedSession

        authed_session = AuthorizedSession(credentials)

        response = authed_session.request(
            'GET', 'https://www.googleapis.com/storage/v1/b')

    The underlying :meth:`request` implementation handles adding the
    credentials' headers to the request and refreshing credentials as needed.
# request.sessions.py
def request(self, method, url,
            params=None, data=None, headers=None, cookies=None, files=None,
            auth=None, timeout=None, allow_redirects=True, proxies=None,
            hooks=None, stream=None, verify=None, cert=None, json=None):
        """Constructs a :class:`Request <Request>`, prepares it and sends it.
        Returns :class:`Response <Response>` object.

        :param method: method for the new :class:`Request` object.
        :param url: URL for the new :class:`Request` object.
        :param params: (optional) Dictionary or bytes to be sent in the query
            string for the :class:`Request`.
        :param data: (optional) Dictionary, list of tuples, bytes, or file-like
            object to send in the body of the :class:`Request`.
        :param json: (optional) json to send in the body of the
            :class:`Request`.
        :param headers: (optional) Dictionary of HTTP Headers to send with the
            :class:`Request`.
        :param cookies: (optional) Dict or CookieJar object to send with the
            :class:`Request`.
        :param files: (optional) Dictionary of ``'filename': file-like-objects``
            for multipart encoding upload.
        :param auth: (optional) Auth tuple or callable to enable
            Basic/Digest/Custom HTTP Auth.
        :param timeout: (optional) How long to wait for the server to send
            data before giving up, as a float, or a :ref:`(connect timeout,
            read timeout) <timeouts>` tuple.
        :type timeout: float or tuple
        :param allow_redirects: (optional) Set to True by default.
        :type allow_redirects: bool
        :param proxies: (optional) Dictionary mapping protocol or protocol and
            hostname to the URL of the proxy.
        :param stream: (optional) whether to immediately download the response
            content. Defaults to ``False``.
        :param verify: (optional) Either a boolean, in which case it controls whether we verify
            the server's TLS certificate, or a string, in which case it must be a path
            to a CA bundle to use. Defaults to ``True``.
        :param cert: (optional) if String, path to ssl client cert file (.pem).
            If Tuple, ('cert', 'key') pair.
        :rtype: requests.Response
        """

and i testing now i think this really works, when there is no connections this throw exceptions like NewConnectionError, ConnectionError, ReadTimeoutError , etc in time of timeout parameter, and not preventing from rest of the code execute.

Im not sure about this.. but still working, and i would like you guide me in making test for those implementation, of course if you accept it is correctly implementation.

@hiranya911
Copy link
Contributor

The request() method indeed takes a timeout argument. But there's no attribute named http_timeout on Session. So my previous comment about the following line should still stand:

self.resp = self.session.get(self.url, stream=True, timeout=self.session.http_timeout, **self.requests_kwargs)

@jeffersonkr
Copy link
Author

jeffersonkr commented May 28, 2019

session = _sseclient.KeepAuthSession(self._client.credential, self._client.timeout)
return self._listen_with_session(callback, session)
class KeepAuthSession(transport.requests.AuthorizedSession):
    """A session that does not drop authentication on redirects between domains."""
		
	def __init__(self, credential, http_timeout):
	    super(KeepAuthSession, self).__init__(credential)
            self.http_timeout = http_timeout

I cant just pass throw KeepAuthSession ?

@skywritergr
Copy link

I've been having a similar (?) issue and I was wondering if this is related. I have an a Python app in Google Cloud Run. That container is sometimes stopped (if not enough usage) I guess in a way that could classify as a disconnection of sorts.

I am instantiating the firebase connection like so:

from firebase_admin import firestore, initialize_app, credentials

cred = credentials.ApplicationDefault()
initialize_app(cred, {
  'projectId': 'my-project-id-1234',
})

db = firestore.client()

def function_A:
 ...
 data = db.collection(u'data').where(u'title', u'==', name).stream()
 ...

def function_B:
 ....

The weird thing is that my application is working fine for some time and then randomly starts failing. After catching the exception i see the following:

Exception: 401 Request had invalid authentication credentials. Expected OAuth 2 access token, login cookie or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole-project.

It looks very similar to what is being talked here. Is there a solution to this?

@hiranya911
Copy link
Contributor

@skywritergr you're using Firestore and therefore is unrelated to this issue. I'd suggest reporting directly at https://github.com/googleapis/python-firestore

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants