From efcdbcaecbe3520a1a884d76d62115862d38d70c Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 15 Sep 2023 15:02:56 +0200 Subject: [PATCH] [docker run] Some improvements (#420) --- .../python-processor-exclamation/README.md | 7 ++++ examples/applications/query-jdbc/README.md | 15 ++----- .../applications/query-jdbc/gateways.yaml | 39 ++++++++++++++++++ .../langstream/api/runtime/ExecutionPlan.java | 12 +++--- .../docker/LocalRunApplicationCmd.java | 41 +++++++++++++++---- .../apps/SerializedApplicationInstance.java | 10 +++-- .../impl/k8s/KubernetesClusterRuntime.java | 10 +++-- .../langstream/runtime/agent/AgentRunner.java | 11 +++-- .../src/main/assemble/entrypoint.sh | 5 +-- .../assemble/langstream-runtime-tester.xml | 8 ++++ .../src/main/assemble/logback.xml | 38 +++++++++++++++++ .../src/main/docker/Dockerfile | 1 + .../tester/LocalApplicationRunner.java | 33 ++++++++++++--- .../resources/gateway.application.properties | 1 + .../webservice.application.properties | 2 +- 15 files changed, 186 insertions(+), 47 deletions(-) create mode 100644 examples/applications/query-jdbc/gateways.yaml create mode 100644 langstream-runtime/langstream-runtime-tester/src/main/assemble/logback.xml diff --git a/examples/applications/python-processor-exclamation/README.md b/examples/applications/python-processor-exclamation/README.md index 6ddd9535a..ec1d1a060 100644 --- a/examples/applications/python-processor-exclamation/README.md +++ b/examples/applications/python-processor-exclamation/README.md @@ -9,6 +9,13 @@ The code in `example.py` adds an exclamation mark to the end of a string message ./bin/langstream apps deploy test -app examples/applications/python-processor-exclamation -i examples/instances/kafka-kubernetes.yaml -s examples/secrets/secrets.yaml ``` +## Talk with the Chat bot using the CLI +Since the application opens a gateway, we can use the gateway API to send and consume messages. + +``` +./bin/langstream gateway chat test -cg consume-output -pg produce-input -p sessionId=$(uuidgen) +``` + ## Start a Producer ``` kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.35.1-kafka-3.4.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic input-topic diff --git a/examples/applications/query-jdbc/README.md b/examples/applications/query-jdbc/README.md index 394459524..141300913 100644 --- a/examples/applications/query-jdbc/README.md +++ b/examples/applications/query-jdbc/README.md @@ -60,9 +60,11 @@ Insert some data: ./bin/langstream apps deploy test -app examples/applications/query-jdbc -i examples/instances/kafka-kubernetes.yaml -s examples/secrets/secrets.yaml ``` -## Start a Producer +## Talk with the Chat bot using the CLI +Since the application opens a gateway, we can use the gateway API to send and consume messages. + ``` -kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.35.1-kafka-3.4.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic input-topic +./bin/langstream gateway chat test -cg consume-output -pg produce-input -p sessionId=$(uuidgen) ``` Insert a JSON with "id", "name" and "description": @@ -71,12 +73,3 @@ Insert a JSON with "id", "name" and "description": {"id": 1, "name": "test", "description": "test"} ``` - -## Start a Consumer - -Start a Kafka Consumer on a terminal - -``` -kubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.35.1-kafka-3.4.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic output-topic --from-beginning -``` - diff --git a/examples/applications/query-jdbc/gateways.yaml b/examples/applications/query-jdbc/gateways.yaml new file mode 100644 index 000000000..9c077e554 --- /dev/null +++ b/examples/applications/query-jdbc/gateways.yaml @@ -0,0 +1,39 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +gateways: + - id: produce-input + type: produce + topic: input-topic + parameters: + - sessionId + produceOptions: + headers: + - key: langstream-client-session-id + valueFromParameters: sessionId + + - id: consume-output + type: consume + topic: output-topic + parameters: + - sessionId + consumeOptions: + filters: + headers: + - key: langstream-client-session-id + valueFromParameters: sessionId + diff --git a/langstream-api/src/main/java/ai/langstream/api/runtime/ExecutionPlan.java b/langstream-api/src/main/java/ai/langstream/api/runtime/ExecutionPlan.java index 246992055..6ea295f4f 100644 --- a/langstream-api/src/main/java/ai/langstream/api/runtime/ExecutionPlan.java +++ b/langstream-api/src/main/java/ai/langstream/api/runtime/ExecutionPlan.java @@ -119,11 +119,13 @@ public AgentNode getAgentImplementation(Module module, String id) { public void registerAgent(Module module, String id, AgentNode agentImplementation) { String internalId = module.getId() + "#" + id; - log.info( - "registering agent {} for module {} with id {}", - agentImplementation, - module.getId(), - id); + if (log.isDebugEnabled()) { + log.debug( + "registering agent {} for module {} with id {}", + agentImplementation, + module.getId(), + id); + } agents.put(internalId, agentImplementation); } diff --git a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java index aa5b670c0..177ac707a 100644 --- a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java +++ b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java @@ -39,8 +39,7 @@ public class LocalRunApplicationCmd extends BaseDockerCmd { @CommandLine.Option( names = {"-i", "--instance"}, - description = "Instance file path", - required = true) + description = "Instance file path") private String instanceFilePath; @CommandLine.Option( @@ -118,7 +117,13 @@ public void run() { } final File appDirectory = checkFileExistsOrDownload(appPath); - final File instanceFile = checkFileExistsOrDownload(instanceFilePath); + final File instanceFile; + + if (instanceFilePath != null) { + instanceFile = checkFileExistsOrDownload(instanceFilePath); + } else { + instanceFile = null; + } final File secretsFile = checkFileExistsOrDownload(secretFilePath); log("Tenant " + getTenant()); @@ -129,7 +134,7 @@ public void run() { } else { log("Running all the agents in the application"); } - log("Instance file: " + instanceFile.getAbsolutePath()); + log("Instance file: " + instanceFile); if (secretsFile != null) { log("Secrets file: " + secretsFile.getAbsolutePath()); } @@ -137,8 +142,8 @@ public void run() { log("Start S3: " + startS3); log("Start Webservices " + startWebservices); - if ((appDirectory == null || instanceFile == null)) { - throw new IllegalArgumentException("application and instance files are required"); + if (appDirectory == null) { + throw new IllegalArgumentException("application files are required"); } downloadDependencies(appDirectory.toPath(), getClient(), this::log); @@ -147,9 +152,27 @@ public void run() { final String instanceContents; try { - instanceContents = - LocalFileReferenceResolver.resolveFileReferencesInYAMLFile( - instanceFile.toPath()); + if (instanceFile != null) { + + instanceContents = + LocalFileReferenceResolver.resolveFileReferencesInYAMLFile( + instanceFile.toPath()); + } else { + if (startBroker) { + instanceContents = + "instance:\n" + + " streamingCluster:\n" + + " type: \"kafka\"\n" + + " configuration:\n" + + " admin:\n" + + " bootstrap.servers: localhost:9092"; + log( + "Using default instance file that connects to the Kafka broker inside the docker container"); + } else { + throw new IllegalArgumentException( + "instance file is required if broker is not started"); + } + } } catch (Exception e) { log( "Failed to resolve instance file references. Please double check the file path: " diff --git a/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/apps/SerializedApplicationInstance.java b/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/apps/SerializedApplicationInstance.java index 8c323a77f..cf2de2166 100644 --- a/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/apps/SerializedApplicationInstance.java +++ b/langstream-k8s-deployer/langstream-k8s-deployer-api/src/main/java/ai/langstream/deployer/k8s/api/crds/apps/SerializedApplicationInstance.java @@ -42,10 +42,12 @@ public SerializedApplicationInstance( this.instance = applicationInstance.getInstance(); this.gateways = applicationInstance.getGateways(); this.agentRunners = new HashMap<>(); - log.info( - "Serializing application instance {} executionPlan {}", - applicationInstance, - executionPlan); + if (log.isDebugEnabled()) { + log.debug( + "Serializing application instance {} executionPlan {}", + applicationInstance, + executionPlan); + } if (executionPlan != null && executionPlan.getAgents() != null) { for (Map.Entry entry : executionPlan.getAgents().entrySet()) { AgentNode agentNode = entry.getValue(); diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntime.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntime.java index a9d806be7..d9d89efee 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntime.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntime.java @@ -248,10 +248,12 @@ private void collectAgentCustomResourceAndSecret( StreamingClusterRuntime streamingClusterRuntime, ExecutionPlan applicationInstance, String codeStorageArchiveId) { - log.info( - "Building configuration for Agent {}, codeStorageArchiveId {}", - agent, - codeStorageArchiveId); + if (log.isDebugEnabled()) { + log.debug( + "Building configuration for Agent {}, codeStorageArchiveId {}", + agent, + codeStorageArchiveId); + } if (!(agent instanceof DefaultAgentNode defaultAgentImplementation)) { throw new UnsupportedOperationException( "Only default agent implementations are supported"); diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java index 3b59826c9..2c03cf0a4 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java @@ -147,14 +147,16 @@ public void run( Runnable beforeStopSource, boolean startHttpServer) throws Exception { - log.info("Pod Configuration {}", configuration); + if (log.isDebugEnabled()) { + log.debug("Pod Configuration {}", configuration); + } // agentId is the identity of the agent in the cluster // it is shared by all the instances of the agent String agentId = configuration.agent().applicationId() + "-" + configuration.agent().agentId(); - log.info("Starting agent {} with configuration {}", agentId, configuration.agent()); + log.info("Starting agent {}", agentId); List customLibClasspath = buildCustomLibClasspath(codeDirectory); try (NarFileHandler narFileHandler = @@ -915,7 +917,10 @@ public PermanentFailureException(Throwable cause) { private static AgentCodeAndLoader initAgent( RuntimePodConfiguration configuration, AgentCodeRegistry agentCodeRegistry) throws Exception { - log.info("Bootstrapping agent with configuration {}", configuration.agent()); + log.info( + "Bootstrapping agent {} type {}", + configuration.agent().agentId(), + configuration.agent().agentType()); return initAgent( configuration.agent().agentId(), configuration.agent().agentType(), diff --git a/langstream-runtime/langstream-runtime-tester/src/main/assemble/entrypoint.sh b/langstream-runtime/langstream-runtime-tester/src/main/assemble/entrypoint.sh index 493f65bf5..6972216e0 100644 --- a/langstream-runtime/langstream-runtime-tester/src/main/assemble/entrypoint.sh +++ b/langstream-runtime/langstream-runtime-tester/src/main/assemble/entrypoint.sh @@ -15,9 +15,6 @@ # limitations under the License. # -echo "Hosts:" -cat /etc/hosts - START_KAFKA=${START_BROKER:-true} if [ "$START_BROKER" = "true" ]; then echo "Starting Broker" @@ -30,4 +27,4 @@ if [ "$START_MINIO" = "true" ]; then /minio/minio server /tmp & fi -exec java ${JAVA_OPTS} -Djdk.lang.Process.launchMechanism=vfork -cp "/app/lib/*:/app/tester/lib/*" "ai.langstream.runtime.tester.Main" +exec java ${JAVA_OPTS} -Dlogging.config=/app/logback.xml -Djdk.lang.Process.launchMechanism=vfork -cp "/app/lib/*:/app/tester/lib/*" "ai.langstream.runtime.tester.Main" diff --git a/langstream-runtime/langstream-runtime-tester/src/main/assemble/langstream-runtime-tester.xml b/langstream-runtime/langstream-runtime-tester/src/main/assemble/langstream-runtime-tester.xml index 7566da1cd..f86649d86 100644 --- a/langstream-runtime/langstream-runtime-tester/src/main/assemble/langstream-runtime-tester.xml +++ b/langstream-runtime/langstream-runtime-tester/src/main/assemble/langstream-runtime-tester.xml @@ -33,6 +33,14 @@ unix 755 + + src/main/assemble/ + . + + logback.xml + + unix + diff --git a/langstream-runtime/langstream-runtime-tester/src/main/assemble/logback.xml b/langstream-runtime/langstream-runtime-tester/src/main/assemble/logback.xml new file mode 100644 index 000000000..a23d4058a --- /dev/null +++ b/langstream-runtime/langstream-runtime-tester/src/main/assemble/logback.xml @@ -0,0 +1,38 @@ + + + + + + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -%kvp- %msg%n + + + + + + + + + + + diff --git a/langstream-runtime/langstream-runtime-tester/src/main/docker/Dockerfile b/langstream-runtime/langstream-runtime-tester/src/main/docker/Dockerfile index 97090cfea..5de408787 100644 --- a/langstream-runtime/langstream-runtime-tester/src/main/docker/Dockerfile +++ b/langstream-runtime/langstream-runtime-tester/src/main/docker/Dockerfile @@ -40,6 +40,7 @@ RUN mkdir /minio \ ADD maven/lib /app/tester/lib RUN rm -f /app/tester/lib/*netty* ADD maven/entrypoint.sh /app/entrypoint.sh +ADD maven/logback.xml /app/logback.xml WORKDIR /app diff --git a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/LocalApplicationRunner.java b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/LocalApplicationRunner.java index e9f778d86..3a9ff189f 100644 --- a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/LocalApplicationRunner.java +++ b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/LocalApplicationRunner.java @@ -28,7 +28,12 @@ import ai.langstream.runtime.agent.AgentRunner; import ai.langstream.runtime.agent.api.AgentInfo; import ai.langstream.runtime.api.agent.RuntimePodConfiguration; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import io.fabric8.kubernetes.api.model.Secret; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; @@ -44,11 +49,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; @Slf4j public class LocalApplicationRunner implements AutoCloseable, InMemoryApplicationStore.AgentInfoCollector { + private static final ObjectMapper MAPPER = new ObjectMapper(new YAMLFactory()); + final KubeTestServer kubeServer = new KubeTestServer(); final InMemoryApplicationStore applicationStore = new InMemoryApplicationStore(); final ApplicationDeployer applicationDeployer; @@ -162,11 +170,13 @@ public AgentRunResult executeAgentRunners(ApplicationRuntime runtime, List futures = new ArrayList<>(); for (RuntimePodConfiguration podConfiguration : pods) { + Path podRuntimeConfigurationFile = persistPodConfiguration(podConfiguration); CompletableFuture handle = new CompletableFuture<>(); futures.add(handle); executorService.submit( @@ -196,7 +207,7 @@ public AgentRunResult executeAgentRunners(ApplicationRuntime runtime, List