Skip to content

Commit

Permalink
feat: introduced process controller
Browse files Browse the repository at this point in the history
  • Loading branch information
le1nux committed Jan 5, 2025
1 parent e876c97 commit 3fe7690
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 121 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import multiprocessing as mp
from dataclasses import dataclass
from typing import Callable

from modalities.dataloader.preprocessing.queued_processing.queued_processing import Processor
from modalities.utils.logging import get_logger


@dataclass
class PipelineStep:
name: str
input_queue: mp.Queue
processors: list[Processor]


class ProcessController:
def __init__(self, pipeline_steps: list[PipelineStep], populate_jobs: Callable):
"""Initializes the ProcessController
Each pipeline step contains a list of processors that retrieve the data from the input queue,
process it and if necessary put it into the output queue of the next step.
"""
self._pipeline_steps = pipeline_steps
self._populate_jobs = populate_jobs

def run(self):
# add the jobs to the input queues
get_logger().info("Populating jobs")
self._populate_jobs()

# start the processors
for step in self._pipeline_steps:
get_logger().info(f"Starting processors for step {step.name}")
for processor in step.processors:
processor.start()

# wait for the processors to finish
for step in self._pipeline_steps:
for _ in step.processors:
step.input_queue.put(None)
get_logger().info(f"Waiting for processors in step {step.name} to finish")

for processor in step.processors:
processor.join()

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
from dataclasses import dataclass
import mmap
import pickle
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Optional
from modalities.exceptions import ReaderIndexationError

import numpy as np
from enum import Enum

from modalities.exceptions import ReaderIndexationError


@dataclass
class Sample:
# If the index is not shuffled, then the incrementeal_line_id
# points to the position in the dataset
# If the index is shuffled, then the incremental_line_id
# points to the position in the shuffled index and the
# If the index is shuffled, then the incremental_line_id
# points to the position in the shuffled index and the
# shuffled_line_id points to the position in the original index
incremental_line_id: int
raw_data_path: Path
offset: int
sample_length_in_bytes: int
content_raw: str | bytes
content_tokenized: Optional[bytes] = None
token_size_in_bytes: Optional[int] = None
shuffled_line_id: Optional[int] = None


Expand Down Expand Up @@ -142,7 +146,7 @@ def __getitem__(self, key: int) -> Sample:
return Sample(
raw_data_path=self.raw_data_path,
incremental_line_id=key,
shuffled_line_id=key, # TODO so far we don't support shuffling here!
shuffled_line_id=key, # TODO so far we don't support shuffling here!
offset=offset,
sample_length_in_bytes=sample_length_in_bytes,
content_raw=content,
Expand Down Expand Up @@ -229,7 +233,7 @@ def __getitem__(self, key: int) -> Sample:
"""
try:
if self.global_shuffle_index is not None:
if self.global_shuffle_index is not None:
mapped_key = self.global_shuffle_index[key]
else:
mapped_key = key
Expand Down Expand Up @@ -260,7 +264,6 @@ class LargeFileLinesReaderTypes(Enum):


class LargeFileLinesReaderFactory:

@staticmethod
def get_local_reader(
raw_data_path: Path,
Expand Down

0 comments on commit 3fe7690

Please sign in to comment.