diff --git a/CMakeLists.txt b/CMakeLists.txt index 1640cfa..9341468 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,7 +3,7 @@ project(dramhit VERSION 0.1.0) # Declare options option(SANITIZE "Enable ASAN." OFF) -option(SANITIZE_TESTING "Enable ASAN for tests." ON) +option(SANITIZE_TESTING "Enable ASAN for tests." OFF) option(BUILD_APP "Build the main application, dramhit." ON) option(BUILD_TESTING "Build tests." OFF) option(BUILD_EXAMPLE "Build examples." OFF) @@ -13,10 +13,10 @@ option(VTUNE "Use VTUNE to do performance monitering." OFF) option(AGGR "Use aggregation hashtable (histogram)" ON) option(BQUEUE "Enable bqueue tests" OFF) option(XORWOW "Xorwow" OFF) -option(BQ_ZIPFIAN "Enable global zipfian distribution generation" ON) +option(BQ_ZIPFIAN "Enable global zipfian distribution generation" OFF) option(BQ_ZIPFIAN_LOCAL "Enable local zipfian distribution generation" OFF) option(CALC_STATS "Enable hashtable statistics" ON) -option(ZIPF_FAST "Enable faster zipfian distribution generation" ON) +option(ZIPF_FAST "Enable faster zipfian distribution generation" OFF) option(LATENCY_COLLECTION "Enable latency data collection" OFF) option(BQ_KMER_TEST "Bqueue kmer test" OFF) diff --git a/include/Application.hpp b/include/Application.hpp index d386843..96d50bc 100644 --- a/include/Application.hpp +++ b/include/Application.hpp @@ -19,7 +19,7 @@ class Application { std::vector threads; Shard *shards; MsrHandler *msr_ctrl; - RadixContext radixContext; + RadixContext *radixContext; public: std::vector nodes; diff --git a/include/input_reader/fastq.hpp b/include/input_reader/fastq.hpp index aa9e523..310f830 100644 --- a/include/input_reader/fastq.hpp +++ b/include/input_reader/fastq.hpp @@ -111,6 +111,7 @@ class FastqReader : public FileReader { } }; + /// Reads KMers from a Fastq file. template class FastqKMerReader : public InputReaderU64 { diff --git a/include/tests/KmerTest.hpp b/include/tests/KmerTest.hpp index 158d50f..25ee806 100644 --- a/include/tests/KmerTest.hpp +++ b/include/tests/KmerTest.hpp @@ -16,16 +16,21 @@ class KmerTest { void count_kmer(Shard *sh, const Configuration &config, BaseHashTable *ht, std::barrier *barrier); - void count_kmer_radix(Shard *sh, const Configuration &config, - std::barrier *barrier, RadixContext &context); +// void count_kmer_radix(Shard *sh, const Configuration &config, +// std::barrier *barrier, RadixContext &context); - void count_kmer_radix_custom(Shard *sh, const Configuration &config, - std::barrier *barrier, - RadixContext &context); +// void count_kmer_radix_custom(Shard *sh, const Configuration &config, +// std::barrier *barrier, +// RadixContext &context); - void count_kmer_radix_jerry(Shard *sh, const Configuration &config, - std::barrier *barrier, - RadixContext &context, BaseHashTable *ht); +// void count_kmer_radix_partition(Shard *sh, const Configuration &config, +// std::barrier *barrier, +// RadixContext &context, BaseHashTable *ht); + + + void count_kmer_radix_partition_global(Shard* sh, const Configuration& config, + std::barrier* barrier, + RadixContext* context, BaseHashTable* ht); }; } // namespace kmercounter diff --git a/include/types.hpp b/include/types.hpp index 4d33634..ff64efb 100644 --- a/include/types.hpp +++ b/include/types.hpp @@ -102,158 +102,352 @@ struct alignas(64) cacheline { typedef uint64_t Kmer; -typedef struct KmerChunk { - size_t count; - Kmer* kmers; -} KmerChunk_t; +/** + * Makes a non-temporal write of 64 bytes from src to dst. + * Uses vectorized non-temporal stores if available, falls + * back to assignment copy. + * + * @param dst + * @param src + * + * @return + */ +static inline void store_nontemp_64B(void* dst, void* src) { +#ifdef __AVX__ + register __m256i* d1 = (__m256i*)dst; + register __m256i s1 = *((__m256i*)src); + register __m256i* d2 = d1 + 1; + register __m256i s2 = *(((__m256i*)src) + 1); + + _mm256_stream_si256(d1, s1); + _mm256_stream_si256(d2, s2); + +#elif defined(__SSE2__) + + register __m128i* d1 = (__m128i*)dst; + register __m128i* d2 = d1 + 1; + register __m128i* d3 = d1 + 2; + register __m128i* d4 = d1 + 3; + register __m128i s1 = *(__m128i*)src; + register __m128i s2 = *((__m128i*)src + 1); + register __m128i s3 = *((__m128i*)src + 2); + register __m128i s4 = *((__m128i*)src + 3); + + _mm_stream_si128(d1, s1); + _mm_stream_si128(d2, s2); + _mm_stream_si128(d3, s3); + _mm_stream_si128(d4, s4); -const uint64_t KMERSPERCACHELINE1 = (CACHELINE_SIZE / sizeof(Kmer)); -const uint64_t MAX_CHUCKS = 5; +#else + /* just copy with assignment */ + *(cacheline_t*)dst = *(cacheline_t*)src; + +#endif +} -class PartitionChunks { +#define KMERSPERCACHELINE (CACHE_LINE_SIZE / sizeof(Kmer)) +#define CACHELINEPERPAGE (PAGE_SIZE / CACHE_LINE_SIZE) + +// typedef union { +// struct { +// Kmer kmers[KMERSPERCACHELINE]; +// } kmers; +// struct { +// Kmer kmers[KMERSPERCACHELINE - 1]; +// uint32_t slot; +// } data; +// } cacheline_t; + +typedef struct { + Kmer kmers[KMERSPERCACHELINE]; +} cacheline_t; + +typedef struct cacheblock { + cacheline_t* lines; + uint64_t count; + uint64_t max_count; + struct cacheblock* next; +} cacheblock_t; + +class BufferedPartition { public: - size_t chunk_size; // # bytes for a KmerChunk_t - size_t chunk_count; // # kmer in a KmerChunk_t - KmerChunk_t chunks[MAX_CHUCKS]; - uint64_t chunks_len = 0; // # length of chunk array - - PartitionChunks() = default; - - PartitionChunks(size_t size_hint) { - // chuck size must be a multiple of CACHELINE_SIZE - chunk_size = - (size_hint + CACHELINE_SIZE - 1) / CACHELINE_SIZE * CACHELINE_SIZE * 3; - chunk_count = chunk_size / sizeof(Kmer) - - 1; // subtract 1 because we keep track of count - alloc_kmerchunk(); + uint64_t hint_max_line; + uint64_t hint_max_kmer; + + uint64_t total_kmer_count; // how many kmer is inserted + uint64_t total_line_count; // how many line is occupied + + uint8_t buffer_size; + uint8_t buffer_count; + cacheline_t buffer; + + uint8_t blocks_count; + cacheblock_t* headblock_ptr; + cacheblock_t* tailblock_ptr; + bool non_temp; + + BufferedPartition(size_t size_hint) { + buffer_size = KMERSPERCACHELINE; + non_temp = false; + hint_max_line = (size_hint + CACHE_LINE_SIZE - 1) / CACHE_LINE_SIZE; + hint_max_kmer = (size_hint + sizeof(Kmer) - 1) / sizeof(Kmer); + total_kmer_count = 0; + total_line_count = 0; + buffer_count = 0; + headblock_ptr = (cacheblock_t*)malloc(sizeof(cacheblock_t)); + headblock_ptr->lines = + (cacheline_t*)malloc(sizeof(cacheline_t) * hint_max_line); + headblock_ptr->count = 0; + headblock_ptr->max_count = hint_max_line; + headblock_ptr->next = NULL; + tailblock_ptr = headblock_ptr; + blocks_count = 1; } - void write_one(Kmer k) { - KmerChunk last = chunks[chunks_len - 1]; - if (last.count == chunk_count) { - if (!alloc_kmerchunk()) { - PLOG_FATAL << "Allocation of KmerChunk failed \n"; - } + ~BufferedPartition() { + cacheblock_t* tmp; + int free_num = 0; + while (headblock_ptr != NULL) { + tmp = headblock_ptr; + headblock_ptr = headblock_ptr->next; + free(tmp->lines); + free(tmp); + free_num++; + } + + if (free_num != blocks_count) { + PLOG_FATAL.printf("free num %d, != blocks count %d\n", free_num, blocks_count); } - last.kmers[last.count] = k; - last.count++; } - Kmer* get_next() { - KmerChunk last = chunks[chunks_len - 1]; - if (last.count >= chunk_count) { - if (!alloc_kmerchunk()) { - PLOG_FATAL << "Allocation of KmerChunk failed \n"; - }else - { - return chunks[chunks_len - 1].kmers; - } + void alloc_block(uint64_t max_line) { + if (headblock_ptr == NULL) return; + + cacheblock_t* curr = headblock_ptr; + int i = 0; + while (i < (blocks_count - 1)) { + curr = curr->next; + i++; } - return &last.kmers[last.count]; + cacheblock_t* new_block = (cacheblock_t*)malloc(sizeof(cacheblock_t)); + new_block->lines = (cacheline_t*)malloc(sizeof(cacheline_t) * max_line); + new_block->count = 0; + new_block->max_count = max_line; + new_block->next = NULL; + curr->next = new_block; + tailblock_ptr = new_block; + blocks_count++; } - bool alloc_kmerchunk() { - if (chunks_len >= MAX_CHUCKS) return false; + cacheblock_t* get_block(uint64_t block_idx) { + cacheblock_t* curr = headblock_ptr; + while (block_idx > 0) { + curr = curr->next; + block_idx--; + } + return curr; + } - KmerChunk_t kc; - kc.count = 0; - kc.kmers = (Kmer*)alloc(); + Kmer get_kmer(uint64_t block_idx, uint64_t line_idx, uint64_t index) { + if (index >= KMERSPERCACHELINE || line_idx >= hint_max_line || + block_idx >= blocks_count) + return 0; - chunks[chunks_len] = std::move(kc); - chunks_len++; + cacheblock_t* curr = headblock_ptr; + while (block_idx > 0) { + curr = curr->next; + block_idx--; + } - return true; + if (line_idx >= curr->count) return 0; + return curr->lines[line_idx].kmers[index]; } - Kmer* alloc() { -#define MAP_HUGE_2MB (21 << MAP_HUGE_SHIFT) - uint huge = chunk_size < (1 << 20) ? 0 : MAP_HUGE_2MB; - auto addr = (Kmer*)aligned_alloc(CACHELINE_SIZE, chunk_size); - if (addr == MAP_FAILED) { - perror("mmap"); - exit(1); + void write_kmer(Kmer kmer) { + if (buffer_count >= buffer_size) { + write_line(buffer_size); + buffer_count = 0; } - return addr; + buffer.kmers[buffer_count] = kmer; + buffer_count++; } - void advance() { chunks[chunks_len - 1].count += KMERSPERCACHELINE1; } + void flush_buffer() { + if (buffer_count > 0) write_line(buffer_count); + } + + void write_line(uint64_t num_kmer) { + if (tailblock_ptr->count >= tailblock_ptr->max_count) { + alloc_block(PAGE_SIZE / CACHE_LINE_SIZE); + } + + tailblock_ptr->lines[tailblock_ptr->count] = buffer; + tailblock_ptr->count++; + total_kmer_count += num_kmer; + total_line_count++; + } }; +// typedef struct KmerChunk { +// size_t count; +// Kmer* kmers; +// } KmerChunk_t; + +// const uint64_t MAX_CHUCKS = 2; +// #define DEFAULT_CACHELINE_IN_CHUNK 3 + +// class PartitionChunks { +// public: +// size_t chunk_size; // max bytes in a KmerChunk_t.kmer, multiple of +// cacheline size size_t chunk_count; // max kmers in a KmerChunk_t.kmer +// KmerChunk_t chunks[MAX_CHUCKS]; +// uint64_t chunks_len = 0; // number of chunks in chunk array. + +// PartitionChunks() = default; + +// PartitionChunks(size_t size_hint) { +// // chuck size must be a multiple of CACHELINE_SIZE +// chunk_size = (size_hint + CACHELINE_SIZE - 1) / CACHELINE_SIZE * +// CACHELINE_SIZE; chunk_count = chunk_size / sizeof(Kmer); // subtract 1 +// because we keep track of count alloc_kmerchunk(); +// } + +// void write_one(Kmer k) { +// KmerChunk last = chunks[chunks_len - 1]; +// if (last.count == chunk_count) { +// if (!alloc_kmerchunk()) { +// PLOG_FATAL << "Allocation of KmerChunk failed \n"; +// } +// } +// last.kmers[last.count] = k; +// last.count++; +// } + +// Kmer* get_next() { +// KmerChunk last = chunks[chunks_len - 1]; +// if (last.count >= chunk_count) { +// if (!alloc_kmerchunk()) { +// PLOG_FATAL << "Allocation of KmerChunk failed \n"; +// }else +// { +// return chunks[chunks_len - 1].kmers; +// } +// } +// return &last.kmers[last.count]; +// } + +// /** +// * allocate a new kmerChunk and put into chunk array. +// */ +// bool alloc_kmerchunk() { +// if (chunks_len >= MAX_CHUCKS) return false; + +// KmerChunk_t kc; +// kc.count = 0; +// kc.kmers = (Kmer*)alloc_kmer(); + +// chunks[chunks_len] = std::move(kc); +// chunks_len++; + +// return true; +// } + +// Kmer* alloc_kmer() { + +// // #define MAP_HUGE_2MB (21 << MAP_HUGE_SHIFT) +// // uint huge = chunk_size < (1 << 20) ? 0 : MAP_HUGE_2MB; + +// auto addr = (Kmer*)aligned_alloc(CACHELINE_SIZE, chunk_size); +// if (addr == MAP_FAILED) { +// PLOG_FATAL << "mmap failed"; +// exit(1); +// } +// return addr; +// } + +// void advance() { chunks[chunks_len - 1].count += KMERSPERCACHELINE1; } +// }; + + + class RadixContext { public: - uint64_t** hists; - uint64_t** parts; - std::vector partitions; - // Radix shift + // uint64_t** hists; + // uint64_t** parts; + // std::vector partitions; + // uint32_t hashmaps_per_thread; + // uint32_t nthreads_d; + // std::vector>> hashmaps; + + BufferedPartition*** partitions; // partitions[thread_id][radix_bin_num] uint8_t R; - // Radix bits uint8_t D; uint32_t fanOut; uint64_t mask; - // How many hash map does a thread have - uint32_t hashmaps_per_thread; - // floor(log(num_threads)) - uint32_t nthreads_d; uint64_t global_time; - std::vector>> hashmaps; + uint64_t threads_num; + uint64_t size_hint; + - RadixContext(uint8_t d, uint8_t r, uint32_t num_threads) + ~RadixContext() { + free(partitions); + } + + RadixContext(uint8_t d, uint8_t r, uint32_t num_threads, uint64_t filesize) : R(r), D(d), fanOut(1 << d), mask(((1 << d) - 1) << r), global_time(_rdtsc()) { - hists = (uint64_t**)std::aligned_alloc(CACHELINE_SIZE, - fanOut * sizeof(uint64_t*)); - parts = (uint64_t**)std::aligned_alloc(CACHELINE_SIZE, - fanOut * sizeof(uint64_t*)); - - partitions = std::vector(num_threads, nullptr); - // for (auto& p : partitions) { - // p.reserve(fanOut); + threads_num = num_threads; + partitions = (BufferedPartition***) malloc(sizeof(void*) * num_threads); + size_hint = filesize / num_threads; + // hists = (uint64_t**)std::aligned_alloc(CACHELINE_SIZE, + // fanOut * sizeof(uint64_t*)); + // parts = (uint64_t**)std::aligned_alloc(CACHELINE_SIZE, + // fanOut * sizeof(uint64_t*)); + + // partitions = std::vector(num_threads, nullptr); + + // nthreads_d = 0; + // while ((1 << (1 + nthreads_d)) <= num_threads) { + // nthreads_d++; + // } + // auto gather_threads = 1 << nthreads_d; + // if (fanOut <= num_threads) { + // hashmaps_per_thread = 1; + // } else { + // hashmaps_per_thread = 1 << (d - nthreads_d); + // } + // PLOGI.printf("hashmaps_per_thread: %d", hashmaps_per_thread); + // std::vector>> maps( + // gather_threads); + // for (int i = 0; i < gather_threads; i++) { + // maps[i].reserve(hashmaps_per_thread); + // } + // hashmaps = maps; // } - nthreads_d = 0; - while ((1 << (1 + nthreads_d)) <= num_threads) { - nthreads_d++; - } - auto gather_threads = 1 << nthreads_d; - if (fanOut <= num_threads) { - hashmaps_per_thread = 1; - } else { - hashmaps_per_thread = 1 << (d - nthreads_d); - } - PLOGI.printf("hashmaps_per_thread: %d", hashmaps_per_thread); - std::vector>> maps( - gather_threads); - for (int i = 0; i < gather_threads; i++) { - maps[i].reserve(hashmaps_per_thread); - } - hashmaps = maps; - // for (uint32_t i = 0; i < num_threads; i++) { - // hists[i] = (uint32_t*) std::aligned_alloc(CACHE_LINE_SIZE, fanOut * - // sizeof(uint32_t)); partitions[i] = - // (uint64_t*)std::aligned_alloc(CACHE_LINE_SIZE, fanOut * - // sizeof(uint64_t*)); - // } + // absl::flat_hash_map aggregate() const { + // absl::flat_hash_map aggregation; + // for (int i = 0; i < (1 << nthreads_d); i++) { + // for (int j = 0; j < hashmaps_per_thread; j++) { + // auto map = hashmaps[i][j]; + // for (const auto& entry : map) { + // auto key = entry.first; + // auto val = entry.second; + // assert(!aggregation.contains(key)); + // aggregation[key] = val; + // } + // } + // } + // return aggregation; } - absl::flat_hash_map aggregate() const { - absl::flat_hash_map aggregation; - for (int i = 0; i < (1 << nthreads_d); i++) { - for (int j = 0; j < hashmaps_per_thread; j++) { - auto map = hashmaps[i][j]; - for (const auto& entry : map) { - auto key = entry.first; - auto val = entry.second; - assert(!aggregation.contains(key)); - aggregation[key] = val; - } - } - } - return aggregation; - } + + + RadixContext() = default; }; diff --git a/jerry-benchmark.sh b/jerry-benchmark.sh index a350c1a..ba3370d 100755 --- a/jerry-benchmark.sh +++ b/jerry-benchmark.sh @@ -1,17 +1,17 @@ #!/bin/bash # ./build/dramhit --help to see flags - +# 8589934592 function bench_regular() { sudo ./build/dramhit \ --mode 4 \ --ht-type 3 \ --numa-split 1 \ - --num-threads 64 \ - --ht-size 8589934592 \ + --num-threads 1 \ + --ht-size 1000 \ --in-file /opt/dramhit/kmer_dataset/SRR1513870.fastq \ - --k 6 > jerry_benchmark.log + --k 6 > jerry_benchmark_regular.log } function bench_radix() @@ -20,10 +20,28 @@ function bench_radix() --mode 14 \ --ht-type 3 \ --numa-split 1 \ - --num-threads 64 \ + --num-threads 1 \ --ht-size 8589934592 \ --in-file /opt/dramhit/kmer_dataset/SRR1513870.fastq \ - --k 6 --d 8 > jerry_benchmark.log + --k 8 --d 6 > jerry_benchmark_radix.log +} + +function debug() +{ + sudo gdb --args ./build/dramhit "--mode" "14" "--k" "6" "--d" "2" "--ht-type" "3" "--numa-split" "1" "--num-threads" "1" "--ht-size" "100" "--in-file" "/opt/dramhit/kmer_dataset/SRR1513870.fastq" + +} + +function mem_debug() +{ + valgrind -s --leak-check=full ./build/dramhit \ + --mode 14 \ + --ht-type 3 \ + --numa-split 1 \ + --num-threads 1 \ + --ht-size 1000 \ + --in-file /opt/dramhit/kmer_dataset/SRR1513870.fastq \ + --k 6 --d 2 } function bench() { @@ -39,6 +57,8 @@ if [ "$1" == "build" ]; then build elif [ "$1" == "bench" ]; then bench +elif [ "$1" == "debug" ]; then + debug else echo "Invalid option" fi \ No newline at end of file diff --git a/src/Application.cpp b/src/Application.cpp index ad14785..9c5b4f7 100644 --- a/src/Application.cpp +++ b/src/Application.cpp @@ -165,6 +165,7 @@ void free_ht(BaseHashTable *kmer_ht) { void Application::shard_thread(int tid, std::barrier> *barrier) { + Shard *sh = &this->shards[tid]; BaseHashTable *kmer_ht = NULL; @@ -210,6 +211,7 @@ void Application::shard_thread(int tid, while (num_entered != config.num_threads) _mm_pause(); + switch (config.mode) { case SYNTH: this->test.st.synth_run_exec(sh, kmer_ht); @@ -232,11 +234,14 @@ void Application::shard_thread(int tid, this->test.hj.join_relations_generated(sh, config, kmer_ht, config.materialize, barrier); break; - case FASTQ_WITH_INSERT_RADIX: - this->test.kmer.count_kmer_radix_jerry(sh, config, barrier, - this->radixContext, kmer_ht); + case FASTQ_WITH_INSERT_RADIX: + PLOG_INFO << "Running radix "; + this->test.kmer.count_kmer_radix_partition_global(sh, config, barrier, + this->radixContext, kmer_ht); + break; case FASTQ_WITH_INSERT: + PLOG_INFO << "Running Kmer "; this->test.kmer.count_kmer(sh, config, kmer_ht, barrier); break; default: @@ -244,18 +249,18 @@ void Application::shard_thread(int tid, } // Write to file - if (!config.ht_file.empty()) { - // for CAS hashtable, not every thread has to write to file - if (config.ht_type == CASHTPP && (sh->shard_idx > 0)) { - goto done; - } - std::string outfile = config.ht_file + std::to_string(sh->shard_idx); - PLOG_INFO.printf("Shard %u: Printing to file: %s", sh->shard_idx, - outfile.c_str()); - kmer_ht->print_to_file(outfile); - } - - // free_ht(kmer_ht); + // if (!config.ht_file.empty()) { + // // for CAS hashtable, not every thread has to write to file + // if (config.ht_type == CASHTPP && (sh->shard_idx > 0)) { + // goto done; + // } + // std::string outfile = config.ht_file + std::to_string(sh->shard_idx); + // PLOG_INFO.printf("Shard %u: Printing to file: %s", sh->shard_idx, + // outfile.c_str()); + // kmer_ht->print_to_file(outfile); + // } + + free_ht(kmer_ht); done: --num_entered; @@ -331,7 +336,7 @@ int Application::spawn_shard_threads() { CPU_ZERO(&cpuset); CPU_SET(assigned_cpu, &cpuset); pthread_setaffinity_np(_thread.native_handle(), sizeof(cpu_set_t), &cpuset); - PLOGV.printf("Thread %u: affinity: %u", sh->shard_idx, assigned_cpu); + PLOG_INFO.printf("Thread %u: affinity: %u", sh->shard_idx, assigned_cpu); this->threads.push_back(std::move(_thread)); i += 1; } @@ -357,7 +362,7 @@ int Application::spawn_shard_threads() { } } if ((config.mode != CACHE_MISS) && (config.mode != HASHJOIN)) { - print_stats(this->shards, config); + print_stats(this->shards, config); } std::free(this->shards); @@ -554,16 +559,12 @@ int Application::process(int argc, char *argv[]) { exit(-1); } } else if (config.mode == FASTQ_WITH_INSERT_RADIX) { - PLOG_INFO.printf("Mode : FASTQ_WITH_INSERT_RADIX"); auto nthreads = config.num_threads; - this->radixContext = RadixContext(config.D, 0, nthreads); + this->radixContext = new RadixContext(config.D, 0, nthreads, get_file_size(config.in_file.c_str())); PLOG_INFO.printf( - "Mode : FASTQ_WITH_INSERT_RADIX D:%u, fanout: %u, multiplier: %u, " - "nthreads_d: %u", - config.D, this->radixContext.fanOut, - this->radixContext.hashmaps_per_thread, - this->radixContext.nthreads_d); + "Mode : FASTQ_WITH_INSERT_RADIX D:%u, fanout: %u", + config.D, this->radixContext->fanOut); if (config.in_file.empty()) { PLOG_ERROR.printf("Please provide input fasta file."); exit(-1); @@ -626,16 +627,16 @@ int Application::process(int argc, char *argv[]) { } } - // Dump hwprefetchers msr - Needs msr-safe driver - // (use scripts/enable_msr_safe.sh) - auto rdmsr_set = this->msr_ctrl->read_msr(0x1a4); - printf("MSR 0x1a4 has: { "); - for (const auto &e : rdmsr_set) { - printf("0x%lx ", e); - } - printf("}\n"); + // // Dump hwprefetchers msr - Needs msr-safe driver + // // (use scripts/enable_msr_safe.sh) + // auto rdmsr_set = this->msr_ctrl->read_msr(0x1a4); + // printf("MSR 0x1a4 has: { "); + // for (const auto &e : rdmsr_set) { + // printf("0x%lx ", e); + // } + // printf("}\n"); - config.dump_configuration(); + // config.dump_configuration(); if ((config.mode == BQ_TESTS_YES_BQ) || (config.mode == FASTQ_WITH_INSERT) || (config.mode == FASTQ_WITH_INSERT_RADIX)) { diff --git a/src/tests/kmer_radix_tests.cpp b/src/tests/kmer_radix_tests.cpp index 4e15c79..91dfc0d 100644 --- a/src/tests/kmer_radix_tests.cpp +++ b/src/tests/kmer_radix_tests.cpp @@ -17,7 +17,7 @@ #include "constants.hpp" #include "hashtables/base_kht.hpp" #include "hashtables/batch_runner/batch_runner.hpp" -//#include "hashtables/cas_kht_single.hpp" +// #include "hashtables/cas_kht_single.hpp" #include "hashtables/kvtypes.hpp" #include "input_reader/counter.hpp" #include "input_reader/fastq.hpp" @@ -25,251 +25,499 @@ #include "sync.h" #include "tests/KmerTest.hpp" #include "types.hpp" - using namespace std; -uint64_t crc_hash64(const void* buff, uint64_t len) { - // assert(len == 64); - return _mm_crc32_u64(0xffffffff, *static_cast(buff)); -} + namespace kmercounter { // CRC3 -#define HASHER crc_hash64 + + // #define HASHER CityHash64 -absl::flat_hash_map check_count( - const absl::flat_hash_map& reference, - const absl::flat_hash_map& aggregation) { - absl::flat_hash_map diff; - for (const auto& entry : reference) { - auto key = entry.first; - auto val = entry.second; - if (aggregation.contains(key)) { - auto agg_val = aggregation.at(key); - if (val != agg_val) { - diff[key] = (long)agg_val - (long)val; - } - } else { - diff[key] = val; - } - } - return diff; -} +// absl::flat_hash_map check_count( +// const absl::flat_hash_map& reference, +// const absl::flat_hash_map& aggregation) { +// absl::flat_hash_map diff; +// for (const auto& entry : reference) { +// auto key = entry.first; +// auto val = entry.second; +// if (aggregation.contains(key)) { +// auto agg_val = aggregation.at(key); +// if (val != agg_val) { +// diff[key] = (long)agg_val - (long)val; +// } +// } else { +// diff[key] = val; +// } +// } +// return diff; +// } -absl::flat_hash_map build_ref(const Configuration& config) { - PLOGI.printf("Building reference HT reader"); - // Be care of the `K` here; it's a compile time constant. - auto reader = - input_reader::MakeFastqKMerPreloadReader(config.K, config.in_file, 0, 1); - - PLOGI.printf("Alloc reference HT"); - absl::flat_hash_map counter(1 << 20); - PLOGI.printf("Count reference HT"); - counter.reserve(1 << 20); - for (uint64_t kmer; reader->next(&kmer);) { - counter[kmer]++; - } - return counter; +// absl::flat_hash_map build_ref(const Configuration& config) { +// PLOGI.printf("Building reference HT reader"); +// // Be care of the `K` here; it's a compile time constant. +// auto reader = +// input_reader::MakeFastqKMerPreloadReader(config.K, config.in_file, 0, +// 1); + +// PLOGI.printf("Alloc reference HT"); +// absl::flat_hash_map counter(1 << 20); +// PLOGI.printf("Count reference HT"); +// counter.reserve(1 << 20); +// for (uint64_t kmer; reader->next(&kmer);) { +// counter[kmer]++; +// } +// return counter; +// } + +// void check_functionality(const Configuration& config, +// const RadixContext& context) { +// auto reference = build_ref(config); +// auto aggregation = context.aggregate(); +// auto diff = check_count(reference, aggregation); +// auto rev_diff = check_count(aggregation, reference); +// for (auto& entry : rev_diff) { +// assert(entry.second < 0); +// } +// uint64_t max_diff = 0; +// Kmer max_diff_kmer = 0; +// for (auto& entry : diff) { +// auto abs_diff = std::abs(entry.second); + +// assert(entry.second > 0); +// if (abs_diff > max_diff) { +// max_diff = abs_diff; +// max_diff_kmer = entry.first; +// } +// } +// PLOGI.printf( +// "Diff kmer: %llu(rev: %llu); total distinct kmer: (ref: %llu, aggr: " +// "%llu); max diff: %llu(ref: %llu);", +// diff.size(), rev_diff.size(), reference.size(), aggregation.size(), +// max_diff, reference[max_diff_kmer]); +// } +/* #define RADIX_HASH(V) ((V>>7)^(V>>13)^(V>>21)^V) */ + +// uint64_t partitioning(Shard* sh, const Configuration& config, +// RadixContext& context, +// std::unique_ptr reader, +// PartitionChunks* local_chunks) { +// auto shard_idx = sh->shard_idx; +// auto R = context.R; +// auto fanOut = context.fanOut; +// auto sz = sh->f_end - sh->f_start; + +// // Two ways: +// // 1: rel tmp +// // 2: file cache tmp +// uint32_t MASK = context.mask; + +// // uint64_t* hist = (uint64_t*)calloc(fanOut, sizeof(int64_t)); +// // hists[shard_idx] = hist; +// // for (uint64_t kmer; reader->next(&kmer);) { +// // auto hash_val = HASHER((char*)&kmer, sizeof(Kmer)); +// // uint32_t idx = HASH_BIT_MODULO(hash_val, MASK, R); +// // hist[idx]++; +// // num_kmers++; +// // } +// // +// // // Need paddding so that the size of each partition is an integer +// multiple +// // of the cache line size for (uint32_t i = 0; i < fanOut; i++) { +// // auto hist_i = hist[i]; +// // auto mod = hist_i % KMERSPERCACHELINE; +// // hist[i] = mod == 0 ? hist_i : (hist_i + KMERSPERCACHELINE - mod); +// // } +// // +// // uint32_t sum = 0; +// // /* compute local prefix sum on hist so that we can get the start and end +// // position of each partition */ for (uint32_t i = 0; i < fanOut; i++) { +// // sum += hist[i]; +// // hist[i] = sum; +// // } + +// PLOGI.printf("IDX: %u, sz: %llu", shard_idx, sz / fanOut); +// auto start_alloc = _rdtsc(); +// // uint6Please provide a short explanation (a few sentences) of why you +// need +// // more time. 4_t* locals = +// // (uint64_t*)std::aligned_alloc(PAGESIZE, hist[fanOut - 1] * +// // sizeof(Kmer)); +// // cacheline_t* buffers = (cacheline_t*)std::aligned_alloc( +// // CACHELINE_SIZE, sizeof(cacheline_t) * fanOut); + +// cacheline_t* buffers = (cacheline_t*)mmap( +// nullptr, /* 256*1024*1024*/ sizeof(cacheline_t) * fanOut, +// PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); +// // PLOGI.printf("end mmap"); +// if (buffers == MAP_FAILED) { +// perror("mmap"); +// exit(1); +// } +// // cacheline_t buffers[fanOut]; +// auto end_alloc = _rdtsc() - start_alloc; +// // PLOGI.printf("IDX: %u, Partition alloc: %llu cycles", shard_idx, +// // end_alloc); partitions[shard_idx] = locals; + +// for (uint32_t i = 0; i < fanOut; i++) { +// buffers[i].data.slot = 0; +// } +// uint64_t total_num_part = 0; +// auto start_swb = _rdtsc(); +// uint64_t sum = 0; +// uint64_t xo = 0; +// for (uint64_t kmer; reader->next(&kmer);) { +// xo += kmer; +// // sum += kmer; +// total_num_part += 1; +// // auto hash_val = (kmer * 3) >> (64 - context.D); +// auto hash_val = HASHER((const char*)&kmer, sizeof(Kmer)); +// uint32_t idx = HASH_BIT_MODULO(hash_val, MASK, R); +// uint32_t slot = buffers[idx].data.slot; +// Kmer* part = (Kmer*)(buffers + idx); +// // Only works if KMERSPERCACHELINE is a power of 2 +// uint32_t slotMod = (slot) & (KMERSPERCACHELINE - 1); +// part[slotMod] = kmer; + +// if (slotMod == (KMERSPERCACHELINE - 1)) { +// PartitionChunks& partitionChunk = (local_chunks[idx]); +// // PLOGI.printf("partitions size: %llu, partition array: %llu, IDX: %u, +// // idx: %u", context.partitions.size(), +// // context.partitions[shard_idx].size(), shard_idx, idx); +// auto next_loc = partitionChunk.get_next(); +// // xo += (uint64_t) next_loc; +// partitionChunk.advance(); +// // partitionChunk.chunk_size++; +// /* write out 64-Bytes with non-temporal store */ +// store_nontemp_64B(next_loc, (buffers + idx)); +// /* writes += TUPLESPERCACHELINE; */ +// } +// buffers[idx].data.slot = slot + 1; +// } +// context.partitions[shard_idx] = local_chunks; +// // +// context.partitions[shard_idx].insert(context.partitions[shard_idx].end(), +// // local_chunks.begin(), local_chunks.end()); +// // context.partition_ready[shard_idx] = true; +// auto swb_end = _rdtsc(); +// auto swb_diff = swb_end - start_swb; +// PLOGI.printf( +// "IDX: %u;SWB: %llu cycles; Timestamp: %llu; Partition_alloc: %llu; " +// "SWB_per_kmer: %llu, sum: %llu", +// shard_idx, swb_diff, swb_end - context.global_time, end_alloc, +// swb_diff / total_num_part, xo); +// return total_num_part; +// } + +// void KmerTest::count_kmer_radix_partition(Shard* sh, const Configuration& +// config, +// std::barrier* barrier, +// RadixContext& context, BaseHashTable* ht) { +// auto nthreads_d = context.nthreads_d; +// auto gathering_threads = 1 << nthreads_d; +// auto shard_idx = sh->shard_idx; +// auto fanOut = context.fanOut; + +// HTBatchRunner batch_runner(ht); + +// // PartitionChunks local_chunks[fanOut]; +// PartitionChunks* local_chunks = (PartitionChunks*)mmap( +// nullptr, /* 256*1024*1024*/ sizeof(PartitionChunks) * fanOut, +// PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + +// for (int i = 0; i < fanOut; i++) { +// local_chunks[i] = PartitionChunks((sh->f_end - sh->f_start) / fanOut); +// } + +// auto reader = input_reader::MakeFastqKMerPreloadReader( +// config.K, config.in_file, shard_idx, config.num_threads); + +// // wait for read initialization +// barrier->arrive_and_wait(); + +// auto start_partition_cycle = _rdtsc(); + +// uint64_t total_kmers_partition = +// partitioning(sh, config, context, move(reader), local_chunks); + +// // wait for partitions +// barrier->arrive_and_wait(); + +// // Redistribution ? + +// auto start_insertion_cycle = _rdtsc(); +// uint64_t total_kmers_insertion = 0; + +// for (int i = 0; i < fanOut; i++) { +// PartitionChunks pc = local_chunks[i]; +// for (int j = 0; j < pc.chunks_len; j++) { +// KmerChunk kc = pc.chunks[j]; +// for (int k = 0; k < kc.count; k++) { +// batch_runner.insert(kc.kmers[k], 0); +// total_kmers_insertion++; +// } +// } +// } +// batch_runner.flush_insert(); +// // wait for insertion +// barrier->arrive_and_wait(); + +// // stats printing +// sh->stats->insertions.duration = _rdtsc() - start_insertion_cycle; +// sh->stats->insertions.op_count = total_kmers_insertion; + +// get_ht_stats(sh, ht); +// } + +// void KmerTest::count_kmer_radix_partition_global(Shard* sh, +// const Configuration& config, +// std::barrier* barrier, +// RadixContext& context, +// BaseHashTable* ht) { +// auto nthreads_d = context.nthreads_d; +// auto gathering_threads = 1 << nthreads_d; +// auto shard_idx = sh->shard_idx; +// auto fanOut = context.fanOut; + +// HTBatchRunner batch_runner(ht); + +// auto reader = input_reader::MakeFastqKMerPreloadReader( +// config.K, config.in_file, shard_idx, config.num_threads); + +// // wait for read initialization +// barrier->arrive_and_wait(); + +// uint64_t total_kmers_read = radix_partition(context, move(reader)); +// PLOG_INFO.printf("Total kmer read %d\n", total_kmers_read); +// // wait for partitions +// barrier->arrive_and_wait(); + +// auto start_insertion_cycle = _rdtsc(); + +// uint32_t part_count = config.num_threads / fanOut; +// uint64_t total_insertion = 0; + +// for (int i = 0; i < part_count; i++) { +// BufferedPartition& partition = context.buffer_partitions[sh->shard_idx + i]; + +// uint64_t total_kmers_insertion = 0; +// uint64_t total_kmer_partition = partition.total_kmer_count; +// uint64_t num_insert = 0; +// for (cacheblock_t& b : partition.blocks) { +// for (int j = 0; j < b.count; j++) { +// num_insert = total_kmer_partition > KMERSPERCACHELINE +// ? KMERSPERCACHELINE +// : total_kmer_partition; +// for (int k = 0; k < num_insert; k++) { +// batch_runner.insert(b.lines[j].kmers[k], 0); +// total_kmers_insertion++; +// } +// total_kmer_partition -= num_insert; +// } +// } + +// total_insertion += total_kmers_insertion; + +// if (total_kmers_insertion != partition.total_kmer_count) { +// PLOG_FATAL +// << "insertion amount is not equal to partition amount insertion: " +// << total_kmers_insertion +// << " vs partition: " << partition.total_kmer_count; +// } +// } + +// batch_runner.flush_insert(); +// // wait for insertion +// barrier->arrive_and_wait(); + +// // stats printing +// sh->stats->insertions.duration = _rdtsc() - start_insertion_cycle; +// sh->stats->insertions.op_count = total_insertion; +// get_ht_stats(sh, ht); +// } + +uint64_t crc_hash64(const void* buff, uint64_t len) { + return _mm_crc32_u64(0xffffffff, *static_cast(buff)); } +#define HASHER crc_hash64 +#define HASH_BIT_MODULO(K, MASK, NBITS) (((K) & MASK) >> NBITS) -void check_functionality(const Configuration& config, - const RadixContext& context) { - auto reference = build_ref(config); - auto aggregation = context.aggregate(); - auto diff = check_count(reference, aggregation); - auto rev_diff = check_count(aggregation, reference); - for (auto& entry : rev_diff) { - assert(entry.second < 0); - } - uint64_t max_diff = 0; - Kmer max_diff_kmer = 0; - for (auto& entry : diff) { - auto abs_diff = std::abs(entry.second); - - assert(entry.second > 0); - if (abs_diff > max_diff) { - max_diff = abs_diff; - max_diff_kmer = entry.first; + +Kmer packkmer(const char* s, int k) +{ + uint8_t nt; + Kmer kmer; + for(int i=0; i>7)^(V>>13)^(V>>21)^V) */ -#define HASH_BIT_MODULO(K, MASK, NBITS) (((K) & MASK) >> NBITS) -// Should be power of 2? -#define KMERSPERCACHELINE (CACHE_LINE_SIZE / sizeof(Kmer)) - -typedef union { - struct { - Kmer kmers[KMERSPERCACHELINE]; - } kmers; - struct { - Kmer kmers[KMERSPERCACHELINE - 1]; - uint32_t slot; - } data; -} cacheline_t; - -/** - * Makes a non-temporal write of 64 bytes from src to dst. - * Uses vectorized non-temporal stores if available, falls - * back to assignment copy. - * - * @param dst - * @param src - * - * @return - */ -static inline void store_nontemp_64B(void* dst, void* src) { -#ifdef __AVX__ - register __m256i* d1 = (__m256i*)dst; - register __m256i s1 = *((__m256i*)src); - register __m256i* d2 = d1 + 1; - register __m256i s2 = *(((__m256i*)src) + 1); - - _mm256_stream_si256(d1, s1); - _mm256_stream_si256(d2, s2); - -#elif defined(__SSE2__) - - register __m128i* d1 = (__m128i*)dst; - register __m128i* d2 = d1 + 1; - register __m128i* d3 = d1 + 2; - register __m128i* d4 = d1 + 3; - register __m128i s1 = *(__m128i*)src; - register __m128i s2 = *((__m128i*)src + 1); - register __m128i s3 = *((__m128i*)src + 2); - register __m128i s4 = *((__m128i*)src + 3); - - _mm_stream_si128(d1, s1); - _mm_stream_si128(d2, s2); - _mm_stream_si128(d3, s3); - _mm_stream_si128(d4, s4); - -#else - /* just copy with assignment */ - *(cacheline_t*)dst = *(cacheline_t*)src; -#endif +#define PBSTR "||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||" +#define PBWIDTH 60 + +void printProgress(double percentage) { + int val = (int) (percentage * 100); + int lpad = (int) (percentage * PBWIDTH); + int rpad = PBWIDTH - lpad; + printf("\r%3d%% [%.*s%*s]", val, lpad, PBSTR, rpad, ""); + fflush(stdout); } -uint64_t partitioning(Shard* sh, const Configuration& config, - RadixContext& context, - std::unique_ptr reader, - PartitionChunks* local_chunks) { - auto shard_idx = sh->shard_idx; - auto R = context.R; - auto fanOut = context.fanOut; - auto sz = sh->f_end - sh->f_start; - - // Two ways: - // 1: rel tmp - // 2: file cache tmp - uint32_t MASK = context.mask; - - // uint64_t* hist = (uint64_t*)calloc(fanOut, sizeof(int64_t)); - // hists[shard_idx] = hist; - // for (uint64_t kmer; reader->next(&kmer);) { - // auto hash_val = HASHER((char*)&kmer, sizeof(Kmer)); - // uint32_t idx = HASH_BIT_MODULO(hash_val, MASK, R); - // hist[idx]++; - // num_kmers++; - // } - // - // // Need paddding so that the size of each partition is an integer multiple - // of the cache line size for (uint32_t i = 0; i < fanOut; i++) { - // auto hist_i = hist[i]; - // auto mod = hist_i % KMERSPERCACHELINE; - // hist[i] = mod == 0 ? hist_i : (hist_i + KMERSPERCACHELINE - mod); - // } - // - // uint32_t sum = 0; - // /* compute local prefix sum on hist so that we can get the start and end - // position of each partition */ for (uint32_t i = 0; i < fanOut; i++) { - // sum += hist[i]; - // hist[i] = sum; - // } - - PLOGI.printf("IDX: %u, sz: %llu", shard_idx, sz / fanOut); - auto start_alloc = _rdtsc(); - // uint6Please provide a short explanation (a few sentences) of why you need - // more time. 4_t* locals = - // (uint64_t*)std::aligned_alloc(PAGESIZE, hist[fanOut - 1] * - // sizeof(Kmer)); - // cacheline_t* buffers = (cacheline_t*)std::aligned_alloc( - // CACHELINE_SIZE, sizeof(cacheline_t) * fanOut); - - cacheline_t* buffers = (cacheline_t*)mmap( - nullptr, /* 256*1024*1024*/ sizeof(cacheline_t) * fanOut, - PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); - // PLOGI.printf("end mmap"); - if (buffers == MAP_FAILED) { - perror("mmap"); - exit(1); - } - // cacheline_t buffers[fanOut]; - auto end_alloc = _rdtsc() - start_alloc; - // PLOGI.printf("IDX: %u, Partition alloc: %llu cycles", shard_idx, - // end_alloc); partitions[shard_idx] = locals; +uint64_t radix_partition( RadixContext* context, + BufferedPartition** local_partitions, + input_reader::FastqReader& reader, + int id, int k) { + auto R = context->R; + auto mask = context->mask; + auto fanOut = context->fanOut; + uint64_t totol_kmer_part = 0; + + string_view sv; + const char* data; + int max = 100; + uint64_t hash_val = 0; + uint32_t partition_idx = 0; + + while(reader.next(&sv) && max >= 0) + { + size_t len = sv.size(); + + for(int i=0; i<(len-k+1); i++) { + data = sv.substr(i, k).data(); + hash_val = crc_hash64(data, k); + partition_idx = HASH_BIT_MODULO(hash_val, mask, R); + if (partition_idx < fanOut) + { + local_partitions[partition_idx]->write_kmer(packkmer(data, k)); + totol_kmer_part++; + } + else + PLOG_FATAL << "partition index too big"; + } + } + + // flush the cacheline buffer. + for(int i=0; iflush_buffer(); + + // pass the partitions into global lookup table, so other threads can gather. + context->partitions[id] = local_partitions; - for (uint32_t i = 0; i < fanOut; i++) { - buffers[i].data.slot = 0; + uint64_t tt_count = 0; + uint64_t count = 0; + for(int i=0; itotal_kmer_count; + tt_count += count; + PLOG_INFO.printf("Thread id: %d partition [%d], kmer count [%d]", id, i, count); + } + + if(tt_count != totol_kmer_part) + PLOG_FATAL.printf("total count %d doesn match total partition count %d ", tt_count, totol_kmer_part); + return totol_kmer_part; } - uint64_t total_num_part = 0; - auto start_swb = _rdtsc(); - uint64_t sum = 0; - uint64_t xo = 0; - for (uint64_t kmer; reader->next(&kmer);) { - xo += kmer; - // sum += kmer; - total_num_part += 1; - // auto hash_val = (kmer * 3) >> (64 - context.D); - auto hash_val = HASHER((const char*)&kmer, sizeof(Kmer)); - uint32_t idx = HASH_BIT_MODULO(hash_val, MASK, R); - uint32_t slot = buffers[idx].data.slot; - Kmer* part = (Kmer*)(buffers + idx); - // Only works if KMERSPERCACHELINE is a power of 2 - uint32_t slotMod = (slot) & (KMERSPERCACHELINE - 1); - part[slotMod] = kmer; - - if (slotMod == (KMERSPERCACHELINE - 1)) { - PartitionChunks& partitionChunk = (local_chunks[idx]); - // PLOGI.printf("partitions size: %llu, partition array: %llu, IDX: %u, - // idx: %u", context.partitions.size(), - // context.partitions[shard_idx].size(), shard_idx, idx); - auto next_loc = partitionChunk.get_next(); - // xo += (uint64_t) next_loc; - partitionChunk.advance(); - // partitionChunk.chunk_size++; - /* write out 64-Bytes with non-temporal store */ - store_nontemp_64B(next_loc, (buffers + idx)); - /* writes += TUPLESPERCACHELINE; */ + +void KmerTest::count_kmer_radix_partition_global(Shard* sh, + const Configuration& config, + std::barrier* barrier, + RadixContext* context, + BaseHashTable* ht) { + + uint8_t shard_idx = sh->shard_idx; + PLOG_INFO.printf("Radix FASTQ INSERT Initialization started %d", shard_idx); + + uint32_t fanOut = context->fanOut; + HTBatchRunner batch_runner(ht); + auto seq_reader = input_reader::FastqReader(config.in_file, shard_idx, config.num_threads); + size_t array_size = fanOut * sizeof(BufferedPartition*); + BufferedPartition** local_partitions = (BufferedPartition**)malloc(array_size); + for (int i = 0; i < fanOut; i++) + local_partitions[i] = new BufferedPartition(context->size_hint); + PLOG_INFO.printf("LRadix FASTQ INSERT Initialization finished %d", shard_idx); + + barrier->arrive_and_wait(); + + PLOG_INFO.printf("Radix Partition started %d", shard_idx); + uint64_t total_kmers_part = radix_partition(context, local_partitions, seq_reader, shard_idx, config.K); + PLOG_INFO.printf("Radix Partition finished: %d total parititon: %d", shard_idx, total_kmers_part); + + barrier->arrive_and_wait(); + + auto start_insertion_cycle = _rdtsc(); + uint64_t total_insertion = 0; + uint stride = fanOut / context->threads_num; + BufferedPartition* workload; + + PLOG_INFO.printf("Insertion started %d", shard_idx); + for (uint i = 0; i < context->threads_num; i++) { + for (uint j = stride * i; j < stride * (i + 1); j++) { + workload = context->partitions[i][j]; + + // Insert into a block + uint64_t workload_inserted = 0; + for (uint b = 0; b < workload->blocks_count; b++) { + cacheblock_t* block = workload->get_block(b); + for (uint l = 0; l < block->count; l++) { + for (uint k = 0; k < KMERSPERCACHELINE; k++) { + batch_runner.insert(block->lines[l].kmers[k], 0); + workload_inserted++; + if(workload_inserted >= workload->total_kmer_count) + break; + } + } + } + + total_insertion += workload_inserted; } - buffers[idx].data.slot = slot + 1; } - context.partitions[shard_idx] = local_chunks; - // context.partitions[shard_idx].insert(context.partitions[shard_idx].end(), - // local_chunks.begin(), local_chunks.end()); - // context.partition_ready[shard_idx] = true; - auto swb_end = _rdtsc(); - auto swb_diff = swb_end - start_swb; - PLOGI.printf( - "IDX: %u;SWB: %llu cycles; Timestamp: %llu; Partition_alloc: %llu; " - "SWB_per_kmer: %llu, sum: %llu", - shard_idx, swb_diff, swb_end - context.global_time, end_alloc, - swb_diff / total_num_part, xo); - return total_num_part; + batch_runner.flush_insert(); + + PLOG_INFO.printf("Insertion finished: %d total insertion: %d", shard_idx , total_insertion); + + barrier->arrive_and_wait(); + + sh->stats->insertions.duration = _rdtsc() - start_insertion_cycle; + sh->stats->insertions.op_count = total_insertion; + get_ht_stats(sh, ht); + + //free + // PLOG_INFO.printf("freeing memory allocated: %d", shard_idx); + for(int i=0; i < fanOut; i++) + delete local_partitions[i]; + free(local_partitions); } -void KmerTest::count_kmer_radix_custom(Shard* sh, const Configuration& config, - std::barrier* barrier, - RadixContext& context) { +// void KmerTest::count_kmer_radix_custom(Shard* sh, const Configuration& +// config, +// std::barrier* barrier, +// RadixContext& context) { // auto hashmaps_per_thread = context.hashmaps_per_thread; // auto nthreads_d = context.nthreads_d; // auto gathering_threads = 1 << nthreads_d; @@ -311,7 +559,8 @@ void KmerTest::count_kmer_radix_custom(Shard* sh, const Configuration& config, // for (int i = 0; i < hashmaps_per_thread; i++) { // auto ht = new CASHashTableSingle( -// (1 << 26) / (hashmaps_per_thread == 1 ? 1 : hashmaps_per_thread + 0)); +// (1 << 26) / (hashmaps_per_thread == 1 ? 1 : hashmaps_per_thread + +// 0)); // prealloc_maps.push_back(ht); // } @@ -339,15 +588,9 @@ void KmerTest::count_kmer_radix_custom(Shard* sh, const Configuration& config, // barrier->arrive_and_wait(); // std::uint64_t start_shard = _rdtsc(); -// // start_partition_ts = std::chrono::steady_clock::now(); // start_partition_cycle = _rdtsc(); // auto total_kmers_part = 0; -// // if (shard_idx == 0) { -// // if (shard_idx != 0) { -// // barrier->arrive_and_wait(); -// // return; -// // } // #ifdef WITH_VTUNE_LIB // static const auto event = // __itt_event_create("partitioning", strlen("partitioning")); @@ -362,10 +605,6 @@ void KmerTest::count_kmer_radix_custom(Shard* sh, const Configuration& config, // PLOGI.printf("IDX: %d, total_num: %llu, per_kmer: %llu, p_cycles: %llu", // shard_idx, total_kmers_part, time / total_kmers_part, time); -// // } -// // start_shard = part_alloc; -// // start_partition_cycle += part_alloc; -// // end_partition_ts = std::chrono::steady_clock::now(); // end_partition_cycle = _rdtsc(); // #ifdef WITH_VTUNE_LIB @@ -388,52 +627,14 @@ void KmerTest::count_kmer_radix_custom(Shard* sh, const Configuration& config, // "at: %llu", // shard_idx, after_first_barrier, // end_partition_cycle - context.global_time); -// // start_shard += after_first_barrier; // auto num_threads = config.num_threads; -// // if (shard_idx == 0) { -// // PLOGI.printf("=== Hists after paddding:"); -// // PLOGI.printf("Partition time: %u", _rdtsc() - start); -// // uint64_t total_mem = 0; -// // for (uint32_t ti = 0; ti < num_threads; ti++) { -// // auto count = hists[ti][fanOut - 1]; -// // total_mem += count; -// // PLOGI.printf("IDX: %u, Count: %u, size: %u M", ti, count, count * -// // sizeof(Kmer) / (1024 * 1024)); -// // } -// // PLOGI.printf("Total mem: %u M", total_mem * sizeof(Kmer) / (1024 * -// // 1024)); for (uint32_t ti = 0; ti < num_threads; ti++) { -// // PLOGI.printf("Shard IDX: %u", ti); -// // for (uint32_t i = 0; i < fanOut; i++) { -// // auto count = hists[ti][i]; -// // PLOGI.printf("Partition: %u: %u, size: %u M", i, count, count * -// // sizeof(Kmer) / (1024 * 1024)); -// // } -// // } -// // } - // if (shard_idx >= gathering_threads) { // PLOGW.printf("Thread %u goes idle after partitioning.", shard_idx); // return; // } // std::vector maps; - -// // maps.reserve(32); -// // if (shard_idx == 0) { -// // size_t num = (1llu << 26) * 32; -// // for (int i = 0; i < 1; i++) { -// // auto start_ht = _rdtsc(); -// // auto ht = aligned_alloc(PAGE_SIZE, num * 16); -// // memset(ht, 0, num * 16); -// // // auto ht = new CASHashTableSingle( - 100); -// // auto end_ht = _rdtsc() - start_ht; -// // PLOGI.printf("Iter: %u, alloc cycles: %llu, cycles per elem: %llu", -// // i, end_ht, end_ht / num); -// // maps.push_back(std::move((BaseHashTable*)ht)); -// // } -// // } -// // return; // maps.reserve(hashmaps_per_thread); // uint64_t total_insertions = 0; @@ -446,39 +647,9 @@ void KmerTest::count_kmer_radix_custom(Shard* sh, const Configuration& config, // // start_insertions_ts = std::chrono::steady_clock::now(); // start_insertions_cycle = _rdtsc(); -// // std::vector gather_hist(hashmaps_per_thread); -// // for (uint32_t k = 0; k < hashmaps_per_thread; k++) { -// // uint32_t partition_idx = hashmaps_per_thread * shard_idx + k; -// // uint64_t total_num_kmers = 0; -// // for (uint32_t i = 0; i < num_threads; i++) { -// // uint64_t start = partition_idx == 0u ? 0u : hists[i][partition_idx - -// // 1]; uint64_t end = hists[i][partition_idx]; total_num_kmers += end - -// // start; -// // } -// // gather_hist.push_back(total_num_kmers); -// // } -// // for (uint32_t k = 1; k < hashmaps_per_thread; k++) { -// // gather_hist[k] += gather_hist[k - 1]; -// // } - -// // auto total_capacity = gather_hist[hashmaps_per_thread - 1] * -// // sizeof(KVType); auto shared_hash_array = (KVType*) -// // std::aligned_alloc(PAGESIZE, total_capacity); auto shared_insert_queue = -// // (ItemQueue *)(aligned_alloc(CACHELINE_SIZE, PREFETCH_QUEUE_SIZE * -// // sizeof(ItemQueue))); memset(shared_hash_array, 0, total_capacity); - -// // if (shard_idx != 0) { -// // barrier->arrive_and_wait(); -// // return; -// // } // for (uint32_t k = 0; k < hashmaps_per_thread; k++) { // uint32_t partition_idx = hashmaps_per_thread * shard_idx + k; -// // std::queue ready_queue; -// // for (int i = 0; i < num_threads; i++) { -// // ready_queue.push(i); -// // } - // uint64_t total_num_kmers = 0; // for (uint32_t i = 0; i < num_threads; i++) { // for (auto& chunk : context.partitions[i][partition_idx].chunks) { @@ -487,32 +658,15 @@ void KmerTest::count_kmer_radix_custom(Shard* sh, const Configuration& config, // } // total_insertions += total_num_kmers; -// // auto ht_alloc_start = _rdtsc(); -// // auto ht = new CASHashTableSingle(total_num_kmers); -// // auto ht_alloc_time = _rdtsc() - ht_alloc_start; -// // start_shard += ht_alloc_time; -// // start_insertions_cycle += ht_alloc_time; // auto ht = prealloc_maps[k]; -// // PLOGI.printf("IDX: %u, HT alloc cycle per Kmer: %llu, total cycles: -// // %llu", shard_idx, ht_alloc_time / total_num_kmers, ht_alloc_time); -// // counter.reserve(total >> 6); // auto count_inner = 0; // auto start_insertions_cycle_inner = _rdtsc(); // for (uint32_t i = 0; i < num_threads; i++) { // size_t next_t = (shard_idx + i) % num_threads; -// // uint64_t start = partition_idx == 0u ? 0u : hists[i][partition_idx - -// // 1]; uint64_t end = hists[i][partition_idx]; // auto& chunks = context.partitions[next_t][partition_idx]; -// // if (i == 1) { -// // PLOGI.printf("IDX: %u, remote: %u, start: %u end: %u", shard_idx, -// // i, start, end); -// // } -// // auto count_innest = chunks.total_count; -// // count_inner += count_innest; - // auto start_insertions_cycle_innest = _rdtsc(); // for (auto chunk : chunks.chunks) { // count_inner += chunk.count; @@ -521,33 +675,9 @@ void KmerTest::count_kmer_radix_custom(Shard* sh, const Configuration& config, // ht->insert_one(&arg, nullptr); // } // } - -// // for (size_t k = start; k < end; k++) { -// // // xori = xori + 1; -// // // __asm(""); -// // // xori ^= partitions[i][k]; -// // // batch_runner.insert(partitions[i][k], 0 /* we use the aggr -// // tables so no value */); arg.key = partitions[i][k]; -// // ht->insert_one(&arg, nullptr); -// // } -// // auto diff = _rdtsc() - start_insertions_cycle_innest; -// // PLOGI.printf("IDX:%u; remote: %u; Innest: cycles: %llu, -// // cycles_per_in: %llu , start: %llu; end: %llu; total: %llu.", -// // shard_idx, i, diff, diff / count_innest, start, end, -// // count_innest); // } // auto diff = _rdtsc() - start_insertions_cycle_inner; -// // PLOGI.printf("Inner: cycles: %llu, cycles_per_in: %llu", diff, -// // diff / count_inner); -// // batch_runner.insert(xori, 0 /* we use the aggr tables so no value */); -// // batch_runner.flush_insert(); - -// // absl::flat_hash_map counter( -// // total_num_kmers); // 1GB initial size. -// // ht->aggregate(counter); -// // context.hashmaps[shard_idx].push_back(std::move(counter)); - // maps.push_back(ht); // } @@ -565,7 +695,8 @@ void KmerTest::count_kmer_radix_custom(Shard* sh, const Configuration& config, // sh->stats->insertions.duration = _rdtsc() - start_shard; // sh->stats->insertions.op_count = total_insertions; -// // PLOG_INFO.printf("IDX: %u, num_kmers: %u, fill: %u", shard_idx, num_kmers, +// // PLOG_INFO.printf("IDX: %u, num_kmers: %u, fill: %u", shard_idx, +// num_kmers, // // ht->get_fill()); // for (uint32_t i = 0; i < hashmaps_per_thread; i++) { // PLOG_INFO.printf("IDX: %u, cap: %u, fill: %u", shard_idx, @@ -576,8 +707,8 @@ void KmerTest::count_kmer_radix_custom(Shard* sh, const Configuration& config, // end_cycles = _rdtsc(); // PLOG_INFO.printf( // "Kmer insertion took %llu us (%llu cycles)", -// chrono::duration_cast(end_ts - start_ts).count(), -// end_cycles - start_cycles); +// chrono::duration_cast(end_ts - +// start_ts).count(), end_cycles - start_cycles); // // check_functionality(config, context); // } // auto partition_time = chrono::duration_cast( @@ -589,82 +720,25 @@ void KmerTest::count_kmer_radix_custom(Shard* sh, const Configuration& config, // .count(); // auto insertion_cycles = end_insertions_cycle - start_insertions_cycle; // auto total_time = partition_time + insertion_time; -// auto time_per_insertion = (double)insertion_time / (double)total_insertions; -// auto cycles_per_insertion = insertion_cycles / total_insertions; -// auto cycles_per_partition = partition_cycles / total_kmers_part; -// PLOG_INFO.printf( -// "IDX: %u, radix: %u partition_time: %llu us(%llu %%), partition_cycles: " +// auto time_per_insertion = (double)insertion_time / +// (double)total_insertions; auto cycles_per_insertion = insertion_cycles / +// total_insertions; auto cycles_per_partition = partition_cycles / +// total_kmers_part; PLOG_INFO.printf( +// "IDX: %u, radix: %u partition_time: %llu us(%llu %%), partition_cycles: +// " // "%llu, total_kmer_partition: %llu, cycles_per_partition: %llu, " // "first_barrier: %llu, insertion " // "time: %llu us(%llu %%), insertion_cycles: %llu, time_per_insertion: " // "%.4f us" // ", cycles_per_insertion: %llu, total_kmer_insertion: %llu, " // "second_barrier: %llu", -// shard_idx, context.D, partition_time, partition_time * 100 / total_time, -// partition_cycles, total_kmers_part, cycles_per_partition, +// shard_idx, context.D, partition_time, partition_time * 100 / +// total_time, partition_cycles, total_kmers_part, cycles_per_partition, // after_first_barrier, insertion_time, insertion_time * 100 / total_time, // insertion_cycles, time_per_insertion, cycles_per_insertion, // total_insertions, second_barrier); // // PLOGV.printf("[%d] Num kmers %llu", sh->shard_idx, total_insertions); // // get_ht_stats(sh, ht); -} - -void KmerTest::count_kmer_radix_jerry(Shard* sh, const Configuration& config, - std::barrier* barrier, - RadixContext& context, BaseHashTable* ht) { - auto nthreads_d = context.nthreads_d; - auto gathering_threads = 1 << nthreads_d; - auto shard_idx = sh->shard_idx; - auto fanOut = context.fanOut; - - HTBatchRunner batch_runner(ht); +// } - // PartitionChunks local_chunks[fanOut]; - PartitionChunks* local_chunks = (PartitionChunks*)mmap( - nullptr, /* 256*1024*1024*/ sizeof(PartitionChunks) * fanOut, - PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); - - for (int i = 0; i < fanOut; i++) { - local_chunks[i] = PartitionChunks((sh->f_end - sh->f_start) / fanOut); - } - - auto reader = input_reader::MakeFastqKMerPreloadReader( - config.K, config.in_file, shard_idx, config.num_threads); - - // wait for read initialization - barrier->arrive_and_wait(); - - auto start_partition_cycle = _rdtsc(); - - uint64_t total_kmers_partition = - partitioning(sh, config, context, move(reader), local_chunks); - - // wait for partitions - barrier->arrive_and_wait(); - - // Redistribution ? - - auto start_insertion_cycle = _rdtsc(); - uint64_t total_kmers_insertion = 0; - - for (int i = 0; i < fanOut; i++) { - PartitionChunks pc = local_chunks[i]; - for (int j = 0; j < pc.chunks_len; j++) { - KmerChunk kc = pc.chunks[j]; - for (int k = 0; k < kc.count; k++) { - batch_runner.insert(kc.kmers[k], 0); - total_kmers_insertion++; - } - } - } - batch_runner.flush_insert(); - // wait for insertion - barrier->arrive_and_wait(); - - // stats printing - sh->stats->insertions.duration = _rdtsc() - start_insertion_cycle; - sh->stats->insertions.op_count = total_kmers_insertion; - - get_ht_stats(sh, ht); -} } // namespace kmercounter