Skip to content

Commit

Permalink
Output non-pipelined sharded modules from rewrite + enhance pipelinab…
Browse files Browse the repository at this point in the history
…ility test (#2579)

Summary:

Includes non-pipelined modules to `_rewrite_model` output; for testing and logging purposes.

Differential Revision: D63445036
  • Loading branch information
che-sh authored and facebook-github-bot committed Nov 22, 2024
1 parent 2962be0 commit ec6de90
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 10 deletions.
7 changes: 7 additions & 0 deletions torchrec/distributed/train_pipeline/train_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ def _pipeline_model(
self._model,
self._original_forwards,
self._pipelined_preprocs,
non_pipelined_sharded_modules,
) = _rewrite_model(
model=self._model,
context=context,
Expand All @@ -540,6 +541,12 @@ def _pipeline_model(
pipelined_forward=pipelined_forward,
pipeline_preproc=self._pipeline_preproc,
)
if non_pipelined_sharded_modules:
logger.warn(
"Sharded modules were not pipelined: %s. "
+ "This needs to be fixed for pipelining to work to the full extent.",
", ".join(non_pipelined_sharded_modules),
)
# initializes input dist, so we can override input dist forwards
self.start_sparse_data_dist(batch, context)
self._original_kjt_dist_forwards = _override_input_dist_forwards(
Expand Down
39 changes: 29 additions & 10 deletions torchrec/distributed/train_pipeline/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,7 @@ def _rewrite_model( # noqa C901
torch.nn.Module,
List[Callable[..., Any]],
List[PipelinedPreproc],
List[str],
]:
input_model = model
# Get underlying nn.Module
Expand Down Expand Up @@ -1235,6 +1236,7 @@ def _rewrite_model( # noqa C901
original_forwards = []

pipelined_preprocs: Set[PipelinedPreproc] = set()
non_pipelined_sharded_modules = []

for node in graph.nodes:
if node.op == "call_module" and node.target in sharded_modules:
Expand Down Expand Up @@ -1265,6 +1267,7 @@ def _rewrite_model( # noqa C901
logger.warning(
f"Module '{node.target}'' will not be pipelined, due to input modifications"
)
non_pipelined_sharded_modules.append(node.target)

# JIT script unsharded modules if applicable.
if apply_jit:
Expand All @@ -1273,7 +1276,13 @@ def _rewrite_model( # noqa C901
if isinstance(input_model, DistributedModelParallel):
input_model.module = graph_model

return pipelined_forwards, input_model, original_forwards, list(pipelined_preprocs)
return (
pipelined_forwards,
input_model,
original_forwards,
list(pipelined_preprocs),
non_pipelined_sharded_modules,
)


def _override_input_dist_forwards(
Expand Down Expand Up @@ -1559,16 +1568,26 @@ def start_sparse_data_dist(self, batch: In) -> In:
if not self.initialized:
# Step 1: Pipeline input dist in trec sharded modules
# TODO (yhshin): support preproc modules for `StagedTrainPipeline`
self._pipelined_modules, self.model, self._original_forwards, _ = (
_rewrite_model(
model=self.model,
context=self.context,
dist_stream=self.data_dist_stream,
batch=batch,
apply_jit=self.apply_jit,
pipelined_forward=self._pipelined_forward,
)
(
self._pipelined_modules,
self.model,
self._original_forwards,
_,
non_pipelined_sharded_modules,
) = _rewrite_model(
model=self.model,
context=self.context,
dist_stream=self.data_dist_stream,
batch=batch,
apply_jit=self.apply_jit,
pipelined_forward=self._pipelined_forward,
)
if non_pipelined_sharded_modules:
logger.warn(
"Sharded modules were not pipelined: %s. "
+ "Fixing this is needed for pipelining to work correctly.",
", ".join(non_pipelined_sharded_modules),
)
# initializes input dist, so we can override input dist forwards
_start_data_dist(self._pipelined_modules, batch, self.context)
self._original_kjt_dist_forwards = _override_input_dist_forwards(
Expand Down

0 comments on commit ec6de90

Please sign in to comment.