Skip to content

Commit

Permalink
Specilaized 1 worker indexing function
Browse files Browse the repository at this point in the history
  • Loading branch information
softwaredoug committed Jul 26, 2024
1 parent 4dbb876 commit 4e39538
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 5 deletions.
41 changes: 38 additions & 3 deletions searcharray/indexing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import numpy as np
import math
import gc
import os
import sys
from typing import Iterable, List, Optional
Expand Down Expand Up @@ -92,8 +91,8 @@ def _gather_tokens(array, tokenizer,

logger.info("Tokenization -- vstacking")
terms_w_posns = np.vstack([all_terms, all_docs, all_posns])
del all_terms, all_docs, all_posns
gc.collect()
# del all_terms, all_docs, all_posns
# gc.collect()
logger.info("Tokenization -- DONE")
return terms_w_posns, term_doc

Expand Down Expand Up @@ -196,10 +195,46 @@ def _process_batches(term_doc, batch_size,
return bit_posns


def build_index_no_workers(array: Iterable, tokenizer, batch_size=10000,
data_dir: Optional[str] = None,
truncate=False):
term_dict = TermDict()
term_doc = SparseMatSetBuilder()
doc_lens: List[np.ndarray] = []
bit_posns = None

logger.info("Indexing begins w/ NO workers")
for batch_beg, batch in batch_iterator(array, batch_size):
batch_beg, batch_term_doc, batch_bit_posns, batch_doc_lens = _tokenize_batch(batch, tokenizer, term_dict, batch_size, batch_beg, truncate=truncate)
term_doc.concat(batch_term_doc)
if bit_posns is None:
bit_posns = batch_bit_posns
else:
bit_posns.concat(batch_bit_posns)
doc_lens.append(batch_doc_lens)

doc_lens = np.concatenate(doc_lens)

avg_doc_length = np.mean(doc_lens)

term_doc_built = RowViewableMatrix(term_doc.build())
logger.info("Indexing from tokenization complete")
assert bit_posns is not None
# if data_dir is None:
# data_dir = searcharray_home()
if data_dir is not None:
logger.info(f"Memmapping bit positions to {data_dir}")
bit_posns.memmap(data_dir)
return term_doc_built, bit_posns, term_dict, avg_doc_length, np.array(doc_lens)


def build_index_from_tokenizer(array: Iterable, tokenizer, batch_size=10000,
data_dir: Optional[str] = None,
truncate=False, workers=4):
"""Build index directly from tokenizing docs (array of string)."""
if workers == 1:
return build_index_no_workers(array, tokenizer, batch_size=batch_size,
data_dir=data_dir, truncate=truncate)
term_dict = TermDict()
term_doc = SparseMatSetBuilder()
doc_lens: List[np.ndarray] = []
Expand Down
7 changes: 5 additions & 2 deletions test/test_tmdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,14 @@ def tmdb_pd_data(tmdb_raw_data):


@pytest.fixture(scope="session", params=["full", "ends_empty", "memmap", "small_batch",
"smallbatch_memmap"])
"smallbatch_memmap", "one_worker"])
def tmdb_data(tmdb_pd_data, request):
ensure_data_dir_exists()
print(f"Rebuilding index with {request.param}")
df = tmdb_pd_data
workers = 4 if request.param != "one_worker" else 1
indexed = SearchArray.index(df['title'],
workers=workers,
batch_size=5000 if request.param in ["small_batch", "smallbatch_memmap"] else 100000,
data_dir=DATA_DIR if request.param == "memmap" else None)
df['title_tokens'] = indexed
Expand Down Expand Up @@ -309,7 +311,8 @@ def test_index_benchmark_warmed(benchmark, tmdb_pd_data):
def test_index_benchmark_1k_random(benchmark, tmdb_pd_data):
prof = Profiler(benchmark)
thousand_random = np.random.choice(tmdb_pd_data['overview'], size=1000)
results = prof.run(SearchArray.index, thousand_random, autowarm=False)
results = prof.run(SearchArray.index, thousand_random, autowarm=False,
workers=1)
assert len(results) == 1000


Expand Down

0 comments on commit 4e39538

Please sign in to comment.