From d053a9d98d2002764685629940b1631a5053fda7 Mon Sep 17 00:00:00 2001 From: Zhiguo Wu Date: Mon, 8 Jul 2024 09:48:43 +0800 Subject: [PATCH] BIGTOP-4152: Add task log gRPC service to read agent logs (#14) --- .../bigtop/manager/agent/cache/Caches.java | 26 +++++ .../executor/AbstractCommandExecutor.java | 1 + .../executor/HostCheckCommandExecutor.java | 1 + .../agent/service/CommandServiceGrpcImpl.java | 3 + .../agent/service/TaskLogServiceGrpcImpl.java | 103 ++++++++++++++++++ .../src/main/resources/proto/command.proto | 1 - .../src/main/resources/proto/task_log.proto | 35 ++++++ .../stage/runner/AbstractStageRunner.java | 8 -- .../server/controller/SseController.java | 14 +-- .../manager/server/grpc/GrpcClient.java | 2 +- .../server/scheduler/HostInfoScheduler.java | 4 - ...andLogService.java => TaskLogService.java} | 10 +- .../service/impl/CommandLogServiceImpl.java | 91 ---------------- .../service/impl/TaskLogServiceImpl.java | 81 ++++++++++++++ .../main/resources/ddl/DaMeng-DDL-CREATE.sql | 21 ---- .../nop/v1_0_0/kafka/KafkaBrokerScript.java | 12 +- .../zookeeper/ZookeeperClientScript.java | 4 +- .../zookeeper/ZookeeperServerScript.java | 10 +- .../src/components/job-info/job.vue | 1 + 19 files changed, 272 insertions(+), 156 deletions(-) create mode 100644 bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/cache/Caches.java create mode 100644 bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/TaskLogServiceGrpcImpl.java create mode 100644 bigtop-manager-grpc/src/main/resources/proto/task_log.proto rename bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/{CommandLogService.java => TaskLogService.java} (79%) delete mode 100644 bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/CommandLogServiceImpl.java create mode 100644 bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/TaskLogServiceImpl.java diff --git a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/cache/Caches.java b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/cache/Caches.java new file mode 100644 index 00000000..568a057d --- /dev/null +++ b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/cache/Caches.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.agent.cache; + +import java.util.ArrayList; +import java.util.List; + +public class Caches { + public static final List RUNNING_TASKS = new ArrayList<>(); +} diff --git a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/AbstractCommandExecutor.java b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/AbstractCommandExecutor.java index 8a44bb46..fda8b755 100644 --- a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/AbstractCommandExecutor.java +++ b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/AbstractCommandExecutor.java @@ -60,6 +60,7 @@ public CommandReply execute(CommandRequest request) { } protected void doExecuteOnDevMode() { + log.info("Running command on dev mode"); commandReplyBuilder.setCode(MessageConstants.SUCCESS_CODE); commandReplyBuilder.setResult(ShellResult.success().getResult()); } diff --git a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/HostCheckCommandExecutor.java b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/HostCheckCommandExecutor.java index 82eb1e93..90b2f437 100644 --- a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/HostCheckCommandExecutor.java +++ b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/HostCheckCommandExecutor.java @@ -44,6 +44,7 @@ public CommandType getCommandType() { @Override public void doExecute() { + log.info("[agent executeTask] taskEvent is: {}", commandRequest); ShellResult shellResult = runChecks(List.of(this::checkTimeSync)); commandReplyBuilder.setCode(shellResult.getExitCode()); commandReplyBuilder.setResult(shellResult.getResult()); diff --git a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/CommandServiceGrpcImpl.java b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/CommandServiceGrpcImpl.java index 42591bab..ed1f829f 100644 --- a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/CommandServiceGrpcImpl.java +++ b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/CommandServiceGrpcImpl.java @@ -18,6 +18,7 @@ */ package org.apache.bigtop.manager.agent.service; +import org.apache.bigtop.manager.agent.cache.Caches; import org.apache.bigtop.manager.agent.executor.CommandExecutor; import org.apache.bigtop.manager.agent.executor.CommandExecutors; import org.apache.bigtop.manager.grpc.generated.CommandReply; @@ -39,6 +40,7 @@ public class CommandServiceGrpcImpl extends CommandServiceGrpc.CommandServiceImp public void exec(CommandRequest request, StreamObserver responseObserver) { try { MDC.put("taskId", String.valueOf(request.getTaskId())); + Caches.RUNNING_TASKS.add(request.getTaskId()); CommandExecutor commandExecutor = CommandExecutors.getCommandExecutor(request.getType()); CommandReply reply = commandExecutor.execute(request); responseObserver.onNext(reply); @@ -48,6 +50,7 @@ public void exec(CommandRequest request, StreamObserver responseOb Status status = Status.UNKNOWN.withDescription(e.getMessage()); responseObserver.onError(status.asRuntimeException()); } finally { + Caches.RUNNING_TASKS.remove(request.getTaskId()); MDC.clear(); } } diff --git a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/TaskLogServiceGrpcImpl.java b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/TaskLogServiceGrpcImpl.java new file mode 100644 index 00000000..45855272 --- /dev/null +++ b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/TaskLogServiceGrpcImpl.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.agent.service; + +import org.apache.bigtop.manager.agent.cache.Caches; +import org.apache.bigtop.manager.grpc.generated.TaskLogReply; +import org.apache.bigtop.manager.grpc.generated.TaskLogRequest; +import org.apache.bigtop.manager.grpc.generated.TaskLogServiceGrpc; + +import org.apache.commons.lang3.SystemUtils; + +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; +import net.devh.boot.grpc.server.service.GrpcService; + +import java.io.File; +import java.io.RandomAccessFile; + +@Slf4j +@GrpcService +public class TaskLogServiceGrpcImpl extends TaskLogServiceGrpc.TaskLogServiceImplBase { + + @Override + public void getLog(TaskLogRequest request, StreamObserver responseObserver) { + String path = getLogFilePath(request.getTaskId()); + try (RandomAccessFile file = new RandomAccessFile(path, "r")) { + // Read from beginning + long fileLength = file.length(); + while (file.getFilePointer() < fileLength) { + String line = file.readLine(); + if (line != null) { + responseObserver.onNext( + TaskLogReply.newBuilder().setText(line).build()); + } + } + + // Waiting for new logs + boolean isTaskRunning = true; + while (isTaskRunning) { + isTaskRunning = Caches.RUNNING_TASKS.contains(request.getTaskId()); + readNewLogs(file, responseObserver); + Thread.sleep(1000); + } + } catch (Exception e) { + String errMsg = "Error when reading task log: " + e.getMessage() + ", please fix it"; + responseObserver.onNext(TaskLogReply.newBuilder().setText(errMsg).build()); + + log.error("Error reading task log", e); + Status status = Status.UNKNOWN.withDescription(e.getMessage()); + responseObserver.onError(status.asRuntimeException()); + } + } + + private void readNewLogs(RandomAccessFile file, StreamObserver responseObserver) throws Exception { + long position = file.getFilePointer(); + if (position < file.length()) { + // Read new logs + file.seek(position); + if (file.readByte() != '\n') { + file.seek(position); + } + + String line = file.readLine(); + while (line != null) { + responseObserver.onNext(TaskLogReply.newBuilder().setText(line).build()); + line = file.readLine(); + } + } + } + + private String getLogFilePath(Long taskId) { + String baseDir; + if (SystemUtils.IS_OS_WINDOWS) { + baseDir = SystemUtils.getUserDir().getPath(); + } else { + File file = new File(this.getClass() + .getProtectionDomain() + .getCodeSource() + .getLocation() + .getPath()); + baseDir = file.getParentFile().getParentFile().getPath(); + } + + return baseDir + File.separator + "tasklogs" + File.separator + "task-" + taskId + ".log"; + } +} diff --git a/bigtop-manager-grpc/src/main/resources/proto/command.proto b/bigtop-manager-grpc/src/main/resources/proto/command.proto index c86c571a..e2c09a43 100644 --- a/bigtop-manager-grpc/src/main/resources/proto/command.proto +++ b/bigtop-manager-grpc/src/main/resources/proto/command.proto @@ -30,7 +30,6 @@ enum CommandType { COMPONENT = 0; HOST_CHECK = 1; CACHE_DISTRIBUTE = 2; - TASK_LOG = 3; } message CommandRequest { diff --git a/bigtop-manager-grpc/src/main/resources/proto/task_log.proto b/bigtop-manager-grpc/src/main/resources/proto/task_log.proto new file mode 100644 index 00000000..af7bff4c --- /dev/null +++ b/bigtop-manager-grpc/src/main/resources/proto/task_log.proto @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "org.apache.bigtop.manager.grpc.generated"; +option java_outer_classname = "TaskLogProto"; + +service TaskLogService { + rpc GetLog (TaskLogRequest) returns (stream TaskLogReply) {} +} + +message TaskLogRequest { + int64 task_id = 1; +} + +message TaskLogReply { + string text = 1; +} \ No newline at end of file diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/runner/AbstractStageRunner.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/runner/AbstractStageRunner.java index f508fddd..30145da8 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/runner/AbstractStageRunner.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/runner/AbstractStageRunner.java @@ -30,7 +30,6 @@ import org.apache.bigtop.manager.grpc.utils.ProtobufUtil; import org.apache.bigtop.manager.server.command.stage.factory.StageContext; import org.apache.bigtop.manager.server.grpc.GrpcClient; -import org.apache.bigtop.manager.server.service.CommandLogService; import lombok.extern.slf4j.Slf4j; @@ -42,9 +41,6 @@ @Slf4j public abstract class AbstractStageRunner implements StageRunner { - @Resource - private CommandLogService commandLogService; - @Resource protected StageRepository stageRepository; @@ -81,7 +77,6 @@ public void run() { CommandRequest request = builder.build(); futures.add(CompletableFuture.supplyAsync(() -> { - commandLogService.onLogStarted(task.getId(), task.getHostname()); CommandServiceGrpc.CommandServiceBlockingStub stub = GrpcClient.getBlockingStub( task.getHostname(), CommandServiceGrpc.CommandServiceBlockingStub.class); CommandReply reply = stub.exec(request); @@ -90,14 +85,11 @@ public void run() { boolean taskSuccess = reply != null && reply.getCode() == MessageConstants.SUCCESS_CODE; if (taskSuccess) { - commandLogService.onLogReceived(task.getId(), task.getHostname(), "Success!"); onTaskSuccess(task); } else { - commandLogService.onLogReceived(task.getId(), task.getHostname(), "Failed!"); onTaskFailure(task); } - commandLogService.onLogEnded(task.getId(), task.getHostname()); return taskSuccess; })); } diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/SseController.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/SseController.java index 9ffccefd..7ed8642d 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/SseController.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/SseController.java @@ -18,7 +18,7 @@ */ package org.apache.bigtop.manager.server.controller; -import org.apache.bigtop.manager.server.service.CommandLogService; +import org.apache.bigtop.manager.server.service.TaskLogService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; @@ -32,7 +32,6 @@ import reactor.core.publisher.FluxSink; import jakarta.annotation.Resource; -import java.io.IOException; @Tag(name = "Sse Controller") @RestController @@ -40,21 +39,21 @@ public class SseController { @Resource - private CommandLogService commandLogService; + private TaskLogService taskLogService; @Operation(summary = "get task log", description = "Get a task log") @GetMapping("/tasks/{id}/log") public SseEmitter log(@PathVariable Long id, @PathVariable Long clusterId) { - SseEmitter emitter = new SseEmitter(); + // Default timeout to 5 minutes + SseEmitter emitter = new SseEmitter(5 * 60 * 1000L); Flux flux = - Flux.create(sink -> commandLogService.registerSink(id, sink), FluxSink.OverflowStrategy.BUFFER); + Flux.create(sink -> taskLogService.registerSink(id, sink), FluxSink.OverflowStrategy.BUFFER); flux.subscribe( s -> { try { emitter.send(s); - } catch (IOException e) { - commandLogService.unregisterSink(id); + } catch (Exception e) { emitter.completeWithError(e); } }, @@ -62,7 +61,6 @@ public SseEmitter log(@PathVariable Long id, @PathVariable Long clusterId) { emitter::complete); emitter.onTimeout(emitter::complete); - emitter.onCompletion(() -> commandLogService.unregisterSink(id)); return emitter; } } diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/grpc/GrpcClient.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/grpc/GrpcClient.java index 61422456..f759e16a 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/grpc/GrpcClient.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/grpc/GrpcClient.java @@ -122,7 +122,7 @@ private static ManagedChannel getChannel(String host) { if (isChannelAlive(host)) { return CHANNELS.get(host); } else { - throw new ApiException(ApiExceptionEnum.HOST_NOT_CONNECTED, host); + return createChannel(host); } } diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/HostInfoScheduler.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/HostInfoScheduler.java index 81705abe..99442468 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/HostInfoScheduler.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/HostInfoScheduler.java @@ -54,10 +54,6 @@ public void execute() { private void getHostInfo(Host host) { String hostname = host.getHostname(); try { - if (!GrpcClient.isChannelAlive(hostname)) { - GrpcClient.createChannel(hostname); - } - HostInfoServiceGrpc.HostInfoServiceBlockingStub stub = GrpcClient.getBlockingStub(hostname, HostInfoServiceGrpc.HostInfoServiceBlockingStub.class); HostInfoReply reply = stub.getHostInfo(HostInfoRequest.newBuilder().build()); diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/TaskLogService.java similarity index 79% rename from bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java rename to bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/TaskLogService.java index e0bcdfd9..cc1bee72 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/TaskLogService.java @@ -20,15 +20,7 @@ import reactor.core.publisher.FluxSink; -public interface CommandLogService { +public interface TaskLogService { void registerSink(Long taskId, FluxSink sink); - - void unregisterSink(Long taskId); - - void onLogStarted(Long taskId, String hostname); - - void onLogReceived(Long taskId, String hostname, String log); - - void onLogEnded(Long taskId, String hostname); } diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/CommandLogServiceImpl.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/CommandLogServiceImpl.java deleted file mode 100644 index 945eb47d..00000000 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/CommandLogServiceImpl.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.bigtop.manager.server.service.impl; - -import org.apache.bigtop.manager.server.service.CommandLogService; - -import org.apache.commons.collections4.CollectionUtils; - -import org.springframework.stereotype.Service; - -import reactor.core.publisher.FluxSink; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -@Service -public class CommandLogServiceImpl implements CommandLogService { - - private final Map> taskSinks = new HashMap<>(); - - private final Map> logs = new HashMap<>(); - - public void registerSink(Long taskId, FluxSink sink) { - List list = logs.get(taskId); - if (CollectionUtils.isNotEmpty(list)) { - synchronized (list) { - taskSinks.put(taskId, sink); - for (String log : list) { - sink.next(log); - } - } - } else { - // Task already completed, get logs from database - sink.next("Task finished, please check the log details on agent machine."); - sink.complete(); - } - } - - public void unregisterSink(Long taskId) { - taskSinks.remove(taskId); - } - - @Override - public void onLogStarted(Long taskId, String hostname) { - logs.put(taskId, new ArrayList<>()); - } - - @Override - public void onLogReceived(Long taskId, String hostname, String log) { - List list = logs.get(taskId); - - synchronized (list) { - list.add(log); - FluxSink sink = taskSinks.get(taskId); - if (sink != null) { - sink.next(log); - } - } - } - - @Override - public void onLogEnded(Long taskId, String hostname) { - List list = logs.get(taskId); - synchronized (list) { - FluxSink sink = taskSinks.remove(taskId); - if (sink != null) { - sink.complete(); - } - } - - logs.remove(taskId); - } -} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/TaskLogServiceImpl.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/TaskLogServiceImpl.java new file mode 100644 index 00000000..7d91360a --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/TaskLogServiceImpl.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.service.impl; + +import org.apache.bigtop.manager.common.enums.JobState; +import org.apache.bigtop.manager.dao.entity.Task; +import org.apache.bigtop.manager.dao.repository.TaskRepository; +import org.apache.bigtop.manager.grpc.generated.TaskLogReply; +import org.apache.bigtop.manager.grpc.generated.TaskLogRequest; +import org.apache.bigtop.manager.grpc.generated.TaskLogServiceGrpc; +import org.apache.bigtop.manager.server.grpc.GrpcClient; +import org.apache.bigtop.manager.server.service.TaskLogService; + +import org.springframework.stereotype.Service; + +import io.grpc.stub.StreamObserver; +import reactor.core.publisher.FluxSink; + +import jakarta.annotation.Resource; + +@Service +public class TaskLogServiceImpl implements TaskLogService { + + @Resource + private TaskRepository taskRepository; + + public void registerSink(Long taskId, FluxSink sink) { + Task task = taskRepository.getReferenceById(taskId); + String hostname = task.getHostname(); + + if (task.getState() == JobState.PENDING || task.getState() == JobState.CANCELED) { + new Thread(() -> { + sink.next("There is no log when task is in status: " + + task.getState().name().toLowerCase() + + ", please reopen the window when status changed"); + sink.complete(); + }) + .start(); + } else { + TaskLogServiceGrpc.TaskLogServiceStub asyncStub = + GrpcClient.getAsyncStub(hostname, TaskLogServiceGrpc.TaskLogServiceStub.class); + TaskLogRequest request = + TaskLogRequest.newBuilder().setTaskId(taskId).build(); + asyncStub.getLog(request, new LogReader(sink)); + } + } + + private record LogReader(FluxSink sink) implements StreamObserver { + + @Override + public void onNext(TaskLogReply reply) { + sink.next(reply.getText()); + } + + @Override + public void onError(Throwable t) { + sink.error(t); + } + + @Override + public void onCompleted() { + sink.complete(); + } + } +} diff --git a/bigtop-manager-server/src/main/resources/ddl/DaMeng-DDL-CREATE.sql b/bigtop-manager-server/src/main/resources/ddl/DaMeng-DDL-CREATE.sql index d86816e6..f2eabdd9 100644 --- a/bigtop-manager-server/src/main/resources/ddl/DaMeng-DDL-CREATE.sql +++ b/bigtop-manager-server/src/main/resources/ddl/DaMeng-DDL-CREATE.sql @@ -62,27 +62,6 @@ CREATE UNIQUE INDEX INDEX33555701 ON "bigtop_manager"."cluster" ("cluster_name") CREATE INDEX "idx_cluster_stack_id" ON "bigtop_manager"."cluster" ("stack_id"); --- "bigtop_manager"."command_log" definition - -CREATE TABLE "bigtop_manager"."command_log" ( - "id" BIGINT NOT NULL, - "create_by" BIGINT NULL, - "create_time" DATETIME NULL, - "update_by" BIGINT NULL, - "update_time" DATETIME NULL, - "hostname" VARCHAR(255) NULL, - "result" CLOB NULL, - "job_id" BIGINT NULL, - "stage_id" BIGINT NULL, - "task_id" BIGINT NULL, - CONSTRAINT CONS134218865 PRIMARY KEY ("id") -); -CREATE UNIQUE INDEX INDEX33555669 ON "bigtop_manager"."command_log" ("id"); -CREATE INDEX "idx_cl_job_id" ON "bigtop_manager"."command_log" ("job_id"); -CREATE INDEX "idx_cl_stage_id" ON "bigtop_manager"."command_log" ("stage_id"); -CREATE INDEX "idx_cl_task_id" ON "bigtop_manager"."command_log" ("task_id"); - - -- "bigtop_manager"."component" definition CREATE TABLE "bigtop_manager"."component" ( diff --git a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/kafka/KafkaBrokerScript.java b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/kafka/KafkaBrokerScript.java index fc173c32..dbb18eb5 100644 --- a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/kafka/KafkaBrokerScript.java +++ b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/kafka/KafkaBrokerScript.java @@ -31,36 +31,36 @@ public class KafkaBrokerScript implements Script { @Override public ShellResult install(Params params) { - log.info("Default to success in dev mode"); + log.info("Default to success on dev mode"); return ShellResult.success(); } @Override public ShellResult configure(Params params) { - log.info("Default to success in dev mode"); + log.info("Default to success on dev mode"); return ShellResult.success(); } @Override public ShellResult start(Params params) { - log.info("Default to success in dev mode"); + log.info("Default to success on dev mode"); return ShellResult.success(); } @Override public ShellResult stop(Params params) { - log.info("Default to success in dev mode"); + log.info("Default to success on dev mode"); return ShellResult.success(); } @Override public ShellResult status(Params params) { - log.info("Default to success in dev mode"); + log.info("Default to success on dev mode"); return ShellResult.success(); } public ShellResult test(Params params) { - log.info("Default to success in dev mode"); + log.info("Default to success on dev mode"); return ShellResult.success(); } } diff --git a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperClientScript.java b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperClientScript.java index 08e3bf06..fced816e 100644 --- a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperClientScript.java +++ b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperClientScript.java @@ -32,13 +32,13 @@ public class ZookeeperClientScript implements ClientScript { @Override public ShellResult install(Params params) { - log.info("Default to success in dev mode"); + log.info("Default to success on dev mode"); return ShellResult.success(); } @Override public ShellResult configure(Params params) { - log.info("Default to success in dev mode"); + log.info("Default to success on dev mode"); return ShellResult.success(); } } diff --git a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperServerScript.java b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperServerScript.java index 48d75dac..24871503 100644 --- a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperServerScript.java +++ b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperServerScript.java @@ -31,31 +31,31 @@ public class ZookeeperServerScript implements Script { @Override public ShellResult install(Params params) { - log.info("Default to success in dev mode"); + log.info("Default to success on dev mode"); return ShellResult.success(); } @Override public ShellResult configure(Params params) { - log.info("Default to success in dev mode"); + log.info("Default to success on dev mode"); return ShellResult.success(); } @Override public ShellResult start(Params params) { - log.info("Default to success in dev mode"); + log.info("Default to success on dev mode"); return ShellResult.success(); } @Override public ShellResult stop(Params params) { - log.info("Default to success in dev mode"); + log.info("Default to success on dev mode"); return ShellResult.success(); } @Override public ShellResult status(Params params) { - log.info("Default to success in dev mode"); + log.info("Default to success on dev mode"); return ShellResult.success(); } } diff --git a/bigtop-manager-ui/src/components/job-info/job.vue b/bigtop-manager-ui/src/components/job-info/job.vue index 7abb2b7a..9ca85680 100644 --- a/bigtop-manager-ui/src/components/job-info/job.vue +++ b/bigtop-manager-ui/src/components/job-info/job.vue @@ -223,6 +223,7 @@ const clickTask = (record: TaskVO) => { breadcrumbs.value.push(record) currTaskInfo.value = record + logTextOrigin.value = '' getLogsInfo(record.id) }