Skip to content

Commit

Permalink
BIGTOP-4144: Introduce gRPC for server-agent communication (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinw66 authored Jul 5, 2024
1 parent 2c82695 commit a35ffde
Show file tree
Hide file tree
Showing 80 changed files with 868 additions and 1,654 deletions.
1 change: 0 additions & 1 deletion .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ header:
- 'mvnw.cmd'
- 'LICENSE'
- 'pnpm-lock.yaml'
- '**/*.proto'
- '**/*.txt'

comment: on-failure
13 changes: 9 additions & 4 deletions bigtop-manager-agent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,19 @@
<dependency>
<groupId>org.apache.bigtop</groupId>
<artifactId>bigtop-manager-common</artifactId>
<version>${revision}</version>
</dependency>

<dependency>
<groupId>org.apache.bigtop</groupId>
<artifactId>bigtop-manager-grpc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.bigtop</groupId>
<artifactId>bigtop-manager-stack-core</artifactId>
<version>${revision}</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
Expand Down Expand Up @@ -108,6 +109,10 @@
<artifactId>oshi-core</artifactId>
</dependency>

<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-server-spring-boot-starter</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.MultiGauge;

@EnableAsync
@EnableScheduling
@SpringBootApplication(scanBasePackages = {"org.apache.bigtop.manager.agent", "org.apache.bigtop.manager.common"})
public class AgentApplication {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,25 @@
*/
package org.apache.bigtop.manager.agent.executor;

import org.apache.bigtop.manager.agent.holder.SpringContextHolder;
import org.apache.bigtop.manager.common.constants.MessageConstants;
import org.apache.bigtop.manager.common.message.entity.command.CommandRequestMessage;
import org.apache.bigtop.manager.common.message.entity.command.CommandResponseMessage;
import org.apache.bigtop.manager.common.shell.ShellResult;
import org.apache.bigtop.manager.common.utils.Environments;
import org.apache.bigtop.manager.grpc.generated.CommandReply;
import org.apache.bigtop.manager.grpc.generated.CommandRequest;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class AbstractCommandExecutor implements CommandExecutor {

protected CommandRequestMessage commandRequestMessage;
protected CommandRequest commandRequest;

protected CommandResponseMessage commandResponseMessage;
protected CommandReply.Builder commandReplyBuilder;

@Override
public void execute(CommandRequestMessage message) {
commandRequestMessage = message;
commandResponseMessage = new CommandResponseMessage();
public CommandReply execute(CommandRequest request) {
commandRequest = request;
commandReplyBuilder = CommandReply.newBuilder();

try {
if (Environments.isDevMode()) {
Expand All @@ -46,24 +45,23 @@ public void execute(CommandRequestMessage message) {
doExecute();
}
} catch (Exception e) {
commandResponseMessage.setCode(MessageConstants.FAIL_CODE);
commandResponseMessage.setResult(e.getMessage());
commandReplyBuilder.setCode(MessageConstants.FAIL_CODE);
commandReplyBuilder.setResult(e.getMessage());

log.error("Run command failed, {}", message, e);
log.error("Run command failed, {}", request, e);
}

commandResponseMessage.setCommandMessageType(message.getCommandMessageType());
commandResponseMessage.setMessageId(message.getMessageId());
commandResponseMessage.setHostname(message.getHostname());
commandResponseMessage.setTaskId(message.getTaskId());
commandResponseMessage.setStageId(message.getStageId());
commandResponseMessage.setJobId(message.getJobId());
SpringContextHolder.getAgentWebSocket().sendMessage(commandResponseMessage);
commandReplyBuilder.setType(request.getType());
commandReplyBuilder.setHostname(request.getHostname());
commandReplyBuilder.setTaskId(request.getTaskId());
commandReplyBuilder.setStageId(request.getStageId());
commandReplyBuilder.setJobId(request.getJobId());
return commandReplyBuilder.build();
}

protected void doExecuteOnDevMode() {
commandResponseMessage.setCode(MessageConstants.SUCCESS_CODE);
commandResponseMessage.setResult(ShellResult.success().getResult());
commandReplyBuilder.setCode(MessageConstants.SUCCESS_CODE);
commandReplyBuilder.setResult(ShellResult.success().getResult());
}

protected abstract void doExecute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import org.apache.bigtop.manager.common.constants.Constants;
import org.apache.bigtop.manager.common.constants.MessageConstants;
import org.apache.bigtop.manager.common.message.entity.command.CommandMessageType;
import org.apache.bigtop.manager.common.message.entity.payload.CacheMessagePayload;
import org.apache.bigtop.manager.common.utils.JsonUtils;
import org.apache.bigtop.manager.grpc.generated.CommandType;
import org.apache.bigtop.manager.stack.common.utils.linux.LinuxFileUtils;

import org.springframework.beans.factory.config.ConfigurableBeanFactory;
Expand All @@ -46,15 +46,15 @@
public class CacheDistributeCommandExecutor extends AbstractCommandExecutor {

@Override
public CommandMessageType getCommandMessageType() {
return CommandMessageType.CACHE_DISTRIBUTE;
public CommandType getCommandType() {
return CommandType.CACHE_DISTRIBUTE;
}

@Override
public void doExecute() {
CacheMessagePayload cacheMessagePayload =
JsonUtils.readFromString(commandRequestMessage.getMessagePayload(), CacheMessagePayload.class);
log.info("[agent executeTask] taskEvent is: {}", commandRequestMessage);
JsonUtils.readFromString(commandRequest.getPayload(), CacheMessagePayload.class);
log.info("[agent executeTask] taskEvent is: {}", commandRequest);
String cacheDir = Constants.STACK_CACHE_DIR;

LinuxFileUtils.createDirectories(cacheDir, "root", "root", "rwxr-xr-x", false);
Expand All @@ -67,8 +67,8 @@ public void doExecute() {
JsonUtils.writeToFile(cacheDir + REPOS_INFO, cacheMessagePayload.getRepoInfo());
JsonUtils.writeToFile(cacheDir + CLUSTER_INFO, cacheMessagePayload.getClusterInfo());

commandResponseMessage.setCode(MessageConstants.SUCCESS_CODE);
commandResponseMessage.setResult(
MessageFormat.format("Host [{0}] cached successful!!!", commandRequestMessage.getHostname()));
commandReplyBuilder.setCode(MessageConstants.SUCCESS_CODE);
commandReplyBuilder.setResult(
MessageFormat.format("Host [{0}] cached successful!!!", commandRequest.getHostname()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,24 @@
*/
package org.apache.bigtop.manager.agent.executor;

import org.apache.bigtop.manager.common.message.entity.command.CommandMessageType;
import org.apache.bigtop.manager.common.message.entity.command.CommandRequestMessage;
import org.apache.bigtop.manager.grpc.generated.CommandReply;
import org.apache.bigtop.manager.grpc.generated.CommandRequest;
import org.apache.bigtop.manager.grpc.generated.CommandType;

/**
* Interface for executing commands on agent.
*/
public interface CommandExecutor {

/**
* Get the type of the command message.
* @return CommandMessageType - the type of the command message.
* Get the type of the command.
* @return CommandType - the type of the command.
*/
CommandMessageType getCommandMessageType();
CommandType getCommandType();

/**
* Execute the command.
* @param message - the message for command that needs to be executed.
* @param request - the request for command that needs to be executed.
*/
void execute(CommandRequestMessage message);
CommandReply execute(CommandRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.bigtop.manager.agent.executor;

import org.apache.bigtop.manager.agent.holder.SpringContextHolder;
import org.apache.bigtop.manager.common.message.entity.command.CommandMessageType;
import org.apache.bigtop.manager.grpc.generated.CommandType;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -32,14 +32,14 @@ public class CommandExecutors {

private static final AtomicBoolean LOADED = new AtomicBoolean(false);

private static final Map<CommandMessageType, String> COMMAND_EXECUTORS = new HashMap<>();
private static final Map<CommandType, String> COMMAND_EXECUTORS = new HashMap<>();

public static CommandExecutor getCommandExecutor(CommandMessageType commandMessageType) {
public static CommandExecutor getCommandExecutor(CommandType commandType) {
if (!LOADED.get()) {
load();
}

String beanName = COMMAND_EXECUTORS.get(commandMessageType);
String beanName = COMMAND_EXECUTORS.get(commandType);
return SpringContextHolder.getApplicationContext().getBean(beanName, CommandExecutor.class);
}

Expand All @@ -52,16 +52,16 @@ private static synchronized void load() {
SpringContextHolder.getCommandExecutors().entrySet()) {
String beanName = entry.getKey();
CommandExecutor commandExecutor = entry.getValue();
if (COMMAND_EXECUTORS.containsKey(commandExecutor.getCommandMessageType())) {
log.error("Duplicate CommandExecutor with message type: {}", commandExecutor.getCommandMessageType());
if (COMMAND_EXECUTORS.containsKey(commandExecutor.getCommandType())) {
log.error("Duplicate CommandExecutor with message type: {}", commandExecutor.getCommandType());
continue;
}

COMMAND_EXECUTORS.put(commandExecutor.getCommandMessageType(), beanName);
COMMAND_EXECUTORS.put(commandExecutor.getCommandType(), beanName);
log.info(
"Load JobRunner: {} with identifier: {}",
commandExecutor.getClass().getName(),
commandExecutor.getCommandMessageType());
commandExecutor.getCommandType());
}

LOADED.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
*/
package org.apache.bigtop.manager.agent.executor;

import org.apache.bigtop.manager.common.message.entity.command.CommandMessageType;
import org.apache.bigtop.manager.common.message.entity.payload.CommandPayload;
import org.apache.bigtop.manager.common.shell.ShellResult;
import org.apache.bigtop.manager.common.utils.JsonUtils;
import org.apache.bigtop.manager.grpc.generated.CommandType;
import org.apache.bigtop.manager.stack.core.executor.StackExecutor;

import org.springframework.beans.factory.config.ConfigurableBeanFactory;
Expand All @@ -35,8 +35,8 @@
public class ComponentCommandExecutor extends AbstractCommandExecutor {

@Override
public CommandMessageType getCommandMessageType() {
return CommandMessageType.COMPONENT;
public CommandType getCommandType() {
return CommandType.COMPONENT;
}

@Override
Expand All @@ -46,12 +46,11 @@ protected void doExecuteOnDevMode() {

@Override
public void doExecute() {
CommandPayload commandPayload =
JsonUtils.readFromString(commandRequestMessage.getMessagePayload(), CommandPayload.class);
log.info("[agent executeTask] taskEvent is: {}", commandRequestMessage);
CommandPayload commandPayload = JsonUtils.readFromString(commandRequest.getPayload(), CommandPayload.class);
log.info("[agent executeTask] taskEvent is: {}", commandRequest);
ShellResult shellResult = StackExecutor.execute(commandPayload);

commandResponseMessage.setCode(shellResult.getExitCode());
commandResponseMessage.setResult(shellResult.getResult());
commandReplyBuilder.setCode(shellResult.getExitCode());
commandReplyBuilder.setResult(shellResult.getResult());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.bigtop.manager.agent.executor;

import org.apache.bigtop.manager.common.constants.MessageConstants;
import org.apache.bigtop.manager.common.message.entity.command.CommandMessageType;
import org.apache.bigtop.manager.common.shell.ShellResult;
import org.apache.bigtop.manager.common.utils.os.TimeSyncDetection;
import org.apache.bigtop.manager.grpc.generated.CommandType;

import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
Expand All @@ -38,15 +38,15 @@
public class HostCheckCommandExecutor extends AbstractCommandExecutor {

@Override
public CommandMessageType getCommandMessageType() {
return CommandMessageType.HOST_CHECK;
public CommandType getCommandType() {
return CommandType.HOST_CHECK;
}

@Override
public void doExecute() {
ShellResult shellResult = runChecks(List.of(this::checkTimeSync));
commandResponseMessage.setCode(shellResult.getExitCode());
commandResponseMessage.setResult(shellResult.getResult());
commandReplyBuilder.setCode(shellResult.getExitCode());
commandReplyBuilder.setResult(shellResult.getResult());
}

private ShellResult runChecks(List<Supplier<ShellResult>> suppliers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.bigtop.manager.agent.holder;

import org.apache.bigtop.manager.agent.executor.CommandExecutor;
import org.apache.bigtop.manager.agent.ws.AgentWebSocketHandler;

import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
Expand All @@ -41,10 +40,6 @@ public void setApplicationContext(@Nonnull ApplicationContext applicationContext
SpringContextHolder.applicationContext = applicationContext;
}

public static AgentWebSocketHandler getAgentWebSocket() {
return applicationContext.getBean(AgentWebSocketHandler.class);
}

public static Map<String, CommandExecutor> getCommandExecutors() {
return applicationContext.getBeansOfType(CommandExecutor.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

Expand All @@ -35,21 +33,19 @@

@Slf4j
@Component
@EnableScheduling
@EnableAsync
public class MetricsCollector {

@Qualifier("diskMultiGauge") @Resource
private MultiGauge diskMultiGauge;
@Resource
@Qualifier("diskMultiGauge") private MultiGauge diskMultiGauge;

@Qualifier("memMultiGauge") @Resource
private MultiGauge memMultiGauge;
@Resource
@Qualifier("memMultiGauge") private MultiGauge memMultiGauge;

@Qualifier("cpuMultiGauge") @Resource
private MultiGauge cpuMultiGauge;
@Resource
@Qualifier("cpuMultiGauge") private MultiGauge cpuMultiGauge;

@Qualifier("zookeeperMultiGauge") @Resource
private MultiGauge zookeeperMultiGauge;
@Resource
@Qualifier("zookeeperMultiGauge") private MultiGauge zookeeperMultiGauge;

@Async
@Scheduled(cron = "*/10 * * * * ?")
Expand Down
Loading

0 comments on commit a35ffde

Please sign in to comment.