Skip to content

Commit

Permalink
Add some docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver committed Apr 18, 2024
1 parent 8320ded commit 0beb368
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ public static BarrageSession of(
return new BarrageSession(session, client, channel);
}

public static BarrageSession create(
SessionImpl session, BufferAllocator incomingAllocator, ManagedChannel channel) {
final FlightClient client = FlightGrpcUtilsExtension.createFlightClientWithSharedChannel(
incomingAllocator, channel, Collections.singletonList(new SessionMiddleware(session)));
return new BarrageSession(session, client, channel);
}

protected BarrageSession(
final SessionImpl session, final FlightClient client, final ManagedChannel channel) {
super(session, client);
Expand Down
4 changes: 2 additions & 2 deletions py/client/tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ def test_blink_input_table(self):


def test_share_table(self):
pub_session = Session()
t = pub_session.empty_table(1000).update("X = i")
pub_session = Session("localhost", 10008)
t = pub_session.empty_table(1000).update(["X = i", "Y = 2*i"])
self.assertEqual(t.size, 1000)
shared_ticket = SharedTicket.random_ticket()
pub_session.publish_table(t, shared_ticket)
Expand Down
125 changes: 102 additions & 23 deletions py/server/deephaven/remote.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
from __future__ import annotations
import threading
from typing import Dict

import jpy

from deephaven import DHError
from deephaven._wrapper import JObjectWrapper
from deephaven.table import Table

_JURI = jpy.get_type("java.net.URI")
Expand All @@ -16,52 +19,124 @@
_JDeephavenChannelImpl = jpy.get_type("io.deephaven.proto.DeephavenChannelImpl")
_JSessionImpl = jpy.get_type("io.deephaven.client.impl.SessionImpl")
_JExecutors = jpy.get_type("java.util.concurrent.Executors")
# _JBarrageSession = jpy.get_type("io.deephaven.client.impl.BarrageSession")
# _JFlightSession = jpy.get_type("io.deephaven.client.impl.FlightSession")
# _JRootAllocator = jpy.get_type("org.apache.arrow.memory.RootAllocator")
_JBarrageSession = jpy.get_type("io.deephaven.client.impl.BarrageSession")
_JRootAllocator = jpy.get_type("org.apache.arrow.memory.RootAllocator")
_JSharedId = jpy.get_type("io.deephaven.client.impl.SharedId")
_JBarrageTableResolver = jpy.get_type("io.deephaven.server.uri.BarrageTableResolver")

_session_cache: Dict[str, RemoteSession] = {} # use WeakValueDictionary to avoid memory leak?
_remote_session_lock = threading.Lock()

class Session:
"""
A Deephaven gRPC session.

def remote_session(host: str,
port: int = 10000,
auth_type: str = "Anonymous",
auth_token: str = "",
# never_timeout: bool = True,
# session_type: str = 'python',
# use_tls: bool = False,
# tls_root_certs: bytes = None,
# client_cert_chain: bytes = None,
) -> RemoteSession:
"""Returns a Deephaven gRPC session to a remote server if a cached session is available; otherwise, creates a new
session.
Args:
host (str): the host name or IP address of the Deephaven server.
port (int): the port number that the remote Deephaven server is listening on, default is 10000.
auth_type (str): the authentication type string, can be "Anonymous', 'Basic", or any custom-built
authenticator in the server, such as "io.deephaven.authentication.psk.PskAuthenticationHandler",
default is 'Anonymous'.
auth_token (str): the authentication token string. When auth_type is 'Basic', it must be
"user:password"; when auth_type is "Anonymous', it will be ignored; when auth_type is a custom-built
authenticator, it must conform to the specific requirement of the authenticator
Returns:
a Deephaven gRPC session
Raises:
DHError
"""
def __init__(self, host: str = None,
port: int = None,
with _remote_session_lock:
uri = f"dh+plain://{host}:{port}"
session = _session_cache.get(uri)
if session:
if session.is_alive: # doesn't guarantee the session is still alive, just that it hasn't been explicitly
# closed
return session
else:
del _session_cache[uri]
session = RemoteSession(host, port, auth_type, auth_token)
_session_cache[uri] = session
return session


class RemoteSession (JObjectWrapper):
""" A Deephaven gRPC session to a remote server."""
# def __init__(self, j_barrage_session):
# self.j_barrage_session = j_barrage_session
#
# def subscribe(self, ticket: bytes):
# j_table_handle = self._j_session.of(_JSharedId(ticket).ticketId().table())
# j_barrage_subscription = self._j_barrage_session.subscribe(j_table_handle,
# _JBarrageTableResolver.SUB_OPTIONS)
# j_table = j_barrage_subscription.entireTable().get()
# return Table(j_table)
#
# def snapshot(self):
# return self.j_barrage_session.snapshot()
def __init__(self, host: str,
port: int = 10000,
auth_type: str = "Anonymous",
auth_token: str = ""):
auth_token: str = "",
# never_timeout: bool = True,
# session_type: str = 'python',
# use_tls: bool = False,
# tls_root_certs: bytes = None,
# client_cert_chain: bytes = None,
):
"""Creates a Deephaven gRPC session to a remote server.
Args:
host (str): the host name or IP address of the Deephaven server.
port (int): the port number that the remote Deephaven server is listening on, default is 10000.
auth_type (str): the authentication type string, can be "Anonymous', 'Basic", or any custom-built
authenticator in the server, such as "io.deephaven.authentication.psk.PskAuthenticationHandler",
default is 'Anonymous'.
auth_token (str): the authentication token string. When auth_type is 'Basic', it must be
"user:password"; when auth_type is "Anonymous', it will be ignored; when auth_type is a custom-built
authenticator, it must conform to the specific requirement of the authenticator
"""
self.host = host
self.port = port
self._auth_type = auth_type
self._auth_token = auth_token
self.grpc_channel = None
self._r_lock = threading.RLock()
self._is_alive = False
self._uri = f"dh+plain://{host}:{port}"
self._connect()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@property
def is_alive(self) -> bool:
"""if the session is alive."""
return self._is_alive

def _connect(self) -> None:
target = ":".join([self.host, str(self.port)])
try:
_j_host_config = (_JClientConfig.builder()
.target(_JDeephavenTarget.of(_JURI("dh+plain://" + target)))
.target(_JDeephavenTarget.of(_JURI(self._uri)))
.build())
_j_channel = _JChannelHelper.channel(_j_host_config)
_j_dh_channel = _JDeephavenChannelImpl(_j_channel)
self._j_channel = _JChannelHelper.channel(_j_host_config)
_j_dh_channel = _JDeephavenChannelImpl(self._j_channel)

_j_session_config = (_JSessionImplConfig.builder()
.executor(_JExecutors.newScheduledThreadPool(4))
.authenticationTypeAndValue(f"{self._auth_type} {self._auth_token}")
.channel(_j_dh_channel)
.build())
self._j_session = _JSessionImpl.create(_j_session_config)
self._j_barrage_session = _JBarrageSession.create(self._j_session, _JRootAllocator(), self._j_channel)
self._is_alive = True


except Exception as e:
raise DHError("failed to create a session to a remote Deephaven server.") from e

Expand All @@ -80,11 +155,15 @@ def close(self):
finally:
self._is_alive = False

def fetch(self, shared_ticket) -> Table:
def fetch(self, shared_ticket: bytes) -> Table:
"""Fetches data from the Deephaven server."""
if not self._is_alive:
raise DHError("the session is not alive.")
try:
return Table(self._j_session.of(shared_ticket).table())
j_table_handle = self._j_session.of(_JSharedId(shared_ticket).ticketId().table())
j_barrage_subscription = self._j_barrage_session.subscribe(j_table_handle,
_JBarrageTableResolver.SUB_OPTIONS)
j_table = j_barrage_subscription.entireTable().get()
return Table(j_table)
except Exception as e:
raise DHError("failed to fetch data from the server.") from e
12 changes: 9 additions & 3 deletions py/server/tests/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@
#

import unittest

from deephaven.remote import Session
from deephaven.remote import remote_session

from tests.testbase import BaseTestCase


class RemoteTestCase(BaseTestCase):
def test_session(self):
session = Session(host="core-server-2-1", port=10000, auth_type="Anonymous")
session = remote_session(host="core-server-2-1", port=10000, auth_type="Anonymous")
t = session.fetch(b'hgd\xc6\xf3\xea\xd3\x15\xbabB#\x1e-\x94\xfcI')
self.assertEqual(t.size, 1000)
self.assertEqual(len(t.columns), 2)
sp = t.snapshot()
self.assertEqual(sp.size, 1000)
t1 = t.update("Z = X + Y")
self.assertEqual(t1.size, 1000)


if __name__ == "__main__":
Expand Down

0 comments on commit 0beb368

Please sign in to comment.