Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [#62] add refactored method for Japanese MinHashLSH-based near-deduplication #63

Closed
21 changes: 21 additions & 0 deletions configs/japanese_job_v2_oscar-2301.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# pre-processing:
base_dir: '/fsx/polyglot_data/major/ja/0_raw/oscar-2301'
targets:
- ""
output_dir: '/fsx/polyglot_data/major/ja/2_quality_filter/v2/oscar-2301'

n_dist: 64
n_output: 100
is_cluster: False
is_local: False

min_doc_len: 50
max_doc_len: 100000
min_mean_word_len: 1
max_mean_word_len: 10
symbol_to_word_ratio: 0.1
bullet_point_ratio: 0.9
ellipsis_ratio: 0.3
japanese_word_ratio: 0.8
freq_char_cnt: 1
separator_ratio: 0.1
13 changes: 11 additions & 2 deletions dps/spark/jobs/japanese_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
#japanese_remove_repeated_text,
japanese_symbol_to_word_ratio_filter,
japanese_frequent_char_existence_filter,
reduce_japanese_emoticon
reduce_japanese_emoticon,
many_separators_filter,
remove_symbols,
)


Expand All @@ -46,7 +48,11 @@ def japanese_job(config_path: str):
conf = yaml.load(f, Loader=yaml.FullLoader)

input_paths = ",".join([f'{conf["base_dir"]}/{t}' for t in conf["targets"]])
session_fn = spark_session_for_cluster if conf["is_cluster"] else spark_session
if conf["is_local"]:
from dps.spark.spark_session import spark_session_local
sessin_fn = spark_session_local
else:
session_fn = spark_session_for_cluster if conf["is_cluster"] else spark_session

with session_fn("Japanse text processing job") as spark:
sc: SparkContext = spark.sparkContext
Expand All @@ -63,6 +69,9 @@ def japanese_job(config_path: str):
.filter(lambda x: dict(text=preprocess_text(x["text"])))
.filter(lambda x: doc_len_filter(x["text"], conf["min_doc_len"], conf["max_doc_len"]))
.filter(lambda x: japanese_frequent_char_existence_filter(x["text"], conf["freq_char_cnt"]))
.filter(lambda x: reduce_japanese_emoticon(x["text"]))
.filter(lambda x: many_separators_filter(x["text"]), conf["separator_ratio"])
.filter(lambda x: remove_symbols(x["text"]))
)
proc_rdd.repartition(conf["n_output"]).flatMap(to_json).saveAsTextFile(conf["output_dir"])

108 changes: 108 additions & 0 deletions dps/spark/jobs/japanese_minhash_dedup_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""inspired from `depup_job_romance_minhash.py `"""
import argparse

from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, MinHashLSH
from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col

import sparknlp
from sparknlp.annotator import WordSegmenterModel
from sparknlp.base import DocumentAssembler


def create_spark_session() -> SparkSession:
spark = SparkSession.builder \
.appName("Spark NLP Enterprise") \
.master("local[*]") \
.config("spark.driver.memory","16") \
.config("spark.driver.maxResultSize", "2G") \
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:${version_public}") \
.config("spark.jars", "https://pypi.johnsnowlabs.com/${secret.code}/spark-nlp-jsl-${version}.jar") \
.getOrCreate()
return spark


def create_feature_pipeline() -> Pipeline:
hashing_tf = HashingTF(inputCol="result", outputCol="features", numFeatures=2**14)
idf = IDF(inputCol="features", outputCol="idf_features")
return Pipeline(stages=[hashing_tf, idf])


def create_minhash_lsh() -> MinHashLSH:
return MinHashLSH(inputCol="idf_features", outputCol="hashes", numHashTables=5)


def deduplicate_dataset(df: DataFrame, threshold: float) -> DataFrame:
# Fit and transform the DataFrame
pipeline = create_feature_pipeline()
model = pipeline.fit(df)
transformed_df = model.transform(df)

# Configure and fit the MinHashLSH model
minhash_lsh = create_minhash_lsh()
minhash_lsh_model = minhash_lsh.fit(transformed_df)

# Perform self-join and filter based on the Jaccard distance threshold
similar_pairs = minhash_lsh_model.approxSimilarityJoin(transformed_df, transformed_df, threshold)
filtered_pairs = similar_pairs.filter("datasetA.id < datasetB.id")
# Deduplicate the dataset
deduplicated_ids = filtered_pairs.select(col("datasetA.id").alias("id")).distinct()
deduplicated_df = df.join(deduplicated_ids, "id", "leftanti")
return deduplicated_df


def main() -> None:
# arguments
threshold = 0.8

spark = create_spark_session()
sc: SparkContext = spark.sparkContext

example = spark.createDataFrame([
['清代は湖北省が置かれ、そのまま現代の行政区分になっている。'],
['データブリックスは、学術界とオープンソースコミュニティをルーツとするデータ+AIの企業です。'],
['データブリックスは、学術界とオープンソースコミュニティをルーツとするデータの企業です。'],
['ジョンスノーラボからこんにちは! ']
], ["text"])

# Word segmenter or Tokenizer
document_assembler = DocumentAssembler()\
.setInputCol("text")\
.setOutputCol("document")

word_segmenter = WordSegmenterModel.pretrained("wordseg_gsd_ud", "ja")\
.setInputCols("document")\
.setOutputCol("token")

pipeline = Pipeline(stages=[
document_assembler,
word_segmenter,
])
ws_model = pipeline.fit(spark.createDataFrame([[""]]).toDF("text"))
segmented_df = ws_model.transform(example)

# deduplicationn
df = segmented_df.select(["text", "token.result"])
df = df.withColumn("id", F.monotonically_increasing_id())

feature_pipeline = create_feature_pipeline()
model = feature_pipeline.fit(df)
transformed_df = model.transform(df)

minhash_lsh = MinHashLSH(inputCol="idf_features", outputCol="hashes", numHashTables=5)
minhash_lsh_model = minhash_lsh.fit(transformed_df)
similar_pairs = minhash_lsh_model.approxSimilarityJoin(
transformed_df, transformed_df, threshold)
filtered_pairs = similar_pairs.filter("datasetA.id < datasetB.id")

deduplicated_ids = filtered_pairs.select(F.col("datasetA.id").alias("id")).distinct()
deduplicated_df = df.join(deduplicated_ids, "id", "leftanti")

print(deduplicated_df.toPandas())
spark.stop()


if __name__ == "__main__":
main()
21 changes: 19 additions & 2 deletions dps/spark/prep/japanese_prep.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,36 @@ def japanese_mean_word_len_filter(


def japanese_symbol_to_word_ratio_filter(text: str, symbol_to_word_ratio: float) -> bool:
symbols = ["…", "...", "#"]
symbols = [
"...", "…", "[…]",
"#",
]
words = word_tokenize(text)
return symbol_to_word_ratio >= (
len([word for word in words if any([symbol in word for symbol in symbols])])
/ (len(words) + 1e-12)
)


def japanese_frequent_char_existence_filter(text: str, freq_char_cnt: int) -> bool:
return freq_char_ratio <= (
return freq_char_cnt <= (
sum([re.search(chr, text)!=None for chr in JAPANESE_FREQ_CHAR_LIST])
)


def reduce_japanese_emoticon(text):
text = re.sub("w{3,}", "www", text)
text = re.sub("笑{2,}", "笑", text)
return text


def many_separators_filter(text: str, separator_ratio: float):
whitespace_ratio = (len(text.split()) - 1) / len(text)
touten_ratio = (len(text.split("、")) - 1) / len(text)
return (whitespace_ratio <= separator_ratio) and (touten_ratio <= separator_ratio)
# NOTE: test and check the filter with the opposite condition
# return (whitespace_ratio > 0.1) or (touten_ratio > 0.1)


def remove_symbols(text):
return text.replace("[…]", "")
7 changes: 6 additions & 1 deletion dps/spark/utils/japanese_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,9 @@
]
BAD_WORD_LIST = BAD_WORD_LIST_1 + BAD_WORD_LIST_2

JAPANESE_FREQ_CHAR_LIST = ["は","を","に","と","の", "て", "へ", "です", "だ"]
JAPANESE_FREQ_CHAR_LIST = [
"て", "に", "を", "は",
# "と","の",
"へ",
"です", "だ",
]