diff --git a/controllers/storagecluster/reconcile.go b/controllers/storagecluster/reconcile.go index 5ecd8d8350..06d04a6b40 100644 --- a/controllers/storagecluster/reconcile.go +++ b/controllers/storagecluster/reconcile.go @@ -408,6 +408,10 @@ func (r *StorageClusterReconciler) reconcilePhases( return reconcile.Result{}, err } + if res, err := r.ownStorageConsumersInNamespace(instance); err != nil || !res.IsZero() { + return reconcile.Result{}, err + } + // in-memory conditions should start off empty. It will only ever hold // negative conditions (!Available, Degraded, Progressing) r.conditions = nil @@ -816,6 +820,29 @@ func (r *StorageClusterReconciler) ownStorageClusterPeersInNamespace(instance *o return reconcile.Result{}, nil } +func (r *StorageClusterReconciler) ownStorageConsumersInNamespace(instance *ocsv1.StorageCluster) (reconcile.Result, error) { + storageConsumerList := &ocsv1alpha1.StorageConsumerList{} + err := r.Client.List(r.ctx, storageConsumerList, client.InNamespace(instance.Namespace)) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to list storageConsumer: %w", err) + } + for i := range storageConsumerList.Items { + scp := &storageConsumerList.Items[i] + lenOwners := len(scp.OwnerReferences) + err := controllerutil.SetOwnerReference(instance, scp, r.Scheme) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to set owner reference on storageConsumer %v: %w", scp.Name, err) + } + if lenOwners != len(scp.OwnerReferences) { + err = r.Client.Update(r.ctx, scp) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to persist StorageCluster owner ref on storageConsumer %v: %w", scp.Name, err) + } + } + } + return reconcile.Result{}, nil +} + // Checks whether a string is contained within a slice func contains(slice []string, s string) bool { for _, item := range slice { diff --git a/services/provider/server/consumer.go b/services/provider/server/consumer.go index 63423fb620..ccf8e994dc 100644 --- a/services/provider/server/consumer.go +++ b/services/provider/server/consumer.go @@ -270,3 +270,18 @@ func (c *ocsConsumerManager) RemoveAnnotation(ctx context.Context, id string, an } return nil } + +func (c *ocsConsumerManager) GetByClientID(ctx context.Context, clientID string) (*ocsv1alpha1.StorageConsumer, error) { + consumerObjList := &ocsv1alpha1.StorageConsumerList{} + err := c.client.List(ctx, consumerObjList) + if err != nil { + return nil, fmt.Errorf("failed to list storageConsumer objects: %v", err) + } + for i := range consumerObjList.Items { + consumer := consumerObjList.Items[i] + if consumer.Status.Client.ID == clientID { + return &consumer, nil + } + } + return nil, nil +} diff --git a/services/provider/server/server.go b/services/provider/server/server.go index f222009778..c816370153 100644 --- a/services/provider/server/server.go +++ b/services/provider/server/server.go @@ -64,6 +64,7 @@ const ( oneGibInBytes = 1024 * 1024 * 1024 monConfigMap = "rook-ceph-mon-endpoints" monSecret = "rook-ceph-mon" + mirroringTokenKey = "rbdMirrorBootstrap1eerSecretName" ) type OCSProviderServer struct { @@ -1123,3 +1124,131 @@ func (s *OCSProviderServer) isSystemInMaintenanceMode(ctx context.Context) (bool } return kerrors.IsNotFound(err), nil } + +func (s *OCSProviderServer) GetStorageClientsInfo(ctx context.Context, req *pb.StorageClientsInfoRequest) (*pb.StorageClientsInfoResponse, error) { + response := &pb.StorageClientsInfoResponse{} + for i := range req.ClientIDs { + consumer, err := s.consumerManager.GetByClientID(ctx, req.ClientIDs[i]) + if err != nil { + klog.Errorf("failed to get consumer with client id %v: %v", req.ClientIDs[i], err) + response.Errors = append(response.Errors, + &pb.StorageClientInfoError{ + ClientID: req.ClientIDs[i], + Code: pb.ErrorCode_Internal, + Message: err.Error(), + }, + ) + } + if consumer == nil { + klog.Infof("no consumer found with id %v", req.ClientIDs[i]) + continue + } + + owner := util.FindOwnerRefByKind(consumer, "StorageCluster") + if owner == nil { + klog.Infof("no owner found for consumer %v", req.ClientIDs[i]) + continue + } + + if owner.UID != types.UID(req.StorageClusterUID) { + klog.Infof("storageCluster specified on the req does not own the client %v", req.ClientIDs[i]) + continue + } + + rnsList := &rookCephv1.CephBlockPoolRadosNamespaceList{} + err = s.client.List( + ctx, + rnsList, + client.InNamespace(s.namespace), + client.MatchingLabels{controllers.StorageConsumerNameLabel: consumer.Name}, + client.Limit(2), + ) + if err != nil { + response.Errors = append(response.Errors, + &pb.StorageClientInfoError{ + ClientID: req.ClientIDs[i], + Code: pb.ErrorCode_Internal, + Message: "failed to get radosnamespaces", + }, + ) + klog.Error(err) + continue + } + if len(rnsList.Items) > 1 { + response.Errors = append(response.Errors, + &pb.StorageClientInfoError{ + ClientID: req.ClientIDs[i], + Code: pb.ErrorCode_Internal, + Message: "invalid number of radosnamespace found for the Client", + }, + ) + klog.Errorf("invalid number of radosnamespace found for the Client %v", req.ClientIDs[i]) + continue + } + clientInfo := &pb.ClientInfo{ClientID: req.ClientIDs[i]} + if len(rnsList.Items) == 1 { + clientInfo.RadosNamespace = rnsList.Items[0].Name + } + response.ClientsInfo = append(response.ClientsInfo, &pb.ClientInfo{ClientID: req.ClientIDs[i]}) + } + + return response, nil +} + +func (s *OCSProviderServer) GetBlockPoolsInfo(ctx context.Context, req *pb.BlockPoolsInfoRequest) (*pb.BlockPoolsInfoResponse, error) { + response := &pb.BlockPoolsInfoResponse{} + + for i := range req.BlockPoolNames { + cephBlockPool := &rookCephv1.CephBlockPool{} + cephBlockPool.Name = req.BlockPoolNames[i] + cephBlockPool.Namespace = s.namespace + err := s.client.Get(ctx, client.ObjectKeyFromObject(cephBlockPool), cephBlockPool) + if kerrors.IsNotFound(err) { + klog.Infof("blockpool %v not found", cephBlockPool.Name) + continue + } else if err != nil { + klog.Errorf("failed to get blockpool %v: %v", cephBlockPool.Name, err) + response.Errors = append(response.Errors, + &pb.BlockPoolInfoError{ + BlockPoolName: cephBlockPool.Name, + Code: pb.ErrorCode_Internal, + Message: err.Error(), + }, + ) + } + + var mirroringToken string + + if cephBlockPool.Spec.Mirroring.Enabled && + cephBlockPool.Status.Info != nil && + cephBlockPool.Status.Info[mirroringTokenKey] != "" { + secret := &corev1.Secret{} + secret.Name = cephBlockPool.Status.Info[mirroringTokenKey] + secret.Namespace = s.namespace + err := s.client.Get(ctx, client.ObjectKeyFromObject(secret), secret) + if kerrors.IsNotFound(err) { + klog.Infof("bootstrap secret %v for blockpool %v not found", secret.Name, cephBlockPool.Name) + continue + } else if err != nil { + errMsg := fmt.Sprintf( + "failed to get bootstrap secret %s for CephBlockPool %s: %v", + cephBlockPool.Status.Info[mirroringTokenKey], + cephBlockPool.Name, + err, + ) + klog.Error(errMsg) + continue + } + mirroringToken = string(secret.Data["token"]) + } + + response.BlockPoolsInfo = append(response.BlockPoolsInfo, &pb.BlockPoolInfo{ + BlockPoolName: cephBlockPool.Name, + MirroringToken: mirroringToken, + BlockPoolID: strconv.Itoa(cephBlockPool.Status.PoolID), + }) + + } + + return response, nil +}