Skip to content

Commit

Permalink
fix: data-centric scheduling.
Browse files Browse the repository at this point in the history
Signed-off-by: Electronic-Waste <[email protected]>
  • Loading branch information
Electronic-Waste committed Nov 21, 2024
1 parent 4ba8fb7 commit 8a6c5e7
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 71 deletions.
Binary file added docs/proposals/images/fl-v2-architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
154 changes: 83 additions & 71 deletions docs/proposals/sedna-federated-learning-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
- [Proposal](#proposal)
- [Use Cases](#use-cases)
- [Design Details](#design-details)
- [New Fields in FederatedLearningJob CRD](#new-fields-in-federatedlearningjob-crd)
- [Sedna Federated Learning Example](#sedna-federated-learning-example)
- [Subnet Management](#subnet-management)
- [FederatedLearningJob CRD](#federatedlearningjob-crd)
- [DataLoader Daemonset & Waiter Container](#dataloader-daemonset--waiter-container)
- [Controller Design](#controller-design)
- [Federated Learning Controller](#federated-learning-controller)
- [Downstream Controller](#downstream-controller)
Expand Down Expand Up @@ -59,74 +60,89 @@ The non-goals include:

- Integrate inference jobs (currently)

## User Story
## Proposal

Nowadays, the number of edge devices explodes in an exponential way, making federated learning more popular in some security-sensitive cloud-edge scenarios. However, the edge devices are heterogenous and have different amounts of resources like CPU, GPU, and memory. It’s unnecessary and impossible to execute federated learning on all of them with the consideration of capability and efficiency. So we just need to place the training task on those devices with abundant resources and use a portion of data for training.
In the new design, we assume that:

By integrating training-operator, we can schedule training tasks to those devices where richer resources are prepared for training, thus making federated-learning more efficient and practical. And also, we will have various lifecycle managing abilities so that the training process would be more robust and scalable.
1. Data can be transferred within a secure subnet.

## Proposal
2. All training workers have the same parameters.

We propose **reusing Kubernetes Custom Resource Definitions (CRDs) for federated learning** (i.e. `FederatedLearningJob`) to enable distributed training with training-operator. In this way, we can implement new training runtime with training-operator while maintaining backward compatibility with the default one.
Since federated learning is a training task, it’s in fact data-driven. If we only schedule training tasks without scheduling the training data, the model will have unacceptable training bias. So we need to collect data and distribute it to different training workers to avoid training bias, after which we can execute the federated learning jobs.

The design details will be described in the following chapter. The main ideas of new design are:
Based on the reasons above, we propose **Two-Phase Federated Learning** to enable distributed training with training-operator:

1. Add new fields to the CRD of federated learning (i.e. `FederatedLearningJob`)
1. **Phase1 - Data Preparation**: Collect training data in edge nodes and distribute them to different training workers. In this phase, federated training tasks are scheduled to nodes but are **blocked** waiting for data.

2. Add training-operator runtime (e.g. `PyTorchJob`, `TFJob`) as an alternative to default runtime (i.e. `Pod`)
2. **Phase2 - Training**: Execute federated learning jobs when the training data is ready.

3. If not specified, `Pod` will be the default runtime so as to maintain backward compatibility.
The design details will be described in the following chapter. The main ideas of new design are:

![](./images/federated-learning-job-crd.png)
1. Define subnets using `NodeGroup` in KubeEdge.

### Use Cases
2. New version of `FederatedLearningJob`.

3. Add DataLoader Daemonset to collect and distribute data in Federated Learning Phase1.

4. Using `PyTorchJob` CRD in training-operator as the training runtime for federated learning.

5. Add a waiter container to the `initContainer` of training pods to check the readiness of the data.

![](./images/fl-v2-architecture.png)

Add “specifying training runtime” compared to [the proposal for federated learning](./federated-learning.md):
### Use Cases

- Users can create a federated learning job, with providing a training script, specifying the aggregation algorithm, configuring training hyperparameters, configuring training datasets, **specifying training runtime (default/training-operator)**.
- Users can create a federated learning job, with providing a training script, specifying the aggregation algorithm, configuring training hyperparameters, configuring training datasets.

- Users can get the federated learning status, including the nodes participating in training, current training status, sample size of each node, current iteration times, and current aggregation times.

- Users can get the saved aggregated model. The model file can be stored on the cloud or edge node.

## Design Details

### New Fields in `FederatedLearningJob` CRD

We decided to add a `TrainingPolicy` field. It allows users to choose two training mode:
### Subnet Management

1. `Default`: Use the original training mode in Sedna.
We will use [NodeGroup](https://github.com/kubeedge/kubeedge/blob/master/docs/proposals/node-group-management.md) in KubeEdge to define subnets for nodes, within which data can be transferred among nodes. This ensures the privacy of the data and enhances the efficiency of training.

2. **`Distributed`: Use training-operator as the training runtime to orchestrate training tasks**.
### `FederatedLearningJob` CRD

When the `Mode` field is set to `Distributed`, users need to specify the framework (e.g. PyTorch, Tensorflow, etc.) they use to decide the CRD we use in training-operator (e.g. PyTorchJob).
We defines new `FederatedLearningJob` CRD as follows:

```Golang
// FLJobSpec is a description of a federated learning job
type FLJobSpec struct {
+ TrainingPolicy TrainingPolicy `json:"trainingPolicy,omitempty"`
AggregationWorker AggregationWorker `json:"aggregationWorker"`

TrainingWorkers TrainingWorker `json:"trainingWorkers"`

PretrainedModel PretrainedModel `json:"pretrainedModel,omitempty"`

Transmitter Transmitter `json:"transmitter,omitempty"`
}

// TrainingWorker describes the data a training worker should have
type TrainingWorker struct {
Replicas int `json:"replicas"`

AggregationWorker AggregationWorker `json:"aggregationWorker"`
TargetNodeGroups []TargetNodeGroups `json:"targetNodeGroups"`

TrainingWorkers []TrainingWorker `json:"trainingWorkers"`
Datasets []TrainDataset `json:"datasets"`

PretrainedModel PretrainedModel `json:"pretrainedModel,omitempty"`
TrainingPolicy TrainingPolicy `json:"trainingPolicy,omitempty"`

Transmitter Transmitter `json:"transmitter,omitempty"`
Template v1.PodTemplateSpec `json:"template"`
}

// TrainingPolicy defines the policy we take in the training phase
type TrainingPolicy struct {
// Mode defines the training mode, chosen from Default and Distributed
// Mode defines the training mode, chosen from Sequential and Distributed
Mode string `json:"mode,omitempty"`

// Framework indicates the framework we use(e.g. PyTorch). We will determine the training runtime(i.e. CRDs in training-operator) we adopt to orchestrate training tasks when the Mode field is set to Distributed
Framework string `json:"framework,omitempty"`
}
```

### Sedna Federated Learning Example

The configuration of federated learning jobs should look like:

```YAML
Expand All @@ -135,9 +151,6 @@ kind: FederatedLearningJob
metadata:
name: surface-defect-detection
spec:
+ trainingPolicy:
+ mode: Distributed
+ framework: PyTorch
aggregationWorker:
model:
name: "surface-defect-detection-model"
Expand All @@ -154,47 +167,46 @@ spec:
resources: # user defined resources
limits:
memory: 2Gi
trainingWorkers:
- dataset:
name: "edge1-surface-defect-detection-dataset"
template:
spec:
nodeName: $EDGE1_NODE
containers:
- image: $TRAIN_IMAGE
name: train-worker
imagePullPolicy: IfNotPresent
env: # user defined environments
- name: "batch_size"
value: "32"
- name: "learning_rate"
value: "0.001"
- name: "epochs"
value: "2"
resources: # user defined resources
limits:
memory: 2Gi
- dataset:
name: "edge2-surface-defect-detection-dataset"
template:
spec:
nodeName: $EDGE2_NODE
containers:
- image: $TRAIN_IMAGE
name: train-worker
imagePullPolicy: IfNotPresent
env: # user defined environments
- name: "batch_size"
value: "32"
- name: "learning_rate"
value: "0.001"
- name: "epochs"
value: "2"
resources: # user defined resources
limits:
memory: 2Gi
trainingWorkers:
datasets:
- edge1-surface-defect-detection-dataset
- edge2-surface-defect-detection-dataset
replicas: 2
targetNodesGroup:
- surface-detect-nodes-group
trainingPolicy:
mode: Distributed
framework: PyTorch
template:
spec:
containers:
- image: $TRAIN_IMAGE
name: train-worker
imagePullPolicy: IfNotPresent
env: # user defined environments
- name: "batch_size"
value: "32"
- name: "learning_rate"
value: "0.001"
- name: "epochs"
value: "2"
resources: # user defined resources
limits:
memory: 2Gi
```
### DataLoader Daemonset & Waiter Container
> **TBD**: DataLoader Daemonset may be implemented based on [edgemesh](https://github.com/kubeedge/edgemesh)
The DataLoader daemonset watches for the event of `FederatedLearningJob`.

When a new federated learning job is created, it will be blocked by the waiter container and reach `pending` status. The DataLoader daemonset will be notified about this event, and get the `.spec.datasets` field and the corresponding nodes info about training tasks to transfer training data to the target dir of each training worker.

The waiter container exists in every training pods’ `initContainers` field and will block training tasks until the data for training is ready.

When the data is ready, the DataLoader daemonset will notify every waiter container about this. After that, the waiter containers will reach `completed` status and training tasks start executing.

## Controller Design

The new design **will not change the main architecture** of the original Federated Learning Controller, which would start three separate goroutines called `federated-learning`, `upstream`, and `downstream` controllers.
Expand Down

0 comments on commit 8a6c5e7

Please sign in to comment.