Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Cache Serailize API #47

Merged
merged 60 commits into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from 58 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
27ca6d6
WIP: add reservation apis
milton0825 Apr 22, 2021
03fce3d
add missing files
milton0825 Apr 22, 2021
bce5b9c
added create DAO
milton0825 Apr 23, 2021
ca1b6dd
Add get dao
milton0825 Apr 24, 2021
20d1568
wip
milton0825 Apr 27, 2021
e80137e
fix tests
milton0825 Apr 27, 2021
0211470
wired reservation manager
milton0825 Apr 28, 2021
3c16c66
add todos
milton0825 Apr 28, 2021
9783b00
add more tests
milton0825 Apr 28, 2021
0b6135a
add more tests
milton0825 Apr 29, 2021
4457b5b
add more logging
milton0825 Apr 29, 2021
60b2c8d
add logging and stats
milton0825 Apr 29, 2021
acccd0a
fix lint
milton0825 Apr 29, 2021
e2c6b40
add more comments
milton0825 Apr 29, 2021
bf3011e
First -> Take
milton0825 May 10, 2021
eb5a4e7
WIP: add createOrupdate API
milton0825 May 13, 2021
f979633
Added boilerplate automation (#41)
yindia May 7, 2021
5d93425
update boilerplate code (#40)
samhita-alla May 7, 2021
7892458
Upgrade gorm to v1.21.9 (#42)
milton0825 May 19, 2021
c0fbbca
Added boilerplate automation (#43)
yindia May 27, 2021
c600d56
add more instructinos
milton0825 Jun 2, 2021
8e1b319
add more comments
milton0825 Jun 2, 2021
84a7ecb
refactor a bit
milton0825 Jun 2, 2021
6a1e6d2
add timer
milton0825 Jun 2, 2021
95b8716
fix lint / tests
milton0825 Jun 2, 2021
8fa9951
add comments & tests
milton0825 Jun 8, 2021
bf8405c
fix lint
milton0825 Jun 8, 2021
931b1d3
add docs
milton0825 Jun 8, 2021
571315c
Fix connection error handling (#45)
milton0825 Jun 8, 2021
9cb4472
Update code of conduct (#46)
samhita-alla Jul 29, 2021
803f4cb
separated ReservationManager CreateOrUpdate function into individual …
hamersaw Aug 24, 2021
a1fbc14
fixed race condition on Reservation repository Create function
hamersaw Aug 24, 2021
2afa4bc
changed reservation expiration to use heartbeatInterval and heartbeat…
hamersaw Aug 24, 2021
c72a6b6
fixed lint and unit test errors
hamersaw Aug 24, 2021
1bc36e4
added unit tests for extending reservation and update failure
hamersaw Aug 25, 2021
7bc155c
removed ExtendReservation API mocks
hamersaw Aug 25, 2021
080ff5c
added ExpiresAt and HeartbeatInterval fiedls to ReservationStatus ret…
hamersaw Aug 25, 2021
62fcc7f
implemented ReleaseReservation
hamersaw Aug 25, 2021
f5efc5b
added unit tests for reservation transformer
hamersaw Aug 25, 2021
89f41cd
fixed lint errors
hamersaw Aug 25, 2021
81e46da
implemented unit tests for ReleaseReservation API call
hamersaw Aug 26, 2021
e65570a
updated reservation API to only work with reservations - not actual a…
hamersaw Sep 20, 2021
06fcf90
Fix error type check to detect uniqueConstraintViolation
EngHabu Sep 1, 2021
c21349c
Revert "Fix error type check to detect uniqueConstraintViolation"
EngHabu Sep 1, 2021
2c968ee
Fix gorm wrong error type cast (#48)
EngHabu Sep 2, 2021
bb05d0f
added support for heartbeat_interval definition in reservation manager
hamersaw Sep 23, 2021
5b46ec6
updated test and fixed lint errors
hamersaw Sep 23, 2021
d20ffe4
removed unnecessary dependencies from go.mod
hamersaw Sep 28, 2021
125a1bc
fixing merge conflicts
hamersaw Nov 3, 2021
0da1bf2
updated flyteidl version - change before merging
hamersaw Nov 3, 2021
c058b55
adding reservation model to migration
hamersaw Nov 3, 2021
36e723c
Merge branch 'master' into feature/artifact-reservation
hamersaw Nov 16, 2021
2faa715
udpated dockerfile go template to reflect current master fixing rebas…
hamersaw Nov 16, 2021
0a956dc
and again .. with a space
hamersaw Nov 16, 2021
c61fef9
add docs on exports
hamersaw Nov 16, 2021
efab0f2
changed configuration to use config.Duration from flytestdlib instead…
hamersaw Nov 16, 2021
2ee1f41
added owner id to reservation gorm impl delete function
hamersaw Nov 16, 2021
f52a8cf
if reservation is missing on release reservation (meaning another ent…
hamersaw Nov 16, 2021
394e92a
updated flyteidl version
hamersaw Nov 18, 2021
df8dba4
remove flyteidl replace in go.mod and updating to latest version
hamersaw Nov 24, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datacatalog_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ datacatalog:
storage-prefix: "metadata"
metrics-scope: "datacatalog"
profiler-port: 10254
heartbeat-grace-period-multiplier: 3
max-reservation-heartbeat: 10s
storage:
connection:
access-key: minio
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.13

require (
github.com/Selvatico/go-mocket v1.0.7
github.com/flyteorg/flyteidl v0.18.17
github.com/flyteorg/flyteidl v0.18.38
github.com/flyteorg/flytestdlib v0.3.13
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.4.3
Expand All @@ -18,3 +18,5 @@ require (
gorm.io/driver/postgres v1.1.0
gorm.io/gorm v1.21.9
)

replace github.com/flyteorg/flyteidl => github.com/hamersaw/flyteidl v0.19.26-0.20211103115633-100abab11c51
hamersaw marked this conversation as resolved.
Show resolved Hide resolved
10 changes: 8 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.18.17 h1:74pPZ9PzITuzq+CgjMPb9EcFI5bVkf8mM5m4xmmlTmY=
github.com/flyteorg/flyteidl v0.18.17/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.38 h1:XgAw9d2Q/UjWQyXbnZz/j4N6OVGDxr7jceden6PdCgY=
github.com/flyteorg/flyteidl v0.18.38/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
Expand Down Expand Up @@ -308,6 +308,9 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.12.2/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hamersaw/flyteidl v0.19.26-0.20211103115633-100abab11c51 h1:e9zvtfNKr+K84a7du4wJgC+MXYcEsA13yYGMsEGsjQs=
github.com/hamersaw/flyteidl v0.19.26-0.20211103115633-100abab11c51/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
Expand Down Expand Up @@ -523,6 +526,7 @@ github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -989,6 +993,7 @@ google.golang.org/genproto v0.0.0-20200312145019-da6875a35672/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587/go.mod h1:YsZOwe1myG/8QRHRsmBRE1LrgQY60beZKjly0O1fX9U=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
Expand Down Expand Up @@ -1026,6 +1031,7 @@ google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8=
google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
Expand Down
5 changes: 3 additions & 2 deletions pkg/manager/impl/artifact_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ func getTestArtifact() *datacatalog.Artifact {

func newMockDataCatalogRepo() *mocks.DataCatalogRepo {
return &mocks.DataCatalogRepo{
MockDatasetRepo: &mocks.DatasetRepo{},
MockArtifactRepo: &mocks.ArtifactRepo{},
MockDatasetRepo: &mocks.DatasetRepo{},
MockArtifactRepo: &mocks.ArtifactRepo{},
MockReservationRepo: &mocks.ReservationRepo{},
}
}

Expand Down
212 changes: 212 additions & 0 deletions pkg/manager/impl/reservation_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package impl

import (
"context"
"time"

"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"

"github.com/flyteorg/datacatalog/pkg/errors"
"github.com/flyteorg/datacatalog/pkg/repositories"
repo_errors "github.com/flyteorg/datacatalog/pkg/repositories/errors"
"github.com/flyteorg/datacatalog/pkg/repositories/models"
"github.com/flyteorg/datacatalog/pkg/repositories/transformers"

"github.com/flyteorg/datacatalog/pkg/manager/interfaces"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog"
)

type reservationMetrics struct {
scope promutils.Scope
reservationAcquired labeled.Counter
reservationReleased labeled.Counter
reservationAlreadyInProgress labeled.Counter
acquireReservationFailure labeled.Counter
releaseReservationFailure labeled.Counter
reservationDoesNotExist labeled.Counter
}

type NowFunc func() time.Time

type reservationManager struct {
repo repositories.RepositoryInterface
heartbeatGracePeriodMultiplier time.Duration
maxHeartbeatInterval time.Duration
now NowFunc
systemMetrics reservationMetrics
}

// Creates a new reservation manager with the specified properties
func NewReservationManager(
hamersaw marked this conversation as resolved.
Show resolved Hide resolved
repo repositories.RepositoryInterface,
heartbeatGracePeriodMultiplier time.Duration,
maxHeartbeatInterval time.Duration,
nowFunc NowFunc, // Easier to mock time.Time for testing
reservationScope promutils.Scope,
) interfaces.ReservationManager {
systemMetrics := reservationMetrics{
scope: reservationScope,
reservationAcquired: labeled.NewCounter(
"reservation_acquired",
"Number of times a reservation was acquired",
reservationScope),
reservationReleased: labeled.NewCounter(
"reservation_released",
"Number of times a reservation was released",
reservationScope),
reservationAlreadyInProgress: labeled.NewCounter(
"reservation_already_in_progress",
"Number of times we try of acquire a reservation but the reservation is in progress",
reservationScope,
),
acquireReservationFailure: labeled.NewCounter(
"acquire_reservation_failure",
"Number of times we failed to acquire reservation",
reservationScope,
),
releaseReservationFailure: labeled.NewCounter(
"release_reservation_failure",
"Number of times we failed to release a reservation",
reservationScope,
),
reservationDoesNotExist: labeled.NewCounter(
"reservation_does_not_exist",
"Number of times we attempt to modify a reservation that does not exist",
reservationScope,
),
}

return &reservationManager{
repo: repo,
heartbeatGracePeriodMultiplier: heartbeatGracePeriodMultiplier,
maxHeartbeatInterval: maxHeartbeatInterval,
now: nowFunc,
systemMetrics: systemMetrics,
}
}

// Attempt to acquire a reservation for the specified artifact. If there is not active reservation, successfully
// acquire it. If you are the owner of the active reservation, extend it. If another owner, return the existing reservation.
func (r *reservationManager) GetOrExtendReservation(ctx context.Context, request *datacatalog.GetOrExtendReservationRequest) (*datacatalog.GetOrExtendReservationResponse, error) {
reservationID := request.ReservationId

// Use minimum of maxHeartbeatInterval and requested heartbeat interval
heartbeatInterval := r.maxHeartbeatInterval
requestHeartbeatInterval := request.GetHeartbeatInterval()
if requestHeartbeatInterval != nil && requestHeartbeatInterval.AsDuration() < heartbeatInterval {
heartbeatInterval = requestHeartbeatInterval.AsDuration()
}

reservation, err := r.tryAcquireReservation(ctx, reservationID, request.OwnerId, heartbeatInterval)
if err != nil {
r.systemMetrics.acquireReservationFailure.Inc(ctx)
return nil, err
}

return &datacatalog.GetOrExtendReservationResponse{
Reservation: &reservation,
}, nil
}

// tryAcquireReservation will fetch the reservation first and only create/update
// the reservation if it does not exist or has expired.
// This is an optimization to reduce the number of writes to db. We always need
// to do a GET here because we want to know who owns the reservation
// and show it to users on the UI. However, the reservation is held by a single
// task most of the times and there is no need to do a write.
func (r *reservationManager) tryAcquireReservation(ctx context.Context, reservationID *datacatalog.ReservationID, ownerID string, heartbeatInterval time.Duration) (datacatalog.Reservation, error) {
repo := r.repo.ReservationRepo()
reservationKey := transformers.FromReservationID(reservationID)
repoReservation, err := repo.Get(ctx, reservationKey)

reservationExists := true
if err != nil {
if errors.IsDoesNotExistError(err) {
// Reservation does not exist yet so let's create one
reservationExists = false
} else {
return datacatalog.Reservation{}, err
}
}

now := r.now()
newRepoReservation := models.Reservation{
ReservationKey: reservationKey,
OwnerID: ownerID,
ExpiresAt: now.Add(heartbeatInterval * r.heartbeatGracePeriodMultiplier),
}

// Conditional upsert on reservation. Race conditions are handled
// within the reservation repository Create and Update function calls.
var repoErr error
if !reservationExists {
repoErr = repo.Create(ctx, newRepoReservation, now)
} else if repoReservation.ExpiresAt.Before(now) || repoReservation.OwnerID == ownerID {
repoErr = repo.Update(ctx, newRepoReservation, now)
} else {
logger.Debugf(ctx, "Reservation: %+v is held by %s", reservationKey, repoReservation.OwnerID)

reservation, err := transformers.CreateReservation(&repoReservation, heartbeatInterval)
if err != nil {
return reservation, err
}

r.systemMetrics.reservationAlreadyInProgress.Inc(ctx)
return reservation, nil
}

if repoErr != nil {
if repoErr.Error() == repo_errors.AlreadyExists {
// Looks like someone else tried to obtain the reservation
// at the same time and they won. Let's find out who won.
rsv1, err := repo.Get(ctx, reservationKey)
if err != nil {
return datacatalog.Reservation{}, err
}

reservation, err := transformers.CreateReservation(&rsv1, heartbeatInterval)
if err != nil {
return reservation, err
}

r.systemMetrics.reservationAlreadyInProgress.Inc(ctx)
return reservation, nil
}

return datacatalog.Reservation{}, repoErr
}

// Reservation has been acquired or extended without error
reservation, err := transformers.CreateReservation(&newRepoReservation, heartbeatInterval)
if err != nil {
return reservation, err
}

r.systemMetrics.reservationAlreadyInProgress.Inc(ctx)
return reservation, nil
}

// Release an active reservation with the specified owner. If one does not exist, gracefully return.
func (r *reservationManager) ReleaseReservation(ctx context.Context, request *datacatalog.ReleaseReservationRequest) (*datacatalog.ReleaseReservationResponse, error) {
repo := r.repo.ReservationRepo()
reservationKey := transformers.FromReservationID(request.ReservationId)

err := repo.Delete(ctx, reservationKey, request.OwnerId)
if err != nil {
if errors.IsDoesNotExistError(err) {
logger.Warnf(ctx, "Reservation does not exist id: %+v, err %v", request.ReservationId, err)
r.systemMetrics.reservationDoesNotExist.Inc(ctx)
return &datacatalog.ReleaseReservationResponse{}, nil
}

logger.Errorf(ctx, "Failed to release reservation: %+v, err: %v", reservationKey, err)
r.systemMetrics.releaseReservationFailure.Inc(ctx)
return nil, err
}

r.systemMetrics.reservationReleased.Inc(ctx)
return &datacatalog.ReleaseReservationResponse{}, nil
}
Loading