Skip to content

Commit

Permalink
add server side implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Rewant Soni <[email protected]>
  • Loading branch information
rewantsoni committed Nov 19, 2024
1 parent 18aa06c commit 0a3a397
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 0 deletions.
27 changes: 27 additions & 0 deletions controllers/storagecluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ func (r *StorageClusterReconciler) reconcilePhases(
return reconcile.Result{}, err
}

if res, err := r.ownStorageConsumersInNamespace(instance); err != nil || !res.IsZero() {
return reconcile.Result{}, err
}

// in-memory conditions should start off empty. It will only ever hold
// negative conditions (!Available, Degraded, Progressing)
r.conditions = nil
Expand Down Expand Up @@ -816,6 +820,29 @@ func (r *StorageClusterReconciler) ownStorageClusterPeersInNamespace(instance *o
return reconcile.Result{}, nil
}

func (r *StorageClusterReconciler) ownStorageConsumersInNamespace(instance *ocsv1.StorageCluster) (reconcile.Result, error) {
storageConsumerList := &ocsv1alpha1.StorageConsumerList{}
err := r.Client.List(r.ctx, storageConsumerList, client.InNamespace(instance.Namespace))
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to list storageConsumer: %w", err)
}
for i := range storageConsumerList.Items {
scp := &storageConsumerList.Items[i]
lenOwners := len(scp.OwnerReferences)
err := controllerutil.SetOwnerReference(instance, scp, r.Scheme)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to set owner reference on storageConsumer %v: %w", scp.Name, err)
}
if lenOwners != len(scp.OwnerReferences) {
err = r.Client.Update(r.ctx, scp)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to persist StorageCluster owner ref on storageConsumer %v: %w", scp.Name, err)
}
}
}
return reconcile.Result{}, nil
}

// Checks whether a string is contained within a slice
func contains(slice []string, s string) bool {
for _, item := range slice {
Expand Down
1 change: 1 addition & 0 deletions deploy/ocs-operator/manifests/provider-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ rules:
resources:
- cephfilesystemsubvolumegroups
- cephblockpoolradosnamespaces
- cephblockpools
verbs:
- get
- list
Expand Down
1 change: 1 addition & 0 deletions rbac/provider-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ rules:
resources:
- cephfilesystemsubvolumegroups
- cephblockpoolradosnamespaces
- cephblockpools
verbs:
- get
- list
Expand Down
15 changes: 15 additions & 0 deletions services/provider/server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,18 @@ func (c *ocsConsumerManager) UpdateConsumerStatus(ctx context.Context, id string
klog.Infof("successfully updated Status for StorageConsumer %v", consumerObj.Name)
return nil
}

func (c *ocsConsumerManager) GetByClientID(ctx context.Context, clientID string) (*ocsv1alpha1.StorageConsumer, error) {
consumerObjList := &ocsv1alpha1.StorageConsumerList{}
err := c.client.List(ctx, consumerObjList)
if err != nil {
return nil, fmt.Errorf("failed to list storageConsumer objects: %v", err)
}
for i := range consumerObjList.Items {
consumer := consumerObjList.Items[i]
if consumer.Status.Client.ID == clientID {
return &consumer, nil
}
}
return nil, nil
}
129 changes: 129 additions & 0 deletions services/provider/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const (
ramenDRReplicationIDKey = "ramendr.openshift.io/replicationid"
ramenDRFlattenModeKey = "replication.storage.openshift.io/flatten-mode"
oneGibInBytes = 1024 * 1024 * 1024
mirroringTokenKey = "rbdMirrorBootstrap1eerSecretName"
)

const (
Expand Down Expand Up @@ -1043,3 +1044,131 @@ func (s *OCSProviderServer) PeerStorageCluster(ctx context.Context, req *pb.Peer

return &pb.PeerStorageClusterResponse{}, nil
}

func (s *OCSProviderServer) GetStorageClientsInfo(ctx context.Context, req *pb.StorageClientsInfoRequest) (*pb.StorageClientsInfoResponse, error) {
response := &pb.StorageClientsInfoResponse{}
for i := range req.ClientIDs {
consumer, err := s.consumerManager.GetByClientID(ctx, req.ClientIDs[i])
if err != nil {
klog.Errorf("failed to get consumer with client id %v: %v", req.ClientIDs[i], err)
response.Errors = append(response.Errors,
&pb.StorageClientInfoError{
ClientID: req.ClientIDs[i],
Code: pb.ErrorCode_Internal,
Message: err.Error(),
},
)
}
if consumer == nil {
klog.Infof("no consumer found with id %v", req.ClientIDs[i])
continue
}

owner := util.FindOwnerRefByKind(consumer, "StorageCluster")
if owner == nil {
klog.Infof("no owner found for consumer %v", req.ClientIDs[i])
continue
}

if owner.UID != types.UID(req.StorageClusterUID) {
klog.Infof("storageCluster specified on the req does not own the client %v", req.ClientIDs[i])
continue
}

rnsList := &rookCephv1.CephBlockPoolRadosNamespaceList{}
err = s.client.List(
ctx,
rnsList,
client.InNamespace(s.namespace),
client.MatchingLabels{controllers.StorageConsumerNameLabel: consumer.Name},
client.Limit(2),
)
if err != nil {
response.Errors = append(response.Errors,
&pb.StorageClientInfoError{
ClientID: req.ClientIDs[i],
Code: pb.ErrorCode_Internal,
Message: "failed to get radosnamespaces",
},
)
klog.Error(err)
continue
}
if len(rnsList.Items) > 1 {
response.Errors = append(response.Errors,
&pb.StorageClientInfoError{
ClientID: req.ClientIDs[i],
Code: pb.ErrorCode_Internal,
Message: "invalid number of radosnamespace found for the Client",
},
)
klog.Errorf("invalid number of radosnamespace found for the Client %v", req.ClientIDs[i])
continue
}
clientInfo := &pb.ClientInfo{ClientID: req.ClientIDs[i]}
if len(rnsList.Items) == 1 {
clientInfo.RadosNamespace = rnsList.Items[0].Name
}
response.ClientsInfo = append(response.ClientsInfo, &pb.ClientInfo{ClientID: req.ClientIDs[i]})
}

return response, nil
}

func (s *OCSProviderServer) GetBlockPoolsInfo(ctx context.Context, req *pb.BlockPoolsInfoRequest) (*pb.BlockPoolsInfoResponse, error) {
response := &pb.BlockPoolsInfoResponse{}

for i := range req.BlockPoolNames {
cephBlockPool := &rookCephv1.CephBlockPool{}
cephBlockPool.Name = req.BlockPoolNames[i]
cephBlockPool.Namespace = s.namespace
err := s.client.Get(ctx, client.ObjectKeyFromObject(cephBlockPool), cephBlockPool)
if kerrors.IsNotFound(err) {
klog.Infof("blockpool %v not found", cephBlockPool.Name)
continue
} else if err != nil {
klog.Errorf("failed to get blockpool %v: %v", cephBlockPool.Name, err)
response.Errors = append(response.Errors,
&pb.BlockPoolInfoError{
BlockPoolName: cephBlockPool.Name,
Code: pb.ErrorCode_Internal,
Message: err.Error(),
},
)
}

var mirroringToken string

if cephBlockPool.Spec.Mirroring.Enabled &&
cephBlockPool.Status.Info != nil &&
cephBlockPool.Status.Info[mirroringTokenKey] != "" {
secret := &corev1.Secret{}
secret.Name = cephBlockPool.Status.Info[mirroringTokenKey]
secret.Namespace = s.namespace
err := s.client.Get(ctx, client.ObjectKeyFromObject(secret), secret)
if kerrors.IsNotFound(err) {
klog.Infof("bootstrap secret %v for blockpool %v not found", secret.Name, cephBlockPool.Name)
continue
} else if err != nil {
errMsg := fmt.Sprintf(
"failed to get bootstrap secret %s for CephBlockPool %s: %v",
cephBlockPool.Status.Info[mirroringTokenKey],
cephBlockPool.Name,
err,
)
klog.Error(errMsg)
continue
}
mirroringToken = string(secret.Data["token"])
}

response.BlockPoolsInfo = append(response.BlockPoolsInfo, &pb.BlockPoolInfo{
BlockPoolName: cephBlockPool.Name,
MirroringToken: mirroringToken,
BlockPoolID: strconv.Itoa(cephBlockPool.Status.PoolID),
})

}

return response, nil
}

0 comments on commit 0a3a397

Please sign in to comment.