Skip to content

Commit

Permalink
enh(query)[TD-33268]. add unit tests to increase test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
JinqingKuang committed Dec 22, 2024
1 parent dcdce61 commit e6d00b4
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 176 deletions.
1 change: 0 additions & 1 deletion source/dnode/vnode/src/tsdb/tsdbRead2.c
Original file line number Diff line number Diff line change
Expand Up @@ -5791,7 +5791,6 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {

// make sure only release once
void* p = pReader->pReadSnap;
TSDB_CHECK_NULL(p, code, lino, _end, TSDB_CODE_INVALID_PARA);
if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) {
tsdbUntakeReadSnap2(pReader, p, false);
pReader->pReadSnap = NULL;
Expand Down
12 changes: 12 additions & 0 deletions source/libs/executor/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,15 @@ TARGET_INCLUDE_DIRECTORIES(
PUBLIC "${TD_SOURCE_DIR}/include/common"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)

ADD_EXECUTABLE(execUtilTests execUtilTests.cpp)
TARGET_LINK_LIBRARIES(
execUtilTests
PRIVATE os util common executor gtest_main qcom function planner scalar nodes vnode
)

TARGET_INCLUDE_DIRECTORIES(
execUtilTests
PUBLIC "${TD_SOURCE_DIR}/include/common"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
34 changes: 34 additions & 0 deletions source/libs/executor/test/execUtilTests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include "gtest/gtest.h"

#include "executil.h"

TEST(execUtilTest, resRowTest) {
SDiskbasedBuf *pBuf = nullptr;
int32_t pageSize = 32;
int32_t numPages = 3;
int32_t code = createDiskbasedBuf(&pBuf, pageSize, pageSize * numPages, "test_buf", "/");
EXPECT_EQ(code, TSDB_CODE_SUCCESS);

std::vector<void *> pages(numPages);
std::vector<int32_t> pageIds(numPages);
for (int32_t i = 0; i < numPages; ++i) {
pages[i] = getNewBufPage(pBuf, &pageIds[i]);
EXPECT_NE(pages[i], nullptr);
EXPECT_EQ(pageIds[i], i);
}

EXPECT_EQ(getNewBufPage(pBuf, nullptr), nullptr);

SResultRowPosition pos = {.pageId = 0};
for (int32_t i = 0; i < numPages; ++i) {
pos.pageId = pageIds[i];
bool forUpdate = i & 0x1;
SResultRow *row = getResultRowByPos(pBuf, &pos, forUpdate);
EXPECT_EQ((void *)row, pages[i]);
}

pos.pageId = numPages + 1;
EXPECT_EQ(getResultRowByPos(pBuf, &pos, true), nullptr);

destroyDiskbasedBuf(pBuf);
}
246 changes: 71 additions & 175 deletions source/util/src/tdecompressavx.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ char tsSIMDEnable = 0;
#endif

int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type) {
#ifdef __AVX2__
#ifdef __AVX512F__
int32_t word_length = getWordLength(type);

// Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Expand Down Expand Up @@ -53,183 +53,79 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
int32_t gRemainder = (nelements - _pos);
int32_t num = (gRemainder > elems) ? elems : gRemainder;

int32_t batch = 0;
int32_t remain = 0;
if (tsSIMDEnable && tsAVX512Supported && tsAVX512Enable) {
#ifdef __AVX512F__
batch = num >> 3;
remain = num & 0x07;
#endif
} else if (tsSIMDEnable && tsAVX2Supported) {
#ifdef __AVX2__
batch = num >> 2;
remain = num & 0x03;
#endif
}
int32_t batch = num >> 3;
int32_t remain = num & 0x07;

if (selector == 0 || selector == 1) {
if (tsSIMDEnable && tsAVX512Supported && tsAVX512Enable) {
#ifdef __AVX512F__
for (int32_t i = 0; i < batch; ++i) {
__m512i prev = _mm512_set1_epi64(prevValue);
_mm512_storeu_si512((__m512i *)&p[_pos], prev);
_pos += 8; // handle 64bit x 8 = 512bit
}
for (int32_t i = 0; i < remain; ++i) {
p[_pos++] = prevValue;
}
#endif
} else if (tsSIMDEnable && tsAVX2Supported) {
for (int32_t i = 0; i < batch; ++i) {
__m256i prev = _mm256_set1_epi64x(prevValue);
_mm256_storeu_si256((__m256i *)&p[_pos], prev);
_pos += 4;
}

for (int32_t i = 0; i < remain; ++i) {
p[_pos++] = prevValue;
}

} else { // alternative implementation without SIMD instructions.
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = prevValue;
v += bit;
}
for (int32_t i = 0; i < batch; ++i) {
__m512i prev = _mm512_set1_epi64(prevValue);
_mm512_storeu_si512((__m512i *)&p[_pos], prev);
_pos += 8; // handle 64bit x 8 = 512bit
}
for (int32_t i = 0; i < remain; ++i) {
p[_pos++] = prevValue;
}
} else {
if (tsSIMDEnable && tsAVX512Supported && tsAVX512Enable) {
#ifdef __AVX512F__
__m512i sum_mask1 = _mm512_set_epi64(6, 6, 4, 4, 2, 2, 0, 0);
__m512i sum_mask2 = _mm512_set_epi64(5, 5, 5, 5, 1, 1, 1, 1);
__m512i sum_mask3 = _mm512_set_epi64(3, 3, 3, 3, 3, 3, 3, 3);
__m512i base = _mm512_set1_epi64(w);
__m512i maskVal = _mm512_set1_epi64(mask);
__m512i shiftBits = _mm512_set_epi64(bit * 7 + 4, bit * 6 + 4, bit * 5 + 4, bit * 4 + 4, bit * 3 + 4,
bit * 2 + 4, bit + 4, 4);
__m512i inc = _mm512_set1_epi64(bit << 3);

for (int32_t i = 0; i < batch; ++i) {
__m512i after = _mm512_srlv_epi64(base, shiftBits);
__m512i zigzagVal = _mm512_and_si512(after, maskVal);

// ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1)))
__m512i signmask = _mm512_and_si512(_mm512_set1_epi64(1), zigzagVal);
signmask = _mm512_sub_epi64(_mm512_setzero_si512(), signmask);
__m512i delta = _mm512_xor_si512(_mm512_srli_epi64(zigzagVal, 1), signmask);

// calculate the cumulative sum (prefix sum) for each number
// decode[0] = prevValue + final[0]
// decode[1] = decode[0] + final[1] -----> prevValue + final[0] + final[1]
// decode[2] = decode[1] + final[2] -----> prevValue + final[0] + final[1] + final[2]
// decode[3] = decode[2] + final[3] -----> prevValue + final[0] + final[1] + final[2] + final[3]

// 7 6 5 4 3 2 1
// 0 D7 D6 D5 D4 D3 D2 D1
// D0 D6 0 D4 0 D2 0 D0
// 0 D7+D6 D6 D5+D4 D4 D3+D2 D2
// D1+D0 D0 13 6 9 4 5 2
// 1 0
__m512i prev = _mm512_set1_epi64(prevValue);
__m512i cum_sum = _mm512_add_epi64(delta, _mm512_maskz_permutexvar_epi64(0xaa, sum_mask1, delta));
cum_sum = _mm512_add_epi64(cum_sum, _mm512_maskz_permutexvar_epi64(0xcc, sum_mask2, cum_sum));
cum_sum = _mm512_add_epi64(cum_sum, _mm512_maskz_permutexvar_epi64(0xf0, sum_mask3, cum_sum));

// 13 6 9 4 5 2 1
// 0 D7,D6 D6 D5,D4 D4 D3,D2 D2
// D1,D0 D0 +D5,D4 D5,D4, 0 0 D1,D0 D1,D0
// 0 0 D7~D4 D6~D4 D5~D4 D4 D3~D0 D2~D0
// D1~D0 D0 22 15 9 4 6 3
// 1 0
//
// D3~D0 D3~D0 D3~D0 D3~D0 0 0 0
// 0 28 21 15 10 6 3 1
// 0

cum_sum = _mm512_add_epi64(cum_sum, prev);
_mm512_storeu_si512((__m512i *)&p[_pos], cum_sum);

shiftBits = _mm512_add_epi64(shiftBits, inc);
prevValue = p[_pos + 7];
_pos += 8;
}
// handle the remain value
for (int32_t i = 0; i < remain; i++) {
zigzag_value = ((w >> (v + (batch * bit * 8))) & mask);
prevValue += ZIGZAG_DECODE(int64_t, zigzag_value);

p[_pos++] = prevValue;
v += bit;
}
#endif
} else if (tsSIMDEnable && tsAVX2Supported) {
__m256i base = _mm256_set1_epi64x(w);
__m256i maskVal = _mm256_set1_epi64x(mask);

__m256i shiftBits = _mm256_set_epi64x(bit * 3 + 4, bit * 2 + 4, bit + 4, 4);
__m256i inc = _mm256_set1_epi64x(bit << 2);

for (int32_t i = 0; i < batch; ++i) {
__m256i after = _mm256_srlv_epi64(base, shiftBits);
__m256i zigzagVal = _mm256_and_si256(after, maskVal);

// ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1)))
__m256i signmask = _mm256_and_si256(_mm256_set1_epi64x(1), zigzagVal);
signmask = _mm256_sub_epi64(_mm256_setzero_si256(), signmask);

// get four zigzag values here
__m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask);

// calculate the cumulative sum (prefix sum) for each number
// decode[0] = prevValue + final[0]
// decode[1] = decode[0] + final[1] -----> prevValue + final[0] + final[1]
// decode[2] = decode[1] + final[2] -----> prevValue + final[0] + final[1] + final[2]
// decode[3] = decode[2] + final[3] -----> prevValue + final[0] + final[1] + final[2] + final[3]

// 1, 2, 3, 4
//+ 0, 1, 0, 3
// 1, 3, 3, 7
// shift and add for the first round
__m128i prev = _mm_set1_epi64x(prevValue);
__m256i x = _mm256_slli_si256(delta, 8);

delta = _mm256_add_epi64(delta, x);
_mm256_storeu_si256((__m256i *)&p[_pos], delta);

// 1, 3, 3, 7
//+ 0, 0, 3, 3
// 1, 3, 6, 10
// shift and add operation for the second round
__m128i firstPart = _mm_loadu_si128((__m128i *)&p[_pos]);
__m128i secondItem = _mm_set1_epi64x(p[_pos + 1]);
__m128i secPart = _mm_add_epi64(_mm_loadu_si128((__m128i *)&p[_pos + 2]), secondItem);
firstPart = _mm_add_epi64(firstPart, prev);
secPart = _mm_add_epi64(secPart, prev);

// save it in the memory
_mm_storeu_si128((__m128i *)&p[_pos], firstPart);
_mm_storeu_si128((__m128i *)&p[_pos + 2], secPart);

shiftBits = _mm256_add_epi64(shiftBits, inc);
prevValue = p[_pos + 3];
_pos += 4;
}

// handle the remain value
for (int32_t i = 0; i < remain; i++) {
zigzag_value = ((w >> (v + (batch * bit * 4))) & mask);
prevValue += ZIGZAG_DECODE(int64_t, zigzag_value);

p[_pos++] = prevValue;
v += bit;
}
} else { // alternative implementation without SIMD instructions.
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prevValue += ZIGZAG_DECODE(int64_t, zigzag_value);

p[_pos++] = prevValue;
v += bit;
}
__m512i sum_mask1 = _mm512_set_epi64(6, 6, 4, 4, 2, 2, 0, 0);
__m512i sum_mask2 = _mm512_set_epi64(5, 5, 5, 5, 1, 1, 1, 1);
__m512i sum_mask3 = _mm512_set_epi64(3, 3, 3, 3, 3, 3, 3, 3);
__m512i base = _mm512_set1_epi64(w);
__m512i maskVal = _mm512_set1_epi64(mask);
__m512i shiftBits = _mm512_set_epi64(bit * 7 + 4, bit * 6 + 4, bit * 5 + 4, bit * 4 + 4, bit * 3 + 4,
bit * 2 + 4, bit + 4, 4);
__m512i inc = _mm512_set1_epi64(bit << 3);

for (int32_t i = 0; i < batch; ++i) {
__m512i after = _mm512_srlv_epi64(base, shiftBits);
__m512i zigzagVal = _mm512_and_si512(after, maskVal);

// ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1)))
__m512i signmask = _mm512_and_si512(_mm512_set1_epi64(1), zigzagVal);
signmask = _mm512_sub_epi64(_mm512_setzero_si512(), signmask);
__m512i delta = _mm512_xor_si512(_mm512_srli_epi64(zigzagVal, 1), signmask);

// calculate the cumulative sum (prefix sum) for each number
// decode[0] = prevValue + final[0]
// decode[1] = decode[0] + final[1] -----> prevValue + final[0] + final[1]
// decode[2] = decode[1] + final[2] -----> prevValue + final[0] + final[1] + final[2]
// decode[3] = decode[2] + final[3] -----> prevValue + final[0] + final[1] + final[2] + final[3]

// 7 6 5 4 3 2 1
// 0 D7 D6 D5 D4 D3 D2 D1
// D0 D6 0 D4 0 D2 0 D0
// 0 D7+D6 D6 D5+D4 D4 D3+D2 D2
// D1+D0 D0 13 6 9 4 5 2
// 1 0
__m512i prev = _mm512_set1_epi64(prevValue);
__m512i cum_sum = _mm512_add_epi64(delta, _mm512_maskz_permutexvar_epi64(0xaa, sum_mask1, delta));
cum_sum = _mm512_add_epi64(cum_sum, _mm512_maskz_permutexvar_epi64(0xcc, sum_mask2, cum_sum));
cum_sum = _mm512_add_epi64(cum_sum, _mm512_maskz_permutexvar_epi64(0xf0, sum_mask3, cum_sum));

// 13 6 9 4 5 2 1
// 0 D7,D6 D6 D5,D4 D4 D3,D2 D2
// D1,D0 D0 +D5,D4 D5,D4, 0 0 D1,D0 D1,D0
// 0 0 D7~D4 D6~D4 D5~D4 D4 D3~D0 D2~D0
// D1~D0 D0 22 15 9 4 6 3
// 1 0
//
// D3~D0 D3~D0 D3~D0 D3~D0 0 0 0
// 0 28 21 15 10 6 3 1
// 0

cum_sum = _mm512_add_epi64(cum_sum, prev);
_mm512_storeu_si512((__m512i *)&p[_pos], cum_sum);

shiftBits = _mm512_add_epi64(shiftBits, inc);
prevValue = p[_pos + 7];
_pos += 8;
}
// handle the remain value
for (int32_t i = 0; i < remain; i++) {
zigzag_value = ((w >> (v + (batch * bit * 8))) & mask);
prevValue += ZIGZAG_DECODE(int64_t, zigzag_value);

p[_pos++] = prevValue;
v += bit;
}
}
} break;
Expand Down Expand Up @@ -292,7 +188,7 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,

return nelements * word_length;
#else
uError("unable run %s without avx2 instructions", __func__);
uError("unable run %s without avx512 instructions", __func__);
return -1;
#endif
}
Expand Down
14 changes: 14 additions & 0 deletions source/util/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,27 @@ add_test(
COMMAND logTest
)

IF(COMPILER_SUPPORT_AVX2)
MESSAGE(STATUS "AVX2 instructions is ACTIVATED")
set_source_files_properties(decompressTest.cpp PROPERTIES COMPILE_FLAGS -mavx2)
ENDIF()
add_executable(decompressTest "decompressTest.cpp")
target_link_libraries(decompressTest os util common gtest_main)
add_test(
NAME decompressTest
COMMAND decompressTest
)


IF($TD_LINUX)
add_executable(utilTests "utilTests.cpp")
target_link_libraries(utilTests os util common gtest_main)
add_test(
NAME utilTests
COMMAND utilTests
)
ENDIF()

if(${TD_LINUX})
# terrorTest
add_executable(terrorTest "terrorTest.cpp")
Expand Down
Loading

0 comments on commit e6d00b4

Please sign in to comment.