Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
added support for heartbeat_interval definition in reservation manager
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Sep 28, 2021
1 parent 83b8149 commit f682886
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 12 deletions.
2 changes: 1 addition & 1 deletion datacatalog_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ datacatalog:
metrics-scope: "datacatalog"
profiler-port: 10254
heartbeat-grace-period-multiplier: 3
heartbeat-interval-sec: 5
max-reservation-heartbeat-sec: 10
storage:
connection:
access-key: minio
Expand Down
26 changes: 17 additions & 9 deletions pkg/manager/impl/reservation_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ type NowFunc func() time.Time
type reservationManager struct {
repo repositories.RepositoryInterface
heartbeatGracePeriodMultiplier time.Duration
heartbeatInterval time.Duration
maxHeartbeatInterval time.Duration
now NowFunc
systemMetrics reservationMetrics
}

func NewReservationManager(
repo repositories.RepositoryInterface,
heartbeatGracePeriodMultiplier time.Duration,
heartbeatInterval time.Duration,
maxHeartbeatInterval time.Duration,
nowFunc NowFunc, // Easier to mock time.Time for testing
reservationScope promutils.Scope,
) interfaces.ReservationManager {
Expand Down Expand Up @@ -75,15 +75,23 @@ func NewReservationManager(
return &reservationManager{
repo: repo,
heartbeatGracePeriodMultiplier: heartbeatGracePeriodMultiplier,
heartbeatInterval: heartbeatInterval,
maxHeartbeatInterval: maxHeartbeatInterval,
now: nowFunc,
systemMetrics: systemMetrics,
}
}

func (r *reservationManager) GetOrExtendReservation(ctx context.Context, request *datacatalog.GetOrExtendReservationRequest) (*datacatalog.GetOrExtendReservationResponse, error) {
reservationID := request.ReservationId
reservation, err := r.tryAcquireReservation(ctx, reservationID, request.OwnerId)

// Use minimum of maxHeartbeatInterval and requested heartbeat interval
heartbeatInterval := r.maxHeartbeatInterval
requestHeartbeatInterval := request.GetHeartbeatInterval()
if requestHeartbeatInterval != nil && requestHeartbeatInterval.AsDuration() < heartbeatInterval {
heartbeatInterval = requestHeartbeatInterval.AsDuration()
}

reservation, err := r.tryAcquireReservation(ctx, reservationID, request.OwnerId, heartbeatInterval)
if err != nil {
r.systemMetrics.acquireReservationFailure.Inc(ctx)
return nil, err
Expand All @@ -100,7 +108,7 @@ func (r *reservationManager) GetOrExtendReservation(ctx context.Context, request
// to do a GET here because we want to know who owns the reservation
// and show it to users on the UI. However, the reservation is held by a single
// task most of the times and there is no need to do a write.
func (r *reservationManager) tryAcquireReservation(ctx context.Context, reservationID *datacatalog.ReservationID, ownerID string) (datacatalog.Reservation, error) {
func (r *reservationManager) tryAcquireReservation(ctx context.Context, reservationID *datacatalog.ReservationID, ownerID string, heartbeatInterval time.Duration) (datacatalog.Reservation, error) {
repo := r.repo.ReservationRepo()
reservationKey := transformers.FromReservationID(reservationID)
repoReservation, err := repo.Get(ctx, reservationKey)
Expand All @@ -119,7 +127,7 @@ func (r *reservationManager) tryAcquireReservation(ctx context.Context, reservat
newRepoReservation := models.Reservation{
ReservationKey: reservationKey,
OwnerID: ownerID,
ExpiresAt: now.Add(r.heartbeatInterval * r.heartbeatGracePeriodMultiplier),
ExpiresAt: now.Add(heartbeatInterval * r.heartbeatGracePeriodMultiplier),
}

// Conditional upsert on reservation. Race conditions are handled
Expand All @@ -132,7 +140,7 @@ func (r *reservationManager) tryAcquireReservation(ctx context.Context, reservat
} else {
logger.Debugf(ctx, "Reservation: %+v is held by %s", reservationKey, repoReservation.OwnerID)

reservation, err := transformers.CreateReservation(&repoReservation, r.heartbeatInterval)
reservation, err := transformers.CreateReservation(&repoReservation, heartbeatInterval)
if err != nil {
return reservation, err
}
Expand All @@ -150,7 +158,7 @@ func (r *reservationManager) tryAcquireReservation(ctx context.Context, reservat
return datacatalog.Reservation{}, err
}

reservation, err := transformers.CreateReservation(&rsv1, r.heartbeatInterval)
reservation, err := transformers.CreateReservation(&rsv1, heartbeatInterval)
if err != nil {
return reservation, err
}
Expand All @@ -163,7 +171,7 @@ func (r *reservationManager) tryAcquireReservation(ctx context.Context, reservat
}

// Reservation has been acquired or extended without error
reservation, err := transformers.CreateReservation(&newRepoReservation, r.heartbeatInterval)
reservation, err := transformers.CreateReservation(&newRepoReservation, heartbeatInterval)
if err != nil {
return reservation, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/datacatalogservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func NewDataCatalogService() *DataCatalogService {
DatasetManager: impl.NewDatasetManager(repos, dataStorageClient, catalogScope.NewSubScope("dataset")),
ArtifactManager: impl.NewArtifactManager(repos, dataStorageClient, storagePrefix, catalogScope.NewSubScope("artifact")),
TagManager: impl.NewTagManager(repos, dataStorageClient, catalogScope.NewSubScope("tag")),
ReservationManager: impl.NewReservationManager(repos, time.Duration(dataCatalogConfig.HeartbeatGracePeriodMultiplier), time.Second*time.Duration(dataCatalogConfig.HeartbeatIntervalSec), time.Now,
ReservationManager: impl.NewReservationManager(repos, time.Duration(dataCatalogConfig.HeartbeatGracePeriodMultiplier), time.Second*time.Duration(dataCatalogConfig.MaxReservationHeartbeatSec), time.Now,
catalogScope.NewSubScope("reservation")),
}
}
2 changes: 1 addition & 1 deletion pkg/runtime/configs/data_catalog_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ type DataCatalogConfig struct {
MetricsScope string `json:"metrics-scope" pflag:",Scope that the metrics will record under."`
ProfilerPort int `json:"profiler-port" pflag:",Port that the profiling service is listening on."`
HeartbeatGracePeriodMultiplier int `json:"heartbeat-grace-period-multiplier" pflag:",Number of heartbeats before a reservation expires without an extension."`
HeartbeatIntervalSec int `json:"heartbeat-interval-sec" pflag:",Recommended reservation extension heartbeat interval."`
MaxReservationHeartbeatSec int `json:"max-reservation-heartbeat-sec" pflag:",The maximum available reservation extension heartbeat interval."`
}

0 comments on commit f682886

Please sign in to comment.