-
Notifications
You must be signed in to change notification settings - Fork 667
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added writer.wait_merging_threads
call
#628
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hopefully this doesn't hurt performance
I hope it's ok me stopping by, I just wanted to do a quick check on the merging_threads addition.
In your code, it will definitely hurt async concurrency, but probably not data throughput for indexing. I don't know whether your application is sensitive to the concurrency impact or not. Handwaving some numbers, if your application usually serves say 10k concurrent connections, a little blocking code like the writer interactions you have could drop that to, say 1k if it was in a hot path. It probably isn't, but using these made up numbers that's the kind of impact blocking code has inside async event loops. (OTOH if you have very few concurrent IO-bound tasks/users then it probably doesn't matter)
async def _add_document() -> None:
if not await self.filecheck(index_doc["file_location"], index_doc["body"]):
try:
writer: IndexWriter = (await self.index).writer()
writer.add_document(Document.from_dict(index_doc)) # type: ignore[call-arg]
writer.commit()
writer.wait_merging_threads()
filehash = self.filehash(index_doc["body"])
(await self.index_files)[index_doc["file_location"]] = filehash
if document:
docs_index_dir = await self.docs_index_directory
async with await anyio.open_file(
docs_index_dir / f"{filehash}.{self.storage.extension()}",
"wb",
) as f:
await f.write(self.storage.write_to_string(document))
self.changed = True
except ValueError as e:
if "Failed to acquire Lockfile: LockBusy." in str(e):
raise AsyncRetryError("Failed to acquire lock.") from e
raise The issue is that all interactions with the It might look something like this (pseudocode): def writer_stuff(index_path, index_docs):
writer: IndexWriter = Index.open(index_path).writer(heap_size=1000 * 1000 * 1000)
for index_doc in index_docs:
writer.add_document(Document.from_dict(index_doc)) # type: ignore[call-arg]
writer.commit()
writer.wait_merging_threads() And then at the call site async def _add():
...
index_docs = ...
exe = concurrent.futures.ProcessPoolExecutor()
loop = asyncio.get_running_loop()
await loop.run_in_executor(exe, writer_stuff, index_path, index_docs)
... This kind of code is annoying to write, because any parameters that cross the interprocess boundary do so in a pickle and that's why I pass the |
(fyi @mskarlin @jamesbraza ) |
Hello @cjrh sorry we had some stuff that delayed me a bit on a response. Thanks for your thoughts/time here, it's much appreciated, and actually this PR turns out to be a massive win:
So it turns out by not calling I might suggest to tantivy-py to somehow tweak the interface on Per your comment on multiprocessing/GIL, that is a great suggestion. I opened #646 to make sure we track this possibility |
good to hear 👍🏼 22 files sounds much more like what I was expecting. You should no longer be seeing the memory error. |
Per advice in quickwit-oss/tantivy-py#359 (comment)