Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BIGTOP-4144: Introduce gRPC for server-agent communication #13

Merged
merged 7 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ header:
- 'mvnw.cmd'
- 'LICENSE'
- 'pnpm-lock.yaml'
- '**/*.proto'
- '**/*.txt'

comment: on-failure
13 changes: 9 additions & 4 deletions bigtop-manager-agent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,19 @@
<dependency>
<groupId>org.apache.bigtop</groupId>
<artifactId>bigtop-manager-common</artifactId>
<version>${revision}</version>
</dependency>

<dependency>
<groupId>org.apache.bigtop</groupId>
<artifactId>bigtop-manager-grpc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.bigtop</groupId>
<artifactId>bigtop-manager-stack-core</artifactId>
<version>${revision}</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
Expand Down Expand Up @@ -108,6 +109,10 @@
<artifactId>oshi-core</artifactId>
</dependency>

<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-server-spring-boot-starter</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.MultiGauge;

@EnableAsync
@EnableScheduling
@SpringBootApplication(scanBasePackages = {"org.apache.bigtop.manager.agent", "org.apache.bigtop.manager.common"})
public class AgentApplication {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,25 @@
*/
package org.apache.bigtop.manager.agent.executor;

import org.apache.bigtop.manager.agent.holder.SpringContextHolder;
import org.apache.bigtop.manager.common.constants.MessageConstants;
import org.apache.bigtop.manager.common.message.entity.command.CommandRequestMessage;
import org.apache.bigtop.manager.common.message.entity.command.CommandResponseMessage;
import org.apache.bigtop.manager.common.shell.ShellResult;
import org.apache.bigtop.manager.common.utils.Environments;
import org.apache.bigtop.manager.grpc.generated.CommandReply;
import org.apache.bigtop.manager.grpc.generated.CommandRequest;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class AbstractCommandExecutor implements CommandExecutor {

protected CommandRequestMessage commandRequestMessage;
protected CommandRequest commandRequest;

protected CommandResponseMessage commandResponseMessage;
protected CommandReply.Builder commandReplyBuilder;

@Override
public void execute(CommandRequestMessage message) {
commandRequestMessage = message;
commandResponseMessage = new CommandResponseMessage();
public CommandReply execute(CommandRequest request) {
commandRequest = request;
commandReplyBuilder = CommandReply.newBuilder();

try {
if (Environments.isDevMode()) {
Expand All @@ -46,24 +45,23 @@ public void execute(CommandRequestMessage message) {
doExecute();
}
} catch (Exception e) {
commandResponseMessage.setCode(MessageConstants.FAIL_CODE);
commandResponseMessage.setResult(e.getMessage());
commandReplyBuilder.setCode(MessageConstants.FAIL_CODE);
commandReplyBuilder.setResult(e.getMessage());

log.error("Run command failed, {}", message, e);
log.error("Run command failed, {}", request, e);
}

commandResponseMessage.setCommandMessageType(message.getCommandMessageType());
commandResponseMessage.setMessageId(message.getMessageId());
commandResponseMessage.setHostname(message.getHostname());
commandResponseMessage.setTaskId(message.getTaskId());
commandResponseMessage.setStageId(message.getStageId());
commandResponseMessage.setJobId(message.getJobId());
SpringContextHolder.getAgentWebSocket().sendMessage(commandResponseMessage);
commandReplyBuilder.setType(request.getType());
commandReplyBuilder.setHostname(request.getHostname());
commandReplyBuilder.setTaskId(request.getTaskId());
commandReplyBuilder.setStageId(request.getStageId());
commandReplyBuilder.setJobId(request.getJobId());
return commandReplyBuilder.build();
}

protected void doExecuteOnDevMode() {
commandResponseMessage.setCode(MessageConstants.SUCCESS_CODE);
commandResponseMessage.setResult(ShellResult.success().getResult());
commandReplyBuilder.setCode(MessageConstants.SUCCESS_CODE);
commandReplyBuilder.setResult(ShellResult.success().getResult());
}

protected abstract void doExecute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import org.apache.bigtop.manager.common.constants.Constants;
import org.apache.bigtop.manager.common.constants.MessageConstants;
import org.apache.bigtop.manager.common.message.entity.command.CommandMessageType;
import org.apache.bigtop.manager.common.message.entity.payload.CacheMessagePayload;
import org.apache.bigtop.manager.common.utils.JsonUtils;
import org.apache.bigtop.manager.grpc.generated.CommandType;
import org.apache.bigtop.manager.stack.common.utils.linux.LinuxFileUtils;

import org.springframework.beans.factory.config.ConfigurableBeanFactory;
Expand All @@ -46,15 +46,15 @@
public class CacheDistributeCommandExecutor extends AbstractCommandExecutor {

@Override
public CommandMessageType getCommandMessageType() {
return CommandMessageType.CACHE_DISTRIBUTE;
public CommandType getCommandType() {
return CommandType.CACHE_DISTRIBUTE;
}

@Override
public void doExecute() {
CacheMessagePayload cacheMessagePayload =
JsonUtils.readFromString(commandRequestMessage.getMessagePayload(), CacheMessagePayload.class);
log.info("[agent executeTask] taskEvent is: {}", commandRequestMessage);
JsonUtils.readFromString(commandRequest.getPayload(), CacheMessagePayload.class);
log.info("[agent executeTask] taskEvent is: {}", commandRequest);
String cacheDir = Constants.STACK_CACHE_DIR;

LinuxFileUtils.createDirectories(cacheDir, "root", "root", "rwxr-xr-x", false);
Expand All @@ -67,8 +67,8 @@ public void doExecute() {
JsonUtils.writeToFile(cacheDir + REPOS_INFO, cacheMessagePayload.getRepoInfo());
JsonUtils.writeToFile(cacheDir + CLUSTER_INFO, cacheMessagePayload.getClusterInfo());

commandResponseMessage.setCode(MessageConstants.SUCCESS_CODE);
commandResponseMessage.setResult(
MessageFormat.format("Host [{0}] cached successful!!!", commandRequestMessage.getHostname()));
commandReplyBuilder.setCode(MessageConstants.SUCCESS_CODE);
commandReplyBuilder.setResult(
MessageFormat.format("Host [{0}] cached successful!!!", commandRequest.getHostname()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,24 @@
*/
package org.apache.bigtop.manager.agent.executor;

import org.apache.bigtop.manager.common.message.entity.command.CommandMessageType;
import org.apache.bigtop.manager.common.message.entity.command.CommandRequestMessage;
import org.apache.bigtop.manager.grpc.generated.CommandReply;
import org.apache.bigtop.manager.grpc.generated.CommandRequest;
import org.apache.bigtop.manager.grpc.generated.CommandType;

/**
* Interface for executing commands on agent.
*/
public interface CommandExecutor {

/**
* Get the type of the command message.
* @return CommandMessageType - the type of the command message.
* Get the type of the command.
* @return CommandType - the type of the command.
*/
CommandMessageType getCommandMessageType();
CommandType getCommandType();

/**
* Execute the command.
* @param message - the message for command that needs to be executed.
* @param request - the request for command that needs to be executed.
*/
void execute(CommandRequestMessage message);
CommandReply execute(CommandRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.bigtop.manager.agent.executor;

import org.apache.bigtop.manager.agent.holder.SpringContextHolder;
import org.apache.bigtop.manager.common.message.entity.command.CommandMessageType;
import org.apache.bigtop.manager.grpc.generated.CommandType;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -32,14 +32,14 @@ public class CommandExecutors {

private static final AtomicBoolean LOADED = new AtomicBoolean(false);

private static final Map<CommandMessageType, String> COMMAND_EXECUTORS = new HashMap<>();
private static final Map<CommandType, String> COMMAND_EXECUTORS = new HashMap<>();

public static CommandExecutor getCommandExecutor(CommandMessageType commandMessageType) {
public static CommandExecutor getCommandExecutor(CommandType commandType) {
if (!LOADED.get()) {
load();
}

String beanName = COMMAND_EXECUTORS.get(commandMessageType);
String beanName = COMMAND_EXECUTORS.get(commandType);
return SpringContextHolder.getApplicationContext().getBean(beanName, CommandExecutor.class);
}

Expand All @@ -52,16 +52,16 @@ private static synchronized void load() {
SpringContextHolder.getCommandExecutors().entrySet()) {
String beanName = entry.getKey();
CommandExecutor commandExecutor = entry.getValue();
if (COMMAND_EXECUTORS.containsKey(commandExecutor.getCommandMessageType())) {
log.error("Duplicate CommandExecutor with message type: {}", commandExecutor.getCommandMessageType());
if (COMMAND_EXECUTORS.containsKey(commandExecutor.getCommandType())) {
log.error("Duplicate CommandExecutor with message type: {}", commandExecutor.getCommandType());
continue;
}

COMMAND_EXECUTORS.put(commandExecutor.getCommandMessageType(), beanName);
COMMAND_EXECUTORS.put(commandExecutor.getCommandType(), beanName);
log.info(
"Load JobRunner: {} with identifier: {}",
commandExecutor.getClass().getName(),
commandExecutor.getCommandMessageType());
commandExecutor.getCommandType());
}

LOADED.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
*/
package org.apache.bigtop.manager.agent.executor;

import org.apache.bigtop.manager.common.message.entity.command.CommandMessageType;
import org.apache.bigtop.manager.common.message.entity.payload.CommandPayload;
import org.apache.bigtop.manager.common.shell.ShellResult;
import org.apache.bigtop.manager.common.utils.JsonUtils;
import org.apache.bigtop.manager.grpc.generated.CommandType;
import org.apache.bigtop.manager.stack.core.executor.StackExecutor;

import org.springframework.beans.factory.config.ConfigurableBeanFactory;
Expand All @@ -35,8 +35,8 @@
public class ComponentCommandExecutor extends AbstractCommandExecutor {

@Override
public CommandMessageType getCommandMessageType() {
return CommandMessageType.COMPONENT;
public CommandType getCommandType() {
return CommandType.COMPONENT;
}

@Override
Expand All @@ -46,12 +46,11 @@ protected void doExecuteOnDevMode() {

@Override
public void doExecute() {
CommandPayload commandPayload =
JsonUtils.readFromString(commandRequestMessage.getMessagePayload(), CommandPayload.class);
log.info("[agent executeTask] taskEvent is: {}", commandRequestMessage);
CommandPayload commandPayload = JsonUtils.readFromString(commandRequest.getPayload(), CommandPayload.class);
log.info("[agent executeTask] taskEvent is: {}", commandRequest);
ShellResult shellResult = StackExecutor.execute(commandPayload);

commandResponseMessage.setCode(shellResult.getExitCode());
commandResponseMessage.setResult(shellResult.getResult());
commandReplyBuilder.setCode(shellResult.getExitCode());
commandReplyBuilder.setResult(shellResult.getResult());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.bigtop.manager.agent.executor;

import org.apache.bigtop.manager.common.constants.MessageConstants;
import org.apache.bigtop.manager.common.message.entity.command.CommandMessageType;
import org.apache.bigtop.manager.common.shell.ShellResult;
import org.apache.bigtop.manager.common.utils.os.TimeSyncDetection;
import org.apache.bigtop.manager.grpc.generated.CommandType;

import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
Expand All @@ -38,15 +38,15 @@
public class HostCheckCommandExecutor extends AbstractCommandExecutor {

@Override
public CommandMessageType getCommandMessageType() {
return CommandMessageType.HOST_CHECK;
public CommandType getCommandType() {
return CommandType.HOST_CHECK;
}

@Override
public void doExecute() {
ShellResult shellResult = runChecks(List.of(this::checkTimeSync));
commandResponseMessage.setCode(shellResult.getExitCode());
commandResponseMessage.setResult(shellResult.getResult());
commandReplyBuilder.setCode(shellResult.getExitCode());
commandReplyBuilder.setResult(shellResult.getResult());
}

private ShellResult runChecks(List<Supplier<ShellResult>> suppliers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.bigtop.manager.agent.holder;

import org.apache.bigtop.manager.agent.executor.CommandExecutor;
import org.apache.bigtop.manager.agent.ws.AgentWebSocketHandler;

import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
Expand All @@ -41,10 +40,6 @@ public void setApplicationContext(@Nonnull ApplicationContext applicationContext
SpringContextHolder.applicationContext = applicationContext;
}

public static AgentWebSocketHandler getAgentWebSocket() {
return applicationContext.getBean(AgentWebSocketHandler.class);
}

public static Map<String, CommandExecutor> getCommandExecutors() {
return applicationContext.getBeansOfType(CommandExecutor.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

Expand All @@ -35,21 +33,19 @@

@Slf4j
@Component
@EnableScheduling
@EnableAsync
public class MetricsCollector {

@Qualifier("diskMultiGauge") @Resource
private MultiGauge diskMultiGauge;
@Resource
@Qualifier("diskMultiGauge") private MultiGauge diskMultiGauge;

@Qualifier("memMultiGauge") @Resource
private MultiGauge memMultiGauge;
@Resource
@Qualifier("memMultiGauge") private MultiGauge memMultiGauge;

@Qualifier("cpuMultiGauge") @Resource
private MultiGauge cpuMultiGauge;
@Resource
@Qualifier("cpuMultiGauge") private MultiGauge cpuMultiGauge;

@Qualifier("zookeeperMultiGauge") @Resource
private MultiGauge zookeeperMultiGauge;
@Resource
@Qualifier("zookeeperMultiGauge") private MultiGauge zookeeperMultiGauge;

@Async
@Scheduled(cron = "*/10 * * * * ?")
Expand Down
Loading
Loading