Skip to content

Commit

Permalink
Move away from ClientMiddleware and ClientAuthHandler in pydeephaven (#…
Browse files Browse the repository at this point in the history
…5489)

* For debugging the gRPC error seen at optiver

* Move away from ClientMiddleware and ClientAuthHandler in pydeephaven.

* Cleanups.

* Cleanups.

* Remove unused import.

* type annotation for _refresh_token return.

* Remove test_mt.

* Fix issues with bi-streaming calls and typos.

* Followup to review comments from Jianfeng.

* Update _session_service.py after _auth_value rename.

* Followup to Corey's comments.

* Followup to Corey's comments.

* Limit refresh retries.

* Fixed forwarding.

* Do something more sane for wrap_rpc with pre-existing metadata.

* Again.

* Followup to review comments from Colin.

* Removed comment that didn't apply anymore.

* Followup to review comments.

* Removed file that I did not intend to add.

* Followup to review comments.

* Fix typos, thanks Jianfeng.

* Shorter.

* Followup to review comments.

* Followup to Colin's comments.

* Rework wrap.

* Fix typo, hat tip Jianfeng.

* remove unused imports.

* Moved to _services_lock for service initialization.

* Restore newline.

* remove newline

* remove spurious whitespace

* Add newline

* Add newline.

* Something.

* Ensure the program terminates if we run out of refresh attempts.

* Tweak retry failure case.

* Spurious whitespace.

* Fix types on update_matadata.

* Add type annotations

* One more type hint.

---------

Co-authored-by: jianfengmao <[email protected]>
  • Loading branch information
jcferretti and jmao-denver committed Jun 4, 2024
1 parent 21b6351 commit 82fc24b
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 116 deletions.
6 changes: 3 additions & 3 deletions py/client/pydeephaven/_app_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ def __init__(self, session):
def list_fields(self) -> Any:
"""Fetches the current application fields."""
try:
fields = self._grpc_app_stub.ListFields(
application_pb2.ListFieldsRequest(),
metadata=self.session.grpc_metadata
fields = self.session.wrap_bidi_rpc(
self._grpc_app_stub.ListFields,
application_pb2.ListFieldsRequest()
)
return fields
except Exception as e:
Expand Down
19 changes: 11 additions & 8 deletions py/client/pydeephaven/_arrow_flight_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pyarrow as pa
import pyarrow.flight as paflight

from pyarrow.flight import FlightCallOptions
from pydeephaven._arrow import map_arrow_type
from pydeephaven.dherror import DHError
from pydeephaven.table import Table
Expand All @@ -26,10 +27,10 @@ def import_table(self, data: pa.Table) -> Table:
dh_fields.append(pa.field(name=f.name, type=f.type, metadata=map_arrow_type(f.type)))
dh_schema = pa.schema(dh_fields)

# No need to add headers/metadata here via the options argument;
# or middleware is already doing it for every call.
writer, reader = self._flight_client.do_put(
pa.flight.FlightDescriptor.for_path("export", str(ticket)), dh_schema)
pa.flight.FlightDescriptor.for_path("export", str(ticket)),
dh_schema,
FlightCallOptions(headers=self.session.grpc_metadata))
writer.write_table(data)
# Note that pyarrow's write_table completes the gRPC. If we send another gRPC close
# it is possible that by the time the request arrives at the server that it no longer
Expand All @@ -44,9 +45,10 @@ def do_get_table(self, table: Table) -> pa.Table:
"""Gets a snapshot of a Table via Flight do_get."""
try:
flight_ticket = paflight.Ticket(table.ticket.ticket)
# No need to add headers/metadata here via the options argument;
# or middleware is already doing it for every call.
reader = self._flight_client.do_get(flight_ticket)
reader = self._flight_client.do_get(
flight_ticket,
FlightCallOptions(headers=self.session.grpc_metadata))

return reader.read_all()
except Exception as e:
raise DHError("failed to perform a flight DoGet on the table.") from e
Expand All @@ -59,8 +61,9 @@ def do_exchange(self):
"""
try:
desc = pa.flight.FlightDescriptor.for_command(b"dphn")
options = paflight.FlightCallOptions(headers=self.session.grpc_metadata)
writer, reader = self._flight_client.do_exchange(desc, options)
writer, reader = self._flight_client.do_exchange(
desc,
FlightCallOptions(headers=self.session.grpc_metadata))
return writer, reader

except Exception as e:
Expand Down
6 changes: 3 additions & 3 deletions py/client/pydeephaven/_config_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ def __init__(self, session):
def get_configuration_constants(self) -> Dict[str, Any]:
"""Fetches the server configuration as a dict."""
try:
response = self._grpc_app_stub.GetConfigurationConstants(config_pb2.ConfigurationConstantsRequest(),
metadata=self.session.grpc_metadata
)
response = self.session.wrap_rpc(
self._grpc_app_stub.GetConfigurationConstants,
config_pb2.ConfigurationConstantsRequest())
return dict(response.config_values)
except Exception as e:
raise DHError("failed to get the configuration constants.") from e
37 changes: 21 additions & 16 deletions py/client/pydeephaven/_console_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,29 @@ def start_console(self):
if self.console_id:
return

try:
result_id = self.session.make_ticket()
response = self._grpc_console_stub.StartConsole(
console_pb2.StartConsoleRequest(result_id=result_id, session_type=self.session._session_type),
metadata=self.session.grpc_metadata)
self.console_id = response.result_id
except Exception as e:
raise DHError("failed to start a console.") from e
with self.session._r_lock:
if not self.console_id:
try:
result_id = self.session.make_ticket()
response = self.session.wrap_rpc(
self._grpc_console_stub.StartConsole,
console_pb2.StartConsoleRequest(
result_id=result_id,
session_type=self.session._session_type))
self.console_id = response.result_id
except Exception as e:
raise DHError("failed to start a console.") from e

def run_script(self, server_script: str) -> Any:
"""Runs a Python script in the console."""
self.start_console()

try:
response = self._grpc_console_stub.ExecuteCommand(
response = self.session.wrap_rpc(
self._grpc_console_stub.ExecuteCommand,
console_pb2.ExecuteCommandRequest(
console_id=self.console_id,
code=server_script),
metadata=self.session.grpc_metadata)
code=server_script))
return response
except Exception as e:
raise DHError("failed to execute a command in the console.") from e
Expand All @@ -47,10 +51,11 @@ def bind_table(self, table: Table, variable_name: str):
if not table or not variable_name:
raise DHError("invalid table and/or variable_name values.")
try:
response = self._grpc_console_stub.BindTableToVariable(
console_pb2.BindTableToVariableRequest(console_id=self.console_id,
table_id=table.ticket,
variable_name=variable_name),
metadata=self.session.grpc_metadata)
self.session.wrap_rpc(
self._grpc_console_stub.BindTableToVariable,
console_pb2.BindTableToVariableRequest(
console_id=self.console_id,
table_id=table.ticket,
variable_name=variable_name))
except Exception as e:
raise DHError("failed to bind a table to a variable on the server.") from e
15 changes: 8 additions & 7 deletions py/client/pydeephaven/_input_table_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@ def __init__(self, session):
def add(self, input_table: InputTable, table: Table):
"""Adds a table to the InputTable."""
try:
response = self._grpc_input_table_stub.AddTableToInputTable(
self.session.wrap_rpc(
self._grpc_input_table_stub.AddTableToInputTable,
inputtable_pb2.AddTableRequest(input_table=input_table.ticket,
table_to_add=table.ticket),
metadata=self.session.grpc_metadata)
table_to_add=table.ticket))
except Exception as e:
raise DHError("failed to add to InputTable") from e

def delete(self, input_table: InputTable, table: Table):
"""Deletes a table from an InputTable."""
try:
response = self._grpc_input_table_stub.DeleteTableFromInputTable(
inputtable_pb2.DeleteTableRequest(input_table=input_table.ticket,
table_to_remove=table.ticket),
metadata=self.session.grpc_metadata)
self.session.wrap_rpc(
self._grpc_input_table_stub.DeleteTableFromInputTable,
inputtable_pb2.DeleteTableRequest(
input_table=input_table.ticket,
table_to_remove=table.ticket))
except Exception as e:
raise DHError("failed to delete from InputTable") from e
4 changes: 3 additions & 1 deletion py/client/pydeephaven/_plugin_obj_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ def __init__(self, session: 'pydeephaven.session.Session'):
def message_stream(self, req_stream: PluginRequestStream) -> Any:
"""Opens a connection to the server-side implementation of this plugin."""
try:
resp = self._grpc_app_stub.MessageStream(req_stream, metadata=self.session.grpc_metadata)
resp = self.session.wrap_bidi_rpc(
self._grpc_app_stub.MessageStream,
req_stream)
return resp
except Exception as e:
raise DHError("failed to establish bidirectional stream with the server.") from e
12 changes: 8 additions & 4 deletions py/client/pydeephaven/_session_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,19 @@ def connect(self) -> grpc.Channel:
def close(self):
"""Closes the gRPC connection."""
try:
self._grpc_session_stub.CloseSession(
session_pb2.HandshakeRequest(auth_protocol=0, payload=self.session._auth_token),
metadata=self.session.grpc_metadata)
self.session.wrap_rpc(
self._grpc_session_stub.CloseSession,
session_pb2.HandshakeRequest(
auth_protocol=0,
payload=self.session._auth_header_value))
except Exception as e:
raise DHError("failed to close the session.") from e

def release(self, ticket):
"""Releases an exported ticket."""
try:
self._grpc_session_stub.Release(session_pb2.ReleaseRequest(id=ticket), metadata=self.session.grpc_metadata)
self.session.wrap_rpc(
self._grpc_session_stub.Release,
session_pb2.ReleaseRequest(id=ticket))
except Exception as e:
raise DHError("failed to release a ticket.") from e
22 changes: 13 additions & 9 deletions py/client/pydeephaven/_table_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ def batch(self, ops: List[TableOp]) -> Table:
batch_ops = BatchOpAssembler(self.session, table_ops=ops).build_batch()

try:
response = self._grpc_table_stub.Batch(
table_pb2.BatchTableRequest(ops=batch_ops),
metadata=self.session.grpc_metadata)
response = self.session.wrap_bidi_rpc(
self._grpc_table_stub.Batch,
table_pb2.BatchTableRequest(ops=batch_ops))

exported_tables = []
for exported in response:
Expand All @@ -46,9 +46,11 @@ def grpc_table_op(self, table: Table, op: TableOp, table_class: type = Table) ->
else:
table_reference = None
stub_func = op.__class__.get_stub_func(self._grpc_table_stub)
response = stub_func(op.make_grpc_request(result_id=result_id, source_id=table_reference),
metadata=self.session.grpc_metadata)

response = self.session.wrap_rpc(
stub_func,
op.make_grpc_request(
result_id=result_id,
source_id=table_reference))
if response.success:
return table_class(self.session, ticket=response.result_id.ticket,
schema_header=response.schema_header,
Expand All @@ -61,11 +63,13 @@ def grpc_table_op(self, table: Table, op: TableOp, table_class: type = Table) ->

def fetch_etcr(self, ticket) -> Table:
"""Given a ticket, constructs a table around it, by fetching metadata from the server."""
response = self._grpc_table_stub.GetExportedTableCreationResponse(ticket, metadata=self.session.grpc_metadata)
response = self.session.wrap_rpc(
self._grpc_table_stub.GetExportedTableCreationResponse,
ticket)
if response.success:
return Table(self.session, ticket=response.result_id.ticket,
schema_header=response.schema_header,
size=response.size,
is_static=response.is_static)
else:
raise DHError(f"Server error received for ExportedTableCreationResponse: {response.error_info}")
raise DHError(
f"Server error received for ExportedTableCreationResponse: {response.error_info}")
Loading

0 comments on commit 82fc24b

Please sign in to comment.