Skip to content

Commit

Permalink
Restructured pseudonymize_ne
Browse files Browse the repository at this point in the history
  • Loading branch information
fexfl committed Feb 15, 2025
1 parent d24d170 commit 556217a
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 148 deletions.
122 changes: 60 additions & 62 deletions mailcom/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,8 @@ def __init__(self):
],
}

# records the already replaced names in an email
self.used_first_names = {}

# records NEs in the last email
self.per_list = []
self.org_list = []
self.loc_list = []
self.misc_list = []
self.ne_list = []

def init_spacy(self, language: str, model="default"):
if model == "default":
Expand Down Expand Up @@ -99,13 +93,8 @@ def init_transformers(
)

def reset(self):
# reset used names for processing a new email
self.used_first_names.clear()
# reset NEs
self.per_list.clear()
self.org_list.clear()
self.loc_list.clear()
self.misc_list.clear()
self.ne_list.clear()

def get_sentences(self, input_text):
doc = self.nlp_spacy(input_text)
Expand All @@ -118,71 +107,80 @@ def get_ner(self, sentence):
ner = self.ner_recognizer(sentence)
return ner

def pseudonymize_per(self, new_sentence):
unique_ne_list = list(dict.fromkeys(self.per_list))
for ne in unique_ne_list:
# choose the pseudonym
nm_list = self.used_first_names
pseudo_list = self.pseudo_first_names
pseudonym = ""
name_variations = [
ne,
ne.lower(),
ne.title(),
]
# if this name has been replaced before, choose the same pseudonym
for nm_var in name_variations:
pseudonym = nm_list.get(nm_var, "")
if pseudonym != "":
break
def choose_per_pseudonym(self, name):
pseudonym = ""
# get list of already replaced names, and list of corresponding pseudonyms
used_names = [ne["word"] for ne in self.ne_list]
used_pseudonyms = [
ne["pseudonym"] if "pseudonym" in ne else "" for ne in self.ne_list
]
# amount of pseudonyms for PER used
n_pseudonyms_used = [ne["entity_group"] for ne in self.ne_list].count("PER")
# check all variations of the name
name_variations = [
name,
name.lower(),
name.title(),
]
# if this name has been replaced before, choose the same pseudonym
for nm_var in name_variations:
pseudonym = (
used_pseudonyms[used_names.index(nm_var)]
if nm_var in used_names
else ""
)
if pseudonym != "":
break
# if none is found, choose a new pseudonym
if pseudonym == "":
try:
pseudonym = pseudo_list["fr"][
len(nm_list)
pseudonym = self.pseudo_first_names["fr"][
n_pseudonyms_used
] # reaches end of the list
except IndexError:
pseudonym = pseudo_list["fr"][0]
nm_list[ne] = pseudonym
# replace all occurences with pseudonym
new_sentence = new_sentence.replace(ne, pseudonym)
return new_sentence
pseudonym = self.pseudo_first_names["fr"][0]
return pseudonym

def pseudonymize_ne(self, ner, sentence):
# remove any named entities
entlist = []
new_sentence = sentence
for i in range(len(ner)):
entity = ner[i]
ent_string = entity["entity_group"] # noqa
# record offset generated by pseudonym lengths different than NE lengths
offset = 0
for _, entity in enumerate(ner):
# process NE
ent_string = entity["entity_group"]
ent_word = entity["word"]
# here we could check that string is "PER"
ent_conf = entity["score"] # noqa
ent_position = entity["start"], entity["end"]
# Here we have to be careful - tokenization with
# transformers is quite different from spacy/stanza/flair
# here we get character ids
entlist.append(ent_position)
# now replace respective characters
# replace PER
start, end = entity["start"], entity["end"]
# choose the pseudonym of current NE based on its type
if ent_string == "PER":
# add the name of this entity to list
self.per_list.append(ent_word)
pseudonym = self.choose_per_pseudonym(ent_word)
# replace LOC
elif ent_string == "LOC":
new_sentence = new_sentence.replace(ent_word, "[location]")
self.loc_list.append(ent_word)
pseudonym = "[location]"
# replace ORG
elif ent_string == "ORG":
new_sentence = new_sentence.replace(ent_word, "[organization]")
self.org_list.append(ent_word)
pseudonym = "[organization]"
# replace MISC
elif ent_string == "MISC":
new_sentence = new_sentence.replace(ent_word, "[misc]")
self.misc_list.append(ent_word)
# replace all unique PER now
new_sentence = self.pseudonymize_per(new_sentence)
pseudonym = "[misc]"

# add the pseudonym to the entity dict
entity["pseudonym"] = pseudonym

# add this entity to the total NE list
self.ne_list.append(entity)

# replace the NE with its pseudonym
# only replace this occurence of the NE by using start and end positions
new_sentence = (
new_sentence[: start + offset]
+ pseudonym
+ new_sentence[end + offset :] # noqa
)
# update offset
offset += len(pseudonym) - len(ent_word)

# return new sentence
newlist = [new_sentence]
return newlist

Expand Down Expand Up @@ -259,8 +257,8 @@ def make_dir(path: str):
print("Parsing input file {}".format(file))
text = io.get_text(file)
text = io.get_html_text(text)
xml = io.data_to_xml(text)
io.write_file(xml, path_output / output_filename)
# xml = io.data_to_xml(text)
# io.write_file(xml, path_output / output_filename)
if not text:
continue
# Test functionality of Pseudonymize class
Expand Down
177 changes: 91 additions & 86 deletions mailcom/test/test_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_reset(get_default_fr):
get_default_fr.reset()
# Test that used names lists are empty now
# They should be cleared after every email
assert len(get_default_fr.used_first_names) == 0
assert len(get_default_fr.ne_list) == 0


def test_get_ner(get_default_fr):
Expand Down Expand Up @@ -102,19 +102,6 @@ def test_get_sentences_with_punctuation(get_default_fr):
assert sentences[2] == "Très bien, merci."


def test_pseudonymize_per(get_default_fr):
sentence = "Francois and Agathe are friends."
nelist = ["Francois", "Agathe"]
get_default_fr.per_list = nelist
pseudonymized_sentence = get_default_fr.pseudonymize_per(sentence)
assert "Francois" not in pseudonymized_sentence
assert "Agathe" not in pseudonymized_sentence
assert any(
pseudo in pseudonymized_sentence
for pseudo in get_default_fr.pseudo_first_names["fr"]
)


def test_pseudonymize_numbers(get_default_fr):
sentence = "My phone number is 123-456-7890."
pseudonymized_sentence = get_default_fr.pseudonymize_numbers(sentence)
Expand Down Expand Up @@ -198,92 +185,110 @@ def test_pseudonymize_email_addresses(get_default_fr):
assert pseudonymized_sentence == ""


def test_pseudonymize_ne_with_person_entities(get_default_fr):
sentence = "Francois et Agathe sont amis."
def test_choose_per_pseudonym_new_name(get_default_fr):
name = "Jean"
pseudonym = get_default_fr.choose_per_pseudonym(name)
assert pseudonym in get_default_fr.pseudo_first_names["fr"]


def test_choose_per_pseudonym_existing_name(get_default_fr):
name = "Claude"
get_default_fr.ne_list = [
{"word": "Claude", "entity_group": "PER", "pseudonym": "Dominique"}
]
pseudonym = get_default_fr.choose_per_pseudonym(name)
assert pseudonym == "Dominique"


def test_choose_per_pseudonym_case_insensitive(get_default_fr):
name = "claude"
get_default_fr.ne_list = [
{"word": "Claude", "entity_group": "PER", "pseudonym": "Dominique"}
]
pseudonym = get_default_fr.choose_per_pseudonym(name)
assert pseudonym == "Dominique"


def test_choose_per_pseudonym_exhausted_list(get_default_fr):
name = "Jean"
get_default_fr.ne_list = [
{"word": "Claude", "entity_group": "PER", "pseudonym": pseudo}
for pseudo in get_default_fr.pseudo_first_names["fr"]
]
pseudonym = get_default_fr.choose_per_pseudonym(name)
assert pseudonym == get_default_fr.pseudo_first_names["fr"][0]


def test_pseudonymize_ne_person(get_default_fr):
sentence = "Mehdi et Théo sont amis."
ner = [
{
"entity_group": "PER",
"score": 0.99,
"word": "Francois",
"start": 0,
"end": 8,
},
{
"entity_group": "PER",
"score": 0.99,
"word": "Agathe",
"start": 13,
"end": 19,
},
{"entity_group": "PER", "word": "Mehdi", "start": 0, "end": 5},
{"entity_group": "PER", "word": "Théo", "start": 9, "end": 13},
]
pseudonymized_sentence = get_default_fr.pseudonymize_ne(ner, sentence)
assert "Francois" not in pseudonymized_sentence[0]
assert "Agathe" not in pseudonymized_sentence[0]
pseudonymized_sentence = " ".join(get_default_fr.pseudonymize_ne(ner, sentence))
assert "Mehdi" not in pseudonymized_sentence
assert "Théo" not in pseudonymized_sentence
assert any(
pseudo in pseudonymized_sentence[0]
pseudo in pseudonymized_sentence
for pseudo in get_default_fr.pseudo_first_names["fr"]
)


def test_pseudonymize_ne_with_location_entities(get_default_fr):
sentence = "Paris et New York sont des villes."
def test_pseudonymize_ne_location(get_default_fr):
sentence = "Paris est une belle ville."
ner = [
{"entity_group": "LOC", "word": "Paris", "start": 0, "end": 5},
]
pseudonymized_sentence = " ".join(get_default_fr.pseudonymize_ne(ner, sentence))
assert pseudonymized_sentence == "[location] est une belle ville."


def test_pseudonymize_ne_organization(get_default_fr):
sentence = "Microsoft est une grande entreprise."
ner = [
{
"entity_group": "LOC",
"score": 0.99,
"word": "Paris",
"start": 0,
"end": 5,
},
{
"entity_group": "LOC",
"score": 0.99,
"word": "New York",
"start": 10,
"end": 18,
},
{"entity_group": "ORG", "word": "Microsoft", "start": 0, "end": 9},
]
pseudonymized_sentence = get_default_fr.pseudonymize_ne(ner, sentence)
assert "Paris" not in pseudonymized_sentence[0]
assert "New York" not in pseudonymized_sentence[0]
assert "[location]" in pseudonymized_sentence[0]
pseudonymized_sentence = " ".join(get_default_fr.pseudonymize_ne(ner, sentence))
assert pseudonymized_sentence == "[organization] est une grande entreprise."


def test_pseudonymize_ne_with_organization_entities(get_default_fr):
sentence = "Google et Microsoft sont des géants de la technologie."
def test_pseudonymize_ne_misc(get_default_fr):
sentence = "Le Tour de France est un événement célèbre."
ner = [
{
"entity_group": "ORG",
"score": 0.99,
"word": "Google",
"start": 0,
"end": 6,
},
{
"entity_group": "ORG",
"score": 0.99,
"word": "Microsoft",
"start": 11,
"end": 20,
},
{"entity_group": "MISC", "word": "Tour de France", "start": 3, "end": 17},
]
pseudonymized_sentence = get_default_fr.pseudonymize_ne(ner, sentence)
assert "Google" not in pseudonymized_sentence[0]
assert "Microsoft" not in pseudonymized_sentence[0]
assert "[organization]" in pseudonymized_sentence[0]
pseudonymized_sentence = " ".join(get_default_fr.pseudonymize_ne(ner, sentence))
assert pseudonymized_sentence == "Le [misc] est un événement célèbre."


def test_pseudonymize_ne_with_misc_entities(get_default_fr):
sentence = "La tour Eiffel est un monument célèbre."
def test_pseudonymize_ne_multiple_entities(get_default_fr):
sentence = "Thomas travaille chez Microsoft à Paris."
ner = [
{
"entity_group": "MISC",
"score": 0.99,
"word": "tour Eiffel",
"start": 4,
"end": 16,
},
{"entity_group": "PER", "word": "Thomas", "start": 0, "end": 6},
{"entity_group": "ORG", "word": "Microsoft", "start": 18, "end": 27},
{"entity_group": "LOC", "word": "Paris", "start": 30, "end": 35},
]
pseudonymized_sentence = get_default_fr.pseudonymize_ne(ner, sentence)
assert "tour Eiffel" not in pseudonymized_sentence[0]
assert "[misc]" in pseudonymized_sentence[0]
pseudonymized_sentence = " ".join(get_default_fr.pseudonymize_ne(ner, sentence))
assert "Thomas" not in pseudonymized_sentence
assert "Microsoft" not in pseudonymized_sentence
assert "Paris" not in pseudonymized_sentence
assert any(
pseudo in pseudonymized_sentence
for pseudo in get_default_fr.pseudo_first_names["fr"]
)
assert "[organization]" in pseudonymized_sentence
assert "[location]" in pseudonymized_sentence


def test_pseudonymize_ne_no_entities(get_default_fr):
sentence = "Ceci est une phrase sans entités nommées."
ner = []
pseudonymized_sentence = " ".join(get_default_fr.pseudonymize_ne(ner, sentence))
assert pseudonymized_sentence == sentence


def test_pseudonymize_ne_empty_sentence(get_default_fr):
sentence = "Ceci est une phrase sans entités nommées."
ner = []
pseudonymized_sentence = " ".join(get_default_fr.pseudonymize_ne(ner, sentence))
assert pseudonymized_sentence == sentence

0 comments on commit 556217a

Please sign in to comment.