-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathlanguage.py
168 lines (129 loc) · 4.44 KB
/
language.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# Copyright 2016 Douglas Bagnall <[email protected]>
# -*- coding: utf-8 -*-
import os
import unicodedata
import re
from collections import Counter
from hashlib import sha1
import mappings
import meta
HERE = os.path.dirname(__file__)
TRAINING_CORPUS = os.path.join(HERE,
'corpus/pan16-author-clustering-training-dataset-2016-02-17/')
CONTROL_CORPUS = os.path.join(HERE,
'corpus/control/')
VALIDATION_CORPUS = os.path.join(HERE,
'corpus/validation/')
def always(x):
return True
def read_file(fn):
f = open(fn)
text = f.read()
f.close()
return text
def get_text_and_id(fn, lang, raw=False):
text = read_file(fn)
if not raw:
remap = mappings.get_charmap(lang)
text = remap(text)
tid = sha1(text).hexdigest()
return text, tid
def load_control_texts(srcdir, lang, ext='.txt'):
"""load the texts, deduplicating along the way."""
texts = {}
records = []
srcdir = os.path.join(srcdir, lang)
for d, dirnames, filenames in os.walk(srcdir, followlinks=True):
for fn in filenames:
if fn.endswith(ext):
ffn = os.path.join(d, fn)
text, tid = get_text_and_id(ffn, lang)
texts[tid] = text
records.append((fn, ffn, tid))
return texts, records
def load_problem_texts(srcdir, lang, raw=False, ext='.txt'):
"""load the texts, remapping and deduplicating along the way."""
texts = {}
records = []
for fn in sorted(os.listdir(srcdir)):
if fn.endswith(ext):
ffn = os.path.join(srcdir, fn)
text, tid = get_text_and_id(ffn, lang, raw)
texts[tid] = text
records.append((fn, ffn, tid))
return texts, records
def load_corpus(srcdir, lang, raw=False):
dirs = meta.read_lang_info(srcdir, lang)
texts = {}
problems = {}
for d in dirs:
fulldir = os.path.join(srcdir, d)
d_texts, records = load_problem_texts(fulldir, lang, raw)
problems[d] = records
texts.update(d_texts)
return texts, problems
def concat_corpus(srcdir, lang, raw=False):
texts, problems = load_corpus(srcdir, lang, raw)
return '\n'.join(texts.values())
def count_chars(text, decompose=False):
text = text.decode('utf-8')
if decompose:
text = unicodedata.normalize('NFKD', text)
else:
text = unicodedata.normalize('NFKC', text)
c = Counter(text)
return c.most_common()
def word_df_filter(texts, threshold):
nonalpha = u'!"%&\'()*,./7`:;?¹—\s«¹»€-'
wsplit = re.compile(u'[%s]+' % nonalpha, re.UNICODE).split
c = Counter()
for v in texts.values():
v = v.replace('¹', '')
v = v.decode('utf-8')
words = set(wsplit(v))
c.update(words)
t = len(texts) * threshold
good_words = set(x for x in c if c[x] > t)
def accept_word(m):
w = m.group(0)
if w in good_words:
return w
return u'°'
for k in texts.keys():
v = texts[k].decode('utf-8')
v = re.sub(u'[^%s]+' % nonalpha, accept_word, v, flags=re.UNICODE)
texts[k] = v.encode('utf-8')
# Charmaps always discard these
dispensible_chars = set('\x0b\x0c\r'.decode('utf8') + u'\ufeff\xad\x85' +
u'\u2028\\_')
# Charmaps discard these unless really common.
discountable_chars = {k: 0.25 for k in '+=<>|%'}
single_quotes = set("'‘’‘‘".decode('utf8') + u'\u2018')
double_quotes = set('‟"„“”'.decode('utf8'))
def unify_case(text):
# trickier than .lower() because of the decomposed case marker
text = text.lower()
return text.replace("¹".decode('utf8'), "")
def split_words(text, ignore_case=False):
if not isinstance(text, unicode):
text = text.decode('utf8')
if ignore_case:
text = unify_case(text)
text = re.sub(r"(?<=\w)'(?=\w)", r"³".decode('utf8'),
text, flags=re.U)
words = re.split(r"[^\w³-]+".decode('utf8'), text, flags=re.U)
words = sum((w.split('--') for w in words), [])
words = [x.strip('-_') for x in words]
return [x for x in words if x]
def decode_split_word(w):
return w.replace('³'.decode('utf8'), "'")
def print_word_counts(c):
prev_n = None
for w, n in c.most_common():
if n != prev_n:
print "\n------------- %s --------------" % n
prev_n = n
w = decode_split_word(w)
print w,
print
print len(c)