forked from HillZhang1999/MuCGEC
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathm2convertor.py
113 lines (101 loc) · 3.92 KB
/
m2convertor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# -*- coding:UTF-8 -*-
# @Author: Xuezhi Fang
# @Date: 2020-06-19
# @Email: [email protected]
import argparse
import re
class M2Processor():
def __init__(self, src_sent, edit_lines):
self.src_sent = src_sent
self.edit_lines = edit_lines
self.edits = {}
self.trg_sents = []
def conv_edit(self, line):
line = line.strip().split("|||")
edit_span = line[0].split(" ")
edit_span = (int(edit_span[0]), int(edit_span[1]))
edit_res = line[2]
editor = line[-1]
if edit_span[0] == -1:
return None
if edit_span[0] == edit_span[1]:
edit_tag = "ADD"
elif edit_res == "-NONE-" or edit_res == "":
edit_tag = "DEL"
else:
edit_tag = "REP"
return editor, edit_tag, edit_span, edit_res
def get_edits(self):
for line in self.edit_lines:
if line:
edit_item = self.conv_edit(line)
if not edit_item:
continue
editor, edit_tag, edit_span, edit_res = edit_item
if editor not in self.edits:
self.edits[editor] = []
self.edits[editor].append({"span": edit_span, "op": edit_tag, "res": edit_res})
def get_para(self):
self.get_edits()
if self.edits:
for editor in self.edits:
sent = self.src_sent.split(" ")
for edit_item in self.edits[editor]:
edit_span, edit_tag, trg_tokens = edit_item["span"], edit_item["op"], edit_item["res"]
if edit_tag == "DEL":
sent[edit_span[0]:edit_span[1]] = [" " for _ in range(edit_span[1] - edit_span[0])]
else:
if edit_tag == "ADD":
if edit_span[0] != 0:
sent[edit_span[0]-1] += " " + trg_tokens
else:
sent[edit_span[0]] = trg_tokens + " " + sent[edit_span[0]]
elif edit_tag == "REP":
src_tokens_len = len(sent[edit_span[0]:edit_span[1]])
sent[edit_span[0]:edit_span[1]] = [trg_tokens] + [" " for _ in range(src_tokens_len-1)]
sent = " ".join(sent).strip()
res_sent = re.sub(" +", " ", sent)
self.trg_sents.append(res_sent)
return self.trg_sents
else:
return [self.src_sent]
def read_file():
src_sent = None
edit_lines = []
with open(args.f, "r", encoding="utf8") as fr:
for line in fr:
if line:
line = line.strip()
if line.startswith("S "):
src_sent = line.replace("S ", "", 1)
elif line.startswith("A "):
edit_lines.append(line.replace("A ", "", 1))
elif line == "":
yield src_sent, edit_lines
edit_lines.clear()
def main():
counter = 0
fw_src = open(f"{args.p}.src", "w", encoding="utf8")
fw_trg = open(args.o, "w", encoding="utf8")
fw_para = open(f"{args.p}.para", "w", encoding="utf8")
for src_sent, edit_lines in read_file():
counter += 1
m2_item = M2Processor(src_sent, edit_lines)
trg_sents = m2_item.get_para()
fw_para.write(f"S {src_sent}\n")
prefix_counter = 0
for sent in trg_sents:
fw_para.write(f"T{prefix_counter} {sent}\n")
prefix_counter += 1
fw_para.write("\n")
fw_src.write(src_sent+"\n")
fw_trg.write(trg_sents[0]+"\n")
fw_src.close()
fw_trg.close()
fw_para.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-f", help="m2 file")
parser.add_argument("-o", help="output file")
args = parser.parse_args()
main()