Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add grpc compression on publish #677

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
eadc43e
feature: grpc compression
acocuzzo Apr 14, 2022
7a7d62f
add compression to gapic publish, and pass compression on _commit
acocuzzo Apr 25, 2022
9d9b054
add compression parameter to publish() in generated client
acocuzzo Apr 25, 2022
c0dc436
Revert "add compression parameter to publish() in generated client"
acocuzzo Apr 25, 2022
9f70c0c
removing handwritten change to client.py
acocuzzo Apr 25, 2022
d98a8d8
Merge branch 'main' into grpc-compression
acocuzzo Apr 25, 2022
9489fb8
remove custom compression set outside of _client defaults
acocuzzo Apr 25, 2022
650a742
remove extra space
acocuzzo Apr 25, 2022
8fc311d
add handwritten changed to client.py
acocuzzo May 3, 2022
513bd0a
Merge branch 'main' into grpc-compression
acocuzzo May 11, 2022
7c498cf
remove args on sequencer
acocuzzo May 11, 2022
e7c78cc
fix lint publisher sample
acocuzzo May 11, 2022
f057617
adding compression coverage to test_thread
acocuzzo May 11, 2022
e3f4839
linter fixes
acocuzzo May 11, 2022
03f35e8
add samples
acocuzzo May 11, 2022
49eb355
fix samples
acocuzzo May 11, 2022
90efd38
fix samples tests
acocuzzo May 11, 2022
e62ce78
add compression to receive test
acocuzzo May 11, 2022
1806f53
temp log
acocuzzo May 12, 2022
ba321b9
Revert "add compression to receive test"
acocuzzo May 20, 2022
350e0ab
Revert "Revert "add compression to receive test""
acocuzzo May 20, 2022
89837bf
Revert "temp log"
acocuzzo May 20, 2022
45595be
Merge branch 'main' into grpc-compression
acocuzzo Aug 29, 2022
68c3d0b
Merge branch 'main' into grpc-compression
acocuzzo May 16, 2023
1502c4c
Merge branch 'main' into grpc-compression
acocuzzo Oct 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from google.cloud.pubsub_v1.publisher import futures
from google.cloud.pubsub_v1.publisher._batch import base
from google.pubsub_v1 import types as gapic_types
import grpc

if typing.TYPE_CHECKING: # pragma: NO COVER
from google.cloud import pubsub_v1
Expand Down Expand Up @@ -118,6 +119,12 @@ def __init__(

self._commit_retry = commit_retry
self._commit_timeout = commit_timeout
self._enable_grpc_compression = (
self.client.publisher_options.enable_grpc_compression
)
self._compression_bytes_threshold = (
self.client.publisher_options.compression_bytes_threshold
)

@staticmethod
def make_lock() -> threading.Lock:
Expand Down Expand Up @@ -269,13 +276,25 @@ def _commit(self) -> None:
start = time.time()

batch_transport_succeeded = True

# Set compression if enabled.
compression = None

if (
self._enable_grpc_compression
and gapic_types.PublishRequest(messages=self._messages)._pb.ByteSize()
>= self._compression_bytes_threshold
):
compression = grpc.Compression.Gzip

try:
# Performs retries for errors defined by the retry configuration.
response = self._client._gapic_publish(
topic=self._topic,
messages=self._messages,
retry=self._commit_retry,
timeout=self._commit_timeout,
compression=compression,
)
except google.api_core.exceptions.GoogleAPIError as exc:
# We failed to publish, even after retries, so set the exception on
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ def __init__(
# The object controlling the message publishing flow
self._flow_controller = FlowController(self.publisher_options.flow_control)

self._enable_grpc_compression = self.publisher_options.enable_grpc_compression
self._compression_bytes_threshold = (
self.publisher_options.compression_bytes_threshold
)

@classmethod
def from_service_account_file( # type: ignore[override]
cls,
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/pubsub_v1/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ class PublisherOptions(NamedTuple):
"Timeout settings for message publishing by the client. It should be "
"compatible with :class:`~.pubsub_v1.types.TimeoutType`."
)
enable_grpc_compression: bool = False

compression_bytes_threshold: int = 240


# Define the type class and default values for flow control settings.
Expand Down
2 changes: 2 additions & 0 deletions google/pubsub_v1/services/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,7 @@ def publish(
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: TimeoutType = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
compression: grpc.Compression = None,
) -> pubsub.PublishResponse:
r"""Adds one or more messages to the topic. Returns ``NOT_FOUND`` if
the topic does not exist.
Expand Down Expand Up @@ -833,6 +834,7 @@ def sample_publish():
retry=retry,
timeout=timeout,
metadata=metadata,
compression=compression,
)

# Done; return the response.
Expand Down
5 changes: 5 additions & 0 deletions samples/snippets/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from google.cloud import pubsub_v1
pub_sub_client = pubsub_v1.PublisherClient()

if __name__ == '__main__':
result = pub_sub_client.publish("projects/annaco-python-lib-test/topics/annaco-python-lib-test-topic", bytes("Some message here!", "utf8")).result()
70 changes: 70 additions & 0 deletions samples/snippets/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,76 @@ def callback(future: pubsub_v1.publisher.futures.Future) -> None:
# [END pubsub_publisher_batch_settings]


def publish_messages_with_default_compression_threshold(project_id: str, topic_id: str) -> None:
"""Publishes messages to a Pub/Sub topic with grpc compression enabled."""
# [START pubsub_publisher_compression_settings]
from concurrent import futures
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

# Configure publisher with compression
publisher = pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True))
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

# Resolve the publish future in a separate thread.
def callback(future: pubsub_v1.publisher.futures.Future) -> None:
message_id = future.result()
print(message_id)

for n in range(1, 10):
data_str = f"Message number {n}"
# Data must be a bytestring
data = data_str.encode("utf-8")
publish_future = publisher.publish(topic_path, data)
# Non-blocking. Allow the publisher client to batch multiple messages.
publish_future.add_done_callback(callback)
publish_futures.append(publish_future)

futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with compression settings to {topic_path}.")
# [END pubsub_publisher_compression_settings]

def publish_messages_with_low_compression_threshold(project_id: str, topic_id: str) -> None:
"""Publishes messages to a Pub/Sub topic with grpc compression enabled."""
# [START pubsub_publisher_compression_settings]
from concurrent import futures
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

# Configure publisher with compression
publisher = pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True, compression_bytes_threshold=0))
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

# Resolve the publish future in a separate thread.
def callback(future: pubsub_v1.publisher.futures.Future) -> None:
message_id = future.result()
print(message_id)

for n in range(1, 10):
data_str = f"Message number {n}"
# Data must be a bytestring
data = data_str.encode("utf-8")
publish_future = publisher.publish(topic_path, data)
# Non-blocking. Allow the publisher client to batch multiple messages.
publish_future.add_done_callback(callback)
publish_futures.append(publish_future)

futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with compression settings to {topic_path}.")
# [END pubsub_publisher_compression_settings]



def publish_messages_with_flow_control_settings(project_id: str, topic_id: str) -> None:
"""Publishes messages to a Pub/Sub topic with flow control settings."""
# [START pubsub_publisher_flow_control]
Expand Down
18 changes: 18 additions & 0 deletions samples/snippets/publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,24 @@ def test_publish_with_ordering_keys(
assert f"Published messages with ordering keys to {topic_path}." in out


def test_publish_with_default_compression(
topic_path: str, capsys: CaptureFixture[str]
) -> None:
publisher.publish_messages_with_default_compression_threshold(PROJECT_ID, TOPIC_ID)

out, _ = capsys.readouterr()
assert f"Published messages with compression settings to {topic_path}." in out


def test_publish_with_low_compression(
topic_path: str, capsys: CaptureFixture[str]
) -> None:
publisher.publish_messages_with_default_compression_threshold(PROJECT_ID, TOPIC_ID)

out, _ = capsys.readouterr()
assert f"Published messages with compression settings to {topic_path}." in out


def test_resume_publish_with_error_handler(
topic_path: str, capsys: CaptureFixture[str]
) -> None:
Expand Down
1 change: 1 addition & 0 deletions samples/snippets/sponge_log.xml -v
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
<?xml version="1.0" encoding="utf-8"?><testsuites><testsuite name="pytest" errors="0" failures="0" skipped="0" tests="24" time="23.343" timestamp="2022-04-22T15:12:12.032784" hostname="cs-872183299652-default"><testcase classname="samples.snippets.iam_test" name="test_get_topic_policy" time="3.244" /><testcase classname="samples.snippets.iam_test" name="test_get_subscription_policy" time="1.090" /><testcase classname="samples.snippets.iam_test" name="test_set_topic_policy" time="0.401" /><testcase classname="samples.snippets.iam_test" name="test_set_subscription_policy" time="0.400" /><testcase classname="samples.snippets.iam_test" name="test_check_topic_permissions" time="0.053" /><testcase classname="samples.snippets.iam_test" name="test_check_subscription_permissions" time="0.497" /><testcase classname="samples.snippets.publisher_test" name="test_create" time="1.904" /><testcase classname="samples.snippets.publisher_test" name="test_list" time="0.124" /><testcase classname="samples.snippets.publisher_test" name="test_publish" time="1.317" /><testcase classname="samples.snippets.publisher_test" name="test_publish_with_custom_attributes" time="0.858" /><testcase classname="samples.snippets.publisher_test" name="test_publish_with_batch_settings" time="1.261" /><testcase classname="samples.snippets.publisher_test" name="test_publish_with_flow_control_settings" time="0.836" /><testcase classname="samples.snippets.publisher_test" name="test_publish_with_retry_settings" time="0.572" /><testcase classname="samples.snippets.publisher_test" name="test_publish_with_error_handler" time="0.064" /><testcase classname="samples.snippets.publisher_test" name="test_publish_with_ordering_keys" time="1.423" /><testcase classname="samples.snippets.publisher_test" name="test_resume_publish_with_error_handler" time="0.562" /><testcase classname="samples.snippets.publisher_test" name="test_detach_subscription" time="1.520" /><testcase classname="samples.snippets.publisher_test" name="test_delete" time="0.607" /><testcase classname="samples.snippets.schema_test" name="test_create_avro_schema" time="0.522" /><testcase classname="samples.snippets.schema_test" name="test_create_proto_schema" time="0.225" /><testcase classname="samples.snippets.schema_test" name="test_get_schema" time="0.097" /><testcase classname="samples.snippets.schema_test" name="test_list_schemas" time="0.076" /><testcase classname="samples.snippets.schema_test" name="test_create_topic_with_schema" time="2.364" /><testcase classname="samples.snippets.schema_test" name="test_publish_avro_records" time="0.499" /><testcase time="0.925" /></testsuite></testsuites>
1 change: 1 addition & 0 deletions samples/snippets/sponge_log.xml -v -s
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
<?xml version="1.0" encoding="utf-8"?><testsuites><testsuite name="pytest" errors="0" failures="0" skipped="0" tests="51" time="244.740" timestamp="2022-04-22T15:13:22.043387" hostname="cs-872183299652-default"><testcase classname="samples.snippets.iam_test" name="test_get_topic_policy" time="2.263" /><testcase classname="samples.snippets.iam_test" name="test_get_subscription_policy" time="1.700" /><testcase classname="samples.snippets.iam_test" name="test_set_topic_policy" time="0.394" /><testcase classname="samples.snippets.iam_test" name="test_set_subscription_policy" time="0.498" /><testcase classname="samples.snippets.iam_test" name="test_check_topic_permissions" time="0.067" /><testcase classname="samples.snippets.iam_test" name="test_check_subscription_permissions" time="0.688" /><testcase classname="samples.snippets.publisher_test" name="test_create" time="2.598" /><testcase classname="samples.snippets.publisher_test" name="test_list" time="0.118" /><testcase classname="samples.snippets.publisher_test" name="test_publish" time="1.426" /><testcase classname="samples.snippets.publisher_test" name="test_publish_with_custom_attributes" time="1.010" /><testcase classname="samples.snippets.publisher_test" name="test_publish_with_batch_settings" time="1.071" /><testcase classname="samples.snippets.publisher_test" name="test_publish_with_flow_control_settings" time="0.671" /><testcase classname="samples.snippets.publisher_test" name="test_publish_with_retry_settings" time="0.435" /><testcase classname="samples.snippets.publisher_test" name="test_publish_with_error_handler" time="0.072" /><testcase classname="samples.snippets.publisher_test" name="test_publish_with_ordering_keys" time="1.472" /><testcase classname="samples.snippets.publisher_test" name="test_resume_publish_with_error_handler" time="0.587" /><testcase classname="samples.snippets.publisher_test" name="test_detach_subscription" time="1.885" /><testcase classname="samples.snippets.publisher_test" name="test_delete" time="0.749" /><testcase classname="samples.snippets.schema_test" name="test_create_avro_schema" time="0.395" /><testcase classname="samples.snippets.schema_test" name="test_create_proto_schema" time="0.330" /><testcase classname="samples.snippets.schema_test" name="test_get_schema" time="0.092" /><testcase classname="samples.snippets.schema_test" name="test_list_schemas" time="0.087" /><testcase classname="samples.snippets.schema_test" name="test_create_topic_with_schema" time="2.238" /><testcase classname="samples.snippets.schema_test" name="test_publish_avro_records" time="0.398" /><testcase classname="samples.snippets.schema_test" name="test_subscribe_with_avro_schema" time="10.028" /><testcase classname="samples.snippets.schema_test" name="test_publish_proto_records" time="2.963" /><testcase classname="samples.snippets.schema_test" name="test_subscribe_with_proto_schema" time="10.451" /><testcase classname="samples.snippets.schema_test" name="test_delete_schema" time="2.091" /><testcase classname="samples.snippets.subscriber_test" name="test_list_in_topic" time="3.186" /><testcase classname="samples.snippets.subscriber_test" name="test_list_in_project" time="0.079" /><testcase classname="samples.snippets.subscriber_test" name="test_create_subscription" time="1.097" /><testcase classname="samples.snippets.subscriber_test" name="test_create_subscription_with_dead_letter_policy" time="4.799" /><testcase classname="samples.snippets.subscriber_test" name="test_receive_with_delivery_attempts" time="90.934" /><testcase classname="samples.snippets.subscriber_test" name="test_update_dead_letter_policy" time="0.234" /><testcase classname="samples.snippets.subscriber_test" name="test_remove_dead_letter_policy" time="0.285" /><testcase classname="samples.snippets.subscriber_test" name="test_create_subscription_with_ordering" time="1.163" /><testcase classname="samples.snippets.subscriber_test" name="test_create_subscription_with_filtering" time="1.167" /><testcase classname="samples.snippets.subscriber_test" name="test_create_subscription_with_exactly_once_delivery" time="6.503" /><testcase classname="samples.snippets.subscriber_test" name="test_create_push_subscription" time="2.292" /><testcase classname="samples.snippets.subscriber_test" name="test_update_push_suscription" time="0.380" /><testcase classname="samples.snippets.subscriber_test" name="test_delete_subscription" time="0.303" /><testcase classname="samples.snippets.subscriber_test" name="test_receive" time="6.855" /><testcase classname="samples.snippets.subscriber_test" name="test_receive_with_custom_attributes" time="5.221" /><testcase classname="samples.snippets.subscriber_test" name="test_receive_with_flow_control" time="5.257" /><testcase classname="samples.snippets.subscriber_test" name="test_receive_with_blocking_shutdown" time="8.700" /><testcase classname="samples.snippets.subscriber_test" name="test_receive_messages_with_exactly_once_delivery_enabled" time="12.036" /><testcase classname="samples.snippets.subscriber_test" name="test_listen_for_errors" time="5.240" /><testcase classname="samples.snippets.subscriber_test" name="test_receive_synchronously" time="3.541" /><testcase classname="samples.snippets.subscriber_test" name="test_receive_synchronously_with_lease" time="23.804" /><testcase classname="samples.snippets.quickstart.quickstart_test" name="test_pub" time="2.808" /><testcase classname="samples.snippets.quickstart.quickstart_test" name="test_sub" time="11.505" /></testsuite></testsuites>
37 changes: 37 additions & 0 deletions samples/snippets/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@
def publisher_client() -> Generator[pubsub_v1.PublisherClient, None, None]:
yield pubsub_v1.PublisherClient()

@pytest.fixture(scope="module")
def publisher_client_with_default_compression() -> Generator[pubsub_v1.PublisherClient, None, None]:
yield pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True))

@pytest.fixture(scope="module")
def publisher_client_with_low_compression() -> Generator[pubsub_v1.PublisherClient, None, None]:
yield pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True, compression_bytes_threshold=0))

@pytest.fixture(scope="module")
def regional_publisher_client() -> Generator[pubsub_v1.PublisherClient, None, None]:
Expand Down Expand Up @@ -974,6 +981,36 @@ def test_listen_for_errors(
subscriber_client.delete_subscription(request={"subscription": subscription_path})


def test_listen_for_errors_default_compression(
publisher_client_with_default_compression: pubsub_v1.PublisherClient,
topic: str,
subscription_async: str,
capsys: CaptureFixture[str],
) -> None:
_ = _publish_messages(publisher_client_with_default_compression, topic)

subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5)

out, _ = capsys.readouterr()
assert subscription_async in out
assert "threw an exception" in out


def test_listen_for_errors_low_compression(
publisher_client_with_low_compression: pubsub_v1.PublisherClient,
topic: str,
subscription_async: str,
capsys: CaptureFixture[str],
) -> None:
_ = _publish_messages(publisher_client_with_low_compression, topic)

subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5)

out, _ = capsys.readouterr()
assert subscription_async in out
assert "threw an exception" in out


def test_receive_synchronously(
subscriber_client: pubsub_v1.SubscriberClient,
publisher_client: pubsub_v1.PublisherClient,
Expand Down
Loading
Loading