Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publisher thread terminates, forever breaking publication when GCE metadata service blips #1173

Closed
pgcamus opened this issue May 13, 2024 · 1 comment · Fixed by #1318
Closed
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API.

Comments

@pgcamus
Copy link

pgcamus commented May 13, 2024

Thanks for stopping by to let us know something could be better!

PLEASE READ: If you have a support contract with Google, please create an issue in the support console instead of filing on GitHub. This will ensure a timely response.

Please run down the following list and make sure you've tried the usual "quick fixes":

If you are still having issues, please be sure to include as much information as possible:

Environment details

  • OS type and version: Ubuntu 22.04
  • Python version: 3.10.9
  • pip version: pip --version
  • google-cloud-pubsub version: 2.21.1

Steps to reproduce

Run google-cloud-pubsub and suffer a metadata outage like https://status.cloud.google.com/incidents/u6rQ2nNVbhAFqGCcTm58.

Note that this can trigger even in an un-sustained GCE metadata outage as once this exception triggers even once, the commit thread is dead forever. In our case, there was a short outage on the metadata server, but the retries all happened so quickly that the exception was raised before the service recovered

2024-04-26T07:30:45.783 Compute Engine Metadata server unavailable on attempt 1 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/universe/universe_domain (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7c1ca813c1c0>: Failed to establish a new connection: [Errno 111] Connection refused'))
2024-04-26T07:30:45.788 Compute Engine Metadata server unavailable on attempt 2 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/universe/universe_domain (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7c1ca8290730>: Failed to establish a new connection: [Errno 111] Connection refused'))
2024-04-26T07:30:45.794 Compute Engine Metadata server unavailable on attempt 3 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/universe/universe_domain (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7c1ca82918a0>: Failed to establish a new connection: [Errno 111] Connection refused'))
2024-04-26T07:30:45.801 [...]
2024-04-26T07:30:45.806 [...]

Code example

# example

Stack trace

Traceback (most recent call last):
  File "/app/device/trimark/proxy/proxy.runfiles/python3_10_x86_64-unknown-linux-gnu/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/app/device/trimark/proxy/proxy.runfiles/python3_10_x86_64-unknown-linux-gnu/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/app/device/trimark/proxy/proxy.runfiles/common_deps_google_cloud_pubsub/site-packages/google/cloud/pubsub_v1/publisher/_batch/thread.py", line 274, in _commit
    response = self._client._gapic_publish(
  File "/app/device/trimark/proxy/proxy.runfiles/common_deps_google_cloud_pubsub/site-packages/google/cloud/pubsub_v1/publisher/client.py", line 267, in _gapic_publish
    return super().publish(*args, **kwargs)
  File "/app/device/trimark/proxy/proxy.runfiles/common_deps_google_cloud_pubsub/site-packages/google/pubsub_v1/services/publisher/client.py", line 1058, in publish
    self._validate_universe_domain()
  File "/app/device/trimark/proxy/proxy.runfiles/common_deps_google_cloud_pubsub/site-packages/google/pubsub_v1/services/publisher/client.py", line 554, in _validate_universe_domain
    or PublisherClient._compare_universes(
  File "/app/device/trimark/proxy/proxy.runfiles/common_deps_google_cloud_pubsub/site-packages/google/pubsub_v1/services/publisher/client.py", line 531, in _compare_universes
    credentials_universe = getattr(credentials, \"universe_domain\", default_universe)
  File "/app/device/trimark/proxy/proxy.runfiles/common_deps_google_auth/site-packages/google/auth/compute_engine/credentials.py", line 154, in universe_domain
    self._universe_domain = _metadata.get_universe_domain(
  File "/app/device/trimark/proxy/proxy.runfiles/common_deps_google_auth/site-packages/google/auth/compute_engine/_metadata.py", line 284, in get_universe_domain
    universe_domain = get(
  File "/app/device/trimark/proxy/proxy.runfiles/common_deps_google_auth/site-packages/google/auth/compute_engine/_metadata.py", line 217, in get
    raise exceptions.TransportError(
google.auth.exceptions.TransportError: Failed to retrieve http://metadata.google.internal/computeMetadata/v1/universe/universe_domain from the Google Compute Engine metadata service. Compute Engine Metadata server unavailable

Speculative analysis

It looks like the issue is that the google-auth library is raising a TransportError which is not caught by the batch commit thread in this library. Potential fixes include catching that in Batch._commit (e.g. here), or catching it further down in google-cloud-pubsub and wrapping it in a GoogleAPIError.

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label May 13, 2024
@mukund-ananthu
Copy link
Contributor

mukund-ananthu commented Nov 4, 2024

Hi pgcamus, thank you for bringing this to our attention. Wanted to list some of my observations:

the commit thread is dead forever.

This is correct. But, one thing I observe here is that the publishing of subsequent messages aren't affected, because of this sequence of events:

  1. TransportError thrown by gapic_publish:
    response = self._client._gapic_publish(
  2. Not caught by the Except clause that only catches GoogleAPIError:
    except google.api_core.exceptions.GoogleAPIError as exc:
  3. Thread in which _commit runs dies
  4. Status of the batch instance still remains IN_PROGRESS:
    self._status = base.BatchStatus.IN_PROGRESS
    , i.e it is not set to BatchStatus.ERROR like it is in case the GoogleAPIError is thrown:
    self._status = base.BatchStatus.ERROR

(this causes a separate issue - more on this later)

  1. For the next message that is attempted to be published in this batch, it batch.publish() would return a None future because of this condition check:

    if self.status != base.BatchStatus.ACCEPTING_MESSAGES:
    return None

  2. This would result in the creation of a new batch instance altogether in the sequencer that is trying to publish to the batch:

    future = batch.publish(wrapper)
    # batch is full, triggering commit_when_full
    if future is None:
    batch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
    # At this point, we lose track of the old batch, but we don't
    # care since it's already committed (because it was full.)
    self._current_batch = batch

  3. This new batch instance would create a new commit thread altogether, whenever the criteria to flush the new batch of messages is met:

Having mentioned the above, I do observe that there are a few related issues that need to be addressed:

  1. Since the TransportError is never caught, and the code path that sets exceptions on the future objects corresponding to messages in the batch that failed never runs. Consequently, futures that are associated with the messages in the batch whose commit thread crashed would never get notified of the exception. They'd indefinitely hang unless they have a timeout set as part of future.result(timeout=x) (even then, they'd only raise a TimeoutError but not have the exception object set) :
    for future in self._futures:
    future.set_exception(exc)

Potential fixes include catching that in Batch._commit

This does solve the problem of:

  1. The commit thread crashing instead of catching the Exception and returning gracefully
  2. The futures of the messages in the batch that failed having future.set_exception done on them

The side effect of the above change, however, would be that, given the state of the batch would now be set to BatchStatus.ERROR, it would cause an AssertionError to be raised for the publish of the subsequent message on the batch:

assert (
self._status != base.BatchStatus.ERROR
), "Publish after stop() or publish error."

which would then be bubbled up until a BaseException is thrown and the client library crashes in the publisher client:

except BaseException as be:
# Exceptions can be thrown when attempting to add messages to
# the batch. If they're thrown, record them in publisher
# batching and create span, end the spans and bubble the
# exception up.
if self._open_telemetry_enabled:
if wrapper:
wrapper.end_publisher_batching_span(be)
wrapper.end_create_span(be)
else: # pragma: NO COVER
warnings.warn(
message="PublishMessageWrapper is None. Hence, not recording exception and ending publisher batching span and create span",
category=RuntimeWarning,
)
raise be

I'm exploring a few options to solve the above issues, without causing unintended side effects and also factoring in the behavior of the Pub/Sub client libraries of other languages to keep the behavior consistent across the libraries. I'll keep this thread posted. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants