Skip to content

Commit

Permalink
Enable executeCode to run systemically
Browse files Browse the repository at this point in the history
  • Loading branch information
abaranec committed Jan 6, 2025
1 parent 8368538 commit bd48add
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,14 @@ public static boolean isSystemicThread() {
}

/**
* Marks the current thread as systemically important, this can be changed with {@link #markThreadNotSystemic()}.
* Marks the current thread as systemically important, this is a permanent change.
*/
public static void markThreadSystemic() {
if (SYSTEMIC_OBJECT_MARKING_ENABLED) {
SYSTEMIC_CREATION_THREAD.set(true);
}
}

/**
* Marks the current thread as not systemically important, this can be changed with {@link #markThreadSystemic()}
* ()}
*/
public static void markThreadNotSystemic() {
if (SYSTEMIC_OBJECT_MARKING_ENABLED) {
SYSTEMIC_CREATION_THREAD.set(false);
}
}

/**
* Execute the supplier with the thread's systemic importance set to the value of systemicThread.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,23 @@ public interface ConsoleSession extends Closeable {
* @throws ExecutionException if the request has an exception
* @throws TimeoutException if the request times out
*/
Changes executeCode(String code) throws InterruptedException, ExecutionException, TimeoutException;
default Changes executeCode(String code) throws InterruptedException, ExecutionException, TimeoutException {
return executeCode(code, false);
}

/**
* Execute the given {@code code} against the script session. The code may be executed systemically, meaning that
* failures of the executed script constitute failures of the application and cause shut down.
*
* @param code the code
* @param systemic if the code should be executed systemically.
* @return the changes
* @throws InterruptedException if the current thread is interrupted
* @throws ExecutionException if the request has an exception
* @throws TimeoutException if the request times out
*/
Changes executeCode(String code, boolean systemic)
throws InterruptedException, ExecutionException, TimeoutException;

/**
* Execute the given {@code path path's} code against the script session.
Expand All @@ -51,23 +67,58 @@ public interface ConsoleSession extends Closeable {
* @throws ExecutionException if the request has an exception
* @throws TimeoutException if the request times out
*/
Changes executeScript(Path path) throws IOException, InterruptedException, ExecutionException, TimeoutException;
default Changes executeScript(Path path)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
return executeScript(path, false);
}

/**
* Execute the given {@code path path's} code against the script session.
*
* @param path the path to the code
* @return the changes
* @throws InterruptedException if the current thread is interrupted
* @throws ExecutionException if the request has an exception
* @throws TimeoutException if the request times out
*/
Changes executeScript(Path path, boolean systemic)
throws IOException, InterruptedException, ExecutionException, TimeoutException;

/**
* Execute the given {@code code} against the script session.
*
* @param code the code
* @return the changes future
*/
default CompletableFuture<Changes> executeCodeFuture(String code) {
return executeCodeFuture(code, false);
}

/**
* Execute the given {@code code} against the script session.
*
* @param code the code
* @return the changes future
*/
CompletableFuture<Changes> executeCodeFuture(String code);
CompletableFuture<Changes> executeCodeFuture(String code, boolean systemic);

/**
* Execute the given {@code path path's} code against the script session.
*
* @param path the path to the code
* @return the changes future
*/
default CompletableFuture<Changes> executeScriptFuture(Path path) throws IOException {
return executeScriptFuture(path, false);
}

/**
* Execute the given {@code path path's} code against the script session.
*
* @param path the path to the code
* @return the changes future
*/
CompletableFuture<Changes> executeScriptFuture(Path path) throws IOException;
CompletableFuture<Changes> executeScriptFuture(Path path, boolean systemic) throws IOException;

/**
* Closes {@code this} console session.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,20 +418,22 @@ public Ticket ticket() {
}

@Override
public Changes executeCode(String code) throws InterruptedException, ExecutionException, TimeoutException {
return executeCodeFuture(code).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
public Changes executeCode(String code, boolean systemic)
throws InterruptedException, ExecutionException, TimeoutException {
return executeCodeFuture(code, systemic).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
}

@Override
public Changes executeScript(Path path)
public Changes executeScript(Path path, boolean systemic)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
return executeScriptFuture(path).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
return executeScriptFuture(path, systemic).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
}

@Override
public CompletableFuture<Changes> executeCodeFuture(String code) {
public CompletableFuture<Changes> executeCodeFuture(String code, boolean systemic) {
final ExecuteCommandRequest request =
ExecuteCommandRequest.newBuilder().setConsoleId(ticket()).setCode(code).build();
ExecuteCommandRequest.newBuilder().setConsoleId(ticket()).setCode(code).setSystemic(systemic)
.build();
return UnaryGrpcFuture.of(request, channel().console()::executeCommand,
response -> {
Changes.Builder builder = Changes.builder().changes(new FieldChanges(response.getChanges()));
Expand All @@ -443,9 +445,9 @@ public CompletableFuture<Changes> executeCodeFuture(String code) {
}

@Override
public CompletableFuture<Changes> executeScriptFuture(Path path) throws IOException {
public CompletableFuture<Changes> executeScriptFuture(Path path, boolean systemic) throws IOException {
final String code = String.join(System.lineSeparator(), Files.readAllLines(path, StandardCharsets.UTF_8));
return executeCodeFuture(code);
return executeCodeFuture(code, systemic);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,14 @@ message LogSubscriptionData {
reserved 4;//if we can scope logs to a script session
// Ticket console_id = 4;
}

message ExecuteCommandRequest {
io.deephaven.proto.backplane.grpc.Ticket console_id = 1;
reserved 2;//if script sessions get a ticket, we will use this reserved tag
string code = 3;
bool systemic=4;
}

message ExecuteCommandResponse {
string error_message = 1;
io.deephaven.proto.backplane.grpc.FieldsChangeUpdate changes = 2;
Expand Down
7 changes: 4 additions & 3 deletions py/client/pydeephaven/_console_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
from typing import Any
from typing import Any,

from pydeephaven.dherror import DHError
from deephaven_core.proto import console_pb2_grpc, console_pb2
Expand Down Expand Up @@ -32,7 +32,7 @@ def start_console(self):
except Exception as e:
raise DHError("failed to start a console.") from e

def run_script(self, server_script: str) -> Any:
def run_script(self, server_script: str, systemic: bool = False) -> Any:
"""Runs a Python script in the console."""
self.start_console()

Expand All @@ -41,7 +41,8 @@ def run_script(self, server_script: str) -> Any:
self._grpc_console_stub.ExecuteCommand,
console_pb2.ExecuteCommandRequest(
console_id=self.console_id,
code=server_script))
code=server_script,
systemic=systemic))
return response
except Exception as e:
raise DHError("failed to execute a command in the console.") from e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.deephaven.engine.table.impl.util.RuntimeMemory.Sample;
import io.deephaven.engine.util.DelegatingScriptSession;
import io.deephaven.engine.util.ScriptSession;
import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.integrations.python.PythonDeephavenSession;
import io.deephaven.internal.log.LoggerFactory;
Expand Down Expand Up @@ -189,7 +190,13 @@ public void executeCommand(
response))
.submit(() -> {
final ScriptSession scriptSession = exportedConsole.get();
final ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode());

final ScriptSession.Changes changes = request.getSystemic()
? SystemicObjectTracker.executeSystemically(true,
() -> scriptSession.evaluateScript(request.getCode()))
: scriptSession.evaluateScript(request.getCode());


final ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder();
final FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder();
changes.created.entrySet()
Expand Down

0 comments on commit bd48add

Please sign in to comment.