-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcleanup_gutenberg_headers.py
273 lines (222 loc) · 8 KB
/
cleanup_gutenberg_headers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
"""
Removes the headers and footers from the Gutenberg raw files and creates
clean versions of the text.
Code copied/adapted from the standardized Gutenberg corpus github,
https://github.com/pgcorpus/gutenberg
"""
from __future__ import unicode_literals
import os
import sys
import logging
import gzip
from pathlib import Path
import utils_general as utils_g
def parse_args(args, config):
parser = utils_g.create_parser(config)
parser.add_argument(
'src_dir',
type=str,
help='source data directory containing *.txt* files in *any* subdirectory'
)
parser.add_argument(
'target_dir',
type=str,
help='target directory to write all clean files'
)
parser.add_argument(
'--suffix',
default='.txt',
type=str,
help='file suffix/suffix header for the target files')
return parser.parse_args(args)
def cleanup(path, text_dir, suffix='.txt', overwrite=False):
"""
Remove headers from raw txt file.
Parameters
----------
path : string
Path to the PG****_raw.txt file
text_dir: target directory
"""
# PG_number = path.split("/")[-1].split("_")[0][2:]
if not isinstance(path, Path):
path = Path(path)
PG_number = path.name.split(suffix)[0][2:]
source_file = os.path.join(text_dir, "PG%s_text.txt" % PG_number)
if not overwrite:
if Path(source_file).exists():
logging.warning(f'File {source_file} exists. Set overwrite=True.')
return
if path.suffix == '.gzip':
with gzip.open(path, 'rt') as f:
text = f.read()
else:
with open(path) as f:
text = f.read()
clean = strip_headers(text)
with open(source_file, "w") as f:
f.write(clean)
############
TEXT_START_MARKERS = frozenset((
"*END*THE SMALL PRINT",
"*** START OF THE PROJECT GUTENBERG",
"*** START OF THIS PROJECT GUTENBERG",
"This etext was prepared by",
"E-text prepared by",
"Produced by",
"Distributed Proofreading Team",
"Proofreading Team at http://www.pgdp.net",
"http://gallica.bnf.fr)",
" http://archive.org/details/",
"http://www.pgdp.net",
"by The Internet Archive)",
"by The Internet Archive/Canadian Libraries",
"by The Internet Archive/American Libraries",
"public domain material from the Internet Archive",
"Internet Archive)",
"Internet Archive/Canadian Libraries",
"Internet Archive/American Libraries",
"material from the Google Print project",
"*END THE SMALL PRINT",
"***START OF THE PROJECT GUTENBERG",
"This etext was produced by",
"*** START OF THE COPYRIGHTED",
"The Project Gutenberg",
"http://gutenberg.spiegel.de/ erreichbar.",
"Project Runeberg publishes",
"Beginning of this Project Gutenberg",
"Project Gutenberg Online Distributed",
"Gutenberg Online Distributed",
"the Project Gutenberg Online Distributed",
"Project Gutenberg TEI",
"This eBook was prepared by",
"http://gutenberg2000.de erreichbar.",
"This Etext was prepared by",
"This Project Gutenberg Etext was prepared by",
"Gutenberg Distributed Proofreaders",
"Project Gutenberg Distributed Proofreaders",
"the Project Gutenberg Online Distributed Proofreading Team",
"**The Project Gutenberg",
"*SMALL PRINT!",
"More information about this book is at the top of this file.",
"tells you about restrictions in how the file may be used.",
"l'authorization à les utilizer pour preparer ce texte.",
"of the etext through OCR.",
"*****These eBooks Were Prepared By Thousands of Volunteers!*****",
"We need your donations more than ever!",
" *** START OF THIS PROJECT GUTENBERG",
"**** SMALL PRINT!",
'["Small Print" V.',
' (http://www.ibiblio.org/gutenberg/',
'and the Project Gutenberg Online Distributed Proofreading Team',
'Mary Meehan, and the Project Gutenberg Online Distributed Proofreading',
' this Project Gutenberg edition.',
))
TEXT_END_MARKERS = frozenset((
"*** END OF THE PROJECT GUTENBERG",
"*** END OF THIS PROJECT GUTENBERG",
"***END OF THE PROJECT GUTENBERG",
"End of the Project Gutenberg",
"End of The Project Gutenberg",
"Ende dieses Project Gutenberg",
"by Project Gutenberg",
"End of Project Gutenberg",
"End of this Project Gutenberg",
"Ende dieses Projekt Gutenberg",
" ***END OF THE PROJECT GUTENBERG",
"*** END OF THE COPYRIGHTED",
"End of this is COPYRIGHTED",
"Ende dieses Etextes ",
"Ende dieses Project Gutenber",
"Ende diese Project Gutenberg",
"**This is a COPYRIGHTED Project Gutenberg Etext, Details Above**",
"Fin de Project Gutenberg",
"The Project Gutenberg Etext of ",
"Ce document fut presente en lecture",
"Ce document fut présenté en lecture",
"More information about this book is at the top of this file.",
"We need your donations more than ever!",
"END OF PROJECT GUTENBERG",
" End of the Project Gutenberg",
" *** END OF THIS PROJECT GUTENBERG",
))
LEGALESE_START_MARKERS = frozenset(("<<THIS ELECTRONIC VERSION OF",))
LEGALESE_END_MARKERS = frozenset(("SERVICE THAT CHARGES FOR DOWNLOAD",))
def strip_headers(text):
"""
Remove lines that are part of the Project Gutenberg header or footer.
Note: this function is a port of the C++ utility by Johannes Krugel. The
original version of the code can be found at:
http://www14.in.tum.de/spp1307/src/strip_headers.cpp
Args:
text (unicode): The body of the text to clean up.
Returns:
unicode: The text with any non-text content removed.
"""
lines = text.splitlines()
sep = str(os.linesep)
out = []
i = 0
footer_found = False
ignore_section = False
for line in lines:
reset = False
if i <= 600:
# Check if the header ends here
if any(line.startswith(token) for token in TEXT_START_MARKERS):
reset = True
# If it's the end of the header, delete the output produced so far.
# May be done several times, if multiple lines occur indicating the
# end of the header
if reset:
out = []
continue
if i >= 100:
# Check if the footer begins here
if any(line.startswith(token) for token in TEXT_END_MARKERS):
footer_found = True
# If it's the beginning of the footer, stop output
if footer_found:
break
if any(line.startswith(token) for token in LEGALESE_START_MARKERS):
ignore_section = True
continue
elif any(line.startswith(token) for token in LEGALESE_END_MARKERS):
ignore_section = False
continue
if not ignore_section:
out.append(line.rstrip(sep))
i += 1
return sep.join(out)
def main(args=None):
if args is None:
args = sys.argv[1:]
config = {
'logdir': Path.cwd() / 'logs',
'logfile': False,
'logscreen': True,
'overwrite': False,
}
args = parse_args(args, config)
utils_g.makedir_if_needed(config['logdir'])
if args.logname is None:
logname = Path(config['logdir']) / f'{Path(sys.argv[0]).stem}'
else:
logname = Path(config['logdir']) / args.logname
utils_g.log_sysargs(logname, args, script=sys.argv[0],
to_file=args.logfile,
to_screen=args.logscreen
)
logging.info('Initialized logging...')
logging.info(args)
src_dir = Path(args.src_dir)
target_dir = Path(args.target_dir)
suffix = args.suffix
utils_g.makedir_if_needed(target_dir)
srcfiles = [f for f in utils_g.get_all_files(src_dir) if suffix in f.name]
for i, path in enumerate(srcfiles):
logging.info(f'Cleaning up {path}')
cleanup(path, target_dir, overwrite=args.overwrite)
logging.info('DONE.')
if __name__ == '__main__':
main()