Skip to content

Commit

Permalink
refactor into data and control grpc channel option methods with type …
Browse files Browse the repository at this point in the history
…alias return type
  • Loading branch information
anitarua committed Mar 5, 2024
1 parent a4c6973 commit bc5c719
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 28 deletions.
38 changes: 30 additions & 8 deletions src/momento/internal/_utilities/_grpc_channel_options.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
from __future__ import annotations

import grpc
from typing import Sequence, Tuple, Union

from momento.config.transport.grpc_configuration import GrpcConfiguration
from momento.config.transport.transport_strategy import StaticGrpcConfiguration
from momento.internal._utilities import _timedelta_to_ms

DEFAULT_MAX_MESSAGE_SIZE = 5_243_000 # bytes

ChannelArguments = Sequence[Tuple[str, Union[int, None]]]

def grpc_channel_options_from_grpc_config(
grpc_config: GrpcConfiguration, is_control_client: bool = False
) -> grpc.aio.ChannelArgumentType:

def grpc_data_channel_options_from_grpc_config(grpc_config: GrpcConfiguration) -> ChannelArguments:
"""Create gRPC channel options from a GrpcConfiguration.
Args:
grpc_config (GrpcConfiguration): the gRPC configuration.
is_control_client (bool, optional): whether the client is a control client, in which case we want to disable keepalives. Defaults to False.
Returns:
grpc.aio.ChannelArgumentType: a list of gRPC channel options as key-value tuples.
Expand All @@ -36,15 +36,37 @@ def grpc_channel_options_from_grpc_config(
)

keepalive_permit = grpc_config.get_keepalive_permit_without_calls()
if not is_control_client and keepalive_permit is not None:
if keepalive_permit is not None:
channel_options.append(("grpc.keepalive_permit_without_calls", keepalive_permit))

keepalive_time = grpc_config.get_keepalive_time()
if not is_control_client and keepalive_time is not None:
if keepalive_time is not None:
channel_options.append(("grpc.keepalive_time_ms", _timedelta_to_ms(keepalive_time)))

keepalive_timeout = grpc_config.get_keepalive_timeout()
if not is_control_client and keepalive_timeout is not None:
if keepalive_timeout is not None:
channel_options.append(("grpc.keepalive_timeout_ms", _timedelta_to_ms(keepalive_timeout)))

return channel_options


def grpc_control_channel_options_from_grpc_config(grpc_config: GrpcConfiguration) -> ChannelArguments:
"""Create gRPC channel options from a GrpcConfiguration, but disable keepalives.
Args:
grpc_config (GrpcConfiguration): the gRPC configuration.
Returns:
grpc.aio.ChannelArgumentType: a list of gRPC channel options as key-value tuples.
"""
# Override the keepalive options to disable keepalives
control_grpc_config = StaticGrpcConfiguration(
deadline=grpc_config.get_deadline(),
root_certificates_pem=grpc_config.get_root_certificates_pem(),
max_send_message_length=grpc_config.get_max_send_message_length(),
max_receive_message_length=grpc_config.get_max_receive_message_length(),
keepalive_permit_without_calls=None,
keepalive_time=None,
keepalive_timeout=None,
)
return grpc_data_channel_options_from_grpc_config(control_grpc_config)
14 changes: 8 additions & 6 deletions src/momento/internal/aio/_scs_grpc_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
from momento.internal._utilities._channel_credentials import (
channel_credentials_from_root_certs_or_default,
)
from momento.internal._utilities._grpc_channel_options import grpc_channel_options_from_grpc_config
from momento.internal._utilities._grpc_channel_options import (
grpc_control_channel_options_from_grpc_config,
grpc_data_channel_options_from_grpc_config,
)
from momento.retry import RetryStrategy

from ... import logs
Expand All @@ -38,9 +41,8 @@ def __init__(self, configuration: Configuration, credential_provider: Credential
target=credential_provider.control_endpoint,
credentials=channel_credentials_from_root_certs_or_default(configuration),
interceptors=_interceptors(credential_provider.auth_token, configuration.get_retry_strategy()),
options=grpc_channel_options_from_grpc_config(
options=grpc_control_channel_options_from_grpc_config(
grpc_config=configuration.get_transport_strategy().get_grpc_configuration(),
is_control_client=True,
),
)

Expand Down Expand Up @@ -76,7 +78,7 @@ def __init__(self, configuration: Configuration, credential_provider: Credential
# ('grpc.use_local_subchannel_pool', 1),
# (experimental.ChannelOptions.SingleThreadedUnaryStream, 1)
# ],
options=grpc_channel_options_from_grpc_config(
options=grpc_data_channel_options_from_grpc_config(
configuration.get_transport_strategy().get_grpc_configuration()
),
)
Expand Down Expand Up @@ -134,7 +136,7 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede
target=credential_provider.cache_endpoint,
credentials=grpc.ssl_channel_credentials(),
interceptors=_interceptors(credential_provider.auth_token, None),
options=grpc_channel_options_from_grpc_config(grpc_config),
options=grpc_data_channel_options_from_grpc_config(grpc_config),
)

async def close(self) -> None:
Expand All @@ -157,7 +159,7 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede
target=credential_provider.cache_endpoint,
credentials=grpc.ssl_channel_credentials(),
interceptors=_stream_interceptors(credential_provider.auth_token),
options=grpc_channel_options_from_grpc_config(grpc_config),
options=grpc_data_channel_options_from_grpc_config(grpc_config),
)

async def close(self) -> None:
Expand Down
10 changes: 6 additions & 4 deletions src/momento/internal/aio/_vector_index_grpc_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
from momento.internal._utilities._channel_credentials import (
channel_credentials_from_root_certs_or_default,
)
from momento.internal._utilities._grpc_channel_options import grpc_channel_options_from_grpc_config
from momento.internal._utilities._grpc_channel_options import (
grpc_control_channel_options_from_grpc_config,
grpc_data_channel_options_from_grpc_config,
)

from ._add_header_client_interceptor import AddHeaderClientInterceptor, Header

Expand All @@ -25,9 +28,8 @@ def __init__(self, configuration: VectorIndexConfiguration, credential_provider:
target=credential_provider.control_endpoint,
credentials=channel_credentials_from_root_certs_or_default(configuration),
interceptors=_interceptors(credential_provider.auth_token),
options=grpc_channel_options_from_grpc_config(
options=grpc_control_channel_options_from_grpc_config(
grpc_config=configuration.get_transport_strategy().get_grpc_configuration(),
is_control_client=True,
),
)

Expand All @@ -48,7 +50,7 @@ def __init__(self, configuration: VectorIndexConfiguration, credential_provider:
target=credential_provider.vector_endpoint,
credentials=channel_credentials_from_root_certs_or_default(configuration),
interceptors=_interceptors(credential_provider.auth_token),
options=grpc_channel_options_from_grpc_config(
options=grpc_data_channel_options_from_grpc_config(
configuration.get_transport_strategy().get_grpc_configuration()
),
)
Expand Down
14 changes: 8 additions & 6 deletions src/momento/internal/synchronous/_scs_grpc_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
from momento.internal._utilities._channel_credentials import (
channel_credentials_from_root_certs_or_default,
)
from momento.internal._utilities._grpc_channel_options import grpc_channel_options_from_grpc_config
from momento.internal._utilities._grpc_channel_options import (
grpc_control_channel_options_from_grpc_config,
grpc_data_channel_options_from_grpc_config,
)
from momento.internal.synchronous._add_header_client_interceptor import (
AddHeaderClientInterceptor,
AddHeaderStreamingClientInterceptor,
Expand All @@ -36,9 +39,8 @@ def __init__(self, configuration: Configuration, credential_provider: Credential
self._secure_channel = grpc.secure_channel(
target=credential_provider.control_endpoint,
credentials=channel_credentials_from_root_certs_or_default(configuration),
options=grpc_channel_options_from_grpc_config(
options=grpc_control_channel_options_from_grpc_config(
grpc_config=configuration.get_transport_strategy().get_grpc_configuration(),
is_control_client=True,
),
)
intercept_channel = grpc.intercept_channel(
Expand All @@ -63,7 +65,7 @@ def __init__(self, configuration: Configuration, credential_provider: Credential
self._secure_channel = grpc.secure_channel(
target=credential_provider.cache_endpoint,
credentials=channel_credentials_from_root_certs_or_default(configuration),
options=grpc_channel_options_from_grpc_config(
options=grpc_data_channel_options_from_grpc_config(
configuration.get_transport_strategy().get_grpc_configuration()
),
)
Expand Down Expand Up @@ -152,7 +154,7 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede
self._secure_channel = grpc.secure_channel(
target=credential_provider.cache_endpoint,
credentials=grpc.ssl_channel_credentials(),
options=grpc_channel_options_from_grpc_config(grpc_config),
options=grpc_data_channel_options_from_grpc_config(grpc_config),
)
intercept_channel = grpc.intercept_channel(
self._secure_channel, *_interceptors(credential_provider.auth_token, None)
Expand All @@ -178,7 +180,7 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede
self._secure_channel = grpc.secure_channel(
target=credential_provider.cache_endpoint,
credentials=grpc.ssl_channel_credentials(),
options=grpc_channel_options_from_grpc_config(grpc_config),
options=grpc_data_channel_options_from_grpc_config(grpc_config),
)
intercept_channel = grpc.intercept_channel(
self._secure_channel, *_stream_interceptors(credential_provider.auth_token)
Expand Down
10 changes: 6 additions & 4 deletions src/momento/internal/synchronous/_vector_index_grpc_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
from momento.internal._utilities._channel_credentials import (
channel_credentials_from_root_certs_or_default,
)
from momento.internal._utilities._grpc_channel_options import grpc_channel_options_from_grpc_config
from momento.internal._utilities._grpc_channel_options import (
grpc_control_channel_options_from_grpc_config,
grpc_data_channel_options_from_grpc_config,
)
from momento.internal.synchronous._add_header_client_interceptor import (
AddHeaderClientInterceptor,
Header,
Expand All @@ -26,9 +29,8 @@ def __init__(self, configuration: VectorIndexConfiguration, credential_provider:
self._secure_channel = grpc.secure_channel(
target=credential_provider.control_endpoint,
credentials=channel_credentials_from_root_certs_or_default(configuration),
options=grpc_channel_options_from_grpc_config(
options=grpc_control_channel_options_from_grpc_config(
grpc_config=configuration.get_transport_strategy().get_grpc_configuration(),
is_control_client=True,
),
)
intercept_channel = grpc.intercept_channel(self._secure_channel, *_interceptors(credential_provider.auth_token))
Expand All @@ -50,7 +52,7 @@ def __init__(self, configuration: VectorIndexConfiguration, credential_provider:
self._secure_channel = grpc.secure_channel(
target=credential_provider.vector_endpoint,
credentials=channel_credentials_from_root_certs_or_default(configuration),
options=grpc_channel_options_from_grpc_config(
options=grpc_data_channel_options_from_grpc_config(
configuration.get_transport_strategy().get_grpc_configuration()
),
)
Expand Down

0 comments on commit bc5c719

Please sign in to comment.