Skip to content

Commit

Permalink
fix flush bug and fix empty file bug (apache#310)
Browse files Browse the repository at this point in the history
* fix flush bug and fix empty file bug

* add ut
  • Loading branch information
zwhzzz0821 authored Nov 25, 2024
1 parent aff4007 commit 3117ec0
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 18 deletions.
3 changes: 0 additions & 3 deletions cpp/src/common/tsfile_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,6 @@ int TSMIterator::init() {

// FIXME empty list
chunk_group_meta_iter_ = chunk_group_meta_list_.begin();
if (chunk_group_meta_iter_ == chunk_group_meta_list_.end()) {
return E_NOT_EXIST;
}
while (chunk_group_meta_iter_ != chunk_group_meta_list_.end()) {
chunk_meta_iter_ =
chunk_group_meta_iter_.get()->chunk_meta_list_.begin();
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/file/tsfile_io_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,8 @@ int TsFileIOWriter::write_file_index() {
ret = E_OK;
}
ASSERT(ret == E_OK);

if (IS_SUCC(ret)) { // iter finish
if (IS_SUCC(ret) && cur_index_node != nullptr &&
cur_index_node_queue != nullptr) { // iter finish
ASSERT(cur_index_node != nullptr);
ASSERT(cur_index_node_queue != nullptr);
if (RET_FAIL(add_cur_index_node_to_queue(cur_index_node,
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/reader/bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,6 @@ int BloomFilter::serialize_to(ByteStream &out) {
uint8_t *filter_data_bytes = nullptr;
int32_t filter_data_bytes_len = 0;
bitset_.to_bytes(filter_data_bytes, filter_data_bytes_len);
ASSERT(filter_data_bytes_len > 0);

if (RET_FAIL(
SerializationUtil::write_var_uint(filter_data_bytes_len, out))) {
} else if (RET_FAIL(
Expand All @@ -233,7 +231,9 @@ int BloomFilter::serialize_to(ByteStream &out) {
} else if (RET_FAIL(
SerializationUtil::write_var_uint(hash_func_count_, out))) {
}
bitset_.revert_bytes(filter_data_bytes);
if (filter_data_bytes_len > 0) {
bitset_.revert_bytes(filter_data_bytes);
}
return ret;
}

Expand Down
26 changes: 19 additions & 7 deletions cpp/src/writer/tsfile_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,12 @@ int TsFileWriter::flush() {
return ret;
}
}
if (check_chunk_group_empty(device_iter->second,
device_iter->second->is_aligned_)) {
continue;
}
bool is_aligned = device_iter->second->is_aligned_;

if (RET_FAIL(io_writer_->start_flush_chunk_group(device_iter->first,
is_aligned))) {
} else if (RET_FAIL(
Expand All @@ -759,17 +764,24 @@ int TsFileWriter::flush() {
return ret;
}

bool TsFileWriter::check_chunk_group_empty(
MeasurementSchemaGroup *chunk_group) {
bool TsFileWriter::check_chunk_group_empty(MeasurementSchemaGroup *chunk_group,
bool is_aligned) {
MeasurementSchemaMap &map = chunk_group->measurement_schema_map_;
for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end();
ms_iter++) {
MeasurementSchema *m_schema = ms_iter->second;
if (m_schema->chunk_writer_ != NULL &&
m_schema->chunk_writer_->hasData()) {
// first condition is to avoid first flush empty chunk group
// second condition is to avoid repeated flush
return false;
if (is_aligned) {
if (m_schema->value_chunk_writer_ != NULL &&
m_schema->value_chunk_writer_->hasData()) {
return false;
}
} else {
if (m_schema->chunk_writer_ != NULL &&
m_schema->chunk_writer_->hasData()) {
// first condition is to avoid first flush empty chunk group
// second condition is to avoid repeated flush
return false;
}
}
}
return true;
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/writer/tsfile_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class TsFileWriter {
int write_tablet(const Tablet &tablet);
int write_record_aligned(const TsRecord &record);
int write_tablet_aligned(const Tablet &tablet);
std::map<std::string, MeasurementSchemaGroup *> *get_schema_group_map() {
return &schemas_;
}
int64_t calculate_mem_size_for_all_group();
int check_memory_size_and_may_flush_chunks();
/*
Expand All @@ -86,7 +89,8 @@ class TsFileWriter {
private:
int write_point(storage::ChunkWriter *chunk_writer, int64_t timestamp,
const DataPoint &point);
bool check_chunk_group_empty(MeasurementSchemaGroup *chunk_group);
bool check_chunk_group_empty(MeasurementSchemaGroup *chunk_group,
bool is_aligned);
int write_point_aligned(ValueChunkWriter *value_chunk_writer,
int64_t timestamp, const DataPoint &point);
int flush_chunk_group(MeasurementSchemaGroup *chunk_group, bool is_aligned);
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/writer/value_chunk_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,10 @@ int64_t ValueChunkWriter::estimate_max_series_mem_size() {
value_page_writer_.get_statistic()->get_type());
}

bool ValueChunkWriter::hasData() {
return num_of_pages_ > 0 ||
(value_page_writer_.get_statistic() != nullptr &&
value_page_writer_.get_statistic()->count_ > 0);
}

} // end namespace storage
2 changes: 2 additions & 0 deletions cpp/src/writer/value_chunk_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class ValueChunkWriter {

int64_t estimate_max_series_mem_size();

bool hasData();

private:
FORCE_INLINE bool is_cur_page_full() const {
// FIXME
Expand Down
2 changes: 1 addition & 1 deletion cpp/test/common/tsfile_common_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ TEST_F(TSMIteratorTest, InitEmptyList) {
common::PageArena arena;
common::SimpleList<ChunkGroupMeta*> empty_list(&arena);
TSMIterator iter(empty_list);
ASSERT_EQ(iter.init(), common::E_NOT_EXIST);
ASSERT_EQ(iter.init(), common::E_OK);
}

TEST_F(TSMIteratorTest, HasNext) {
Expand Down
124 changes: 123 additions & 1 deletion cpp/test/writer/tsfile_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#include "file/write_file.h"
#include "reader/qds_without_timegenerator.h"
#include "reader/tsfile_reader.h"

#include "writer/chunk_writer.h"
using namespace storage;
using namespace common;

Expand Down Expand Up @@ -381,6 +381,128 @@ TEST_F(TsFileWriterTest, WriteMultipleTabletsDouble) {
ASSERT_EQ(tsfile_writer_->close(), E_OK);
}


TEST_F(TsFileWriterTest, FlushMultipleDevice) {
const int device_num = 50;
const int measurement_num = 50;
const int max_rows = 100;
std::vector<MeasurementSchema> schema_vec[50];

for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
for (int j = 0; j < measurement_num; j++) {
std::string measure_name = "measurement" + std::to_string(j);
schema_vec[i].push_back(
MeasurementSchema(measure_name, common::TSDataType::INT64,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED));
tsfile_writer_->register_timeseries(
device_name, measure_name, common::TSDataType::INT64,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED);
}
}

for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
Tablet tablet(device_name, &schema_vec[i], max_rows);
tablet.init();
for (int j = 0; j < measurement_num; j++) {
for (int row = 0; row < max_rows; row++) {
tablet.set_timestamp(row, 16225600 + row);
}
for (int row = 0; row < max_rows; row++) {
tablet.set_value(row, j, static_cast<int64_t>(row));
}
}
ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK);
// flush after write tablet to check whether write empty chunk
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
}
ASSERT_EQ(tsfile_writer_->close(), E_OK);

std::vector<storage::Path> select_list;
for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
for (int j = 0; j < measurement_num; j++) {
std::string measurement_name = "measurement" + std::to_string(j);
storage::Path path(device_name, measurement_name);
select_list.push_back(path);
}
}
storage::QueryExpression *query_expr =
storage::QueryExpression::create(select_list, nullptr);

storage::TsFileReader reader;
int ret = reader.open(file_name_);
ASSERT_EQ(ret, common::E_OK);
storage::QueryDataSet *tmp_qds = nullptr;

ret = reader.query(query_expr, tmp_qds);
auto *qds = (QDSWithoutTimeGenerator *)tmp_qds;

storage::RowRecord *record;
int64_t cur_record_num = 0;
do {
record = qds->get_next();
// if empty chunk is writen, the timestamp should be NULL
if (!record) {
break;
}
EXPECT_EQ(record->get_timestamp(), 16225600 + cur_record_num);
cur_record_num++;
} while (true);
EXPECT_EQ(cur_record_num, max_rows);
storage::QueryExpression::destory(query_expr);
reader.destroy_query_data_set(qds);
}

TEST_F(TsFileWriterTest, AnalyzeTsfileForload) {
const int device_num = 50;
const int measurement_num = 50;
const int max_rows = 100;
std::vector<MeasurementSchema> schema_vec[50];

for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
for (int j = 0; j < measurement_num; j++) {
std::string measure_name = "measurement" + std::to_string(j);
schema_vec[i].push_back(
MeasurementSchema(measure_name, common::TSDataType::INT64,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED));
tsfile_writer_->register_timeseries(
device_name, measure_name, common::TSDataType::INT64,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED);
}
}

for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
Tablet tablet(device_name, &schema_vec[i], max_rows);
tablet.init();
for (int j = 0; j < measurement_num; j++) {
for (int row = 0; row < max_rows; row++) {
tablet.set_timestamp(row, 16225600 + row);
}
for (int row = 0; row < max_rows; row++) {
tablet.set_value(row, j, static_cast<int64_t>(row));
}
}
ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK);
}
auto schemas = tsfile_writer_->get_schema_group_map();
ASSERT_EQ(schemas->size(), 50);
for (const auto& device_iter : *schemas) {
for (const auto& chunk_iter : device_iter.second->measurement_schema_map_) {
ASSERT_NE(chunk_iter.second->chunk_writer_, nullptr);
ASSERT_TRUE(chunk_iter.second->chunk_writer_->hasData());
}
}
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
ASSERT_EQ(tsfile_writer_->close(), E_OK);
}
TEST_F(TsFileWriterTest, FlushWithoutWriteAfterRegisterTS) {
std::string device_path = "device1";
std::string measurement_name = "temperature";
Expand Down

0 comments on commit 3117ec0

Please sign in to comment.