Skip to content

Commit

Permalink
Add --node-ipc support
Browse files Browse the repository at this point in the history
  • Loading branch information
alecmev committed Aug 9, 2022
1 parent 4b0889b commit 09c626e
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 26 deletions.
94 changes: 71 additions & 23 deletions plugin/core/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from queue import Queue
import http
import json
import multiprocessing.connection
import os
import shutil
import socket
Expand Down Expand Up @@ -48,26 +49,33 @@ def on_stderr_message(self, message: str) -> None:

class AbstractProcessor(Generic[T]):

def write_data(self, writer: IO[bytes], data: T) -> None:
def write_data(self, writer: IO[bytes], data: T, is_node_ipc = False) -> None:
raise NotImplementedError()

def read_data(self, reader: IO[bytes]) -> Optional[T]:
def read_data(self, reader: IO[bytes], is_node_ipc = False) -> Optional[T]:
raise NotImplementedError()


class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]):

def write_data(self, writer: IO[bytes], data: Dict[str, Any]) -> None:
def write_data(self, writer: IO[bytes], data: Dict[str, Any], is_node_ipc = False) -> None:
body = self._encode(data)
writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body))
if not is_node_ipc:
writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body))
else:
writer.write(body + b"\n")

def read_data(self, reader: IO[bytes], is_node_ipc = False) -> Optional[Dict[str, Any]]:
if not is_node_ipc:
headers = http.client.parse_headers(reader) # type: ignore
try:
body = reader.read(int(headers.get("Content-Length")))
except TypeError:
# Expected error on process stopping. Stop the read loop.
raise StopLoopError()
else:
body = reader.readline()

def read_data(self, reader: IO[bytes]) -> Optional[Dict[str, Any]]:
headers = http.client.parse_headers(reader) # type: ignore
try:
body = reader.read(int(headers.get("Content-Length")))
except TypeError:
# Expected error on process stopping. Stop the read loop.
raise StopLoopError()
try:
return self._decode(body)
except Exception as ex:
Expand All @@ -79,7 +87,6 @@ def _encode(data: Dict[str, Any]) -> bytes:
return json.dumps(
data,
ensure_ascii=False,
sort_keys=False,
check_circular=False,
separators=(',', ':')
).encode('utf-8')
Expand All @@ -93,7 +100,7 @@ class ProcessTransport(Transport[T]):

def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: IO[bytes],
writer: IO[bytes], stderr: Optional[IO[bytes]], processor: AbstractProcessor[T],
callback_object: TransportCallbacks[T]) -> None:
callback_object: TransportCallbacks[T], is_node_ipc: bool) -> None:
self._closed = False
self._process = process
self._socket = socket
Expand All @@ -105,6 +112,7 @@ def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket
self._writer_thread = threading.Thread(target=self._write_loop, name='{}-writer'.format(name))
self._stderr_thread = threading.Thread(target=self._stderr_loop, name='{}-stderr'.format(name))
self._callback_object = weakref.ref(callback_object)
self._is_node_ipc = is_node_ipc
self._send_queue = Queue(0) # type: Queue[Union[T, None]]
self._reader_thread.start()
self._writer_thread.start()
Expand Down Expand Up @@ -137,7 +145,7 @@ def __del__(self) -> None:
def _read_loop(self) -> None:
try:
while self._reader:
payload = self._processor.read_data(self._reader)
payload = self._processor.read_data(self._reader, self._is_node_ipc)
if payload is None:
continue

Expand Down Expand Up @@ -190,8 +198,9 @@ def _write_loop(self) -> None:
d = self._send_queue.get()
if d is None:
break
self._processor.write_data(self._writer, d)
self._writer.flush()
self._processor.write_data(self._writer, d, self._is_node_ipc)
if not self._is_node_ipc:
self._writer.flush()
except (BrokenPipeError, AttributeError):
pass
except Exception as ex:
Expand Down Expand Up @@ -223,24 +232,58 @@ def _stderr_loop(self) -> None:
json_rpc_processor = JsonRpcProcessor()


class NodeIpcIO():
_buf = bytearray()
_lines = 0

def __init__(self, conn: multiprocessing.connection._ConnectionBase):
self._conn = conn

# https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L378-L392
def readline(self):
while self._lines == 0:
chunk = self._conn._read(self._conn.fileno(), 65536) # type: bytes
self._buf += chunk
self._lines += chunk.count(b'\n')

self._lines -= 1
foo, _, self._buf = self._buf.partition(b'\n')
print('READLINE: ' + str(foo))
return foo

# https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L369-L376
def write(self, data: bytes):
while len(data):
n = self._conn._write(self._conn.fileno(), data)
data = data[n:]


def create_transport(config: TransportConfig, cwd: Optional[str],
callback_object: TransportCallbacks) -> Transport[Dict[str, Any]]:
stderr = subprocess.PIPE
pass_fds = ()
if config.tcp_port is not None:
assert config.tcp_port is not None
if config.tcp_port < 0:
stdout = subprocess.PIPE
else:
stdout = subprocess.DEVNULL
stdin = subprocess.DEVNULL
else:
elif not config.node_ipc:
stdout = subprocess.PIPE
stdin = subprocess.PIPE
else:
stdout = subprocess.PIPE
stdin = subprocess.DEVNULL
stderr = subprocess.STDOUT
pass_fds = (config.node_ipc.child_conn.fileno(),)

startupinfo = _fixup_startup_args(config.command)
sock = None # type: Optional[socket.socket]
process = None # type: Optional[subprocess.Popen]

def start_subprocess() -> subprocess.Popen:
return _start_subprocess(config.command, stdin, stdout, subprocess.PIPE, startupinfo, config.env, cwd)
return _start_subprocess(config.command, stdin, stdout, stderr, startupinfo, config.env, cwd, pass_fds)

if config.listener_socket:
assert isinstance(config.tcp_port, int) and config.tcp_port > 0
Expand All @@ -258,13 +301,16 @@ def start_subprocess() -> subprocess.Popen:
raise RuntimeError("Failed to connect on port {}".format(config.tcp_port))
reader = sock.makefile('rwb') # type: ignore
writer = reader
else:
elif not config.node_ipc:
reader = process.stdout # type: ignore
writer = process.stdin # type: ignore
else:
reader = writer = NodeIpcIO(config.node_ipc.parent_conn)
if not reader or not writer:
raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer))
return ProcessTransport(config.name, process, sock, reader, writer, process.stderr, json_rpc_processor,
callback_object)
stderr_reader = process.stdout if config.node_ipc else process.stderr
return ProcessTransport(config.name, process, sock, reader, writer, stderr_reader, json_rpc_processor,
callback_object, bool(config.node_ipc))


_subprocesses = weakref.WeakSet() # type: weakref.WeakSet[subprocess.Popen]
Expand Down Expand Up @@ -312,7 +358,8 @@ def _start_subprocess(
stderr: int,
startupinfo: Any,
env: Dict[str, str],
cwd: Optional[str]
cwd: Optional[str],
pass_fds: Union[Tuple[()], Tuple[int]]
) -> subprocess.Popen:
debug("starting {} in {}".format(args, cwd if cwd else os.getcwd()))
process = subprocess.Popen(
Expand All @@ -322,7 +369,8 @@ def _start_subprocess(
stderr=stderr,
startupinfo=startupinfo,
env=env,
cwd=cwd)
cwd=cwd,
pass_fds=pass_fds)
_subprocesses.add(process)
return process

Expand Down
21 changes: 18 additions & 3 deletions plugin/core/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
from wcmatch.glob import BRACE
from wcmatch.glob import globmatch
from wcmatch.glob import GLOBSTAR
import collections
import contextlib
import fnmatch
import multiprocessing
import multiprocessing.connection
import os
import posixpath
import socket
Expand Down Expand Up @@ -605,24 +608,32 @@ def map_from_remote_to_local(self, uri: str) -> Tuple[str, bool]:
return _translate_path(uri, self._remote, self._local)


NodeIpc = collections.namedtuple('NodeIpc', 'parent_conn,child_conn')
NodeIpc.__annotations__ = {'parent_conn': multiprocessing.connection._ConnectionBase, 'child_conn': multiprocessing.connection._ConnectionBase}


class TransportConfig:
__slots__ = ("name", "command", "tcp_port", "env", "listener_socket")
__slots__ = ("name", "command", "tcp_port", "env", "listener_socket", "node_ipc")

def __init__(
self,
name: str,
command: List[str],
tcp_port: Optional[int],
env: Dict[str, str],
listener_socket: Optional[socket.socket]
listener_socket: Optional[socket.socket],
node_ipc: Optional[NodeIpc]
) -> None:
if not command and not tcp_port:
raise ValueError('neither "command" nor "tcp_port" is provided; cannot start a language server')
if node_ipc and (tcp_port or listener_socket):
raise ValueError('"tcp_port" and "listener_socket" can\'t be provided in "--node-ipc" mode; cannot start a language server')
self.name = name
self.command = command
self.tcp_port = tcp_port
self.env = env
self.listener_socket = listener_socket
self.node_ipc = node_ipc


class ClientConfig:
Expand Down Expand Up @@ -790,7 +801,11 @@ def resolve_transport_config(self, variables: Dict[str, str]) -> TransportConfig
env[key] = sublime.expand_variables(value, variables) + os.path.pathsep + env[key]
else:
env[key] = sublime.expand_variables(value, variables)
return TransportConfig(self.name, command, tcp_port, env, listener_socket)
node_ipc = None
if '--node-ipc' in command:
node_ipc = NodeIpc(*multiprocessing.Pipe())
env["NODE_CHANNEL_FD"] = str(node_ipc.child_conn.fileno())
return TransportConfig(self.name, command, tcp_port, env, listener_socket, node_ipc)

def set_view_status(self, view: sublime.View, message: str) -> None:
if sublime.load_settings("LSP.sublime-settings").get("show_view_status"):
Expand Down

0 comments on commit 09c626e

Please sign in to comment.