Skip to content

Commit

Permalink
BIGTOP-4312: Adjust host add logic when adding new cluster (apache#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinw66 authored Dec 31, 2024
1 parent aa5a320 commit 67674c4
Show file tree
Hide file tree
Showing 48 changed files with 336 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class HostPO extends BasePO implements Serializable {
@Column(name = "hostname")
private String hostname;

@Column(name = "agent_dir")
private String agentDir;

@Column(name = "ssh_user")
private String sshUser;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public class ServicePO extends BasePO implements Serializable {
@Column(name = "stack")
private String stack;

@Column(name = "need_restart")
private Boolean needRestart;
@Column(name = "restart_flag")
private Boolean restartFlag;

@Column(name = "cluster_id")
private Long clusterId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class ServiceQuery {

private Long clusterId;

private Boolean needRestart;
private Boolean restartFlag;

private Integer status;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
<mapper namespace="org.apache.bigtop.manager.dao.repository.HostDao">

<sql id="baseColumns">
id, hostname, ipv4, ipv6, os, arch, available_processors, free_memory_size, total_memory_size, free_disk, total_disk, status, cluster_id
id, hostname, agent_dir, ssh_user, ssh_port, auth_type, ssh_password, ssh_key_string, ssh_key_filename, ssh_key_password,
grpc_port, ipv4, ipv6, os, arch, available_processors, free_memory_size, total_memory_size,
free_disk, total_disk, desc, status, err_info, cluster_id
</sql>

<sql id="baseColumnsV2">
${alias}.id, ${alias}.hostname, ${alias}.ssh_user, ${alias}.ssh_port, ${alias}.auth_type,
${alias}.id, ${alias}.hostname, ${alias}.agent_dir, ${alias}.ssh_user, ${alias}.ssh_port, ${alias}.auth_type,
${alias}.ssh_password, ${alias}.ssh_key_string, ${alias}.ssh_key_filename, ${alias}.ssh_key_password,
${alias}.grpc_port, ${alias}.ipv4, ${alias}.ipv6, ${alias}.os, ${alias}.arch,
${alias}.available_processors, ${alias}.free_memory_size, ${alias}.total_memory_size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
<mapper namespace="org.apache.bigtop.manager.dao.repository.ServiceDao">

<sql id="baseColumns">
id, name, display_name, `desc`, user, version, stack, need_restart, cluster_id, status
id, name, display_name, `desc`, user, version, stack, restart_flag, cluster_id, status
</sql>

<sql id="baseColumnsV2">
${alias}.id, ${alias}.name, ${alias}.display_name, ${alias}.`desc`, ${alias}.user, ${alias}.version, ${alias}.stack, ${alias}.need_restart, ${alias}.cluster_id, ${alias}.status
${alias}.id, ${alias}.name, ${alias}.display_name, ${alias}.`desc`, ${alias}.user, ${alias}.version, ${alias}.stack, ${alias}.restart_flag, ${alias}.cluster_id, ${alias}.status
</sql>

<select id="findByQuery" resultType="org.apache.bigtop.manager.dao.po.ServicePO">
Expand All @@ -43,8 +43,8 @@
<if test="query.clusterId != null">
and s.cluster_id = #{query.clusterId}
</if>
<if test="query.needRestart != null">
and s.need_restart = #{query.needRestart}
<if test="query.restartFlag != null">
and s.restart_flag = #{query.restartFlag}
</if>
<if test="query.status != null">
and s.status = #{query.status}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
<mapper namespace="org.apache.bigtop.manager.dao.repository.HostDao">

<sql id="baseColumns">
id, hostname, ipv4, ipv6, os, arch, available_processors, free_memory_size, total_memory_size, free_disk, total_disk, state, cluster_id
id, hostname, agent_dir, ssh_user, ssh_port, auth_type, ssh_password, ssh_key_string, ssh_key_filename, ssh_key_password,
grpc_port, ipv4, ipv6, os, arch, available_processors, free_memory_size, total_memory_size,
free_disk, total_disk, "desc", status, err_info, cluster_id
</sql>

<sql id="baseColumnsV2">
${alias}.id, ${alias}.hostname, ${alias}.ssh_user, ${alias}.ssh_port, ${alias}.auth_type,
${alias}.id, ${alias}.hostname, ${alias}.agent_dir, ${alias}.ssh_user, ${alias}.ssh_port, ${alias}.auth_type,
${alias}.ssh_password, ${alias}.ssh_key_string, ${alias}.ssh_key_filename, ${alias}.ssh_key_password,
${alias}.grpc_port, ${alias}.ipv4, ${alias}.ipv6, ${alias}.os, ${alias}.arch,
${alias}.available_processors, ${alias}.free_memory_size, ${alias}.total_memory_size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
<mapper namespace="org.apache.bigtop.manager.dao.repository.ServiceDao">

<sql id="baseColumns">
id, name, display_name, "desc", "user", version, stack, need_restart, cluster_id, status
id, name, display_name, "desc", "user", version, stack, restart_flag, cluster_id, status
</sql>

<sql id="baseColumnsV2">
${alias}.id, ${alias}.name, ${alias}.display_name, ${alias}."desc", ${alias}."user", ${alias}.version, ${alias}.stack, ${alias}.need_restart, ${alias}.cluster_id, ${alias}.status
${alias}.id, ${alias}.name, ${alias}.display_name, ${alias}."desc", ${alias}."user", ${alias}.version, ${alias}.stack, ${alias}.restart_flag, ${alias}.cluster_id, ${alias}.status
</sql>

<select id="findByQuery" resultType="org.apache.bigtop.manager.dao.po.ServicePO">
Expand All @@ -43,8 +43,8 @@
<if test="query.clusterId != null">
and s.cluster_id = #{query.clusterId}
</if>
<if test="query.needRestart != null">
and s.need_restart = #{query.needRestart}
<if test="query.restartFlag != null">
and s.restart_flag = #{query.restartFlag}
</if>
<if test="query.status != null">
and s.status = #{query.status}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ protected void beforeCreateStages() {
super.beforeCreateStages();
}

protected StageContext createStageContext(String serviceName, String componentName, List<Long> hostIds) {
protected StageContext createStageContext(String serviceName, String componentName, List<String> hostnames) {
StageContext stageContext = StageContext.fromCommandDTO(jobContext.getCommandDTO());

ServiceDTO serviceDTO = StackUtils.getServiceDTO(serviceName);
ComponentDTO componentDTO = StackUtils.getComponentDTO(componentName);

stageContext.setHostIds(hostIds);
stageContext.setHostnames(hostnames);
stageContext.setServiceDTO(serviceDTO);
stageContext.setComponentDTO(componentDTO);

Expand Down Expand Up @@ -133,7 +133,7 @@ protected Boolean isClientComponent(String componentName) {
return componentDTO.getCategory().equalsIgnoreCase(ComponentCategories.CLIENT);
}

protected List<Long> findHostIdsByComponentName(String componentName) {
protected List<String> findHostnamesByComponentName(String componentName) {
ComponentQuery componentQuery = ComponentQuery.builder()
.clusterId(clusterPO.getId())
.name(componentName)
Expand All @@ -142,7 +142,7 @@ protected List<Long> findHostIdsByComponentName(String componentName) {
if (componentPOList == null) {
return new ArrayList<>();
} else {
return componentPOList.stream().map(ComponentPO::getHostId).toList();
return componentPOList.stream().map(ComponentPO::getHostname).toList();
}
}

Expand All @@ -158,12 +158,12 @@ protected void createAddStages() {
String[] split = componentCommand.split("-");
String componentName = split[0];
String serviceName = findServiceNameByComponentName(componentName);
List<Long> hostIds = findHostIdsByComponentName(componentName);
if (CollectionUtils.isEmpty(hostIds)) {
List<String> hostnames = findHostnamesByComponentName(componentName);
if (CollectionUtils.isEmpty(hostnames)) {
continue;
}

StageContext stageContext = createStageContext(serviceName, componentName, hostIds);
StageContext stageContext = createStageContext(serviceName, componentName, hostnames);
stages.add(new ComponentAddStage(stageContext));
}
}
Expand All @@ -173,9 +173,9 @@ protected void createConfigureStages() {
for (ComponentHostDTO componentHost : serviceCommand.getComponentHosts()) {
String serviceName = serviceCommand.getServiceName();
String componentName = componentHost.getComponentName();
List<Long> hostIds = componentHost.getHostIds();
List<String> hostnames = componentHost.getHostnames();

StageContext stageContext = createStageContext(serviceName, componentName, hostIds);
StageContext stageContext = createStageContext(serviceName, componentName, hostnames);
stages.add(new ComponentConfigureStage(stageContext));
}
}
Expand All @@ -193,12 +193,12 @@ protected void createStartStages() {
continue;
}

List<Long> hostIds = findHostIdsByComponentName(componentName);
if (CollectionUtils.isEmpty(hostIds)) {
List<String> hostnames = findHostnamesByComponentName(componentName);
if (CollectionUtils.isEmpty(hostnames)) {
continue;
}

StageContext stageContext = createStageContext(serviceName, componentName, hostIds);
StageContext stageContext = createStageContext(serviceName, componentName, hostnames);
stages.add(new ComponentStartStage(stageContext));
}
}
Expand All @@ -215,12 +215,12 @@ protected void createStopStages() {
continue;
}

List<Long> hostIds = findHostIdsByComponentName(componentName);
if (CollectionUtils.isEmpty(hostIds)) {
List<String> hostnames = findHostnamesByComponentName(componentName);
if (CollectionUtils.isEmpty(hostnames)) {
continue;
}

StageContext stageContext = createStageContext(serviceName, componentName, hostIds);
StageContext stageContext = createStageContext(serviceName, componentName, hostnames);
stages.add(new ComponentStopStage(stageContext));
}
}
Expand All @@ -237,12 +237,12 @@ protected void createCheckStages() {
continue;
}

List<Long> hostIds = findHostIdsByComponentName(componentName);
if (CollectionUtils.isEmpty(hostIds)) {
List<String> hostnames = findHostnamesByComponentName(componentName);
if (CollectionUtils.isEmpty(hostnames)) {
continue;
}

StageContext stageContext = createStageContext(serviceName, componentName, List.of(hostIds.get(0)));
StageContext stageContext = createStageContext(serviceName, componentName, List.of(hostnames.get(0)));
stages.add(new ComponentCheckStage(stageContext));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
*/
package org.apache.bigtop.manager.server.command.job;

import org.apache.bigtop.manager.dao.po.HostPO;
import org.apache.bigtop.manager.dao.po.JobPO;
import org.apache.bigtop.manager.dao.po.StagePO;
import org.apache.bigtop.manager.dao.po.TaskPO;
import org.apache.bigtop.manager.dao.repository.HostDao;
import org.apache.bigtop.manager.server.command.stage.CacheFileUpdateStage;
import org.apache.bigtop.manager.server.command.stage.HostCheckStage;
import org.apache.bigtop.manager.server.command.stage.SetupJdkStage;
Expand All @@ -34,13 +32,12 @@
import org.apache.bigtop.manager.server.model.converter.ClusterConverter;
import org.apache.bigtop.manager.server.model.dto.ClusterDTO;
import org.apache.bigtop.manager.server.model.dto.CommandDTO;

import java.util.ArrayList;
import java.util.List;
import org.apache.bigtop.manager.server.model.dto.HostDTO;
import org.apache.bigtop.manager.server.service.HostService;

public class ClusterAddJob extends AbstractJob {

private HostDao hostDao;
private HostService hostService;

public ClusterAddJob(JobContext jobContext) {
super(jobContext);
Expand All @@ -50,7 +47,7 @@ public ClusterAddJob(JobContext jobContext) {
protected void injectBeans() {
super.injectBeans();

hostDao = SpringContextHolder.getBean(HostDao.class);
hostService = SpringContextHolder.getBean(HostService.class);
}

@Override
Expand All @@ -64,15 +61,14 @@ protected void createStages() {
@Override
public void beforeRun() {
super.beforeRun();
}

@Override
public void onSuccess() {
super.onSuccess();
if (jobContext.getRetryFlag()) {
return;
}

saveCluster();

linkHostToCluster();
saveHosts();

linkJobToCluster();
}
Expand All @@ -94,18 +90,11 @@ private void saveCluster() {
clusterDao.save(clusterPO);
}

private void linkHostToCluster() {
private void saveHosts() {
CommandDTO commandDTO = jobContext.getCommandDTO();
List<Long> ids = commandDTO.getClusterCommand().getHostIds();
List<HostPO> hostPOList = new ArrayList<>();
for (Long id : ids) {
HostPO hostPO = new HostPO();
hostPO.setId(id);
hostPO.setClusterId(clusterPO.getId());
hostPOList.add(hostPO);
}

hostDao.partialUpdateByIds(hostPOList);
HostDTO hostDTO = commandDTO.getClusterCommand().getHosts();
hostDTO.setClusterId(clusterPO.getId());
hostService.add(hostDTO);
}

private void linkJobToCluster() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ protected List<String> getComponentNames() {
}

@Override
protected List<Long> findHostIdsByComponentName(String componentName) {
protected List<String> findHostnamesByComponentName(String componentName) {
for (ServiceCommandDTO serviceCommand : jobContext.getCommandDTO().getServiceCommands()) {
List<ComponentHostDTO> componentHosts = serviceCommand.getComponentHosts();
for (ComponentHostDTO componentHost : componentHosts) {
if (componentHost.getComponentName().equals(componentName)) {
return componentHost.getHostIds();
return componentHost.getHostnames();
}
}
}
Expand Down Expand Up @@ -156,7 +156,7 @@ private void saveService(ServiceCommandDTO serviceCommand) {
List<ComponentPO> componentPOList = new ArrayList<>();
for (ComponentHostDTO componentHostDTO : serviceCommand.getComponentHosts()) {
String componentName = componentHostDTO.getComponentName();
List<HostPO> hostPOList = hostDao.findByIds(componentHostDTO.getHostIds());
List<HostPO> hostPOList = hostDao.findAllByHostnames(componentHostDTO.getHostnames());

for (HostPO hostPO : hostPOList) {
ComponentDTO componentDTO = StackUtils.getComponentDTO(componentName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.bigtop.manager.server.command.task.TaskContext;
import org.apache.bigtop.manager.server.holder.SpringContextHolder;
import org.apache.bigtop.manager.server.model.dto.ComponentDTO;
import org.apache.bigtop.manager.server.model.dto.HostDTO;
import org.apache.bigtop.manager.server.model.dto.ServiceDTO;

import java.util.HashMap;
Expand Down Expand Up @@ -61,12 +60,12 @@ protected String getComponentName() {
return stageContext.getComponentDTO().getName();
}

protected TaskContext createTaskContext(HostDTO hostDTO) {
protected TaskContext createTaskContext(String hostname) {
ServiceDTO serviceDTO = stageContext.getServiceDTO();
ComponentDTO componentDTO = stageContext.getComponentDTO();

TaskContext taskContext = new TaskContext();
taskContext.setHostDTO(hostDTO);
taskContext.setHostname(hostname);
taskContext.setClusterId(clusterPO.getId());
taskContext.setClusterName(clusterPO.getName());
taskContext.setServiceName(serviceDTO.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.bigtop.manager.dao.repository.StageDao;
import org.apache.bigtop.manager.server.command.task.Task;
import org.apache.bigtop.manager.server.holder.SpringContextHolder;
import org.apache.bigtop.manager.server.model.converter.HostConverter;
import org.apache.bigtop.manager.server.model.dto.HostDTO;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -56,9 +54,8 @@ public AbstractStage(StageContext stageContext) {

beforeCreateTasks();

List<HostDTO> hostDTOList = HostConverter.INSTANCE.fromPO2DTO(hostDao.findByIds(stageContext.getHostIds()));
for (HostDTO hostDTO : hostDTOList) {
tasks.add(createTask(hostDTO));
for (String hostname : stageContext.getHostnames()) {
tasks.add(createTask(hostname));
}
}

Expand All @@ -69,7 +66,7 @@ protected void injectBeans() {

protected abstract void beforeCreateTasks();

protected abstract Task createTask(HostDTO hostDTO);
protected abstract Task createTask(String hostname);

protected String getServiceName() {
return "cluster";
Expand Down
Loading

0 comments on commit 67674c4

Please sign in to comment.