From 106a2a0debdcf83981b9074b8e7cb0c9afa7cdfc Mon Sep 17 00:00:00 2001 From: Yao Qing Date: Fri, 3 Nov 2023 09:26:08 +0800 Subject: [PATCH] [v1.2][ISSUE-306]Refactor rouge_score_dedup to support more compute functions (#431) * optimize rouge-score impl spark version Signed-off-by: Xue, Chendi * update rouge-score method Signed-off-by: Xue, Chendi * update Signed-off-by: Xue, Chendi * optimize by using local file Signed-off-by: Xue, Chendi * Format codes and remove unnecessary comments. . * Refactor rouge_score_dedup to support more compute functions. . * Remove debug codes. . * Update text_compare_dedup.py --------- Signed-off-by: Xue, Chendi Co-authored-by: Xue, Chendi --- .../primitives/llmutils/rouge_score_dedup.py | 1 + .../pyrecdp/primitives/operations/__init__.py | 2 +- .../operations/text_compare_dedup.py | 181 ++++++++++++++++++ .../operations/text_rouge_score_dedup.py | 142 -------------- 4 files changed, 183 insertions(+), 143 deletions(-) create mode 100644 RecDP/pyrecdp/primitives/operations/text_compare_dedup.py delete mode 100644 RecDP/pyrecdp/primitives/operations/text_rouge_score_dedup.py diff --git a/RecDP/pyrecdp/primitives/llmutils/rouge_score_dedup.py b/RecDP/pyrecdp/primitives/llmutils/rouge_score_dedup.py index 636a40f40..dc3d9be12 100644 --- a/RecDP/pyrecdp/primitives/llmutils/rouge_score_dedup.py +++ b/RecDP/pyrecdp/primitives/llmutils/rouge_score_dedup.py @@ -46,5 +46,6 @@ def rouge_score_dedup(data_dir, out_dir, data_file_type="jsonl", max_ratio=0.7, output_dir = args.output_dir max_ratio = args.max_ratio batch_size = args.batch_size + with Timer(f"Remove duplicate item by rouge score for {data_dir}"): rouge_score_dedup(data_dir, output_dir, data_file_type, max_ratio, batch_size) diff --git a/RecDP/pyrecdp/primitives/operations/__init__.py b/RecDP/pyrecdp/primitives/operations/__init__.py index 666a6f48f..7024a8419 100644 --- a/RecDP/pyrecdp/primitives/operations/__init__.py +++ b/RecDP/pyrecdp/primitives/operations/__init__.py @@ -16,5 +16,5 @@ from .text_diversityindicate import TextDiversityIndicate from .text_custom import TextCustomerMap, TextCustomerFilter from .text_toxicity import TextToxicity -from .text_rouge_score_dedup import RougeScoreDedup +from .text_compare_dedup import RougeScoreDedup from .text_perplexity_score import TextPerplexityScore diff --git a/RecDP/pyrecdp/primitives/operations/text_compare_dedup.py b/RecDP/pyrecdp/primitives/operations/text_compare_dedup.py new file mode 100644 index 000000000..f003506e3 --- /dev/null +++ b/RecDP/pyrecdp/primitives/operations/text_compare_dedup.py @@ -0,0 +1,181 @@ +from .base import BaseLLMOperation, LLMOPERATORS, statistics_decorator +from ray.data import Dataset +from pyspark.sql import DataFrame + +import pyspark.sql.functions as F +from pyspark.sql import types as T +from pyspark.sql import Row +from rouge_score import rouge_scorer +from pyrecdp.primitives.llmutils.third_party import generate_connected_components + +from .logging_utils import logger +from pyrecdp.core.utils import Timer +from tqdm import tqdm +import pandas as pd + + +class BaseCompareDedup(BaseLLMOperation): + def __init__(self, text_key='text', max_ratio=0.7, batch_size=100, score_store_path='RougeScorefiltered.parquet', + args_dict={}): + settings = {'text_key': text_key, 'max_ratio': max_ratio, 'batch_size': batch_size, + 'score_store_path': score_store_path} + settings.update(args_dict) + super().__init__(settings) + self.text_key = text_key + self.max_ratio = max_ratio + self.batch_size = batch_size + self.score_store_path = score_store_path + self.support_spark = True + self.support_ray = False + self.new_column_name = "score" + + def process_rayds(self, ds=None): + total_rows = ds.count() + line_num = [] + for i in range(1, total_rows): + + d1, d2, d3 = ds.split_at_indices([i, i + 1]) + target_sample = d2.take(1)[0] + instruction = target_sample[self.text_key] + + compute_func = self.get_compute_func() + + # ds = d2.filter(lambda x: True if rouge_scorer._score_lcs(new_instruction_token, scorer._tokenizer.tokenize( + # x["instruction"])).fmeasure < 0.7 else False) + def process_row(sample): + sample[self.new_column_name] = compute_func(instruction, sample[self.text_key]) + return sample + + ds_score: Dataset = d1.map(lambda x: process_row(x)) + if i == 1: + filterd_ds = d1 + if ds_score.max("rouge_score") < self.max_ratio: + filterd_ds = filterd_ds.union(d2) + + return filterd_ds + + @statistics_decorator + def process_spark(self, spark, spark_df: DataFrame) -> DataFrame: + max_ratio = self.max_ratio + spark_df = spark_df.withColumn('id_1', F.monotonically_increasing_id()) + instruction_df_1 = spark_df.withColumnRenamed(self.text_key, "similarity_left") + instruction_df_2 = (spark_df.withColumnRenamed("id_1", "id_2") + .withColumnRenamed(self.text_key, "similarity_right")) + + monotonically_increasing_id_list = spark_df.rdd.map(lambda x: x.id_1).collect() + batches = [monotonically_increasing_id_list[i: i + self.batch_size] for i in + range(0, len(monotonically_increasing_id_list), self.batch_size)] + + def gen_id(id_1, id_2): + if id_1 == id_2: + return -1 + if id_1 < id_2: + return f"{id_1} :: {id_2}" + else: + return f"{id_2} :: {id_1}" + + gen_id_udf = F.udf(gen_id, T.StringType()) + compare_rouge_score_udf = F.udf(self.get_compute_func(), T.FloatType()) + history_pair_df = None + score_df_list = [] + + for batch_count, to_process_ids in tqdm(enumerate(batches), total=len(batches)): + with Timer(f"Round {batch_count}"): + # prepare matrix for one batch calculation + # 1. cross join to get n*n pairs + # 2. use id_pair to reduce calculated pair, if we have dome i_j, then skip j_i + # 3. skip i_i + R = Row('id_2') + tmp_id_df = spark.createDataFrame([R(i) for i in to_process_ids]) + batch_df = instruction_df_2.join(tmp_id_df, on='id_2', how='inner') + dupli_score_matrix = instruction_df_1.crossJoin(batch_df) + dupli_score_matrix = dupli_score_matrix.withColumn("id_pair", + gen_id_udf(F.column("id_1"), F.column("id_2"))) + dupli_score_matrix = dupli_score_matrix.dropDuplicates(["id_pair"]) + dupli_score_matrix = dupli_score_matrix.filter(F.column("id_1") != F.column("id_2")) + dupli_score_matrix = dupli_score_matrix.cache() + + # Now we have minimun pair, start to calculate rouge score + remove_df = dupli_score_matrix.withColumn(self.new_column_name, + compare_rouge_score_udf(F.column("similarity_left"), + F.column("similarity_right"))) + + # find out sample_pairs whose similarity > threshold + remove_df = remove_df.filter(F.column(self.new_column_name) > max_ratio).cache() + logger.info( + f"Round {batch_count}: total processing num_samples is {dupli_score_matrix.count()}, detected high score num_samples is {remove_df.count()}") + # materialize one round + + score_df = remove_df.select('id_1', 'id_2', 'id_pair', 'similarity_left', 'similarity_right', + self.new_column_name).toPandas() + score_df_list.append(score_df) + + instruction_df_1.join(tmp_id_df.withColumnRenamed('id_2', 'id_1'), on='id_1', how='anti').write.parquet( + f"f{self.score_store_path}.tmp_df", mode='overwrite') + instruction_df_1 = spark.read.parquet(f"f{self.score_store_path}.tmp_df") + + # Final join + with Timer("generate_connected_components => duplicates"): + results = [] + [results.extend(df_['id_pair'].to_list()) for df_ in score_df_list] + components = generate_connected_components.generate_connected_components_py(results) + duplicates = [c for c_list in components for c in c_list[1:]] + R = Row('id_1') + total_dup = len(duplicates) + if total_dup != 0: + duplicates_sdf = spark.createDataFrame([R(dup) for dup in duplicates]).cache() + total_dup = duplicates_sdf.count() + spark_df = spark_df.join(duplicates_sdf, + on='id_1', how="left_anti").drop("id_1") + logger.info(f"Finally detected duplicated num_samples is {total_dup}") + else: + spark_df = spark_df.drop("id_1") + score_df = pd.concat(score_df_list, ignore_index=True).reset_index(drop=True) + if self.score_store_path: + import os, shutil + if os.path.exists(self.score_store_path): + os.remove(self.score_store_path) + score_df.to_parquet(self.score_store_path) + if self.statistics_flag: + self.statistics.example = score_df + + return spark_df + + def get_compute_func(self, *args, **kwargs): + raise NotImplementedError("Abstract func") + + def summarize(self) -> str: + return ( + f"A total of {self.statistics.total_in} rows of data were processed, using {self.statistics.used_time} seconds, " + f"A duplication list containing {self.statistics.total_out} found, " + f"Sampled, duplication preview: {self.statistics.example.head(50)}") + + +LLMOPERATORS.register(BaseCompareDedup) + + +class RougeScoreDedup(BaseCompareDedup): + def __init__(self, text_key='text', max_ratio=0.7, batch_size=100, score_store_path='RougeScorefiltered.parquet'): + settings = {'text_key': text_key, 'max_ratio': max_ratio, 'batch_size': batch_size, + "score_store_path": score_store_path} + super().__init__(args_dict=settings) + self.text_key = text_key + self.max_ratio = max_ratio + self.batch_size = batch_size + self.score_store_path = score_store_path + self.rouge_type = 'rougeL' + self.support_spark = True + self.support_ray = False + + def get_compute_func(self, *args, **kwargs): + from rouge_score import rouge_scorer + scorer = rouge_scorer.RougeScorer([self.rouge_type], use_stemmer=False) + + def compare_rouge_score(str_1, str_2): + scores = scorer.score(str_1, str_2) + return scores['rougeL'].fmeasure + + return compare_rouge_score + + +LLMOPERATORS.register(RougeScoreDedup) diff --git a/RecDP/pyrecdp/primitives/operations/text_rouge_score_dedup.py b/RecDP/pyrecdp/primitives/operations/text_rouge_score_dedup.py deleted file mode 100644 index d44d68e39..000000000 --- a/RecDP/pyrecdp/primitives/operations/text_rouge_score_dedup.py +++ /dev/null @@ -1,142 +0,0 @@ -from .base import BaseLLMOperation, LLMOPERATORS -from ray.data import Dataset -from pyspark.sql import DataFrame - -import pyspark.sql.functions as F -from pyspark.sql import types as T -from rouge_score import rouge_scorer - -from .logging_utils import logger - - -def split2df(prod_df, limit_size, spark): - # Create a copy of original dataframe - - row = prod_df.head(1) - first_id = row[0]["id_2"] - copy_df = prod_df - - prod_df_len = prod_df.count() - if limit_size >= prod_df_len: - return prod_df, None - - part_1_rdd = copy_df.limit(limit_size).collect() - part_1_df = spark.createDataFrame(part_1_rdd, prod_df.columns) - - left_id = first_id - 1 + limit_size - part_2_df = copy_df.filter(F.column("id_2") > left_id) - - return part_1_df, part_2_df - - -class RougeScoreDedup(BaseLLMOperation): - def __init__(self, text_key='text', max_ratio=0.7, batch_size=1000, score_store_path='/root/qyao/gitspace/e2eAIOK/RecDP/tests/data/filter_out/filtered'): - settings = {'text_key': text_key, 'max_ratio': max_ratio, 'batch_size': batch_size, "score_store_path": score_store_path} - super().__init__(settings) - self.text_key = text_key - self.max_ratio = max_ratio - self.batch_size = batch_size - self.score_store_path = score_store_path - self.rouge_type = 'rougeL' - self.support_spark = True - self.support_ray = False - - def process_rayds(self, ds=None): - total_rows = ds.count() - line_num = [] - scorer = rouge_scorer.RougeScorer([self.rouge_type], use_stemmer=False) - for i in range(1, total_rows): - - d1, d2, d3 = ds.split_at_indices([i, i + 1]) - target_sample = d2.take(1)[0] - instruction = target_sample[self.text_key] - instruction_token = scorer._tokenizer.tokenize(instruction) - - def process_row(sample, target_token): - sample['rouge_score'] = rouge_scorer._score_lcs(target_token, - scorer._tokenizer.tokenize( - sample[self.text_key])).fmeasure - return sample - - # ds = d2.filter(lambda x: True if rouge_scorer._score_lcs(new_instruction_token, scorer._tokenizer.tokenize( - # x["instruction"])).fmeasure < 0.7 else False) - - ds_score: Dataset = d1.map(lambda x: process_row(x, instruction_token)) - if i == 1: - filterd_ds = d1 - if ds_score.max("rouge_score") < self.max_ratio: - filterd_ds = filterd_ds.union(d2) - - return filterd_ds - - def process_spark(self, spark, spark_df: DataFrame) -> DataFrame: - rouge_type = self.rouge_type - rouge_score_column_name = "rouge_score" - max_ratio = self.max_ratio - - instruction_df_1 = (spark_df.select(self.text_key).rdd.zipWithIndex().toDF() - .select("_1.*", "_2").withColumnRenamed("_2", "id_1")) - instruction_df_1 = instruction_df_1.withColumnRenamed(self.text_key, "instruction") - instruction_df_2 = (instruction_df_1.withColumnRenamed("id_1", "id_2") - .withColumnRenamed("instruction", "instruction_2")) - - scorer = rouge_scorer.RougeScorer([rouge_type], use_stemmer=False) - - def gen_id(id_1, id_2): - if id_1 == id_2: - return -1 - if id_1 < id_2: - return f"{id_1}_{id_2}" - else: - return f"{id_2}_{id_1}" - - def compare_rouge_score(str_1, str_2): - scores = scorer.score(str_1, str_2) - return scores[rouge_type].fmeasure - - compare_rouge_score_udf = F.udf(compare_rouge_score, T.FloatType()) - - batch_count = 0 - while instruction_df_2 is not None: - logger.info( - f"Round {batch_count}: processing {batch_count * self.batch_size} - {(batch_count + 1) * self.batch_size}") - - batch_df, instruction_df_2 = split2df(instruction_df_2, self.batch_size, spark) - dupli_score_matrix = instruction_df_1.crossJoin(batch_df) - gen_id_udf = F.udf(gen_id, T.StringType()) - dupli_score_matrix = dupli_score_matrix.withColumn("id_pair", - gen_id_udf(F.column("id_1"), F.column("id_2"))) - dupli_score_matrix = dupli_score_matrix.dropDuplicates(["id_pair"]) - dupli_score_matrix = dupli_score_matrix.filter(F.column("id_1") != F.column("id_2")) - - remove_df = dupli_score_matrix.withColumn(rouge_score_column_name, - compare_rouge_score_udf(F.column("instruction"), - F.column("instruction_2"))) - remove_df.show() - remove_df = remove_df.filter(F.column(rouge_score_column_name) > max_ratio) - if self.score_store_path: - if batch_count == 0: - score_df = remove_df.select('id_pair', 'rouge_score') - else: - score_df = score_df.union(remove_df.select('id_pair', 'rouge_score')) - - remove_df = remove_df.select( - "instruction", - "id_1") - remove_df = remove_df.dropDuplicates(["id_1"]) - remove_count = remove_df.count() - - if remove_count > 0: - instruction_df_1 = instruction_df_1.subtract(remove_df) - instruction_df_1_rdd = instruction_df_1.collect() - instruction_df_1 = spark.createDataFrame(instruction_df_1_rdd, remove_df.columns) - batch_count += 1 - instruction_df_1 = instruction_df_1.withColumnRenamed("instruction", self.text_key) - spark_df = spark_df.join(instruction_df_1, - on=self.text_key, how="inner").select(spark_df.columns) - if self.score_store_path: - score_df.write.parquet(self.score_store_path, mode='overwrite') - return spark_df - - -LLMOPERATORS.register(RougeScoreDedup)