Skip to content

Commit

Permalink
HIVE-27848: Refactor Initiator hierarchy into CompactorUtil and fix f…
Browse files Browse the repository at this point in the history
…ailure in TestCrudCompactorOnTez (Taraka Rama Rao Lethavadla reviewed by Stamatis Zampetakis)

The work started initially to fix the TestCrudCompactorOnTez.secondCompactionShouldBeRefusedBeforeEnqueueing.

However, while changing the code to address the failure, the inheritance based design for the Initator that was chosen in HIVE-27598 revealed some weaknesses briefly outlined below.

Due to inheritance the InitiatorBase class becomes a Thread something that doesn't really make sense and it comes with additional overhead every time we instantiate it. Moreover, the only class that currently extends InitiatorBase is the Initiator and it's difficult to imagine how we can make other extensions from InitiatorBase; the code becomes complex and any subtle change in InitiatorBase may have unpredictable effects on Initiator. Having a "Base" class that is not really meant to be extended and no instructions on how to do so is problematic.

For the reasons above the focus of the work changed from just re-enabling the test to improving and addressing the shortcomings of the inheritance based design of Initiator.

Close apache#4859
  • Loading branch information
tarak271 authored and zabetak committed Mar 19, 2024
1 parent cf49f0e commit df45194
Show file tree
Hide file tree
Showing 6 changed files with 357 additions and 344 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import org.apache.orc.TypeDescription;
import org.apache.orc.impl.RecordReaderImpl;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
Expand Down Expand Up @@ -602,7 +601,6 @@ public void testCompactionShouldNotFailOnPartitionsWithBooleanField() throws Exc
}

@Test
@Ignore("HIVE-27848")
public void secondCompactionShouldBeRefusedBeforeEnqueueing() throws Exception {
conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);

Expand Down Expand Up @@ -648,6 +646,52 @@ public void secondCompactionShouldBeRefusedBeforeEnqueueing() throws Exception {
Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
}

@Test
public void secondCompactionShouldBeRefusedBeforeEnqueueingForPartition() throws Exception {
conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);

final String dbName = "default";
final String tableName = "compaction_test";
executeStatementOnDriver("drop table if exists " + tableName, driver);
executeStatementOnDriver("CREATE TABLE " + tableName + "(id string, value string) partitioned by(pt string) CLUSTERED BY(id) "
+ "INTO 10 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
executeStatementOnDriver("alter table " + tableName + " add partition(pt='test')",driver);
executeStatementOnDriver("INSERT INTO TABLE " + tableName + " partition(pt='test') values ('1','one'),('2','two'),('3','three'),"
+ "('4','four'),('5','five'),('6','six'),('7','seven'),('8','eight'),('9','nine'),('10','ten'),"
+ "('11','eleven'),('12','twelve'),('13','thirteen'),('14','fourteen'),('15','fifteen'),('16','sixteen'),"
+ "('17','seventeen'),('18','eighteen'),('19','nineteen'),('20','twenty')", driver);

executeStatementOnDriver("insert into " + tableName + " partition(pt='test') values ('21', 'value21'),('84', 'value84'),"
+ "('66', 'value66'),('54', 'value54')", driver);
executeStatementOnDriver(
"insert into " + tableName + " partition(pt='test') values ('22', 'value22'),('34', 'value34')," + "('35', 'value35')", driver);
executeStatementOnDriver("insert into " + tableName + " partition(pt='test') values ('75', 'value75'),('99', 'value99')", driver);

TxnStore txnHandler = TxnUtils.getTxnStore(conf);

//Do a compaction directly and wait for it to finish
CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR);
rqst.setPartitionname("pt=test");
CompactionResponse resp = txnHandler.compact(rqst);
runWorker(conf);

//Try to do a second compaction on the same table before the cleaner runs.
try {
driver.run("ALTER TABLE " + tableName + " partition(pt='test') COMPACT 'major'");
} catch (CommandProcessorException e) {
String errorMessage = ErrorMsg.COMPACTION_REFUSED.format(dbName, tableName, " partition(pt=test)",
"Compaction is already scheduled with state='ready for cleaning' and id=" + resp.getId());
Assert.assertEquals(errorMessage, e.getCauseMessage());
Assert.assertEquals(ErrorMsg.COMPACTION_REFUSED.getErrorCode(), e.getErrorCode());
}

//Check if the first compaction is in 'ready for cleaning'
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(1, compacts.size());
Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
}

@Test
public void testMinorCompactionShouldBeRefusedOnTablesWithOriginalFiles() throws Exception {
conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hive.ql.ddl.table.storage.compact;

import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;

Expand All @@ -27,6 +28,7 @@
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.ddl.DDLOperation;
Expand All @@ -35,17 +37,21 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.txn.compactor.InitiatorBase;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.Optional;

import static org.apache.hadoop.hive.ql.io.AcidUtils.compactionTypeStr2ThriftType;

/**
* Operation process of compacting a table.
*/
public class AlterTableCompactOperation extends DDLOperation<AlterTableCompactDesc> {

public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompactDesc desc) {
super(context, desc);
}
Expand All @@ -56,6 +62,11 @@ public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompact
throw new HiveException(ErrorMsg.NONACID_COMPACTION_NOT_SUPPORTED, table.getDbName(), table.getTableName());
}

Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap =
convertPartitionsFromThriftToDB(getPartitions(table, desc, context));

TxnStore txnHandler = TxnUtils.getTxnStore(context.getConf());

CompactionRequest compactionRequest = new CompactionRequest(table.getDbName(), table.getTableName(),
compactionTypeStr2ThriftType(desc.getCompactionType()));

Expand All @@ -69,40 +80,48 @@ public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompact
compactionRequest.setNumberOfBuckets(desc.getNumberOfBuckets());
}

InitiatorBase initiatorBase = new InitiatorBase();
initiatorBase.setConf(context.getConf());
initiatorBase.init(new AtomicBoolean());

Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap =
convertPartitionsFromThriftToDB(getPartitions(table, desc, context));

if(desc.getPartitionSpec() != null){
Optional<String> partitionName = partitionMap.keySet().stream().findFirst();
partitionName.ifPresent(compactionRequest::setPartitionname);
}
List<CompactionResponse> compactionResponses =
initiatorBase.initiateCompactionForTable(compactionRequest, table.getTTable(), partitionMap);
for (CompactionResponse compactionResponse : compactionResponses) {
if (!compactionResponse.isAccepted()) {
String message;
if (compactionResponse.isSetErrormessage()) {
message = compactionResponse.getErrormessage();
throw new HiveException(ErrorMsg.COMPACTION_REFUSED, table.getDbName(), table.getTableName(),
"CompactionId: " + compactionResponse.getId(), message);
}
context.getConsole().printInfo(
"Compaction already enqueued with id " + compactionResponse.getId() + "; State is "
+ compactionResponse.getState());
continue;
//Will directly initiate compaction if an un-partitioned table/a partition is specified in the request
if (desc.getPartitionSpec() != null || !table.isPartitioned()) {
if (desc.getPartitionSpec() != null) {
Optional<String> partitionName = partitionMap.keySet().stream().findFirst();
partitionName.ifPresent(compactionRequest::setPartitionname);
}
context.getConsole().printInfo("Compaction enqueued with id " + compactionResponse.getId());
if (desc.isBlocking() && compactionResponse.isAccepted()) {
waitForCompactionToFinish(compactionResponse, context);
CompactionResponse compactionResponse = txnHandler.compact(compactionRequest);
parseCompactionResponse(compactionResponse, table, compactionRequest.getPartitionname());
} else { // Check for eligible partitions and initiate compaction
for (Map.Entry<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMapEntry : partitionMap.entrySet()) {
compactionRequest.setPartitionname(partitionMapEntry.getKey());
CompactionResponse compactionResponse =
CompactorUtil.initiateCompactionForPartition(table.getTTable(), partitionMapEntry.getValue(),
compactionRequest, ServerUtils.hostname(), txnHandler, context.getConf());
parseCompactionResponse(compactionResponse, table, partitionMapEntry.getKey());
}
}
return 0;
}

private void parseCompactionResponse(CompactionResponse compactionResponse, Table table, String partitionName)
throws HiveException {
if (compactionResponse == null) {
context.getConsole().printInfo(
"Not enough deltas to initiate compaction for table=" + table.getTableName() + "partition=" + partitionName);
return;
}
if (!compactionResponse.isAccepted()) {
if (compactionResponse.isSetErrormessage()) {
throw new HiveException(ErrorMsg.COMPACTION_REFUSED, table.getDbName(), table.getTableName(),
partitionName == null ? "" : " partition(" + partitionName + ")", compactionResponse.getErrormessage());
}
context.getConsole().printInfo(
"Compaction already enqueued with id " + compactionResponse.getId() + "; State is " + compactionResponse.getState());
return;
}
context.getConsole().printInfo("Compaction enqueued with id " + compactionResponse.getId());
if (desc.isBlocking() && compactionResponse.isAccepted()) {
waitForCompactionToFinish(compactionResponse, context);
}
}

private List<Partition> getPartitions(Table table, AlterTableCompactDesc desc, DDLOperationContext context)
throws HiveException {
List<Partition> partitions = new ArrayList<>();
Expand All @@ -117,7 +136,7 @@ private List<Partition> getPartitions(Table table, AlterTableCompactDesc desc, D
partitions = context.getDb().getPartitions(table, partitionSpec);
if (partitions.size() > 1) {
throw new HiveException(ErrorMsg.TOO_MANY_COMPACTION_PARTITIONS);
} else if (partitions.size() == 0) {
} else if (partitions.isEmpty()) {
throw new HiveException(ErrorMsg.INVALID_PARTITION_SPEC);
}
}
Expand Down
7 changes: 7 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,13 @@ public static AcidDirectory getAcidState(FileSystem fileSystem, Path candidateDi
return directory;
}

public static AcidDirectory getAcidState(StorageDescriptor sd, ValidWriteIdList writeIds, HiveConf conf)
throws IOException {
Path location = new Path(sd.getLocation());
FileSystem fs = location.getFileSystem(conf);
return getAcidState(fs, location, conf, writeIds, Ref.from(false), false);
}

private static void findBestWorkingDeltas(ValidWriteIdList writeIdList, AcidDirectory directory) {
Collections.sort(directory.getCurrentDirectories());
//so now, 'current directories' should be sorted like delta_5_20 delta_5_10 delta_11_20 delta_51_60 for example
Expand Down
Loading

0 comments on commit df45194

Please sign in to comment.