Skip to content

Commit

Permalink
Mock model registry service
Browse files Browse the repository at this point in the history
Signed-off-by: Andrea Lamparelli <[email protected]>
  • Loading branch information
lampajr committed Dec 12, 2023
1 parent ca2d6a3 commit 1064261
Show file tree
Hide file tree
Showing 8 changed files with 425 additions and 344 deletions.
121 changes: 63 additions & 58 deletions controllers/modelregistry_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-logr/logr"
kservev1alpha1 "github.com/kserve/kserve/pkg/apis/serving/v1alpha1"
"github.com/opendatahub-io/model-registry/pkg/api"
"github.com/opendatahub-io/model-registry/pkg/core"
"github.com/opendatahub-io/odh-model-controller/controllers/constants"
"github.com/opendatahub-io/odh-model-controller/controllers/reconcilers"
Expand All @@ -29,6 +30,7 @@ type ModelRegistryReconciler struct {
Scheme *runtime.Scheme
Log logr.Logger
Period time.Duration
mrService api.ModelRegistryApi
mrISReconciler *reconcilers.ModelRegistryInferenceServiceReconciler
mrSEReconciler *reconcilers.ModelRegistryServingEnvironmentReconciler
}
Expand All @@ -49,77 +51,80 @@ func (r *ModelRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
log := r.Log.WithValues("ServingRuntime", req.Name, "namespace", req.Namespace)
log.Info("Reconciling ModelRegistry serving for ServingRuntime: " + req.Name)

mlmdAddr := os.Getenv(constants.MLMDAddressEnv)
if mlmdAddr == "" {
// Env variable not set, look for existing model registry service
opts := []client.ListOption{client.InNamespace(req.Namespace), client.MatchingLabels{
"component": "model-registry",
}}
mrServiceList := &corev1.ServiceList{}
err := r.Client.List(ctx, mrServiceList, opts...)
if err != nil && apierrs.IsNotFound(err) {
// No model registry deployed in the provided namespace, skipping serving reconciliation
log.Info("Stop ModelRegistry serving reconciliation")
return ctrl.Result{}, nil
}
mr := r.mrService
if mr == nil {
mlmdAddr := os.Getenv(constants.MLMDAddressEnv)
if mlmdAddr == "" {
// Env variable not set, look for existing model registry service
opts := []client.ListOption{client.InNamespace(req.Namespace), client.MatchingLabels{
"component": "model-registry",
}}
mrServiceList := &corev1.ServiceList{}
err := r.Client.List(ctx, mrServiceList, opts...)
if err != nil && apierrs.IsNotFound(err) {
// No model registry deployed in the provided namespace, skipping serving reconciliation
log.Info("Stop ModelRegistry serving reconciliation")
return ctrl.Result{}, nil
}

if len(mrServiceList.Items) == 0 {
log.Info("No Model Registry service found for Namespace: " + req.Namespace)
log.Info("Stop ModelRegistry serving reconciliation")
return ctrl.Result{}, nil
}
if len(mrServiceList.Items) == 0 {
log.Info("No Model Registry service found for Namespace: " + req.Namespace)
log.Info("Stop ModelRegistry serving reconciliation")
return ctrl.Result{}, nil
}

// Actually we could iterate over every mrService, as nothing prevents to setup multiple MR in the same namespace
if len(mrServiceList.Items) > 1 {
log.Error(fmt.Errorf("multiple services with component=model-registry for Namespace %s", req.Namespace), "Stop ModelRegistry serving reconciliation")
return ctrl.Result{}, nil
}
// Actually we could iterate over every mrService, as nothing prevents to setup multiple MR in the same namespace
if len(mrServiceList.Items) > 1 {
log.Error(fmt.Errorf("multiple services with component=model-registry for Namespace %s", req.Namespace), "Stop ModelRegistry serving reconciliation")
return ctrl.Result{}, nil
}

mrService := mrServiceList.Items[0]

mrService := mrServiceList.Items[0]
var grpcPort *int32
for _, port := range mrService.Spec.Ports {
if port.Name == "grpc-api" {
grpcPort = &port.Port
break
}
}

var grpcPort *int32
for _, port := range mrService.Spec.Ports {
if port.Name == "grpc-api" {
grpcPort = &port.Port
break
if grpcPort == nil {
log.Error(fmt.Errorf("cannot find grpc-api port for service %s", mrService.Name), "Stop ModelRegistry serving reconciliation")
return ctrl.Result{}, nil
}

mlmdAddr = fmt.Sprintf("%s.%s.svc.cluster.local:%d", mrService.Name, req.Namespace, *grpcPort)
}

if grpcPort == nil {
log.Error(fmt.Errorf("cannot find grpc-api port for service %s", mrService.Name), "Stop ModelRegistry serving reconciliation")
// setup grpc connection to ml-metadata
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

// Setup model registry service
log.Info("Connecting to " + mlmdAddr)
conn, err := grpc.DialContext(
ctxTimeout,
mlmdAddr,
grpc.WithReturnConnectionError(),
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Error(err, "Stop ModelRegistry serving reconciliation")
return ctrl.Result{}, nil
}
defer conn.Close()

mlmdAddr = fmt.Sprintf("%s.%s.svc.cluster.local:%d", mrService.Name, req.Namespace, *grpcPort)
}

// setup grpc connection to ml-metadata
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

// Setup model registry service
log.Info("Connecting to " + mlmdAddr)
conn, err := grpc.DialContext(
ctxTimeout,
mlmdAddr,
grpc.WithReturnConnectionError(),
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Error(err, "Stop ModelRegistry serving reconciliation")
return ctrl.Result{}, nil
}
defer conn.Close()

mr, err := core.NewModelRegistryService(conn)
if err != nil {
log.Error(err, "Stop ModelRegistry serving reconciliation")
return ctrl.Result{}, nil
mr, err = core.NewModelRegistryService(conn)
if err != nil {
log.Error(err, "Stop ModelRegistry serving reconciliation")
return ctrl.Result{}, nil
}
}

// Reconcile the ServingEnvironment from Model Registry
err = r.mrSEReconciler.Reconcile(ctx, log, mr, req.Namespace)
err := r.mrSEReconciler.Reconcile(ctx, log, mr, req.Namespace)
if err != nil {
log.Error(err, "Stop ModelRegistry serving reconciliation")
return ctrl.Result{}, nil
Expand Down
Loading

0 comments on commit 1064261

Please sign in to comment.