Skip to content

Commit

Permalink
BIGTOP-4152: Add task log gRPC service to read agent logs (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinw66 authored Jul 8, 2024
1 parent a35ffde commit d053a9d
Show file tree
Hide file tree
Showing 19 changed files with 272 additions and 156 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bigtop.manager.agent.cache;

import java.util.ArrayList;
import java.util.List;

public class Caches {
public static final List<Long> RUNNING_TASKS = new ArrayList<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public CommandReply execute(CommandRequest request) {
}

protected void doExecuteOnDevMode() {
log.info("Running command on dev mode");
commandReplyBuilder.setCode(MessageConstants.SUCCESS_CODE);
commandReplyBuilder.setResult(ShellResult.success().getResult());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public CommandType getCommandType() {

@Override
public void doExecute() {
log.info("[agent executeTask] taskEvent is: {}", commandRequest);
ShellResult shellResult = runChecks(List.of(this::checkTimeSync));
commandReplyBuilder.setCode(shellResult.getExitCode());
commandReplyBuilder.setResult(shellResult.getResult());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bigtop.manager.agent.service;

import org.apache.bigtop.manager.agent.cache.Caches;
import org.apache.bigtop.manager.agent.executor.CommandExecutor;
import org.apache.bigtop.manager.agent.executor.CommandExecutors;
import org.apache.bigtop.manager.grpc.generated.CommandReply;
Expand All @@ -39,6 +40,7 @@ public class CommandServiceGrpcImpl extends CommandServiceGrpc.CommandServiceImp
public void exec(CommandRequest request, StreamObserver<CommandReply> responseObserver) {
try {
MDC.put("taskId", String.valueOf(request.getTaskId()));
Caches.RUNNING_TASKS.add(request.getTaskId());
CommandExecutor commandExecutor = CommandExecutors.getCommandExecutor(request.getType());
CommandReply reply = commandExecutor.execute(request);
responseObserver.onNext(reply);
Expand All @@ -48,6 +50,7 @@ public void exec(CommandRequest request, StreamObserver<CommandReply> responseOb
Status status = Status.UNKNOWN.withDescription(e.getMessage());
responseObserver.onError(status.asRuntimeException());
} finally {
Caches.RUNNING_TASKS.remove(request.getTaskId());
MDC.clear();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bigtop.manager.agent.service;

import org.apache.bigtop.manager.agent.cache.Caches;
import org.apache.bigtop.manager.grpc.generated.TaskLogReply;
import org.apache.bigtop.manager.grpc.generated.TaskLogRequest;
import org.apache.bigtop.manager.grpc.generated.TaskLogServiceGrpc;

import org.apache.commons.lang3.SystemUtils;

import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;

import java.io.File;
import java.io.RandomAccessFile;

@Slf4j
@GrpcService
public class TaskLogServiceGrpcImpl extends TaskLogServiceGrpc.TaskLogServiceImplBase {

@Override
public void getLog(TaskLogRequest request, StreamObserver<TaskLogReply> responseObserver) {
String path = getLogFilePath(request.getTaskId());
try (RandomAccessFile file = new RandomAccessFile(path, "r")) {
// Read from beginning
long fileLength = file.length();
while (file.getFilePointer() < fileLength) {
String line = file.readLine();
if (line != null) {
responseObserver.onNext(
TaskLogReply.newBuilder().setText(line).build());
}
}

// Waiting for new logs
boolean isTaskRunning = true;
while (isTaskRunning) {
isTaskRunning = Caches.RUNNING_TASKS.contains(request.getTaskId());
readNewLogs(file, responseObserver);
Thread.sleep(1000);
}
} catch (Exception e) {
String errMsg = "Error when reading task log: " + e.getMessage() + ", please fix it";
responseObserver.onNext(TaskLogReply.newBuilder().setText(errMsg).build());

log.error("Error reading task log", e);
Status status = Status.UNKNOWN.withDescription(e.getMessage());
responseObserver.onError(status.asRuntimeException());
}
}

private void readNewLogs(RandomAccessFile file, StreamObserver<TaskLogReply> responseObserver) throws Exception {
long position = file.getFilePointer();
if (position < file.length()) {
// Read new logs
file.seek(position);
if (file.readByte() != '\n') {
file.seek(position);
}

String line = file.readLine();
while (line != null) {
responseObserver.onNext(TaskLogReply.newBuilder().setText(line).build());
line = file.readLine();
}
}
}

private String getLogFilePath(Long taskId) {
String baseDir;
if (SystemUtils.IS_OS_WINDOWS) {
baseDir = SystemUtils.getUserDir().getPath();
} else {
File file = new File(this.getClass()
.getProtectionDomain()
.getCodeSource()
.getLocation()
.getPath());
baseDir = file.getParentFile().getParentFile().getPath();
}

return baseDir + File.separator + "tasklogs" + File.separator + "task-" + taskId + ".log";
}
}
1 change: 0 additions & 1 deletion bigtop-manager-grpc/src/main/resources/proto/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ enum CommandType {
COMPONENT = 0;
HOST_CHECK = 1;
CACHE_DISTRIBUTE = 2;
TASK_LOG = 3;
}

message CommandRequest {
Expand Down
35 changes: 35 additions & 0 deletions bigtop-manager-grpc/src/main/resources/proto/task_log.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.apache.bigtop.manager.grpc.generated";
option java_outer_classname = "TaskLogProto";

service TaskLogService {
rpc GetLog (TaskLogRequest) returns (stream TaskLogReply) {}
}

message TaskLogRequest {
int64 task_id = 1;
}

message TaskLogReply {
string text = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.bigtop.manager.grpc.utils.ProtobufUtil;
import org.apache.bigtop.manager.server.command.stage.factory.StageContext;
import org.apache.bigtop.manager.server.grpc.GrpcClient;
import org.apache.bigtop.manager.server.service.CommandLogService;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -42,9 +41,6 @@
@Slf4j
public abstract class AbstractStageRunner implements StageRunner {

@Resource
private CommandLogService commandLogService;

@Resource
protected StageRepository stageRepository;

Expand Down Expand Up @@ -81,7 +77,6 @@ public void run() {
CommandRequest request = builder.build();

futures.add(CompletableFuture.supplyAsync(() -> {
commandLogService.onLogStarted(task.getId(), task.getHostname());
CommandServiceGrpc.CommandServiceBlockingStub stub = GrpcClient.getBlockingStub(
task.getHostname(), CommandServiceGrpc.CommandServiceBlockingStub.class);
CommandReply reply = stub.exec(request);
Expand All @@ -90,14 +85,11 @@ public void run() {
boolean taskSuccess = reply != null && reply.getCode() == MessageConstants.SUCCESS_CODE;

if (taskSuccess) {
commandLogService.onLogReceived(task.getId(), task.getHostname(), "Success!");
onTaskSuccess(task);
} else {
commandLogService.onLogReceived(task.getId(), task.getHostname(), "Failed!");
onTaskFailure(task);
}

commandLogService.onLogEnded(task.getId(), task.getHostname());
return taskSuccess;
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.bigtop.manager.server.controller;

import org.apache.bigtop.manager.server.service.CommandLogService;
import org.apache.bigtop.manager.server.service.TaskLogService;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
Expand All @@ -32,37 +32,35 @@
import reactor.core.publisher.FluxSink;

import jakarta.annotation.Resource;
import java.io.IOException;

@Tag(name = "Sse Controller")
@RestController
@RequestMapping("/sse/clusters/{clusterId}")
public class SseController {

@Resource
private CommandLogService commandLogService;
private TaskLogService taskLogService;

@Operation(summary = "get task log", description = "Get a task log")
@GetMapping("/tasks/{id}/log")
public SseEmitter log(@PathVariable Long id, @PathVariable Long clusterId) {
SseEmitter emitter = new SseEmitter();
// Default timeout to 5 minutes
SseEmitter emitter = new SseEmitter(5 * 60 * 1000L);

Flux<String> flux =
Flux.create(sink -> commandLogService.registerSink(id, sink), FluxSink.OverflowStrategy.BUFFER);
Flux.create(sink -> taskLogService.registerSink(id, sink), FluxSink.OverflowStrategy.BUFFER);
flux.subscribe(
s -> {
try {
emitter.send(s);
} catch (IOException e) {
commandLogService.unregisterSink(id);
} catch (Exception e) {
emitter.completeWithError(e);
}
},
Throwable::printStackTrace,
emitter::complete);

emitter.onTimeout(emitter::complete);
emitter.onCompletion(() -> commandLogService.unregisterSink(id));
return emitter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private static ManagedChannel getChannel(String host) {
if (isChannelAlive(host)) {
return CHANNELS.get(host);
} else {
throw new ApiException(ApiExceptionEnum.HOST_NOT_CONNECTED, host);
return createChannel(host);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ public void execute() {
private void getHostInfo(Host host) {
String hostname = host.getHostname();
try {
if (!GrpcClient.isChannelAlive(hostname)) {
GrpcClient.createChannel(hostname);
}

HostInfoServiceGrpc.HostInfoServiceBlockingStub stub =
GrpcClient.getBlockingStub(hostname, HostInfoServiceGrpc.HostInfoServiceBlockingStub.class);
HostInfoReply reply = stub.getHostInfo(HostInfoRequest.newBuilder().build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,7 @@

import reactor.core.publisher.FluxSink;

public interface CommandLogService {
public interface TaskLogService {

void registerSink(Long taskId, FluxSink<String> sink);

void unregisterSink(Long taskId);

void onLogStarted(Long taskId, String hostname);

void onLogReceived(Long taskId, String hostname, String log);

void onLogEnded(Long taskId, String hostname);
}
Loading

0 comments on commit d053a9d

Please sign in to comment.