diff --git a/src/instructlab/sdg/configs/knowledge/spellcheck.yaml b/src/instructlab/sdg/configs/knowledge/spellcheck.yaml new file mode 100644 index 00000000..daf1dafa --- /dev/null +++ b/src/instructlab/sdg/configs/knowledge/spellcheck.yaml @@ -0,0 +1,17 @@ +system: You are an AI assistant that is an expert at fixing spelling errors in documents. + +introduction: | + Give me a copy of the below document with all spelling errors corrected. + +principles: | + Do not add any new information. + Do not leave out any information. + +examples: "" + +generation: | + Document: + {document} + +start_tags: [""] +end_tags: [""] diff --git a/src/instructlab/sdg/datamixing.py b/src/instructlab/sdg/datamixing.py index 9d5c10f0..26c86a1d 100644 --- a/src/instructlab/sdg/datamixing.py +++ b/src/instructlab/sdg/datamixing.py @@ -1,5 +1,5 @@ # Standard -from typing import Optional +from typing import Dict, List, Optional import json import logging import os.path @@ -12,6 +12,7 @@ # First Party from instructlab.sdg.utils import GenerateException, pandas +from instructlab.sdg.utils.pandas import dataset_from_pandas_dataframe ALLOWED_COLS = ["id", "messages", "metadata"] logger = logging.getLogger(__name__) @@ -374,7 +375,68 @@ def _conv_pretrain(rec): return rec -def _create_phase10_ds(generated_dataset: Dataset): +def _create_auxiliary_dataset( + generated_dataset: Dataset, auxiliary_inst: Optional[Dict[str, List[str]]] +): + # Samples that went through the auxiliary generation pipeline will + # have a dataset_type column created by that pipeline. If that's + # not present, then we may be running in a pipeline without any + # auxiliary dataset generation enabled. + if "dataset_type" not in generated_dataset.column_names: + return None + # If we didn't find any auxiliary instructions to load, then + # that's also another sign that we're not running with any + # auxiliary datasets enabled. + if auxiliary_inst is None: + return None + # This "base_document" dataset_type is set in the knowledge + # pipeline config, and represents samples that do not have the + # auxiliary generated document attached, so we filter those out. + auxiliary_ds = generated_dataset.filter( + lambda x: x["dataset_type"] != "base_document" + ) + unique_document_auxiliary = auxiliary_ds.to_pandas().drop_duplicates( + subset=["document"] + ) + unique_document_auxiliary = dataset_from_pandas_dataframe(unique_document_auxiliary) + unique_document_auxiliary = unique_document_auxiliary.select_columns( + [ + "raw_document", + "document_outline", + "domain", + "dataset_type", + "document", + ] + ) + unique_document_auxiliary = unique_document_auxiliary.rename_columns( + {"raw_document": "context", "document": "response"} + ) + + def __create_auxiliary_ds(rec): + instruction = random.choice(auxiliary_inst[rec["dataset_type"]]) + messages = [ + {"role": "user", "content": f"{rec['context']}\n\n{instruction}"}, + {"role": "assistant", "content": rec["response"]}, + ] + metadata = json.dumps( + { + "dataset_type": rec["dataset_type"], + "raw_document": rec["context"], + "dataset": f"document_{rec['dataset_type']}", + "domain": rec["domain"], + } + ) + return {"messages": messages, "metadata": metadata, "id": str(uuid.uuid4())} + + unique_document_auxiliary = unique_document_auxiliary.map( + __create_auxiliary_ds, remove_columns=unique_document_auxiliary.column_names + ) + return unique_document_auxiliary + + +def _create_phase10_ds( + generated_dataset: Dataset, auxiliary_inst: Optional[Dict[str, List[str]]] +): """ Create a dataset for Phase 1.0 of downstream training. @@ -387,10 +449,17 @@ def _create_phase10_ds(generated_dataset: Dataset): ) knowledge_ds = _add_extra_contexts_to_samples(knowledge_ds, p=0.4) - return knowledge_ds + auxiliary_dataset = _create_auxiliary_dataset(generated_dataset, auxiliary_inst) + if auxiliary_dataset is not None: + phase10 = concatenate_datasets([knowledge_ds, auxiliary_dataset]) + else: + phase10 = knowledge_ds + return phase10 -def _create_phase07_ds(generated_dataset: Dataset): +def _create_phase07_ds( + generated_dataset: Dataset, auxiliary_inst: Optional[Dict[str, List[str]]] +): """ Create a dataset for Phase 0.7 of downstream training. @@ -404,7 +473,13 @@ def _create_phase07_ds(generated_dataset: Dataset): ) knowledge_ds = knowledge_ds.map(_conv_pretrain) - return knowledge_ds + auxiliary_dataset = _create_auxiliary_dataset(generated_dataset, auxiliary_inst) + if auxiliary_dataset is not None: + auxiliary_dataset = auxiliary_dataset.map(_conv_pretrain) + phase07 = concatenate_datasets([knowledge_ds, auxiliary_dataset]) + else: + phase07 = knowledge_ds + return phase07 def _convert_to_leaf_node_messages(sample: dict, sys_prompt: str): @@ -440,12 +515,21 @@ class DataMixer: # once. NUM_SYNTH_SKILLS = 30 - def __init__(self, data_dirs, output_dir, date_suffix, sys_prompt, num_procs): + def __init__( + self, + data_dirs, + output_dir, + date_suffix, + sys_prompt, + num_procs, + auxiliary_inst=None, + ): self.data_dirs = data_dirs self.output_dir = output_dir self.sys_prompt = sys_prompt self.date_suffix = date_suffix self.num_procs = num_procs + self.auxiliary_inst = auxiliary_inst self.knowledge_recipe = self._load_default_recipe("knowledge.yaml") self.skills_recipe = self._load_default_recipe("skills.yaml") @@ -482,7 +566,9 @@ def _gen_leaf_node_data( def collect(self, leaf_node_path, new_generated_data, is_knowledge): if is_knowledge: - knowledge_phase_data = _create_phase07_ds(new_generated_data) + knowledge_phase_data = _create_phase07_ds( + new_generated_data, self.auxiliary_inst + ) output_file_leaf_knowledge = ( f"node_datasets_{self.date_suffix}/{leaf_node_path}_p07.jsonl" ) @@ -492,7 +578,9 @@ def collect(self, leaf_node_path, new_generated_data, is_knowledge): output_file_leaf_knowledge, ) - skills_phase_data = _create_phase10_ds(new_generated_data) + skills_phase_data = _create_phase10_ds( + new_generated_data, self.auxiliary_inst + ) output_file_leaf_skills = ( f"node_datasets_{self.date_suffix}/{leaf_node_path}_p10.jsonl" ) diff --git a/src/instructlab/sdg/generate_data.py b/src/instructlab/sdg/generate_data.py index 3231f9ec..eeb0c016 100644 --- a/src/instructlab/sdg/generate_data.py +++ b/src/instructlab/sdg/generate_data.py @@ -247,7 +247,7 @@ def load_pipeline(yaml_basename): ) -def _mixer_init(ctx, output_dir, date_suffix): +def _mixer_init(ctx, output_dir, date_suffix, knowledge_auxiliary_inst): pd = platformdirs.PlatformDirs( appname=os.path.join("instructlab", "sdg"), multipath=True ) @@ -258,6 +258,7 @@ def _mixer_init(ctx, output_dir, date_suffix): date_suffix, _SYS_PROMPT, ctx.dataset_num_procs, + knowledge_auxiliary_inst, ) @@ -367,7 +368,10 @@ def generate_data( mmlu_ctx = dataclasses.replace(ctx, checkpoint_dir=None) mmlu_bench_pipe = mmlubench_pipe_init(mmlu_ctx) - mixer = _mixer_init(ctx, output_dir, date_suffix) + # FIXME: remove SDG https://github.com/instructlab/sdg/pull/64 + mixer = _mixer_init( + ctx, output_dir, date_suffix, sdg_knowledge.pipelines[0].auxiliary_inst + ) if console_output: logger.info( diff --git a/src/instructlab/sdg/pipeline.py b/src/instructlab/sdg/pipeline.py index c4206f7c..77931ca3 100644 --- a/src/instructlab/sdg/pipeline.py +++ b/src/instructlab/sdg/pipeline.py @@ -3,7 +3,7 @@ from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from importlib import resources -from typing import Iterable, Optional +from typing import Dict, Iterable, List, Optional import logging import math import os.path @@ -109,6 +109,7 @@ def __init__( ctx: PipelineContext, config_path: str, chained_blocks: list[dict], + auxiliary_inst: Optional[Dict[str, List[str]]] = None, ) -> None: """ Initialize the Pipeline class with a configuration dictionary. @@ -120,12 +121,14 @@ def __init__( self.config_path = config_path # pipeline config is the run configuration that consists of the pipeline steps self.chained_blocks = chained_blocks + # datamixing instructions for auxiliary data generated by this pipeline + self.auxiliary_inst = auxiliary_inst @classmethod def from_file(cls, ctx, pipeline_yaml): if not os.path.isabs(pipeline_yaml): pipeline_yaml = os.path.join(resources.files(__package__), pipeline_yaml) - return cls(ctx, pipeline_yaml, _parse_pipeline_config_file(pipeline_yaml)) + return cls(ctx, pipeline_yaml, *_parse_pipeline_config_file(pipeline_yaml)) def generate(self, dataset) -> Dataset: """ @@ -296,7 +299,11 @@ def _parse_pipeline_config_file(pipeline_yaml): "The pipeline config file contains no 'blocks' section" ) - return content["blocks"] + auxiliary_inst = None + if "datamixing" in content and "auxiliary_instructions" in content["datamixing"]: + auxiliary_inst = content["datamixing"]["auxiliary_instructions"] + + return content["blocks"], auxiliary_inst # This is part of the public API. diff --git a/src/instructlab/sdg/pipelines/full/knowledge.yaml b/src/instructlab/sdg/pipelines/full/knowledge.yaml index 79d7d6e4..9f7b5383 100644 --- a/src/instructlab/sdg/pipelines/full/knowledge.yaml +++ b/src/instructlab/sdg/pipelines/full/knowledge.yaml @@ -1,5 +1,36 @@ version: "1.0" blocks: + - name: duplicate_document_col + type: DuplicateColumnsBlock + config: + columns_map: + document: base_document + + - name: gen_spellcheck + type: LLMBlock + config: + config_path: ../../configs/knowledge/spellcheck.yaml + output_cols: + - spellcheck + gen_kwargs: + max_tokens: 2048 + + - name: flatten_auxiliary_columns + type: FlattenColumnsBlock + config: + var_cols: + - spellcheck + - base_document + value_name: corrected_document + var_name: dataset_type + + - name: rename_to_document_column + type: RenameColumnsBlock + config: + columns_map: + document: raw_document + corrected_document: document + - name: gen_knowledge type: LLMBlock config: @@ -73,3 +104,9 @@ blocks: - explanation - rating - __index_level_0__ + +datamixing: + auxiliary_instructions: + spellcheck: + - Correct any spelling errors in the document and output the corrected version. + - Rewrite the document to remove any spelling errors. diff --git a/src/instructlab/sdg/pipelines/schema/v1.json b/src/instructlab/sdg/pipelines/schema/v1.json index c97844ed..64b9c477 100644 --- a/src/instructlab/sdg/pipelines/schema/v1.json +++ b/src/instructlab/sdg/pipelines/schema/v1.json @@ -364,6 +364,23 @@ } } } + }, + "datamixing": { + "type": "object", + "additionalProperties": false, + "properties": { + "auxiliary_instructions": { + "type": "object", + "patternProperties": { + ".*": { + "type": "array", + "items": { + "type": "string" + } + } + } + } + } } } } diff --git a/tests/test_default_pipeline_configs.py b/tests/test_default_pipeline_configs.py index a56e22bd..8dbdec16 100644 --- a/tests/test_default_pipeline_configs.py +++ b/tests/test_default_pipeline_configs.py @@ -12,6 +12,9 @@ from instructlab.sdg.pipeline import Pipeline, PipelineContext from instructlab.sdg.utilblocks import ( CombineColumnsBlock, + DuplicateColumnsBlock, + FlattenColumnsBlock, + RenameColumnsBlock, SamplePopulatorBlock, SelectorBlock, ) @@ -23,8 +26,11 @@ def _noop_generate(self, samples): @patch.object(CombineColumnsBlock, "generate", _noop_generate) @patch.object(ConditionalLLMBlock, "generate", _noop_generate) +@patch.object(DuplicateColumnsBlock, "generate", _noop_generate) @patch.object(FilterByValueBlock, "generate", _noop_generate) +@patch.object(FlattenColumnsBlock, "generate", _noop_generate) @patch.object(LLMBlock, "generate", _noop_generate) +@patch.object(RenameColumnsBlock, "generate", _noop_generate) @patch.object(SamplePopulatorBlock, "generate", _noop_generate) @patch.object(SelectorBlock, "generate", _noop_generate) @patch("instructlab.sdg.llmblock.server_supports_batched", lambda c, m: True)