Skip to content

Commit

Permalink
Lock the table of affected entities during update operations. (#720)
Browse files Browse the repository at this point in the history
  • Loading branch information
timburks authored Sep 1, 2022
1 parent f641fbe commit f1e9153
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 15 deletions.
4 changes: 1 addition & 3 deletions server/registry/actions_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (s *RegistryServer) UpdateApi(ctx context.Context, req *rpc.UpdateApiReques
}
var response *rpc.Api
if err := s.runInTransaction(ctx, func(ctx context.Context, db *storage.Client) error {
db.LockApis(ctx)
api, err := db.GetApi(ctx, name)
if err == nil {
if err := api.Update(req.GetApi(), models.ExpandMask(req.GetApi(), req.GetUpdateMask())); err != nil {
Expand All @@ -191,9 +192,6 @@ func (s *RegistryServer) UpdateApi(ctx context.Context, req *rpc.UpdateApiReques
return err
} else if status.Code(err) == codes.NotFound && req.GetAllowMissing() {
response, err = s.createApi(ctx, db, name, req.GetApi())
if storage.AlreadyExists(err) { // The API was created between the get and the create.
return status.Errorf(codes.Aborted, "update conflict, please retry")
}
return err
} else {
return err
Expand Down
1 change: 1 addition & 0 deletions server/registry/actions_artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (s *RegistryServer) ReplaceArtifact(ctx context.Context, req *rpc.ReplaceAr

var artifact *models.Artifact
err = db.Transaction(ctx, func(ctx context.Context, db *storage.Client) error {
db.LockArtifacts(ctx)
// Replacement should only succeed on artifacts that currently exist.
if _, err = db.GetArtifact(ctx, name); err != nil {
return err
Expand Down
4 changes: 1 addition & 3 deletions server/registry/actions_deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func (s *RegistryServer) UpdateApiDeployment(ctx context.Context, req *rpc.Updat
}
var response *rpc.ApiDeployment
if err = s.runInTransaction(ctx, func(ctx context.Context, db *storage.Client) error {
db.LockDeployments(ctx)
deployment, err := db.GetDeployment(ctx, name)
if err == nil {
// Apply the update to the deployment - possibly changing the revision ID.
Expand All @@ -226,9 +227,6 @@ func (s *RegistryServer) UpdateApiDeployment(ctx context.Context, req *rpc.Updat
return err
} else if status.Code(err) == codes.NotFound && req.GetAllowMissing() {
response, err = s.createDeployment(ctx, db, name, req.GetApiDeployment())
if storage.AlreadyExists(err) { // The deployment was created between the get and the create.
return status.Errorf(codes.Aborted, "update conflict, please retry")
}
return err
} else {
return err
Expand Down
4 changes: 1 addition & 3 deletions server/registry/actions_projects.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func (s *RegistryServer) UpdateProject(ctx context.Context, req *rpc.UpdateProje
}
var response *rpc.Project
if err := s.runInTransaction(ctx, func(ctx context.Context, db *storage.Client) error {
db.LockProjects(ctx)
project, err := db.GetProject(ctx, name)
if err == nil {
project.Update(req.GetProject(), models.ExpandMask(req.GetProject(), req.GetUpdateMask()))
Expand All @@ -163,9 +164,6 @@ func (s *RegistryServer) UpdateProject(ctx context.Context, req *rpc.UpdateProje
return nil
} else if status.Code(err) == codes.NotFound && req.GetAllowMissing() {
response, err = s.createProject(ctx, db, name, req.GetProject())
if storage.AlreadyExists(err) { // the project was created between the get and the create.
return status.Errorf(codes.Aborted, "update conflict, please retry")
}
return err
} else {
return err
Expand Down
4 changes: 1 addition & 3 deletions server/registry/actions_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func (s *RegistryServer) UpdateApiSpec(ctx context.Context, req *rpc.UpdateApiSp
}
var response *rpc.ApiSpec
if err = s.runInTransaction(ctx, func(ctx context.Context, db *storage.Client) error {
db.LockSpecs(ctx)
spec, err := db.GetSpec(ctx, name)
if err == nil {
// Apply the update to the spec - possibly changing the revision ID.
Expand All @@ -284,9 +285,6 @@ func (s *RegistryServer) UpdateApiSpec(ctx context.Context, req *rpc.UpdateApiSp
return err
} else if status.Code(err) == codes.NotFound && req.GetAllowMissing() {
response, err = s.createSpec(ctx, db, name, req.GetApiSpec())
if storage.AlreadyExists(err) { // the spec was created between the get and the create.
return status.Errorf(codes.Aborted, "update conflict, please retry")
}
return err
} else {
return err
Expand Down
4 changes: 1 addition & 3 deletions server/registry/actions_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (s *RegistryServer) UpdateApiVersion(ctx context.Context, req *rpc.UpdateAp
}
var response *rpc.ApiVersion
if err = s.runInTransaction(ctx, func(ctx context.Context, db *storage.Client) error {
db.LockVersions(ctx)
version, err := db.GetVersion(ctx, name)
if err == nil {
if err := version.Update(req.GetApiVersion(), models.ExpandMask(req.GetApiVersion(), req.GetUpdateMask())); err != nil {
Expand All @@ -191,9 +192,6 @@ func (s *RegistryServer) UpdateApiVersion(ctx context.Context, req *rpc.UpdateAp
return err
} else if status.Code(err) == codes.NotFound && req.GetAllowMissing() {
response, err = s.createApiVersion(ctx, db, name, req.GetApiVersion())
if storage.AlreadyExists(err) { // The version was created between the get and the create.
return status.Errorf(codes.Aborted, "update conflict, please retry")
}
return err
} else {
return err
Expand Down
53 changes: 53 additions & 0 deletions server/registry/internal/storage/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2022 Google LLC. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import (
"context"
"fmt"

_ "github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/dialers/postgres"
)

func (c *Client) lockTable(ctx context.Context, name string) *Client {
if c.db.WithContext(ctx).Name() != "postgres" {
return c
}
return &Client{db: c.db.Exec(fmt.Sprintf("LOCK TABLE %s IN ACCESS EXCLUSIVE MODE", name))}
}

func (c *Client) LockProjects(ctx context.Context) *Client {
return c.lockTable(ctx, "projects")
}

func (c *Client) LockApis(ctx context.Context) *Client {
return c.lockTable(ctx, "apis")
}

func (c *Client) LockVersions(ctx context.Context) *Client {
return c.lockTable(ctx, "versions")
}

func (c *Client) LockDeployments(ctx context.Context) *Client {
return c.lockTable(ctx, "deployments")
}

func (c *Client) LockSpecs(ctx context.Context) *Client {
return c.lockTable(ctx, "specs")
}

func (c *Client) LockArtifacts(ctx context.Context) *Client {
return c.lockTable(ctx, "artifacts")
}

0 comments on commit f1e9153

Please sign in to comment.