From 4e395381fbe4b3815e443d8a83645a5b6bfa89b3 Mon Sep 17 00:00:00 2001 From: Doug Turnbull Date: Fri, 26 Jul 2024 11:06:57 -0400 Subject: [PATCH] Specilaized 1 worker indexing function --- searcharray/indexing.py | 41 ++++++++++++++++++++++++++++++++++++++--- test/test_tmdb.py | 7 +++++-- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/searcharray/indexing.py b/searcharray/indexing.py index 1f78c35..d543fde 100644 --- a/searcharray/indexing.py +++ b/searcharray/indexing.py @@ -1,6 +1,5 @@ import numpy as np import math -import gc import os import sys from typing import Iterable, List, Optional @@ -92,8 +91,8 @@ def _gather_tokens(array, tokenizer, logger.info("Tokenization -- vstacking") terms_w_posns = np.vstack([all_terms, all_docs, all_posns]) - del all_terms, all_docs, all_posns - gc.collect() + # del all_terms, all_docs, all_posns + # gc.collect() logger.info("Tokenization -- DONE") return terms_w_posns, term_doc @@ -196,10 +195,46 @@ def _process_batches(term_doc, batch_size, return bit_posns +def build_index_no_workers(array: Iterable, tokenizer, batch_size=10000, + data_dir: Optional[str] = None, + truncate=False): + term_dict = TermDict() + term_doc = SparseMatSetBuilder() + doc_lens: List[np.ndarray] = [] + bit_posns = None + + logger.info("Indexing begins w/ NO workers") + for batch_beg, batch in batch_iterator(array, batch_size): + batch_beg, batch_term_doc, batch_bit_posns, batch_doc_lens = _tokenize_batch(batch, tokenizer, term_dict, batch_size, batch_beg, truncate=truncate) + term_doc.concat(batch_term_doc) + if bit_posns is None: + bit_posns = batch_bit_posns + else: + bit_posns.concat(batch_bit_posns) + doc_lens.append(batch_doc_lens) + + doc_lens = np.concatenate(doc_lens) + + avg_doc_length = np.mean(doc_lens) + + term_doc_built = RowViewableMatrix(term_doc.build()) + logger.info("Indexing from tokenization complete") + assert bit_posns is not None + # if data_dir is None: + # data_dir = searcharray_home() + if data_dir is not None: + logger.info(f"Memmapping bit positions to {data_dir}") + bit_posns.memmap(data_dir) + return term_doc_built, bit_posns, term_dict, avg_doc_length, np.array(doc_lens) + + def build_index_from_tokenizer(array: Iterable, tokenizer, batch_size=10000, data_dir: Optional[str] = None, truncate=False, workers=4): """Build index directly from tokenizing docs (array of string).""" + if workers == 1: + return build_index_no_workers(array, tokenizer, batch_size=batch_size, + data_dir=data_dir, truncate=truncate) term_dict = TermDict() term_doc = SparseMatSetBuilder() doc_lens: List[np.ndarray] = [] diff --git a/test/test_tmdb.py b/test/test_tmdb.py index 5b84e70..2999357 100644 --- a/test/test_tmdb.py +++ b/test/test_tmdb.py @@ -66,12 +66,14 @@ def tmdb_pd_data(tmdb_raw_data): @pytest.fixture(scope="session", params=["full", "ends_empty", "memmap", "small_batch", - "smallbatch_memmap"]) + "smallbatch_memmap", "one_worker"]) def tmdb_data(tmdb_pd_data, request): ensure_data_dir_exists() print(f"Rebuilding index with {request.param}") df = tmdb_pd_data + workers = 4 if request.param != "one_worker" else 1 indexed = SearchArray.index(df['title'], + workers=workers, batch_size=5000 if request.param in ["small_batch", "smallbatch_memmap"] else 100000, data_dir=DATA_DIR if request.param == "memmap" else None) df['title_tokens'] = indexed @@ -309,7 +311,8 @@ def test_index_benchmark_warmed(benchmark, tmdb_pd_data): def test_index_benchmark_1k_random(benchmark, tmdb_pd_data): prof = Profiler(benchmark) thousand_random = np.random.choice(tmdb_pd_data['overview'], size=1000) - results = prof.run(SearchArray.index, thousand_random, autowarm=False) + results = prof.run(SearchArray.index, thousand_random, autowarm=False, + workers=1) assert len(results) == 1000