diff --git a/datacatalog_config.yaml b/datacatalog_config.yaml index e0f9c3d1..6065f482 100644 --- a/datacatalog_config.yaml +++ b/datacatalog_config.yaml @@ -11,6 +11,8 @@ datacatalog: storage-prefix: "metadata" metrics-scope: "datacatalog" profiler-port: 10254 + heartbeat-grace-period-multiplier: 3 + max-reservation-heartbeat: 10s storage: connection: access-key: minio diff --git a/go.mod b/go.mod index f4e3ba63..75b47c72 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.13 require ( github.com/Selvatico/go-mocket v1.0.7 - github.com/flyteorg/flyteidl v0.18.17 + github.com/flyteorg/flyteidl v0.21.11 github.com/flyteorg/flytestdlib v0.3.13 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/golang/protobuf v1.4.3 diff --git a/go.sum b/go.sum index 55dc41b7..60e520a0 100644 --- a/go.sum +++ b/go.sum @@ -88,7 +88,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= @@ -135,7 +134,6 @@ github.com/coocood/freecache v1.1.1 h1:uukNF7QKCZEdZ9gAV7WQzvh0SbjwdMF6m3x3rxEka github.com/coocood/freecache v1.1.1/go.mod h1:OKrEjkGVoxZhyWAJoeFi5BMLUJm2Tit0kpGkIr7NGYY= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= -github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= @@ -175,8 +173,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= -github.com/flyteorg/flyteidl v0.18.17 h1:74pPZ9PzITuzq+CgjMPb9EcFI5bVkf8mM5m4xmmlTmY= -github.com/flyteorg/flyteidl v0.18.17/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= +github.com/flyteorg/flyteidl v0.21.11 h1:oH9YPoR7scO9GFF/I8D0gCTOB+JP5HRK7b7cLUBRz90= +github.com/flyteorg/flyteidl v0.21.11/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= @@ -307,7 +305,7 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.1.0/go.mod h1:f5nM7jw/oeRSadq3xC github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= -github.com/grpc-ecosystem/grpc-gateway v1.12.2/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c= +github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= @@ -523,6 +521,7 @@ github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9 github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -533,7 +532,6 @@ github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= -github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35/go.mod h1:prYjPmNq4d1NPVmpShWobRqXY3q7Vp+80DqgxxUrUIA= github.com/pquerna/ffjson v0.0.0-20190813045741-dac163c6c0a9/go.mod h1:YARuvh7BUWHNhzDq2OM5tzR2RiCcN2D7sapiKyCel/M= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= @@ -744,7 +742,6 @@ golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -973,7 +970,6 @@ google.golang.org/genproto v0.0.0-20190530194941-fb225487d101/go.mod h1:z3L6/3dT google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= -google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20191115194625-c23dd37a84c9/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= @@ -989,6 +985,7 @@ google.golang.org/genproto v0.0.0-20200312145019-da6875a35672/go.mod h1:55QSHmfG google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587/go.mod h1:YsZOwe1myG/8QRHRsmBRE1LrgQY60beZKjly0O1fX9U= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA= @@ -1015,7 +1012,6 @@ google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ij google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= -google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= @@ -1026,6 +1022,7 @@ google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= +google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8= google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= @@ -1058,7 +1055,6 @@ gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU= gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/kothar/go-backblaze.v0 v0.0.0-20190520213052-702d4e7eb465/go.mod h1:zJ2QpyDCYo1KvLXlmdnFlQAyF/Qfth0fB8239Qg7BIE= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= -gopkg.in/square/go-jose.v2 v2.4.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= diff --git a/pkg/manager/impl/artifact_manager_test.go b/pkg/manager/impl/artifact_manager_test.go index eada92dd..386e15a5 100644 --- a/pkg/manager/impl/artifact_manager_test.go +++ b/pkg/manager/impl/artifact_manager_test.go @@ -90,8 +90,9 @@ func getTestArtifact() *datacatalog.Artifact { func newMockDataCatalogRepo() *mocks.DataCatalogRepo { return &mocks.DataCatalogRepo{ - MockDatasetRepo: &mocks.DatasetRepo{}, - MockArtifactRepo: &mocks.ArtifactRepo{}, + MockDatasetRepo: &mocks.DatasetRepo{}, + MockArtifactRepo: &mocks.ArtifactRepo{}, + MockReservationRepo: &mocks.ReservationRepo{}, } } diff --git a/pkg/manager/impl/reservation_manager.go b/pkg/manager/impl/reservation_manager.go new file mode 100644 index 00000000..3adee114 --- /dev/null +++ b/pkg/manager/impl/reservation_manager.go @@ -0,0 +1,212 @@ +package impl + +import ( + "context" + "time" + + "github.com/flyteorg/flytestdlib/logger" + "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/promutils/labeled" + + "github.com/flyteorg/datacatalog/pkg/errors" + "github.com/flyteorg/datacatalog/pkg/repositories" + repo_errors "github.com/flyteorg/datacatalog/pkg/repositories/errors" + "github.com/flyteorg/datacatalog/pkg/repositories/models" + "github.com/flyteorg/datacatalog/pkg/repositories/transformers" + + "github.com/flyteorg/datacatalog/pkg/manager/interfaces" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog" +) + +type reservationMetrics struct { + scope promutils.Scope + reservationAcquired labeled.Counter + reservationReleased labeled.Counter + reservationAlreadyInProgress labeled.Counter + acquireReservationFailure labeled.Counter + releaseReservationFailure labeled.Counter + reservationDoesNotExist labeled.Counter +} + +type NowFunc func() time.Time + +type reservationManager struct { + repo repositories.RepositoryInterface + heartbeatGracePeriodMultiplier time.Duration + maxHeartbeatInterval time.Duration + now NowFunc + systemMetrics reservationMetrics +} + +// Creates a new reservation manager with the specified properties +func NewReservationManager( + repo repositories.RepositoryInterface, + heartbeatGracePeriodMultiplier time.Duration, + maxHeartbeatInterval time.Duration, + nowFunc NowFunc, // Easier to mock time.Time for testing + reservationScope promutils.Scope, +) interfaces.ReservationManager { + systemMetrics := reservationMetrics{ + scope: reservationScope, + reservationAcquired: labeled.NewCounter( + "reservation_acquired", + "Number of times a reservation was acquired", + reservationScope), + reservationReleased: labeled.NewCounter( + "reservation_released", + "Number of times a reservation was released", + reservationScope), + reservationAlreadyInProgress: labeled.NewCounter( + "reservation_already_in_progress", + "Number of times we try of acquire a reservation but the reservation is in progress", + reservationScope, + ), + acquireReservationFailure: labeled.NewCounter( + "acquire_reservation_failure", + "Number of times we failed to acquire reservation", + reservationScope, + ), + releaseReservationFailure: labeled.NewCounter( + "release_reservation_failure", + "Number of times we failed to release a reservation", + reservationScope, + ), + reservationDoesNotExist: labeled.NewCounter( + "reservation_does_not_exist", + "Number of times we attempt to modify a reservation that does not exist", + reservationScope, + ), + } + + return &reservationManager{ + repo: repo, + heartbeatGracePeriodMultiplier: heartbeatGracePeriodMultiplier, + maxHeartbeatInterval: maxHeartbeatInterval, + now: nowFunc, + systemMetrics: systemMetrics, + } +} + +// Attempt to acquire a reservation for the specified artifact. If there is not active reservation, successfully +// acquire it. If you are the owner of the active reservation, extend it. If another owner, return the existing reservation. +func (r *reservationManager) GetOrExtendReservation(ctx context.Context, request *datacatalog.GetOrExtendReservationRequest) (*datacatalog.GetOrExtendReservationResponse, error) { + reservationID := request.ReservationId + + // Use minimum of maxHeartbeatInterval and requested heartbeat interval + heartbeatInterval := r.maxHeartbeatInterval + requestHeartbeatInterval := request.GetHeartbeatInterval() + if requestHeartbeatInterval != nil && requestHeartbeatInterval.AsDuration() < heartbeatInterval { + heartbeatInterval = requestHeartbeatInterval.AsDuration() + } + + reservation, err := r.tryAcquireReservation(ctx, reservationID, request.OwnerId, heartbeatInterval) + if err != nil { + r.systemMetrics.acquireReservationFailure.Inc(ctx) + return nil, err + } + + return &datacatalog.GetOrExtendReservationResponse{ + Reservation: &reservation, + }, nil +} + +// tryAcquireReservation will fetch the reservation first and only create/update +// the reservation if it does not exist or has expired. +// This is an optimization to reduce the number of writes to db. We always need +// to do a GET here because we want to know who owns the reservation +// and show it to users on the UI. However, the reservation is held by a single +// task most of the times and there is no need to do a write. +func (r *reservationManager) tryAcquireReservation(ctx context.Context, reservationID *datacatalog.ReservationID, ownerID string, heartbeatInterval time.Duration) (datacatalog.Reservation, error) { + repo := r.repo.ReservationRepo() + reservationKey := transformers.FromReservationID(reservationID) + repoReservation, err := repo.Get(ctx, reservationKey) + + reservationExists := true + if err != nil { + if errors.IsDoesNotExistError(err) { + // Reservation does not exist yet so let's create one + reservationExists = false + } else { + return datacatalog.Reservation{}, err + } + } + + now := r.now() + newRepoReservation := models.Reservation{ + ReservationKey: reservationKey, + OwnerID: ownerID, + ExpiresAt: now.Add(heartbeatInterval * r.heartbeatGracePeriodMultiplier), + } + + // Conditional upsert on reservation. Race conditions are handled + // within the reservation repository Create and Update function calls. + var repoErr error + if !reservationExists { + repoErr = repo.Create(ctx, newRepoReservation, now) + } else if repoReservation.ExpiresAt.Before(now) || repoReservation.OwnerID == ownerID { + repoErr = repo.Update(ctx, newRepoReservation, now) + } else { + logger.Debugf(ctx, "Reservation: %+v is held by %s", reservationKey, repoReservation.OwnerID) + + reservation, err := transformers.CreateReservation(&repoReservation, heartbeatInterval) + if err != nil { + return reservation, err + } + + r.systemMetrics.reservationAlreadyInProgress.Inc(ctx) + return reservation, nil + } + + if repoErr != nil { + if repoErr.Error() == repo_errors.AlreadyExists { + // Looks like someone else tried to obtain the reservation + // at the same time and they won. Let's find out who won. + rsv1, err := repo.Get(ctx, reservationKey) + if err != nil { + return datacatalog.Reservation{}, err + } + + reservation, err := transformers.CreateReservation(&rsv1, heartbeatInterval) + if err != nil { + return reservation, err + } + + r.systemMetrics.reservationAlreadyInProgress.Inc(ctx) + return reservation, nil + } + + return datacatalog.Reservation{}, repoErr + } + + // Reservation has been acquired or extended without error + reservation, err := transformers.CreateReservation(&newRepoReservation, heartbeatInterval) + if err != nil { + return reservation, err + } + + r.systemMetrics.reservationAlreadyInProgress.Inc(ctx) + return reservation, nil +} + +// Release an active reservation with the specified owner. If one does not exist, gracefully return. +func (r *reservationManager) ReleaseReservation(ctx context.Context, request *datacatalog.ReleaseReservationRequest) (*datacatalog.ReleaseReservationResponse, error) { + repo := r.repo.ReservationRepo() + reservationKey := transformers.FromReservationID(request.ReservationId) + + err := repo.Delete(ctx, reservationKey, request.OwnerId) + if err != nil { + if errors.IsDoesNotExistError(err) { + logger.Warnf(ctx, "Reservation does not exist id: %+v, err %v", request.ReservationId, err) + r.systemMetrics.reservationDoesNotExist.Inc(ctx) + return &datacatalog.ReleaseReservationResponse{}, nil + } + + logger.Errorf(ctx, "Failed to release reservation: %+v, err: %v", reservationKey, err) + r.systemMetrics.releaseReservationFailure.Inc(ctx) + return nil, err + } + + r.systemMetrics.reservationReleased.Inc(ctx) + return &datacatalog.ReleaseReservationResponse{}, nil +} diff --git a/pkg/manager/impl/reservation_manager_test.go b/pkg/manager/impl/reservation_manager_test.go new file mode 100644 index 00000000..207ce813 --- /dev/null +++ b/pkg/manager/impl/reservation_manager_test.go @@ -0,0 +1,393 @@ +package impl + +import ( + "context" + "fmt" + + mockScope "github.com/flyteorg/flytestdlib/promutils" + + "testing" + "time" + + errors2 "github.com/flyteorg/datacatalog/pkg/errors" + errors3 "github.com/flyteorg/datacatalog/pkg/repositories/errors" + "github.com/flyteorg/datacatalog/pkg/repositories/mocks" + "github.com/flyteorg/datacatalog/pkg/repositories/models" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog" + "github.com/golang/protobuf/ptypes" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc/codes" +) + +var tagName = "tag" +var project = "p" +var name = "n" +var domain = "d" +var version = "v" +var datasetID = datacatalog.DatasetID{ + Project: project, + Name: name, + Domain: domain, + Version: version, +} +var reservationID = datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: tagName, +} +var heartbeatInterval = time.Second * 5 +var heartbeatIntervalPb = ptypes.DurationProto(heartbeatInterval) +var maxHeartbeatInterval = time.Second * 10 +var maxHeartbeatIntervalPb = ptypes.DurationProto(maxHeartbeatInterval) +var heartbeatGracePeriodMultiplier = time.Second * 3 +var prevOwner = "prevOwner" +var currentOwner = "currentOwner" + +func TestGetOrExtendReservation_CreateReservation(t *testing.T) { + dcRepo := getDatacatalogRepo() + + setUpTagRepoGetNotFound(&dcRepo) + + dcRepo.MockReservationRepo.On("Get", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(key models.ReservationKey) bool { + return key.DatasetProject == datasetID.Project && + key.DatasetDomain == datasetID.Domain && + key.DatasetVersion == datasetID.Version && + key.DatasetName == datasetID.Name && + key.TagName == tagName + })).Return(models.Reservation{}, errors2.NewDataCatalogErrorf(codes.NotFound, "entry not found")) + + now := time.Now() + + dcRepo.MockReservationRepo.On("Create", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservation models.Reservation) bool { + return reservation.DatasetProject == datasetID.Project && + reservation.DatasetDomain == datasetID.Domain && + reservation.DatasetName == datasetID.Name && + reservation.DatasetVersion == datasetID.Version && + reservation.TagName == tagName && + reservation.OwnerID == currentOwner && + reservation.ExpiresAt == now.Add(heartbeatInterval*heartbeatGracePeriodMultiplier) + }), + mock.MatchedBy(func(now time.Time) bool { return true }), + ).Return(nil) + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, maxHeartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := datacatalog.GetOrExtendReservationRequest{ + ReservationId: &reservationID, + OwnerId: currentOwner, + HeartbeatInterval: heartbeatIntervalPb, + } + + resp, err := reservationManager.GetOrExtendReservation(context.Background(), &req) + + assert.Nil(t, err) + assert.Equal(t, currentOwner, resp.GetReservation().OwnerId) + assert.Equal(t, heartbeatIntervalPb, resp.GetReservation().HeartbeatInterval) +} + +func TestGetOrExtendReservation_MaxHeartbeatInterval(t *testing.T) { + dcRepo := getDatacatalogRepo() + + setUpTagRepoGetNotFound(&dcRepo) + + dcRepo.MockReservationRepo.On("Get", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(key models.ReservationKey) bool { + return key.DatasetProject == datasetID.Project && + key.DatasetDomain == datasetID.Domain && + key.DatasetVersion == datasetID.Version && + key.DatasetName == datasetID.Name && + key.TagName == tagName + })).Return(models.Reservation{}, errors2.NewDataCatalogErrorf(codes.NotFound, "entry not found")) + + now := time.Now() + + dcRepo.MockReservationRepo.On("Create", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservation models.Reservation) bool { + return reservation.DatasetProject == datasetID.Project && + reservation.DatasetDomain == datasetID.Domain && + reservation.DatasetName == datasetID.Name && + reservation.DatasetVersion == datasetID.Version && + reservation.TagName == tagName && + reservation.OwnerID == currentOwner && + reservation.ExpiresAt == now.Add(heartbeatInterval*heartbeatGracePeriodMultiplier) + }), + mock.MatchedBy(func(now time.Time) bool { return true }), + ).Return(nil) + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, heartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := datacatalog.GetOrExtendReservationRequest{ + ReservationId: &reservationID, + OwnerId: currentOwner, + HeartbeatInterval: maxHeartbeatIntervalPb, + } + + resp, err := reservationManager.GetOrExtendReservation(context.Background(), &req) + + assert.Nil(t, err) + assert.Equal(t, currentOwner, resp.GetReservation().OwnerId) + assert.Equal(t, heartbeatIntervalPb, resp.GetReservation().HeartbeatInterval) +} + +func TestGetOrExtendReservation_ExtendReservation(t *testing.T) { + dcRepo := getDatacatalogRepo() + + setUpTagRepoGetNotFound(&dcRepo) + + now := time.Now() + prevExpiresAt := now.Add(time.Second * 10) + + setUpReservationRepoGet(&dcRepo, prevExpiresAt) + + dcRepo.MockReservationRepo.On("Update", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservation models.Reservation) bool { + return reservation.DatasetProject == datasetID.Project && + reservation.DatasetDomain == datasetID.Domain && + reservation.DatasetName == datasetID.Name && + reservation.DatasetVersion == datasetID.Version && + reservation.TagName == tagName && + reservation.OwnerID == prevOwner && + reservation.ExpiresAt == now.Add(heartbeatInterval*heartbeatGracePeriodMultiplier) + }), + mock.MatchedBy(func(now time.Time) bool { return true }), + ).Return(nil) + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, maxHeartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := datacatalog.GetOrExtendReservationRequest{ + ReservationId: &reservationID, + OwnerId: prevOwner, + HeartbeatInterval: heartbeatIntervalPb, + } + + resp, err := reservationManager.GetOrExtendReservation(context.Background(), &req) + + assert.Nil(t, err) + assert.Equal(t, prevOwner, resp.GetReservation().OwnerId) +} + +func TestGetOrExtendReservation_TakeOverReservation(t *testing.T) { + dcRepo := getDatacatalogRepo() + + setUpTagRepoGetNotFound(&dcRepo) + + now := time.Now() + prevExpiresAt := now.Add(time.Second * 10 * time.Duration(-1)) + + setUpReservationRepoGet(&dcRepo, prevExpiresAt) + + dcRepo.MockReservationRepo.On("Update", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservation models.Reservation) bool { + return reservation.DatasetProject == datasetID.Project && + reservation.DatasetDomain == datasetID.Domain && + reservation.DatasetName == datasetID.Name && + reservation.DatasetVersion == datasetID.Version && + reservation.TagName == tagName && + reservation.OwnerID == currentOwner && + reservation.ExpiresAt == now.Add(heartbeatInterval*heartbeatGracePeriodMultiplier) + }), + mock.MatchedBy(func(now time.Time) bool { return true }), + ).Return(nil) + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, maxHeartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := datacatalog.GetOrExtendReservationRequest{ + ReservationId: &reservationID, + OwnerId: currentOwner, + HeartbeatInterval: heartbeatIntervalPb, + } + + resp, err := reservationManager.GetOrExtendReservation(context.Background(), &req) + + assert.Nil(t, err) + assert.Equal(t, currentOwner, resp.GetReservation().OwnerId) +} + +func TestGetOrExtendReservation_ReservationExists(t *testing.T) { + dcRepo := getDatacatalogRepo() + + setUpTagRepoGetNotFound(&dcRepo) + + now := time.Now() + prevExpiresAt := now.Add(time.Second * 10) + + setUpReservationRepoGet(&dcRepo, prevExpiresAt) + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, maxHeartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := datacatalog.GetOrExtendReservationRequest{ + ReservationId: &reservationID, + OwnerId: currentOwner, + HeartbeatInterval: heartbeatIntervalPb, + } + + resp, err := reservationManager.GetOrExtendReservation(context.Background(), &req) + + assert.Nil(t, err) + assert.Equal(t, prevOwner, resp.GetReservation().OwnerId) +} + +func TestReleaseReservation(t *testing.T) { + dcRepo := getDatacatalogRepo() + + now := time.Now() + + dcRepo.MockReservationRepo.On("Delete", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservationKey models.ReservationKey) bool { + return reservationKey.DatasetProject == datasetID.Project && + reservationKey.DatasetDomain == datasetID.Domain && + reservationKey.DatasetName == datasetID.Name && + reservationKey.DatasetVersion == datasetID.Version && + reservationKey.TagName == tagName + }), + mock.MatchedBy(func(ownerID string) bool { + return ownerID == currentOwner + }), + ).Return(nil) + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, maxHeartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := datacatalog.ReleaseReservationRequest{ + ReservationId: &reservationID, + OwnerId: currentOwner, + } + + _, err := reservationManager.ReleaseReservation(context.Background(), &req) + + assert.Nil(t, err) +} + +func TestReleaseReservation_Failure(t *testing.T) { + dcRepo := getDatacatalogRepo() + + now := time.Now() + reservationErr := fmt.Errorf("unknown error") + + dcRepo.MockReservationRepo.On("Delete", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservationKey models.ReservationKey) bool { + return reservationKey.DatasetProject == datasetID.Project && + reservationKey.DatasetDomain == datasetID.Domain && + reservationKey.DatasetName == datasetID.Name && + reservationKey.DatasetVersion == datasetID.Version && + reservationKey.TagName == tagName + }), + mock.MatchedBy(func(ownerID string) bool { + return ownerID == currentOwner + }), + ).Return(reservationErr) + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, maxHeartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := datacatalog.ReleaseReservationRequest{ + ReservationId: &reservationID, + OwnerId: currentOwner, + } + + _, err := reservationManager.ReleaseReservation(context.Background(), &req) + + assert.Equal(t, reservationErr, err) +} + +func TestReleaseReservation_GracefulFailure(t *testing.T) { + dcRepo := getDatacatalogRepo() + + now := time.Now() + reservationErr := errors3.GetMissingEntityError("Reservation", + &datacatalog.ReservationID{ + DatasetId: &datasetID, + TagName: tagName, + }) + + dcRepo.MockReservationRepo.On("Delete", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(reservationKey models.ReservationKey) bool { + return reservationKey.DatasetProject == datasetID.Project && + reservationKey.DatasetDomain == datasetID.Domain && + reservationKey.DatasetName == datasetID.Name && + reservationKey.DatasetVersion == datasetID.Version && + reservationKey.TagName == tagName + }), + mock.MatchedBy(func(ownerID string) bool { + return ownerID == currentOwner + }), + ).Return(reservationErr) + + reservationManager := NewReservationManager(&dcRepo, + heartbeatGracePeriodMultiplier, maxHeartbeatInterval, + func() time.Time { return now }, mockScope.NewTestScope()) + + req := datacatalog.ReleaseReservationRequest{ + ReservationId: &reservationID, + OwnerId: currentOwner, + } + + _, err := reservationManager.ReleaseReservation(context.Background(), &req) + + assert.Nil(t, err) +} + +func getDatacatalogRepo() mocks.DataCatalogRepo { + return mocks.DataCatalogRepo{ + MockReservationRepo: &mocks.ReservationRepo{}, + MockTagRepo: &mocks.TagRepo{}, + } +} + +func setUpReservationRepoGet(dcRepo *mocks.DataCatalogRepo, prevExpiresAt time.Time) { + dcRepo.MockReservationRepo.On("Get", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + mock.MatchedBy(func(key models.ReservationKey) bool { + return key.DatasetProject == datasetID.Project && + key.DatasetDomain == datasetID.Domain && + key.DatasetVersion == datasetID.Version && + key.DatasetName == datasetID.Name && + key.TagName == tagName + })).Return( + models.Reservation{ + ReservationKey: getReservationKey(), + OwnerID: prevOwner, + ExpiresAt: prevExpiresAt, + }, nil, + ) +} + +func setUpTagRepoGetNotFound(dcRepo *mocks.DataCatalogRepo) { + dcRepo.MockTagRepo.On("Get", + mock.Anything, + mock.Anything, + ).Return(models.Tag{}, errors2.NewDataCatalogErrorf(codes.NotFound, "entry not found")) +} + +func getReservationKey() models.ReservationKey { + return models.ReservationKey{ + DatasetProject: project, + DatasetName: name, + DatasetDomain: domain, + DatasetVersion: version, + TagName: tagName, + } +} diff --git a/pkg/manager/interfaces/reservation.go b/pkg/manager/interfaces/reservation.go new file mode 100644 index 00000000..42d95b8c --- /dev/null +++ b/pkg/manager/interfaces/reservation.go @@ -0,0 +1,15 @@ +package interfaces + +import ( + "context" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog" +) + +// ReservationManager is the interface to handle reservation requests. +// You can find more details about the APIs in datacatalog service proto +// in flyteidl +type ReservationManager interface { + GetOrExtendReservation(context.Context, *datacatalog.GetOrExtendReservationRequest) (*datacatalog.GetOrExtendReservationResponse, error) + ReleaseReservation(context.Context, *datacatalog.ReleaseReservationRequest) (*datacatalog.ReleaseReservationResponse, error) +} diff --git a/pkg/repositories/errors/errors.go b/pkg/repositories/errors/errors.go index c497192e..1c2a29fc 100644 --- a/pkg/repositories/errors/errors.go +++ b/pkg/repositories/errors/errors.go @@ -9,6 +9,7 @@ import ( ) const ( + AlreadyExists = "entity already exists" notFound = "missing entity of type %s with identifier %v" invalidJoin = "cannot relate entity %s with entity %s" invalidEntity = "no such entity %s" diff --git a/pkg/repositories/factory.go b/pkg/repositories/factory.go index 46dacc69..ae234a18 100644 --- a/pkg/repositories/factory.go +++ b/pkg/repositories/factory.go @@ -26,6 +26,7 @@ type RepositoryInterface interface { DatasetRepo() interfaces.DatasetRepo ArtifactRepo() interfaces.ArtifactRepo TagRepo() interfaces.TagRepo + ReservationRepo() interfaces.ReservationRepo } func GetRepository(repoType RepoConfig, dbConfig config.DbConfig, scope promutils.Scope) RepositoryInterface { diff --git a/pkg/repositories/gormimpl/metrics.go b/pkg/repositories/gormimpl/metrics.go index 1c154230..42fccd3e 100644 --- a/pkg/repositories/gormimpl/metrics.go +++ b/pkg/repositories/gormimpl/metrics.go @@ -11,8 +11,10 @@ import ( type gormMetrics struct { Scope promutils.Scope CreateDuration labeled.StopWatch + DeleteDuration labeled.StopWatch GetDuration labeled.StopWatch ListDuration labeled.StopWatch + UpdateDuration labeled.StopWatch } func newGormMetrics(scope promutils.Scope) gormMetrics { @@ -20,9 +22,13 @@ func newGormMetrics(scope promutils.Scope) gormMetrics { Scope: scope, CreateDuration: labeled.NewStopWatch( "create", "Duration for creating a new entity", time.Millisecond, scope), + DeleteDuration: labeled.NewStopWatch( + "delete", "Duration for deleting a new entity", time.Millisecond, scope), GetDuration: labeled.NewStopWatch( "get", "Duration for retrieving an entity ", time.Millisecond, scope), ListDuration: labeled.NewStopWatch( "list", "Duration for listing entities ", time.Millisecond, scope), + UpdateDuration: labeled.NewStopWatch( + "update", "Duration for updating entities ", time.Millisecond, scope), } } diff --git a/pkg/repositories/gormimpl/reservation.go b/pkg/repositories/gormimpl/reservation.go new file mode 100644 index 00000000..fffeba36 --- /dev/null +++ b/pkg/repositories/gormimpl/reservation.go @@ -0,0 +1,115 @@ +package gormimpl + +import ( + "context" + + datacatalog_error "github.com/flyteorg/datacatalog/pkg/errors" + "google.golang.org/grpc/codes" + + "time" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog" + + errors2 "github.com/flyteorg/datacatalog/pkg/repositories/errors" + "github.com/flyteorg/datacatalog/pkg/repositories/interfaces" + "github.com/flyteorg/datacatalog/pkg/repositories/models" + "github.com/flyteorg/flytestdlib/promutils" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +type reservationRepo struct { + db *gorm.DB + repoMetrics gormMetrics + errorTransformer errors2.ErrorTransformer +} + +// NewReservationRepo creates a reservationRepo +func NewReservationRepo(db *gorm.DB, errorTransformer errors2.ErrorTransformer, scope promutils.Scope) interfaces.ReservationRepo { + return &reservationRepo{ + db: db, + errorTransformer: errorTransformer, + repoMetrics: newGormMetrics(scope), + } +} + +func (r *reservationRepo) Create(ctx context.Context, reservation models.Reservation, now time.Time) error { + timer := r.repoMetrics.CreateDuration.Start(ctx) + defer timer.Stop() + + result := r.db.Clauses(clause.OnConflict{DoNothing: true}).Create(&reservation) + if result.Error != nil { + return r.errorTransformer.ToDataCatalogError(result.Error) + } + + if result.RowsAffected == 0 { + return datacatalog_error.NewDataCatalogError(codes.FailedPrecondition, errors2.AlreadyExists) + } + + return nil +} + +func (r *reservationRepo) Delete(ctx context.Context, reservationKey models.ReservationKey, ownerID string) error { + timer := r.repoMetrics.DeleteDuration.Start(ctx) + defer timer.Stop() + + var reservation models.Reservation + + result := r.db.Where(&models.Reservation{ + ReservationKey: reservationKey, + OwnerID: ownerID, + }).Delete(&reservation) + if result.Error != nil { + return r.errorTransformer.ToDataCatalogError(result.Error) + } + + if result.RowsAffected == 0 { + return errors2.GetMissingEntityError("Reservation", + &datacatalog.ReservationID{ + DatasetId: &datacatalog.DatasetID{ + Project: reservationKey.DatasetProject, + Domain: reservationKey.DatasetDomain, + Name: reservationKey.DatasetName, + Version: reservationKey.DatasetVersion, + }, + TagName: reservationKey.TagName, + }) + } + + return nil +} + +func (r *reservationRepo) Get(ctx context.Context, reservationKey models.ReservationKey) (models.Reservation, error) { + timer := r.repoMetrics.GetDuration.Start(ctx) + defer timer.Stop() + + var reservation models.Reservation + + result := r.db.Where(&models.Reservation{ + ReservationKey: reservationKey, + }).Take(&reservation) + + if result.Error != nil { + return reservation, r.errorTransformer.ToDataCatalogError(result.Error) + } + + return reservation, nil +} + +func (r *reservationRepo) Update(ctx context.Context, reservation models.Reservation, now time.Time) error { + timer := r.repoMetrics.UpdateDuration.Start(ctx) + defer timer.Stop() + + result := r.db.Model(&models.Reservation{ + ReservationKey: reservation.ReservationKey, + }).Where("expires_at<=? OR owner_id=?", now, reservation.OwnerID).Updates(reservation) + if result.Error != nil { + return r.errorTransformer.ToDataCatalogError(result.Error) + } + + if result.RowsAffected == 0 { + return datacatalog_error.NewDataCatalogError(codes.FailedPrecondition, errors2.AlreadyExists) + } + + return nil +} diff --git a/pkg/repositories/gormimpl/reservation_test.go b/pkg/repositories/gormimpl/reservation_test.go new file mode 100644 index 00000000..6368e90b --- /dev/null +++ b/pkg/repositories/gormimpl/reservation_test.go @@ -0,0 +1,185 @@ +package gormimpl + +import ( + "context" + "database/sql" + "fmt" + + "testing" + "time" + + "gorm.io/driver/postgres" + "gorm.io/gorm" + + "github.com/flyteorg/datacatalog/pkg/repositories/interfaces" + + apiErrors "github.com/flyteorg/datacatalog/pkg/errors" + "google.golang.org/grpc/codes" + + mocket "github.com/Selvatico/go-mocket" + "github.com/flyteorg/datacatalog/pkg/repositories/errors" + "github.com/flyteorg/datacatalog/pkg/repositories/models" + "github.com/flyteorg/flytestdlib/promutils" + "github.com/stretchr/testify/assert" +) + +func TestCreate(t *testing.T) { + GlobalMock := mocket.Catcher.Reset() + GlobalMock.Logging = true + expectedReservation := GetReservation() + + GlobalMock.NewMock().WithQuery( + `INSERT INTO "reservations" ("created_at","updated_at","deleted_at","dataset_project","dataset_name","dataset_domain","dataset_version","tag_name","owner_id","expires_at","serialized_metadata") VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) ON CONFLICT DO NOTHING`, + ).WithRowsNum(1) + + reservationRepo := getReservationRepo(t) + + err := reservationRepo.Create(context.Background(), expectedReservation, time.Now()) + assert.NoError(t, err) +} + +func TestDelete(t *testing.T) { + GlobalMock := mocket.Catcher.Reset() + GlobalMock.Logging = true + expectedReservation := GetReservation() + + GlobalMock.NewMock().WithQuery( + `DELETE FROM "reservations" WHERE "reservations"."dataset_project" = $1 AND "reservations"."dataset_name" = $2 AND "reservations"."dataset_domain" = $3 AND "reservations"."dataset_version" = $4 AND "reservations"."tag_name" = $5 AND "reservations"."owner_id" = $6`, + ).WithRowsNum(1) + + reservationRepo := getReservationRepo(t) + + err := reservationRepo.Delete(context.Background(), expectedReservation.ReservationKey, expectedReservation.OwnerID) + assert.NoError(t, err) +} + +func TestDeleteFailure(t *testing.T) { + GlobalMock := mocket.Catcher.Reset() + GlobalMock.Logging = true + expectedReservation := GetReservation() + + GlobalMock.NewMock().WithQuery( + `DELETE FROM "reservations" WHERE "reservations"."dataset_project" = $1 AND "reservations"."dataset_name" = $2 AND "reservations"."dataset_domain" = $3 AND "reservations"."dataset_version" = $4 AND "reservations"."tag_name" = $5 AND "reservations"."owner_id" = $6`, + ).WithRowsNum(0) + + reservationRepo := getReservationRepo(t) + + err := reservationRepo.Delete(context.Background(), expectedReservation.ReservationKey, expectedReservation.OwnerID) + assert.Error(t, err) + assert.Equal(t, "missing entity of type Reservation with identifier dataset_id: tag_name:\"testTag\" ", err.Error()) +} + +func TestGet(t *testing.T) { + expectedReservation := GetReservation() + + GlobalMock := mocket.Catcher.Reset() + GlobalMock.Logging = true + + GlobalMock.NewMock().WithQuery( + `SELECT * FROM "reservations" WHERE "reservations"."dataset_project" = $1 AND "reservations"."dataset_name" = $2 AND "reservations"."dataset_domain" = $3 AND "reservations"."dataset_version" = $4 AND "reservations"."tag_name" = $5 LIMIT 1%!!(string=testTag)!(string=testVersion)!(string=testDomain)!(string=testDataset)(EXTRA string=testProject)`, + ).WithReply(getDBResponse(expectedReservation)) + + reservationRepo := getReservationRepo(t) + reservation, err := reservationRepo.Get(context.Background(), expectedReservation.ReservationKey) + assert.Nil(t, err) + assert.Equal(t, expectedReservation.DatasetProject, reservation.DatasetProject) + assert.Equal(t, expectedReservation.DatasetDomain, reservation.DatasetDomain) + assert.Equal(t, expectedReservation.DatasetName, reservation.DatasetName) + assert.Equal(t, expectedReservation.DatasetVersion, reservation.DatasetVersion) + assert.Equal(t, expectedReservation.TagName, reservation.TagName) + assert.Equal(t, expectedReservation.ExpiresAt, reservation.ExpiresAt) +} + +func TestGetNotFound(t *testing.T) { + expectedReservation := GetReservation() + + GlobalMock := mocket.Catcher.Reset() + GlobalMock.Logging = true + + GlobalMock.NewMock().WithError(gorm.ErrRecordNotFound) + + reservationRepo := getReservationRepo(t) + _, err := reservationRepo.Get(context.Background(), expectedReservation.ReservationKey) + assert.Error(t, err) + dcErr, ok := err.(apiErrors.DataCatalogError) + assert.True(t, ok) + assert.Equal(t, dcErr.Code(), codes.NotFound) + +} + +func TestUpdate(t *testing.T) { + GlobalMock := mocket.Catcher.Reset() + GlobalMock.Logging = true + expectedReservation := GetReservation() + + GlobalMock.NewMock().WithQuery( + `UPDATE "reservations" SET "updated_at"=$1,"dataset_project"=$2,"dataset_name"=$3,"dataset_domain"=$4,"dataset_version"=$5,"tag_name"=$6,"owner_id"=$7,"expires_at"=$8 WHERE (expires_at<=$9 OR owner_id=$10) AND "dataset_project" = $11 AND "dataset_name" = $12 AND "dataset_domain" = $13 AND "dataset_version" = $14 AND "tag_name" = $15`, + ).WithRowsNum(1) + + reservationRepo := getReservationRepo(t) + + err := reservationRepo.Update(context.Background(), expectedReservation, time.Now()) + assert.NoError(t, err) +} + +func TestUpdateFailure(t *testing.T) { + GlobalMock := mocket.Catcher.Reset() + GlobalMock.Logging = true + expectedReservation := GetReservation() + + GlobalMock.NewMock().WithQuery( + `UPDATE "reservations" SET "updated_at"=$1,"dataset_project"=$2,"dataset_name"=$3,"dataset_domain"=$4,"dataset_version"=$5,"tag_name"=$6,"owner_id"=$7,"expires_at"=$8 WHERE (expires_at<=$9 OR owner_id=$10) AND "dataset_project" = $11 AND "dataset_name" = $12 AND "dataset_domain" = $13 AND "dataset_version" = $14 AND "tag_name" = $15`, + ).WithRowsNum(0) + + reservationRepo := getReservationRepo(t) + + err := reservationRepo.Update(context.Background(), expectedReservation, time.Now()) + assert.Error(t, err) + assert.Equal(t, "entity already exists", err.Error()) +} + +func getReservationRepo(t *testing.T) interfaces.ReservationRepo { + mocket.Catcher.Register() + sqlDB, err := sql.Open(mocket.DriverName, "blah") + assert.Nil(t, err) + + db, err := gorm.Open(postgres.New(postgres.Config{Conn: sqlDB})) + if err != nil { + t.Fatal(fmt.Sprintf("Failed to open mock db with err %v", err)) + } + + return NewReservationRepo(db, errors.NewPostgresErrorTransformer(), promutils.NewTestScope()) +} + +func getDBResponse(reservation models.Reservation) []map[string]interface{} { + return []map[string]interface{}{ + { + "dataset_project": reservation.DatasetProject, + "dataset_name": reservation.DatasetName, + "dataset_domain": reservation.DatasetDomain, + "dataset_version": reservation.DatasetVersion, + "tag_name": reservation.TagName, + "owner_id": reservation.OwnerID, + "expires_at": reservation.ExpiresAt, + }, + } +} + +func GetReservationKey() models.ReservationKey { + return models.ReservationKey{ + DatasetProject: "testProject", + DatasetName: "testDataset", + DatasetDomain: "testDomain", + DatasetVersion: "testVersion", + TagName: "testTag", + } +} + +func GetReservation() models.Reservation { + reservation := models.Reservation{ + ReservationKey: GetReservationKey(), + OwnerID: "batman", + ExpiresAt: time.Unix(1, 1), + } + return reservation +} diff --git a/pkg/repositories/handle.go b/pkg/repositories/handle.go index 3d5bafa2..bb0d8cd9 100644 --- a/pkg/repositories/handle.go +++ b/pkg/repositories/handle.go @@ -93,5 +93,9 @@ func (h *DBHandle) Migrate(ctx context.Context) error { return err } + if err := h.db.AutoMigrate(&models.Reservation{}); err != nil { + return err + } + return nil } diff --git a/pkg/repositories/interfaces/base.go b/pkg/repositories/interfaces/base.go index 864d724d..4aad6955 100644 --- a/pkg/repositories/interfaces/base.go +++ b/pkg/repositories/interfaces/base.go @@ -4,4 +4,5 @@ type DataCatalogRepo interface { DatasetRepo() DatasetRepo ArtifactRepo() ArtifactRepo TagRepo() TagRepo + ReservationRepo() ReservationRepo } diff --git a/pkg/repositories/interfaces/reservation_repo.go b/pkg/repositories/interfaces/reservation_repo.go new file mode 100644 index 00000000..950742b9 --- /dev/null +++ b/pkg/repositories/interfaces/reservation_repo.go @@ -0,0 +1,26 @@ +package interfaces + +import ( + "context" + "time" + + "github.com/flyteorg/datacatalog/pkg/repositories/models" +) + +// Interface to interact with Reservation Table +type ReservationRepo interface { + + // Create a new reservation if the reservation does not already exist + Create(ctx context.Context, reservation models.Reservation, now time.Time) error + + // Delete a reservation if it exists + Delete(ctx context.Context, reservation models.ReservationKey, ownerID string) error + + // Get reservation + Get(ctx context.Context, reservationKey models.ReservationKey) (models.Reservation, error) + + // Update an existing reservation. If called by the current owner, we update the + // expiresAt timestamp. If called by a new owner and the current reservation has + // expired, we attempt to take over the reservation. + Update(ctx context.Context, reservation models.Reservation, now time.Time) error +} diff --git a/pkg/repositories/mocks/base.go b/pkg/repositories/mocks/base.go index 827b69be..993fe79e 100644 --- a/pkg/repositories/mocks/base.go +++ b/pkg/repositories/mocks/base.go @@ -3,9 +3,10 @@ package mocks import "github.com/flyteorg/datacatalog/pkg/repositories/interfaces" type DataCatalogRepo struct { - MockDatasetRepo *DatasetRepo - MockArtifactRepo *ArtifactRepo - MockTagRepo *TagRepo + MockDatasetRepo *DatasetRepo + MockArtifactRepo *ArtifactRepo + MockTagRepo *TagRepo + MockReservationRepo *ReservationRepo } func (m *DataCatalogRepo) DatasetRepo() interfaces.DatasetRepo { @@ -19,3 +20,7 @@ func (m *DataCatalogRepo) ArtifactRepo() interfaces.ArtifactRepo { func (m *DataCatalogRepo) TagRepo() interfaces.TagRepo { return m.MockTagRepo } + +func (m *DataCatalogRepo) ReservationRepo() interfaces.ReservationRepo { + return m.MockReservationRepo +} diff --git a/pkg/repositories/mocks/reservation.go b/pkg/repositories/mocks/reservation.go new file mode 100644 index 00000000..4cabc8a2 --- /dev/null +++ b/pkg/repositories/mocks/reservation.go @@ -0,0 +1,153 @@ +// Code generated by mockery v1.0.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + + models "github.com/flyteorg/datacatalog/pkg/repositories/models" + + time "time" +) + +// ReservationRepo is an autogenerated mock type for the ReservationRepo type +type ReservationRepo struct { + mock.Mock +} + +type ReservationRepo_Create struct { + *mock.Call +} + +func (_m ReservationRepo_Create) Return(_a0 error) *ReservationRepo_Create { + return &ReservationRepo_Create{Call: _m.Call.Return(_a0)} +} + +func (_m *ReservationRepo) OnCreate(ctx context.Context, reservation models.Reservation, now time.Time) *ReservationRepo_Create { + c := _m.On("Create", ctx, reservation, now) + return &ReservationRepo_Create{Call: c} +} + +func (_m *ReservationRepo) OnCreateMatch(matchers ...interface{}) *ReservationRepo_Create { + c := _m.On("Create", matchers...) + return &ReservationRepo_Create{Call: c} +} + +// Create provides a mock function with given fields: ctx, reservation, now +func (_m *ReservationRepo) Create(ctx context.Context, reservation models.Reservation, now time.Time) error { + ret := _m.Called(ctx, reservation, now) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, models.Reservation, time.Time) error); ok { + r0 = rf(ctx, reservation, now) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type ReservationRepo_Delete struct { + *mock.Call +} + +func (_m ReservationRepo_Delete) Return(_a0 error) *ReservationRepo_Delete { + return &ReservationRepo_Delete{Call: _m.Call.Return(_a0)} +} + +func (_m *ReservationRepo) OnDelete(ctx context.Context, reservation models.ReservationKey, ownerID string) *ReservationRepo_Delete { + c := _m.On("Delete", ctx, reservation, ownerID) + return &ReservationRepo_Delete{Call: c} +} + +func (_m *ReservationRepo) OnDeleteMatch(matchers ...interface{}) *ReservationRepo_Delete { + c := _m.On("Delete", matchers...) + return &ReservationRepo_Delete{Call: c} +} + +// Delete provides a mock function with given fields: ctx, reservation, ownerID +func (_m *ReservationRepo) Delete(ctx context.Context, reservation models.ReservationKey, ownerID string) error { + ret := _m.Called(ctx, reservation, ownerID) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, models.ReservationKey, string) error); ok { + r0 = rf(ctx, reservation, ownerID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type ReservationRepo_Get struct { + *mock.Call +} + +func (_m ReservationRepo_Get) Return(_a0 models.Reservation, _a1 error) *ReservationRepo_Get { + return &ReservationRepo_Get{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *ReservationRepo) OnGet(ctx context.Context, reservationKey models.ReservationKey) *ReservationRepo_Get { + c := _m.On("Get", ctx, reservationKey) + return &ReservationRepo_Get{Call: c} +} + +func (_m *ReservationRepo) OnGetMatch(matchers ...interface{}) *ReservationRepo_Get { + c := _m.On("Get", matchers...) + return &ReservationRepo_Get{Call: c} +} + +// Get provides a mock function with given fields: ctx, reservationKey +func (_m *ReservationRepo) Get(ctx context.Context, reservationKey models.ReservationKey) (models.Reservation, error) { + ret := _m.Called(ctx, reservationKey) + + var r0 models.Reservation + if rf, ok := ret.Get(0).(func(context.Context, models.ReservationKey) models.Reservation); ok { + r0 = rf(ctx, reservationKey) + } else { + r0 = ret.Get(0).(models.Reservation) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, models.ReservationKey) error); ok { + r1 = rf(ctx, reservationKey) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type ReservationRepo_Update struct { + *mock.Call +} + +func (_m ReservationRepo_Update) Return(_a0 error) *ReservationRepo_Update { + return &ReservationRepo_Update{Call: _m.Call.Return(_a0)} +} + +func (_m *ReservationRepo) OnUpdate(ctx context.Context, reservation models.Reservation, now time.Time) *ReservationRepo_Update { + c := _m.On("Update", ctx, reservation, now) + return &ReservationRepo_Update{Call: c} +} + +func (_m *ReservationRepo) OnUpdateMatch(matchers ...interface{}) *ReservationRepo_Update { + c := _m.On("Update", matchers...) + return &ReservationRepo_Update{Call: c} +} + +// Update provides a mock function with given fields: ctx, reservation, now +func (_m *ReservationRepo) Update(ctx context.Context, reservation models.Reservation, now time.Time) error { + ret := _m.Called(ctx, reservation, now) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, models.Reservation, time.Time) error); ok { + r0 = rf(ctx, reservation, now) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/pkg/repositories/models/reservation.go b/pkg/repositories/models/reservation.go new file mode 100644 index 00000000..366801ff --- /dev/null +++ b/pkg/repositories/models/reservation.go @@ -0,0 +1,26 @@ +package models + +import "time" + +// ReservationKey uniquely identifies a reservation +type ReservationKey struct { + DatasetProject string `gorm:"primary_key"` + DatasetName string `gorm:"primary_key"` + DatasetDomain string `gorm:"primary_key"` + DatasetVersion string `gorm:"primary_key"` + TagName string `gorm:"primary_key"` +} + +// Reservation tracks the metadata needed to allow +// task cache serialization +type Reservation struct { + BaseModel + ReservationKey + + // Identifies who owns the reservation + OwnerID string + + // When the reservation will expire + ExpiresAt time.Time + SerializedMetadata []byte +} diff --git a/pkg/repositories/postgres_repo.go b/pkg/repositories/postgres_repo.go index 5da0dca7..c51d5db6 100644 --- a/pkg/repositories/postgres_repo.go +++ b/pkg/repositories/postgres_repo.go @@ -9,9 +9,10 @@ import ( ) type PostgresRepo struct { - datasetRepo interfaces.DatasetRepo - artifactRepo interfaces.ArtifactRepo - tagRepo interfaces.TagRepo + datasetRepo interfaces.DatasetRepo + artifactRepo interfaces.ArtifactRepo + tagRepo interfaces.TagRepo + reservationRepo interfaces.ReservationRepo } func (dc *PostgresRepo) DatasetRepo() interfaces.DatasetRepo { @@ -26,10 +27,15 @@ func (dc *PostgresRepo) TagRepo() interfaces.TagRepo { return dc.tagRepo } +func (dc *PostgresRepo) ReservationRepo() interfaces.ReservationRepo { + return dc.reservationRepo +} + func NewPostgresRepo(db *gorm.DB, errorTransformer errors.ErrorTransformer, scope promutils.Scope) interfaces.DataCatalogRepo { return &PostgresRepo{ - datasetRepo: gormimpl.NewDatasetRepo(db, errorTransformer, scope.NewSubScope("dataset")), - artifactRepo: gormimpl.NewArtifactRepo(db, errorTransformer, scope.NewSubScope("artifact")), - tagRepo: gormimpl.NewTagRepo(db, errorTransformer, scope.NewSubScope("tag")), + datasetRepo: gormimpl.NewDatasetRepo(db, errorTransformer, scope.NewSubScope("dataset")), + artifactRepo: gormimpl.NewArtifactRepo(db, errorTransformer, scope.NewSubScope("artifact")), + tagRepo: gormimpl.NewTagRepo(db, errorTransformer, scope.NewSubScope("tag")), + reservationRepo: gormimpl.NewReservationRepo(db, errorTransformer, scope.NewSubScope("reservation")), } } diff --git a/pkg/repositories/transformers/reservation.go b/pkg/repositories/transformers/reservation.go new file mode 100644 index 00000000..50e9133b --- /dev/null +++ b/pkg/repositories/transformers/reservation.go @@ -0,0 +1,48 @@ +package transformers + +import ( + "time" + + "github.com/flyteorg/datacatalog/pkg/errors" + "github.com/flyteorg/datacatalog/pkg/repositories/models" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog" + + "github.com/golang/protobuf/ptypes" + + "google.golang.org/grpc/codes" +) + +func FromReservationID(reservationID *datacatalog.ReservationID) models.ReservationKey { + datasetID := reservationID.DatasetId + + return models.ReservationKey{ + DatasetProject: datasetID.Project, + DatasetDomain: datasetID.Domain, + DatasetName: datasetID.Name, + DatasetVersion: datasetID.Version, + TagName: reservationID.TagName, + } +} + +func CreateReservation(reservation *models.Reservation, heartbeatInterval time.Duration) (datacatalog.Reservation, error) { + expiresAtPb, err := ptypes.TimestampProto(reservation.ExpiresAt) + if err != nil { + return datacatalog.Reservation{}, errors.NewDataCatalogErrorf(codes.Internal, "failed to serialize expires at time") + } + + heartbeatIntervalPb := ptypes.DurationProto(heartbeatInterval) + return datacatalog.Reservation{ + ReservationId: &datacatalog.ReservationID{ + DatasetId: &datacatalog.DatasetID{ + Project: reservation.DatasetProject, + Domain: reservation.DatasetDomain, + Name: reservation.DatasetName, + Version: reservation.DatasetVersion, + }, + TagName: reservation.TagName, + }, + OwnerId: reservation.OwnerID, + HeartbeatInterval: heartbeatIntervalPb, + ExpiresAt: expiresAtPb, + }, nil +} diff --git a/pkg/repositories/transformers/reservation_test.go b/pkg/repositories/transformers/reservation_test.go new file mode 100644 index 00000000..190c153b --- /dev/null +++ b/pkg/repositories/transformers/reservation_test.go @@ -0,0 +1,61 @@ +package transformers + +import ( + "testing" + "time" + + "github.com/flyteorg/datacatalog/pkg/repositories/models" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog" + "github.com/stretchr/testify/assert" +) + +func TestFromReservationID(t *testing.T) { + reservationID := datacatalog.ReservationID{ + DatasetId: &datacatalog.DatasetID{ + Project: "p", + Name: "n", + Domain: "d", + Version: "v", + }, + TagName: "t", + } + + reservationKey := FromReservationID(&reservationID) + assert.Equal(t, reservationKey.DatasetProject, reservationID.DatasetId.Project) + assert.Equal(t, reservationKey.DatasetName, reservationID.DatasetId.Name) + assert.Equal(t, reservationKey.DatasetDomain, reservationID.DatasetId.Domain) + assert.Equal(t, reservationKey.DatasetVersion, reservationID.DatasetId.Version) + assert.Equal(t, reservationKey.TagName, reservationID.TagName) +} + +func TestCreateReservation(t *testing.T) { + now := time.Now() + heartbeatInterval := time.Second * 5 + modelReservation := models.Reservation{ + ReservationKey: models.ReservationKey{ + DatasetProject: "p", + DatasetName: "n", + DatasetDomain: "d", + DatasetVersion: "v", + TagName: "t", + }, + OwnerID: "o", + ExpiresAt: now, + } + + reservation, err := CreateReservation(&modelReservation, heartbeatInterval) + + assert.Equal(t, err, nil) + assert.Equal(t, reservation.ExpiresAt.AsTime(), modelReservation.ExpiresAt.UTC()) + assert.Equal(t, reservation.HeartbeatInterval.AsDuration(), heartbeatInterval) + assert.Equal(t, reservation.OwnerId, modelReservation.OwnerID) + + reservationID := reservation.ReservationId + assert.Equal(t, reservationID.TagName, modelReservation.TagName) + + datasetID := reservationID.DatasetId + assert.Equal(t, datasetID.Project, modelReservation.DatasetProject) + assert.Equal(t, datasetID.Name, modelReservation.DatasetName) + assert.Equal(t, datasetID.Domain, modelReservation.DatasetDomain) + assert.Equal(t, datasetID.Version, modelReservation.DatasetVersion) +} diff --git a/pkg/rpc/datacatalogservice/service.go b/pkg/rpc/datacatalogservice/service.go index a8d1125c..53371d98 100644 --- a/pkg/rpc/datacatalogservice/service.go +++ b/pkg/rpc/datacatalogservice/service.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "runtime/debug" + "time" "github.com/flyteorg/datacatalog/pkg/manager/impl" "github.com/flyteorg/datacatalog/pkg/manager/interfaces" @@ -20,9 +21,10 @@ import ( ) type DataCatalogService struct { - DatasetManager interfaces.DatasetManager - ArtifactManager interfaces.ArtifactManager - TagManager interfaces.TagManager + DatasetManager interfaces.DatasetManager + ArtifactManager interfaces.ArtifactManager + TagManager interfaces.TagManager + ReservationManager interfaces.ReservationManager } func (s *DataCatalogService) CreateDataset(ctx context.Context, request *catalog.CreateDatasetRequest) (*catalog.CreateDatasetResponse, error) { @@ -53,6 +55,14 @@ func (s *DataCatalogService) ListDatasets(ctx context.Context, request *catalog. return s.DatasetManager.ListDatasets(ctx, request) } +func (s *DataCatalogService) GetOrExtendReservation(ctx context.Context, request *catalog.GetOrExtendReservationRequest) (*catalog.GetOrExtendReservationResponse, error) { + return s.ReservationManager.GetOrExtendReservation(ctx, request) +} + +func (s *DataCatalogService) ReleaseReservation(ctx context.Context, request *catalog.ReleaseReservationRequest) (*catalog.ReleaseReservationResponse, error) { + return s.ReservationManager.ReleaseReservation(ctx, request) +} + func NewDataCatalogService() *DataCatalogService { configProvider := runtime.NewConfigurationProvider() dataCatalogConfig := configProvider.ApplicationConfiguration().GetDataCatalogConfig() @@ -110,5 +120,7 @@ func NewDataCatalogService() *DataCatalogService { DatasetManager: impl.NewDatasetManager(repos, dataStorageClient, catalogScope.NewSubScope("dataset")), ArtifactManager: impl.NewArtifactManager(repos, dataStorageClient, storagePrefix, catalogScope.NewSubScope("artifact")), TagManager: impl.NewTagManager(repos, dataStorageClient, catalogScope.NewSubScope("tag")), + ReservationManager: impl.NewReservationManager(repos, time.Duration(dataCatalogConfig.HeartbeatGracePeriodMultiplier), dataCatalogConfig.MaxReservationHeartbeat.Duration, time.Now, + catalogScope.NewSubScope("reservation")), } } diff --git a/pkg/runtime/configs/data_catalog_config.go b/pkg/runtime/configs/data_catalog_config.go index c7160dca..ec353a2c 100644 --- a/pkg/runtime/configs/data_catalog_config.go +++ b/pkg/runtime/configs/data_catalog_config.go @@ -1,10 +1,16 @@ package configs +import ( + "github.com/flyteorg/flytestdlib/config" +) + //go:generate pflags DataCatalogConfig // This configuration is the base configuration to start admin type DataCatalogConfig struct { - StoragePrefix string `json:"storage-prefix" pflag:",StoragePrefix specifies the prefix where DataCatalog stores offloaded ArtifactData in CloudStorage. If not specified, the data will be stored in the base container directly."` - MetricsScope string `json:"metrics-scope" pflag:",Scope that the metrics will record under."` - ProfilerPort int `json:"profiler-port" pflag:",Port that the profiling service is listening on."` + StoragePrefix string `json:"storage-prefix" pflag:",StoragePrefix specifies the prefix where DataCatalog stores offloaded ArtifactData in CloudStorage. If not specified, the data will be stored in the base container directly."` + MetricsScope string `json:"metrics-scope" pflag:",Scope that the metrics will record under."` + ProfilerPort int `json:"profiler-port" pflag:",Port that the profiling service is listening on."` + HeartbeatGracePeriodMultiplier int `json:"heartbeat-grace-period-multiplier" pflag:",Number of heartbeats before a reservation expires without an extension."` + MaxReservationHeartbeat config.Duration `json:"max-reservation-heartbeat" pflag:",The maximum available reservation extension heartbeat interval."` }