Skip to content

Commit

Permalink
Repo update advisory lock (#1230)
Browse files Browse the repository at this point in the history
* use for no key update

* use a tx advisory lock for repo updates

* skip tests for sqlite

* move check of commit swap on rebase

* do lock before formatting rebase

* hash schema in for lock id

* no tx lock in sqlite

* move rebase formatting to tx

* move dialect check

* rm log

* make the lock ids a bit safer

* change how we do lock id

* refactor id generator
  • Loading branch information
dholms authored Jun 29, 2023
1 parent 9263ddc commit 7cb8c62
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 29 deletions.
13 changes: 13 additions & 0 deletions packages/crypto/src/random.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as noble from '@noble/hashes/utils'
import * as uint8arrays from 'uint8arrays'
import { SupportedEncodings } from 'uint8arrays/to-string'
import { sha256 } from './sha'

export const randomBytes = noble.randomBytes

Expand All @@ -11,3 +12,15 @@ export const randomStr = (
const bytes = randomBytes(byteLength)
return uint8arrays.toString(bytes, encoding)
}

export const randomIntFromSeed = async (
seed: string,
high: number,
low = 0,
): Promise<number> => {
const hash = await sha256(seed)
const number = Buffer.from(hash).readUintBE(0, 6)
const range = high - low
const normalized = number % range
return normalized + low
}
4 changes: 3 additions & 1 deletion packages/dev-env/src/pds.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import getPort from 'get-port'
import * as ui8 from 'uint8arrays'
import * as pds from '@atproto/pds'
import { Secp256k1Keypair } from '@atproto/crypto'
import { Secp256k1Keypair, randomStr } from '@atproto/crypto'
import { MessageDispatcher } from '@atproto/pds/src/event-stream/message-queue'
import { AtpAgent } from '@atproto/api'
import { Client as PlcClient } from '@did-plc/lib'
Expand Down Expand Up @@ -63,6 +63,7 @@ export class TestPds {
labelerDid: 'did:example:labeler',
labelerKeywords: { label_me: 'test-label', label_me_2: 'test-label-2' },
feedGenDid: 'did:example:feedGen',
dbTxLockNonce: await randomStr(32, 'base32'),
...cfg,
})

Expand All @@ -71,6 +72,7 @@ export class TestPds {
? pds.Database.postgres({
url: config.dbPostgresUrl,
schema: config.dbPostgresSchema,
txLockNonce: config.dbTxLockNonce,
})
: pds.Database.memory()
await db.migrateToLatestOrThrow()
Expand Down
10 changes: 10 additions & 0 deletions packages/pds/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ export interface ServerConfigValues {
repoBackfillLimitMs: number
sequencerLeaderLockId?: number

// this is really only used in test environments
dbTxLockNonce?: string

bskyAppViewEndpoint?: string
bskyAppViewDid?: string

Expand Down Expand Up @@ -172,6 +175,8 @@ export class ServerConfig {
undefined,
)

const dbTxLockNonce = nonemptyString(process.env.DB_TX_LOCK_NONCE)

const bskyAppViewEndpoint = nonemptyString(
process.env.BSKY_APP_VIEW_ENDPOINT,
)
Expand Down Expand Up @@ -221,6 +226,7 @@ export class ServerConfig {
maxSubscriptionBuffer,
repoBackfillLimitMs,
sequencerLeaderLockId,
dbTxLockNonce,
bskyAppViewEndpoint,
bskyAppViewDid,
crawlersToNotify,
Expand Down Expand Up @@ -414,6 +420,10 @@ export class ServerConfig {
return this.cfg.sequencerLeaderLockId
}

get dbTxLockNonce() {
return this.cfg.dbTxLockNonce
}

get bskyAppViewEndpoint() {
return this.cfg.bskyAppViewEndpoint
}
Expand Down
30 changes: 28 additions & 2 deletions packages/pds/src/db/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import { dummyDialect } from './util'
import * as migrations from './migrations'
import { CtxMigrationProvider } from './migrations/provider'
import { dbLogger as log } from '../logger'
import { randomIntFromSeed } from '@atproto/crypto'

export class Database {
txEvt = new EventEmitter() as TxnEmitter
txChannelEvts: ChannelEvt[] = []
txLockNonce: string | undefined
channels: Channels
migrator: Migrator
destroyed = false
Expand All @@ -46,6 +48,7 @@ export class Database {
new_repo_event: new EventEmitter() as ChannelEmitter,
outgoing_repo_seq: new EventEmitter() as ChannelEmitter,
}
this.txLockNonce = cfg.dialect === 'pg' ? cfg.txLockNonce : undefined
}

static sqlite(location: string): Database {
Expand All @@ -58,7 +61,7 @@ export class Database {
}

static postgres(opts: PgOptions): Database {
const { schema, url } = opts
const { schema, url, txLockNonce } = opts
const pool =
opts.pool ??
new PgPool({
Expand Down Expand Up @@ -89,7 +92,13 @@ export class Database {
dialect: new PostgresDialect({ pool }),
})

return new Database(db, { dialect: 'pg', pool, schema, url })
return new Database(db, {
dialect: 'pg',
pool,
schema,
url,
txLockNonce,
})
}

static memory(): Database {
Expand Down Expand Up @@ -182,6 +191,17 @@ export class Database {
return txRes
}

async txAdvisoryLock(name: string): Promise<boolean> {
this.assertTransaction()
assert(this.dialect === 'pg', 'Postgres required')
// any lock id < 10k is reserved for session locks
const id = await randomIntFromSeed(name, Number.MAX_SAFE_INTEGER, 10000)
const res = (await sql`SELECT pg_try_advisory_xact_lock(${sql.literal(
id,
)}) as acquired`.execute(this.db)) as TxLockRes
return res.rows[0]?.acquired === true
}

get schema(): string | undefined {
return this.cfg.dialect === 'pg' ? this.cfg.schema : undefined
}
Expand Down Expand Up @@ -299,6 +319,7 @@ export type PgConfig = {
pool: PgPool
url: string
schema?: string
txLockNonce?: string
}

export type SqliteConfig = {
Expand All @@ -315,6 +336,7 @@ type PgOptions = {
poolSize?: number
poolMaxUses?: number
poolIdleTimeoutMs?: number
txLockNonce?: string
}

type ChannelEvents = {
Expand Down Expand Up @@ -356,3 +378,7 @@ class LeakyTxPlugin implements KyselyPlugin {
return args.result
}
}

type TxLockRes = {
rows: { acquired: true | false }[]
}
25 changes: 13 additions & 12 deletions packages/pds/src/services/repo/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ export class RepoService {
) {
this.db.assertTransaction()
const storage = new SqlRepoStorage(this.db, did, now)
const locked = await storage.lockHead()
if (!locked || !locked.equals(commitData.prev)) {
const locked = await storage.lockRepo()
if (!locked) {
throw new ConcurrentWriteError()
}
await Promise.all([
Expand Down Expand Up @@ -245,23 +245,27 @@ export class RepoService {

async rebaseRepo(did: string, swapCommit?: CID) {
this.db.assertNotTransaction()
const rebaseData = await this.formatRebase(did, swapCommit)

// rebases are expensive & should be done rarely, we don't try to re-process on concurrent writes
await this.serviceTx(async (srvcTx) =>
srvcTx.processRebase(did, rebaseData),
)
await this.serviceTx(async (srvcTx) => {
const rebaseData = await srvcTx.formatRebase(did, swapCommit)
await srvcTx.processRebase(did, rebaseData)
})
}

async formatRebase(did: string, swapCommit?: CID): Promise<RebaseData> {
const storage = new SqlRepoStorage(this.db, did, new Date().toISOString())
const locked = await storage.lockRepo()
if (!locked) {
throw new ConcurrentWriteError()
}

const currRoot = await storage.getHead()
if (!currRoot) {
throw new InvalidRequestError(
`${did} is not a registered repo on this server`,
)
}
if (swapCommit && !currRoot.equals(swapCommit)) {
} else if (swapCommit && !currRoot.equals(swapCommit)) {
throw new BadCommitSwapError(currRoot)
}

Expand Down Expand Up @@ -303,11 +307,8 @@ export class RepoService {

async processRebase(did: string, rebaseData: RebaseData) {
this.db.assertTransaction()

const storage = new SqlRepoStorage(this.db, did)
const lockedHead = await storage.lockHead()
if (!rebaseData.rebased.equals(lockedHead)) {
throw new ConcurrentWriteError()
}

const recordCountBefore = await this.countRecordBlocks(did)
await Promise.all([
Expand Down
14 changes: 3 additions & 11 deletions packages/pds/src/sql-repo-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,9 @@ export class SqlRepoStorage extends RepoStorage {
}

// note this method will return null if the repo has a lock on it currently
async lockHead(): Promise<CID | null> {
let builder = this.db.db
.selectFrom('repo_root')
.selectAll()
.where('did', '=', this.did)
if (this.db.dialect !== 'sqlite') {
builder = builder.forUpdate().skipLocked()
}
const res = await builder.executeTakeFirst()
if (!res) return null
return CID.parse(res.root)
async lockRepo(): Promise<boolean> {
if (this.db.dialect === 'sqlite') return true
return this.db.txAdvisoryLock(this.did)
}

async getHead(): Promise<CID | null> {
Expand Down
3 changes: 3 additions & 0 deletions packages/pds/tests/_util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ export const runTestServer = async (
maxSubscriptionBuffer: 200,
repoBackfillLimitMs: HOUR,
sequencerLeaderLockId: uniqueLockId(),
dbTxLockNonce: await randomStr(32, 'base32'),
...params,
})

Expand All @@ -113,6 +114,7 @@ export const runTestServer = async (
? Database.postgres({
url: cfg.dbPostgresUrl,
schema: cfg.dbPostgresSchema,
txLockNonce: cfg.dbTxLockNonce,
})
: Database.memory()

Expand All @@ -123,6 +125,7 @@ export const runTestServer = async (
? Database.postgres({
url: cfg.dbPostgresUrl,
schema: cfg.dbPostgresSchema,
txLockNonce: cfg.dbTxLockNonce,
})
: db
if (opts.migration) {
Expand Down
35 changes: 34 additions & 1 deletion packages/pds/tests/db.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { sql } from 'kysely'
import { once } from 'events'
import { wait } from '@atproto/common'
import { createDeferrable, wait } from '@atproto/common'
import { Database } from '../src'
import { Leader, appMigration } from '../src/db/leader'
import { runTestServer, CloseFn } from './_util'
Expand Down Expand Up @@ -168,6 +168,39 @@ describe('db', () => {
})
})

describe('transaction advisory locks', () => {
it('allows locks in txs to run sequentially', async () => {
if (db.dialect !== 'pg') return
for (let i = 0; i < 100; i++) {
await db.transaction(async (dbTxn) => {
const locked = await dbTxn.txAdvisoryLock('asfd')
expect(locked).toBe(true)
})
}
})

it('locks block between txns', async () => {
if (db.dialect !== 'pg') return
const deferable = createDeferrable()
const tx1 = db.transaction(async (dbTxn) => {
const locked = await dbTxn.txAdvisoryLock('asdf')
expect(locked).toBe(true)
await deferable.complete
})
// give it just a second to ensure it gets the lock
await wait(10)
const tx2 = db.transaction(async (dbTxn) => {
const locked = await dbTxn.txAdvisoryLock('asdf')
expect(locked).toBe(false)
deferable.resolve()
await tx1
const locked2 = await dbTxn.txAdvisoryLock('asdf')
expect(locked2).toBe(true)
})
await tx2
})
})

describe('Leader', () => {
it('allows leaders to run sequentially.', async () => {
const task = async () => {
Expand Down
4 changes: 2 additions & 2 deletions packages/pds/tests/races.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ describe('crud operations', () => {
const now = new Date().toISOString()
await ctx.db.transaction(async (dbTxn) => {
const storage = new SqlRepoStorage(dbTxn, did, now)
const locked = await storage.lockHead()
if (!locked || !locked.equals(commitData.prev)) {
const locked = await storage.lockRepo()
if (!locked) {
throw new ConcurrentWriteError()
}
await wait(waitMs)
Expand Down

0 comments on commit 7cb8c62

Please sign in to comment.