-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex_works.py
124 lines (105 loc) · 4.2 KB
/
index_works.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# index_works.py
import os
from lxml import etree as ET
from flask import current_app
from models import db
from models.work import Work
from search import search
import logging
from datetime import datetime
import concurrent.futures
import functools
from processors.xml_processor import XMLProcessor # import the new parser
logger = logging.getLogger(__name__)
def extract_text_for_indexing(work):
"""
Extract text from an XML file using the same logic we use in the UI.
This ensures consistent indexing vs. display.
Return a big string of text.
"""
processor = XMLProcessor()
xml_path = work.file_path
if not os.path.exists(xml_path):
logger.error(f"File not found: {xml_path}")
return ""
try:
tree = ET.parse(xml_path)
root = tree.getroot()
if work.collection == 'EEBO-TCP':
# TEI approach
sections = processor.process_eebo_content(root)
# Flatten
text_parts = []
for section_title, section_content in sections:
text_parts.append(section_title)
for (item_type, text) in section_content:
text_parts.append(text)
return " ".join(text_parts)
else:
# Shakespeare play
play_data = processor.process_play_content(root)
text_parts = [play_data['title']]
for act in play_data['acts']:
text_parts.append(act['act_title'])
for scene in act['scenes']:
text_parts.append(scene['scene_title'])
for item in scene['content']:
if item['type'] == 'speech':
text_parts.append(item['speaker'])
text_parts.extend(item['lines'])
elif item['type'] == 'stagedir':
text_parts.append(item['text'])
return " ".join(text_parts)
except Exception as e:
logger.error(f"Error processing {xml_path}: {str(e)}")
return ""
def index_work_with_context(app, work):
"""
Index a single work into Elasticsearch with app context, using the new parser.
"""
with app.app_context():
content = extract_text_for_indexing(work)
if not content.strip():
logger.warning(f"No content extracted from {work.file_path}")
return False
success = search.index_work(work, content)
if success:
logger.info(f"Successfully indexed work {work.id}: {work.title}")
return success
def index_all_works(app):
"""
Index all works in the database into Elasticsearch.
"""
with app.app_context():
total_works = Work.query.count()
logger.info(f"Starting indexing of {total_works} works")
batch_size = 100
successful = 0
failed = 0
index_work_partial = functools.partial(index_work_with_context, app)
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
for offset in range(0, total_works, batch_size):
works = Work.query.offset(offset).limit(batch_size).all()
future_to_work = {
executor.submit(index_work_partial, w): w
for w in works
}
for future in concurrent.futures.as_completed(future_to_work):
w = future_to_work[future]
try:
if future.result():
successful += 1
else:
failed += 1
except Exception as exc:
logger.error(f"Error indexing work {w.id}: {str(exc)}")
failed += 1
progress = (offset + len(works)) / total_works * 100
logger.info(f"Progress: {progress:.1f}% "
f"({successful} succeeded, {failed} failed)")
logger.info(f"Indexing complete. {successful} works indexed successfully, {failed} failed")
if __name__ == "__main__":
from app import app
logger.info("Starting indexing process")
index_all_works(app)
logger.info("Indexing process finished")