From 384451bfd8de1ca4b249ab92a2563ad83323988b Mon Sep 17 00:00:00 2001 From: chenxu14 Date: Mon, 30 Dec 2019 18:53:30 +0800 Subject: [PATCH 1/2] Retain the original recover logic if serverName is HMaster --- .../apache/hadoop/hbase/util/KafkaUtil.java | 38 +++++++++- .../hadoop/hbase/util/TestKafkaUtil.java | 73 +++++++++++++++++++ .../org/apache/hadoop/hbase/SplitLogTask.java | 13 ++++ .../SplitLogManagerCoordination.java | 8 +- .../ZKSplitLogManagerCoordination.java | 67 ++++++++++------- .../ZkCoordinatedStateManager.java | 6 +- .../ZkSplitLogWorkerCoordination.java | 20 +++-- .../hbase/master/KafkaRecoveryManager.java | 64 ++++++++++++++-- .../hbase/master/LogRecoveryManager.java | 39 ++++++---- .../hadoop/hbase/master/ServerManager.java | 2 +- .../hadoop/hbase/master/SplitLogManager.java | 10 ++- .../procedure/ServerCrashProcedure.java | 28 ++++++- .../hbase/regionserver/HRegionServer.java | 4 +- .../hbase/wal/AbstractFSWALProvider.java | 1 - .../hbase/master/TestCatalogJanitor.java | 3 +- pom.xml | 2 +- 16 files changed, 306 insertions(+), 72 deletions(-) create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestKafkaUtil.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KafkaUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KafkaUtil.java index 30b67fa..15a47f1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KafkaUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KafkaUtil.java @@ -17,6 +17,9 @@ * limitations under the License. */ package org.apache.hadoop.hbase.util; +import java.net.InetAddress; +import java.net.UnknownHostException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -28,6 +31,18 @@ public class KafkaUtil { private static final Log LOG = LogFactory.getLog(KafkaUtil.class); public static final String KAFKA_BROKER_SERVERS = "kafka.bootstrap.servers"; + private static final String LOCAL_ZONE; + static { + String hostname = null; + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.warn("could not determin local hostname.", e); + } + int index = hostname == null ? -1 : hostname.indexOf("-"); + LOCAL_ZONE = index == -1 ? "rz" : hostname.substring(0, index); + } + /** * Each HBase table corresponding to an KAFKA topic */ @@ -35,16 +50,32 @@ public static String getTableTopic(String tableName) { return tableName; } + public static String getConsumerGroup(String tableName) { + return new StringBuilder("connect-").append(tableName).append("-").append(LOCAL_ZONE).toString(); + } + public static String getTopicTable(String topicName) { return topicName; } /** + * return the partition this row will go to * Each HBase Region corresponding to an KAFKA partition, * rowkey should contains partition info, split by '_' */ public static int getTablePartition(String rowkey, int partitionCount) { - return getPrefix(rowkey) % partitionCount; + String cntStr = String.valueOf(partitionCount); + int len = cntStr.length(); + assert(len <= 5); // partitionCount's upper limit is 10000, len is 5 + int interval = (partitionCount == Math.pow(10, len-1)) ? 1 : (int) Math.pow(10, len) / partitionCount; + int prefix = getPrefix(rowkey); + if (interval == 1) { // power of 10 + return prefix / (int) Math.pow(10, 5 - len); + } else { + String preStr = String.format("%04d", prefix).substring(0, len); + int value = Integer.parseInt(preStr) / interval; + return value; + } } public static int getPrefix(String rowkey) { @@ -54,7 +85,10 @@ public static int getPrefix(String rowkey) { try { int index = rowkey.indexOf("_"); String prefix = (index == -1) ? rowkey : rowkey.substring(0, index); - return Integer.parseInt(prefix); + for (int i = prefix.length(); i < 4; i++) { // rowkey has 4 prefix + prefix += "0"; + } + return Integer.parseInt(prefix.substring(0, 4)); // only retain 4 num } catch (NumberFormatException e) { LOG.warn(e.getMessage(), e); return 0; diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestKafkaUtil.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestKafkaUtil.java new file mode 100644 index 0000000..71d1cae --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestKafkaUtil.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestKafkaUtil { + @Test + public void testGetTablePartition() { + assertEquals(1, KafkaUtil.getTablePartition("1247_rowkey", 10)); + assertEquals(12, KafkaUtil.getTablePartition("1247_rowkey", 100)); + assertEquals(124, KafkaUtil.getTablePartition("1247_rowkey", 1000)); + assertEquals(1247, KafkaUtil.getTablePartition("1247_rowkey", 10000)); + + // partition count is 100 + for (int i = 0; i < 10000; i++) { + String rowkey = String.format("%04d", i); + int partition = KafkaUtil.getTablePartition(rowkey, 100); + if (partition > 9) { + assertTrue(rowkey.startsWith(String.valueOf(partition))); + } else { + assertTrue(rowkey.startsWith("0" + String.valueOf(partition))); + } + } + + // partition count is 200 + Map partInfo = new HashMap<>(); + int interval = 10000 / 200; + for (int i = 0; i < 200; i++) { + partInfo.put(i, i * interval); + } + partInfo.put(200, Integer.MAX_VALUE); + for (int i = 0; i < 10000; i++) { + String rowkey = String.format("%04d", i); + int partition = KafkaUtil.getTablePartition(rowkey, 200); + assertTrue(partInfo.get(partition) <= i && i < partInfo.get(partition + 1)); + } + } + + @Test + public void testGetConsumerGroup() throws UnknownHostException { + String table = "usertable"; + String hostname = InetAddress.getLocalHost().getHostName(); + String groupName = "connect-" + table + "-" + hostname.substring(0, hostname.indexOf("-")); + assertEquals(groupName, KafkaUtil.getConsumerGroup("usertable")); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java index 1feb417..5d474b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java @@ -35,10 +35,15 @@ */ @InterfaceAudience.Private public class SplitLogTask { + public static final int KAFKA_TASK_FIELD_LEN = 5; private final ServerName originServer; private final ZooKeeperProtos.SplitLogTask.State state; private final ZooKeeperProtos.SplitLogTask.RecoveryMode mode; + public enum Type { + HDFS, KAFKA; + } + public static class Unassigned extends SplitLogTask { public Unassigned(final ServerName originServer, final RecoveryMode mode) { super(originServer, ZooKeeperProtos.SplitLogTask.State.UNASSIGNED, mode); @@ -168,6 +173,14 @@ public static SplitLogTask parseFrom(final byte [] data) throws DeserializationE } } + public static Type getTaskType(String taskName) { + if (taskName.split("_").length == KAFKA_TASK_FIELD_LEN) { + return Type.KAFKA; + } else { + return Type.HDFS; + } + } + /** * @return This instance serialized into a byte array * @see #parseFrom(byte[]) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java index c90549e..bef6015 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java @@ -21,11 +21,13 @@ import java.io.IOException; import java.io.InterruptedIOException; +import java.util.Collection; import java.util.Set; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher; import org.apache.hadoop.hbase.master.LogRecoveryManager.ResubmitDirective; @@ -105,12 +107,12 @@ public ServerName getServerName() { /** * Provide the configuration from the SplitLogManager */ - void setDetails(SplitLogManagerDetails details); + void addDetails(SplitLogTask.Type type, SplitLogManagerDetails details); /** * Returns the configuration that was provided previously */ - SplitLogManagerDetails getDetails(); + Collection getDetails(); /** * Prepare the new task @@ -211,5 +213,5 @@ void removeStaleRecoveringRegions(Set knownServers) throws IOException, @VisibleForTesting void init() throws IOException; - public void setTaskFinisher(TaskFinisher taskFinisher); + public void addTaskFinisher(TaskFinisher taskFinisher); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java index 2f5e4c3..6191669 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java @@ -24,11 +24,15 @@ import static org.apache.hadoop.hbase.master.LogRecoveryManager.TerminationStatus.FAILURE; import static org.apache.hadoop.hbase.master.LogRecoveryManager.TerminationStatus.IN_PROGRESS; import static org.apache.hadoop.hbase.master.LogRecoveryManager.TerminationStatus.SUCCESS; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -43,6 +47,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; +import org.apache.hadoop.hbase.SplitLogTask.Type; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status; import org.apache.hadoop.hbase.exceptions.DeserializationException; @@ -91,9 +96,8 @@ public static class ZkSplitLogManagerDetails extends SplitLogManagerDetails { private long zkretries; private long resubmitThreshold; private long timeout; - private TaskFinisher taskFinisher; - - SplitLogManagerDetails details; + private Map taskFinishers; + private Map details; // When lastRecoveringNodeCreationTime is older than the following threshold, we'll check // whether to GC stale recovering znodes @@ -110,6 +114,8 @@ public ZKSplitLogManagerCoordination(final CoordinatedStateManager manager, super(watcher); this.server = manager.getServer(); this.conf = server.getConfiguration(); + this.taskFinishers = new HashMap<>(); + this.details = new HashMap<>(); } @Override @@ -125,8 +131,8 @@ public void init() throws IOException { } @Override - public void setTaskFinisher(TaskFinisher taskFinisher) { - this.taskFinisher = taskFinisher; + public void addTaskFinisher(TaskFinisher taskFinisher) { + this.taskFinishers.put(taskFinisher.getTaskType(), taskFinisher); } @Override @@ -182,6 +188,7 @@ public void deleteTask(String path) { @Override public boolean resubmitTask(String path, Task task, ResubmitDirective directive) { // its ok if this thread misses the update to task.deleted. It will fail later + Type taskType = SplitLogTask.getTaskType(path); if (task.status != IN_PROGRESS) { return false; } @@ -193,9 +200,8 @@ public boolean resubmitTask(String path, Task task, ResubmitDirective directive) // finished the task. This allows to continue if the worker cannot actually handle it, // for any reason. final long time = EnvironmentEdgeManager.currentTime() - task.last_update; - final boolean alive = - details.getMaster().getServerManager() != null ? details.getMaster().getServerManager() - .isServerOnline(task.cur_worker_name) : true; + final boolean alive = details.get(taskType).getMaster().getServerManager() != null ? + details.get(taskType).getMaster().getServerManager().isServerOnline(task.cur_worker_name) : true; if (alive && time < timeout) { LOG.trace("Skipping the resubmit of " + task.toString() + " because the server " + task.cur_worker_name + " is not marked as dead, we waited for " + time @@ -220,7 +226,7 @@ public boolean resubmitTask(String path, Task task, ResubmitDirective directive) } LOG.info("resubmitting task " + path); task.incarnation.incrementAndGet(); - boolean result = resubmit(this.details.getServerName(), path, version); + boolean result = resubmit(this.details.get(taskType).getServerName(), path, version); if (!result) { task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime()); return false; @@ -252,7 +258,7 @@ private void rescan(long retries) { // might miss the watch-trigger that creation of RESCAN node provides. // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks // therefore this behavior is safe. - SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName(), getRecoveryMode()); + SplitLogTask slt = new SplitLogTask.Done(this.details.get(Type.HDFS).getServerName(), getRecoveryMode()); this.watcher .getRecoverableZooKeeper() .getZooKeeper() @@ -289,8 +295,8 @@ private void deleteNodeSuccess(String path) { if (ignoreZKDeleteForTesting) { return; } - Task task; - task = details.getTasks().remove(path); + Type taskType = SplitLogTask.getTaskType(path); + Task task = details.get(taskType).getTasks().remove(path); if (task == null) { if (ZKSplitLog.isRescanNode(watcher, path)) { SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet(); @@ -336,7 +342,8 @@ private boolean needAbandonRetries(int statusCode, String action) { } private void createNode(String path, Long retry_count) { - SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), getRecoveryMode()); + Type taskType = SplitLogTask.getTaskType(path); + SplitLogTask slt = new SplitLogTask.Unassigned(details.get(taskType).getServerName(), getRecoveryMode()); ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count); SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet(); @@ -385,7 +392,8 @@ private void getDataSetWatchSuccess(String path, byte[] data, int version) LOG.info("task " + path + " entered state: " + slt.toString()); resubmitOrFail(path, FORCE); } else if (slt.isDone()) { - LOG.info("task " + path + " entered state: " + slt.toString()); + TaskFinisher taskFinisher = taskFinishers.get(SplitLogTask.getTaskType(path)); + LOG.info("task " + path + " entered state: " + slt.toString() + ", task type is : " + taskFinisher.getTaskType()); if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) { if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) { setDone(path, SUCCESS); @@ -417,7 +425,8 @@ private void getDataSetWatchFailure(String path) { } private void setDone(String path, TerminationStatus status) { - Task task = details.getTasks().get(path); + Type taskType = SplitLogTask.getTaskType(path); + Task task = details.get(taskType).getTasks().get(path); if (task == null) { if (!ZKSplitLog.isRescanNode(watcher, path)) { SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet(); @@ -458,8 +467,8 @@ private void setDone(String path, TerminationStatus status) { Task findOrCreateOrphanTask(String path) { Task orphanTask = new Task(); - Task task; - task = details.getTasks().putIfAbsent(path, orphanTask); + Type taskType = SplitLogTask.getTaskType(path); + Task task = details.get(taskType).getTasks().putIfAbsent(path, orphanTask); if (task == null) { LOG.info("creating orphan task " + path); SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet(); @@ -534,7 +543,7 @@ public void markRegionsRecovering(final ServerName serverName, Set long lastRecordedFlushedSequenceId = -1; try { long lastSequenceId = - this.details.getMaster().getServerManager() + this.details.get(Type.HDFS).getMaster().getServerManager() .getLastFlushedSequenceId(regionEncodeName.getBytes()).getLastFlushedSequenceId(); /* @@ -587,8 +596,8 @@ public void markRegionsRecovering(final ServerName serverName, Set @Override public void nodeDataChanged(String path) { - Task task; - task = details.getTasks().get(path); + Type taskType = SplitLogTask.getTaskType(path); + Task task = details.get(taskType).getTasks().get(path); if (task != null || ZKSplitLog.isRescanNode(watcher, path)) { if (task != null) { task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime()); @@ -800,8 +809,9 @@ private boolean isDistributedLogReplay(Configuration conf) { private boolean resubmit(ServerName serverName, String path, int version) { try { // blocking zk call but this is done from the timeout thread + Type taskType = SplitLogTask.getTaskType(path); SplitLogTask slt = - new SplitLogTask.Unassigned(this.details.getServerName(), getRecoveryMode()); + new SplitLogTask.Unassigned(this.details.get(taskType).getServerName(), getRecoveryMode()); if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) { LOG.debug("failed to resubmit task " + path + " version changed"); return false; @@ -860,6 +870,8 @@ enum Status { * @return DONE if task completed successfully, ERR otherwise */ Status finish(ServerName workerName, String taskname); + + Type getTaskType(); } /** @@ -960,9 +972,10 @@ public class DeleteAsyncCallback implements AsyncCallback.VoidCallback { @Override public void processResult(int rc, String path, Object ctx) { SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet(); + Type taskType = SplitLogTask.getTaskType(path); if (rc != 0) { if (needAbandonRetries(rc, "Delete znode " + path)) { - details.getFailedDeletions().add(path); + details.get(taskType).getFailedDeletions().add(path); return; } if (rc != KeeperException.Code.NONODE.intValue()) { @@ -972,7 +985,7 @@ public void processResult(int rc, String path, Object ctx) { + " remaining retries=" + retry_count); if (retry_count == 0) { LOG.warn("delete failed " + path); - details.getFailedDeletions().add(path); + details.get(taskType).getFailedDeletions().add(path); deleteNodeFailure(path); } else { deleteNode(path, retry_count - 1); @@ -1021,13 +1034,13 @@ public void processResult(int rc, String path, Object ctx, String name) { } @Override - public void setDetails(SplitLogManagerDetails details) { - this.details = details; + public void addDetails(Type type, SplitLogManagerDetails details) { + this.details.put(type, details); } @Override - public SplitLogManagerDetails getDetails() { - return details; + public Collection getDetails() { + return details.values(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java index 2f739be..bc73d10 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java @@ -19,11 +19,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.CoordinatedStateException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableStateManager; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.zookeeper.ZKTableStateManager; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -47,7 +48,8 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { public void initialize(Server server) { this.server = server; this.watcher = server.getZooKeeper(); - splitLogWorkerCoordination = new ZkSplitLogWorkerCoordination(this, watcher); + boolean isMaster = server instanceof HMaster; + splitLogWorkerCoordination = new ZkSplitLogWorkerCoordination(this, watcher, isMaster); splitLogManagerCoordination = new ZKSplitLogManagerCoordination(this, watcher); splitTransactionCoordination = new ZKSplitTransactionCoordination(this, watcher); closeRegionCoordination = new ZkCloseRegionCoordination(this, watcher); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java index 0d88264..9ad467b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java @@ -90,12 +90,13 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements private int maxConcurrentTasks = 0; private final ZkCoordinatedStateManager manager; + private final boolean isOnMaster; public ZkSplitLogWorkerCoordination(ZkCoordinatedStateManager zkCoordinatedStateManager, - ZooKeeperWatcher watcher) { + ZooKeeperWatcher watcher, boolean isOnMaster) { super(watcher); manager = zkCoordinatedStateManager; - + this.isOnMaster = isOnMaster; } /** @@ -428,11 +429,18 @@ public void taskLoop() throws InterruptedException { int idx = (i + offset) % paths.size(); // don't call ZKSplitLog.getNodeName() because that will lead to // double encoding of the path name - if (this.calculateAvailableSplitters(numTasks) > 0) { - grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx))); + String taskName = paths.get(idx); + boolean isNotKafka = SplitLogTask.getTaskType(taskName) != SplitLogTask.Type.KAFKA; + boolean skipTask = isOnMaster ^ isNotKafka; + if (!skipTask && this.calculateAvailableSplitters(numTasks) > 0) { + grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, taskName)); } else { - LOG.debug("Current region server " + server.getServerName() + " has " - + this.tasksInProgress.get() + " tasks in progress and can't take more."); + if (skipTask) { + LOG.info("task " + taskName + " was skipped."); + } else { + LOG.debug("Current region server " + server.getServerName() + " has " + + this.tasksInProgress.get() + " tasks in progress and can't take more."); + } break; } if (shouldStop) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/KafkaRecoveryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/KafkaRecoveryManager.java index 2307581..98b3ada 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/KafkaRecoveryManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/KafkaRecoveryManager.java @@ -27,10 +27,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; +import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -39,7 +41,10 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.KafkaUtil; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.TopicPartition; @@ -54,6 +59,7 @@ public class KafkaRecoveryManager extends LogRecoveryManager { public static final String RECOVERY_MODE = "KAFKA"; private final String kafkaServers; private KafkaConsumer TEST_CONSUMER; + private AdminClient TEST_ADMIN; private final long regionFlushInterval; public KafkaRecoveryManager(HMaster master) throws IOException { @@ -62,7 +68,7 @@ public KafkaRecoveryManager(HMaster master) throws IOException { @VisibleForTesting public KafkaRecoveryManager(Server server, MasterServices masterService) throws IOException { - super(server, server.getConfiguration(), server, masterService, server.getServerName()); + super(server, server.getConfiguration(), server, masterService, server.getServerName(), false); this.kafkaServers = server.getConfiguration().get(KafkaUtil.KAFKA_BROKER_SERVERS); this.regionFlushInterval = server.getConfiguration().getLong( HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, HRegion.DEFAULT_CACHE_FLUSH_INTERVAL); @@ -82,25 +88,36 @@ public void splitLogs(ServerName serverName, Set regions, ServerMan long t = EnvironmentEdgeManager.currentTime(); TaskBatch batch = new TaskBatch(); - try (KafkaConsumer consumer = getKafkaConsumer()) { + try (KafkaConsumer consumer = getKafkaConsumer(); + AdminClient admin = getKafkaAdminClient()) { Map tablePartitions = new HashMap<>(); Map lastOffsets = new HashMap<>(); + int totalRecords = 0; for (HRegionInfo region : regions) { String table = region.getTable().getNameAsString(); String kafkaTopic = KafkaUtil.getTableTopic(table); + String consumerGroup = KafkaUtil.getConsumerGroup(table); if (tablePartitions.get(table) == null) { tablePartitions.put(table, consumer.partitionsFor(kafkaTopic).size()); } int kafkaPartition = KafkaUtil.getTablePartition(Bytes.toString(region.getStartKey()), tablePartitions.get(table)); TopicPartition topicPartition = new TopicPartition(kafkaTopic, kafkaPartition); - consumer.assign(Arrays.asList(topicPartition)); if (lastOffsets.get(topicPartition) == null) { - consumer.seekToEnd(Collections.singletonList(topicPartition)); - lastOffsets.put(topicPartition, consumer.position(topicPartition)); + long lastOffset = HConstants.NO_SEQNUM; + try { + lastOffset = getConsumerLastOffset(admin, topicPartition, consumerGroup); + } catch (Throwable e) { + LOG.warn("error occured when get consumer's last offset." , e); + // fall back to partition's last offset + consumer.assign(Arrays.asList(topicPartition)); + consumer.seekToEnd(Collections.singletonList(topicPartition)); + lastOffset = consumer.position(topicPartition) -1; + } + lastOffsets.put(topicPartition, lastOffset); } long startOffset = sm.getLastFlushedSequenceId(region.getEncodedNameAsBytes()).getLastFlushedSequenceId(); - if (startOffset == -1) { + if (startOffset == HConstants.NO_SEQNUM) { long timestamp = System.currentTimeMillis() - regionFlushInterval; LOG.warn(region.getRegionName() + "'s target start offset is -1, seek with timestamp " + timestamp); OffsetAndTimestamp offset = consumer.offsetsForTimes( @@ -110,6 +127,7 @@ public void splitLogs(ServerName serverName, Set regions, ServerMan String startKey = Bytes.toString(region.getStartKey()); String endKey = Bytes.toString(region.getEndKey()); // taskName like : topic_partition_startOffset_endOffset_regionName-startKey-endKey + totalRecords += (lastOffsets.get(topicPartition) - startOffset); StringBuilder taskName = new StringBuilder(kafkaTopic).append("_").append(kafkaPartition).append("_") .append(startOffset) .append("_").append(lastOffsets.get(topicPartition)) @@ -121,6 +139,7 @@ public void splitLogs(ServerName serverName, Set regions, ServerMan } LOG.info("enqueued split task : " + taskName); } + LOG.info("total replay kafka records : " + totalRecords); } waitForSplittingCompletion(batch, status); @@ -141,11 +160,26 @@ public void splitLogs(ServerName serverName, Set regions, ServerMan LOG.info(msg); } + @VisibleForTesting void setKafkaConsumer(KafkaConsumer consumer) { this.TEST_CONSUMER = consumer; } - KafkaConsumer getKafkaConsumer() { + @VisibleForTesting + void setKafkaAdminClient(AdminClient adminClient) { + this.TEST_ADMIN = adminClient; + } + + private AdminClient getKafkaAdminClient() { + if (TEST_ADMIN != null) { + return TEST_ADMIN; + } + Properties props = new Properties(); + props.setProperty("bootstrap.servers", kafkaServers); + return AdminClient.create(props); + } + + private KafkaConsumer getKafkaConsumer() { if (TEST_CONSUMER != null) { return TEST_CONSUMER; } @@ -159,15 +193,29 @@ KafkaConsumer getKafkaConsumer() { return consumer; } + private long getConsumerLastOffset(AdminClient client, TopicPartition partition, String group) + throws Exception { + ListConsumerGroupOffsetsOptions opts = new ListConsumerGroupOffsetsOptions() + .topicPartitions(Collections.singletonList(partition)).timeoutMs(10000); + Map offsets = client.listConsumerGroupOffsets(group, opts) + .partitionsToOffsetAndMetadata().get(); + return offsets.get(partition).offset(); + } + @Override protected TaskFinisher getTaskFinisher() { return new TaskFinisher() { @Override public Status finish(ServerName workerName, String taskName) { LOG.info("Log recovery Task has finished, taskName is : " + taskName); - // TODO do some cleanup + // do old WALs cleanup in ServerCrashProcedure return Status.DONE; } + + @Override + public SplitLogTask.Type getTaskType() { + return SplitLogTask.Type.KAFKA; + } }; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LogRecoveryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LogRecoveryManager.java index 482aada..d464ef3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LogRecoveryManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LogRecoveryManager.java @@ -63,7 +63,7 @@ public abstract class LogRecoveryManager { protected Server server; protected final Configuration conf; protected final Stoppable stopper; - protected final ChoreService choreService; + protected ChoreService choreService; protected final ConcurrentMap tasks = new ConcurrentHashMap(); public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min @@ -82,30 +82,37 @@ public abstract class LogRecoveryManager { * @param stopper the stoppable in case anything is wrong * @param master the master services * @param serverName the master server name + * @param initCoordination KafkaRecoverManager no need to init SplitLogManagerCoordination * @throws IOException */ public LogRecoveryManager(Server server, Configuration conf, Stoppable stopper, - MasterServices master, ServerName serverName) throws IOException { + MasterServices master, ServerName serverName, boolean initCoordination) throws IOException { this.server = server; this.conf = conf; this.stopper = stopper; - this.choreService = new ChoreService(serverName.toString() + "_LogRecoveryManager_"); if (server.getCoordinatedStateManager() != null) { SplitLogManagerCoordination coordination = ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) .getSplitLogManagerCoordination(); Set failedDeletions = Collections.synchronizedSet(new HashSet()); + TaskFinisher taskFinisher = getTaskFinisher(); + coordination.addTaskFinisher(taskFinisher); SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions, serverName); - coordination.setDetails(details); - coordination.setTaskFinisher(getTaskFinisher()); - coordination.init(); + coordination.addDetails(taskFinisher.getTaskType(), details); + if (initCoordination) { // only SplitLogManager do this + coordination.init(); + } + } + if (initCoordination) { + this.choreService = new ChoreService(serverName.toString() + "_LogRecoveryManager_"); + this.unassignedTimeout = + conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); + this.timeoutMonitor = + new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper); + choreService.scheduleChore(timeoutMonitor); + LOG.info("TimeoutMonitor schedule echa " + unassignedTimeout); } - this.unassignedTimeout = - conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); - this.timeoutMonitor = - new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper); - choreService.scheduleChore(timeoutMonitor); } protected abstract TaskFinisher getTaskFinisher(); @@ -177,7 +184,7 @@ protected void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) .getSplitLogManagerCoordination().remainingTasksInCoordination(); if (remainingTasks >= 0 && actual > remainingTasks) { - LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are " + LOG.warn("Expected at least " + actual + " tasks remaining, but actually there are " + remainingTasks); } if (remainingTasks == 0 || actual == 0) { @@ -369,9 +376,11 @@ protected void chore() { SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet(); LOG.debug("resubmitting unassigned task(s) after timeout"); } - Set failedDeletions = - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().getDetails().getFailedDeletions(); + Set failedDeletions = new HashSet<>(); + for (SplitLogManagerDetails detail : ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().getDetails()) { + failedDeletions.addAll(detail.getFailedDeletions()); + } // Retry previously failed deletes if (failedDeletions.size() > 0) { List tmpPaths = new ArrayList(failedDeletions); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 9ea42ba..f02b961 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -346,7 +346,7 @@ private void updateLastFlushedSequenceIds(ServerName sn, ServerLoad hsl) { if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) { flushedSequenceIdByRegion.put(encodedRegionName, l); } else if (l != HConstants.NO_SEQNUM && l < existingValue) { - LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id (" + LOG.debug("RegionServer " + sn + " indicates a last flushed sequence id (" + l + ") that is less than the previous last flushed sequence id (" + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring."); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 864fdc5..9a0248e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.locks.ReentrantLock; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; +import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; @@ -49,6 +51,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WALSplitter; + import com.google.common.annotations.VisibleForTesting; /** @@ -102,7 +105,7 @@ public class SplitLogManager extends LogRecoveryManager { */ public SplitLogManager(Server server, Configuration conf, Stoppable stopper, MasterServices master, ServerName serverName) throws IOException { - super(server, conf, stopper, master, serverName); + super(server, conf, stopper, master, serverName, true); } @Override @@ -118,6 +121,11 @@ public Status finish(ServerName workerName, String logfile) { } return Status.DONE; } + + @Override + public SplitLogTask.Type getTaskType() { + return SplitLogTask.Type.HDFS; + } }; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index d43a0a3..3b35d84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -30,9 +30,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.AssignmentManager; @@ -49,6 +51,9 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.ServerCrashState; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -303,6 +308,16 @@ protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state) case SERVER_CRASH_FINISH: LOG.info("Finished processing of crashed " + serverName); services.getServerManager().getDeadServers().finish(serverName); + // should clean WALs dir if recovery model is kafka + if (this.recoverModel.equalsIgnoreCase(KafkaRecoveryManager.RECOVERY_MODE)) { + Path rootdir = FSUtils.getRootDir(env.getMasterConfiguration()); + Path logDir = new Path(rootdir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); + try { + WALSplitter.finishSplitLogFile(logDir.toString(), env.getMasterConfiguration()); + } catch (IOException e) { + LOG.warn("Could not finish splitting of log file " + logDir.toString(), e); + } + } return Flow.NO_MORE_STATE; default: @@ -438,7 +453,18 @@ private void splitLogs(final MasterProcedureEnv env) throws IOException { size(this.regionsOnCrashedServer)); } AssignmentManager am = env.getMasterServices().getAssignmentManager(); - if (this.recoverModel.equalsIgnoreCase(KafkaRecoveryManager.RECOVERY_MODE)) { + + boolean isMasterDown = false; + try { + HRegionInfo nsRegion = am.getRegionStates().getRegionsOfTable(TableName.valueOf("hbase", "namespace")).get(0); + ServerName nsServer = am.getRegionStates().getRegionServerOfRegion(nsRegion); + isMasterDown = nsServer.equals(this.serverName); + } catch (Throwable e) { + LOG.warn("could't find the Server that hbase:namespace belongs to.", e); + } + + // Retain the original recover logic if serverName is an master server + if (!isMasterDown && this.recoverModel.equalsIgnoreCase(KafkaRecoveryManager.RECOVERY_MODE)) { KafkaRecoveryManager replayMgr = env.getMasterServices().getKafkaRecoveryManager(); replayMgr.splitLogs(this.serverName, am.getRegionStates().getServerRegions(this.serverName), env.getMasterServices().getServerManager()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 2e1744b..b69b16b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1783,9 +1783,7 @@ private void startServiceThreads() throws IOException { conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1); this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory); - if (!(this instanceof HMaster)) { - splitLogWorker.start(); - } + splitLogWorker.start(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 1340319..a8239ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index 043bc00..41b0192 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.spy; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.SortedMap; @@ -178,7 +179,7 @@ public CoordinatedStateManager getCoordinatedStateManager() { SplitLogManagerCoordination c = Mockito.mock(SplitLogManagerCoordination.class); Mockito.when(m.getSplitLogManagerCoordination()).thenReturn(c); SplitLogManagerDetails d = Mockito.mock(SplitLogManagerDetails.class); - Mockito.when(c.getDetails()).thenReturn(d); + Mockito.when(c.getDetails()).thenReturn(Collections.singleton(d)); return m; } diff --git a/pom.xml b/pom.xml index 0f82c6e..9f7d3aa 100644 --- a/pom.xml +++ b/pom.xml @@ -491,7 +491,7 @@ org.apache.maven.plugins maven-compiler-plugin - 2.5.1 + 3.2 ${compileSource} ${compileSource} From f11f8b4bf6d78a315d162c24093815c1ae3afed7 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 31 Dec 2019 04:54:25 +0000 Subject: [PATCH 2/2] Bump libthrift from 0.9.3 to 0.12.0 Bumps [libthrift](https://github.com/apache/thrift) from 0.9.3 to 0.12.0. - [Release notes](https://github.com/apache/thrift/releases) - [Changelog](https://github.com/apache/thrift/blob/master/CHANGES.md) - [Commits](https://github.com/apache/thrift/compare/0.9.3...v0.12.0) Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 9f7d3aa..ff0cb18 100644 --- a/pom.xml +++ b/pom.xml @@ -1214,7 +1214,7 @@ 1.10.8 2.5.0 thrift - 0.9.3 + 0.12.0 3.4.6 1.7.7 4.0.3