Skip to content

Commit

Permalink
Clone value array when insert non aligned tablet with null (#14643)
Browse files Browse the repository at this point in the history
* Clone value array when insert non aligned tablet with null

* Fix UT

* Add recover wal IT
  • Loading branch information
HTHou authored Jan 10, 2025
1 parent 937df76 commit 5bd1ead
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,11 @@ public void shutdownAllDataNodes() {
dataNodeWrapperList.forEach(AbstractNodeWrapper::stop);
}

@Override
public void shutdownForciblyAllDataNodes() {
dataNodeWrapperList.forEach(AbstractNodeWrapper::stopForcibly);
}

@Override
public void ensureNodeStatus(
final List<BaseNodeWrapper> nodes, final List<NodeStatus> targetStatus)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,11 @@ public void shutdownAllDataNodes() {
throw new UnsupportedOperationException();
}

@Override
public void shutdownForciblyAllDataNodes() {
throw new UnsupportedOperationException();
}

@Override
public int getMqttPort() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ void ensureNodeStatus(List<BaseNodeWrapper> nodes, List<NodeStatus> targetStatus
/** Shutdown all existed DataNodes. */
void shutdownAllDataNodes();

/** Shutdown forcibly all existed DataNodes. */
void shutdownForciblyAllDataNodes();

int getMqttPort();

String getIP();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1318,4 +1318,27 @@ public static void restartDataNodes() {
}
}
}

public static void stopForciblyAndRestartDataNodes() {
EnvFactory.getEnv().shutdownForciblyAllDataNodes();
EnvFactory.getEnv().startAllDataNodes();
long waitStartMS = System.currentTimeMillis();
long maxWaitMS = 60_000L;
long retryIntervalMS = 1000;
while (true) {
try (Connection connection = EnvFactory.getEnv().getConnection()) {
break;
} catch (Exception e) {
try {
Thread.sleep(retryIntervalMS);
} catch (InterruptedException ex) {
break;
}
}
long waited = System.currentTimeMillis() - waitStartMS;
if (waited > maxWaitMS) {
fail("Timeout while waiting for datanodes restart");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.db.protocol.thrift.OperationType;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.SessionDataSet;
Expand Down Expand Up @@ -64,6 +65,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -479,7 +481,7 @@ public void insertTabletWithAlignedTimeseriesTest() {

@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void insertTabletWithNullValuesTest() {
public void insertTabletWithNullValuesTest() throws InterruptedException {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
List<IMeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s0", TSDataType.DOUBLE, TSEncoding.RLE));
Expand Down Expand Up @@ -527,6 +529,38 @@ public void insertTabletWithNullValuesTest() {
assertEquals(9L, field.getLongV());
}
}
dataSet = session.executeQueryStatement("select s3 from root.sg1.d1");
int result = 0;
assertTrue(dataSet.hasNext());
while (dataSet.hasNext()) {
RowRecord rowRecord = dataSet.next();
Field field = rowRecord.getFields().get(0);
// skip null value
if (result == 3) {
result++;
}
assertEquals(result++, field.getIntV());
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
TimeUnit.MILLISECONDS.sleep(2000);

TestUtils.stopForciblyAndRestartDataNodes();

try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
SessionDataSet dataSet = session.executeQueryStatement("select s3 from root.sg1.d1");
int result = 0;
while (dataSet.hasNext()) {
RowRecord rowRecord = dataSet.next();
Field field = rowRecord.getFields().get(0);
// skip null value
if (result == 3) {
result++;
}
assertEquals(result++, field.getIntV());
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,15 @@ public void putBinaries(long[] time, Binary[] value, BitMap bitMap, int start, i
// constraint: time.length + timeIdxOffset == value.length
int timeIdxOffset = 0;
if (bitMap != null && !bitMap.isAllUnmarked()) {
// time array is a reference, should clone necessary time values
// time array is a reference, should clone necessary time array
long[] clonedTime = new long[end - start];
System.arraycopy(time, start, clonedTime, 0, end - start);
time = clonedTime;
timeIdxOffset = start;
// value array is a reference, should clone necessary value array
Binary[] clonedValue = new Binary[value.length];
System.arraycopy(value, 0, clonedValue, 0, value.length);
value = clonedValue;
// drop null at the end of value array
int nullCnt =
dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,15 @@ public void putBooleans(long[] time, boolean[] value, BitMap bitMap, int start,
// constraint: time.length + timeIdxOffset == value.length
int timeIdxOffset = 0;
if (bitMap != null && !bitMap.isAllUnmarked()) {
// time array is a reference, should clone necessary time values
// time array is a reference, should clone necessary time array
long[] clonedTime = new long[end - start];
System.arraycopy(time, start, clonedTime, 0, end - start);
time = clonedTime;
timeIdxOffset = start;
// value array is a reference, should clone necessary value array
boolean[] clonedValue = new boolean[value.length];
System.arraycopy(value, 0, clonedValue, 0, value.length);
value = clonedValue;
// drop null at the end of value array
int nullCnt =
dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,15 @@ public void putDoubles(long[] time, double[] value, BitMap bitMap, int start, in
// constraint: time.length + timeIdxOffset == value.length
int timeIdxOffset = 0;
if (bitMap != null && !bitMap.isAllUnmarked()) {
// time array is a reference, should clone necessary time values
// time array is a reference, should clone necessary time array
long[] clonedTime = new long[end - start];
System.arraycopy(time, start, clonedTime, 0, end - start);
time = clonedTime;
timeIdxOffset = start;
// value array is a reference, should clone necessary value array
double[] clonedValue = new double[value.length];
System.arraycopy(value, 0, clonedValue, 0, value.length);
value = clonedValue;
// drop null at the end of value array
int nullCnt =
dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,15 @@ public void putFloats(long[] time, float[] value, BitMap bitMap, int start, int
// constraint: time.length + timeIdxOffset == value.length
int timeIdxOffset = 0;
if (bitMap != null && !bitMap.isAllUnmarked()) {
// time array is a reference, should clone necessary time values
// time array is a reference, should clone necessary time array
long[] clonedTime = new long[end - start];
System.arraycopy(time, start, clonedTime, 0, end - start);
time = clonedTime;
timeIdxOffset = start;
// value array is a reference, should clone necessary value array
float[] clonedValue = new float[value.length];
System.arraycopy(value, 0, clonedValue, 0, value.length);
value = clonedValue;
// drop null at the end of value array
int nullCnt =
dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,15 @@ public void putInts(long[] time, int[] value, BitMap bitMap, int start, int end)
// constraint: time.length + timeIdxOffset == value.length
int timeIdxOffset = 0;
if (bitMap != null && !bitMap.isAllUnmarked()) {
// time array is a reference, should clone necessary time values
// time array is a reference, should clone necessary time array
long[] clonedTime = new long[end - start];
System.arraycopy(time, start, clonedTime, 0, end - start);
time = clonedTime;
timeIdxOffset = start;
// value array is a reference, should clone necessary value array
int[] clonedValue = new int[value.length];
System.arraycopy(value, 0, clonedValue, 0, value.length);
value = clonedValue;
// drop null at the end of value array
int nullCnt =
dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,15 @@ public void putLongs(long[] time, long[] value, BitMap bitMap, int start, int en
// constraint: time.length + timeIdxOffset == value.length
int timeIdxOffset = 0;
if (bitMap != null && !bitMap.isAllUnmarked()) {
// time array is a reference, should clone necessary time values
// time array is a reference, should clone necessary time array
long[] clonedTime = new long[end - start];
System.arraycopy(time, start, clonedTime, 0, end - start);
time = clonedTime;
timeIdxOffset = start;
// value array is a reference, should clone necessary value array
long[] clonedValue = new long[value.length];
System.arraycopy(value, 0, clonedValue, 0, value.length);
value = clonedValue;
// drop null at the end of value array
int nullCnt =
dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -385,8 +386,19 @@ public void testIoTDBTabletWriteAndSyncClose()

for (int r = 0; r < 100; r++) {
times[r] = r;
((int[]) columns[0])[r] = 1;
((long[]) columns[1])[r] = 1;
((int[]) columns[0])[r] = r;
((long[]) columns[1])[r] = r;
}

BitMap[] bitMaps = new BitMap[2];
bitMaps[0] = new BitMap(100);
bitMaps[1] = new BitMap(100);
for (int r = 0; r < 100; r++) {
if (r % 2 == 0) {
bitMaps[0].mark(r);
} else {
bitMaps[1].mark(r);
}
}

InsertTabletNode insertTabletNode1 =
Expand All @@ -398,11 +410,15 @@ public void testIoTDBTabletWriteAndSyncClose()
dataTypes,
measurementSchemas,
times,
null,
bitMaps,
columns,
times.length);

int hashCode1 = Arrays.hashCode((int[]) columns[0]);
int hashCode2 = Arrays.hashCode((long[]) columns[1]);
dataRegion.insertTablet(insertTabletNode1);
// the hashCode should not be changed when insert
Assert.assertEquals(hashCode1, Arrays.hashCode((int[]) columns[0]));
Assert.assertEquals(hashCode2, Arrays.hashCode((long[]) columns[1]));
dataRegion.syncCloseAllWorkingTsFileProcessors();

for (int r = 50; r < 149; r++) {
Expand Down

0 comments on commit 5bd1ead

Please sign in to comment.