Skip to content

Commit

Permalink
feat: updated query to return correct data
Browse files Browse the repository at this point in the history
  • Loading branch information
joshghent committed Sep 23, 2022
1 parent 906a7d5 commit 4edf2bf
Showing 1 changed file with 38 additions and 19 deletions.
57 changes: 38 additions & 19 deletions packages/cron/src/jobs/pins-backup.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { pipe } from 'it-pipe'
import { CID } from 'multiformats'
import * as raw from 'multiformats/codecs/raw'
import { Readable } from 'stream'
import { Dagula } from 'dagula'
import { getLibp2p } from 'dagula/p2p.js'

export default class Backup {
constructor (env) {
Expand All @@ -27,11 +29,19 @@ export default class Backup {
this.env = env
this.LIMIT = env.QUERY_LIMIT !== undefined ? env.QUERY_LIMIT : 10000
this.GET_PINNED_PINS_QUERY = `
SELECT psa.id, psa.backup_urls, psa.source_cid, psa.content_cid, psa.name, psa.auth_key_id
SELECT DISTINCT ON(psa.id)
psa.id,
psa.backup_urls,
psa.source_cid,
psa.content_cid,
psa.name,
psa.auth_key_id,
pl.ipfs_peer_id
FROM psa_pin_request psa
JOIN pin p ON p.content_cid = psa.content_cid
JOIN pin_location pl ON pl.id = p.pin_location_id
WHERE p.status = 'Pinned'
AND psa.backup_urls IS NULL
AND psa.backup_urls = '{}'
LIMIT $1
`
this.s3 = new AWS.S3({})
Expand Down Expand Up @@ -64,12 +74,15 @@ export default class Backup {
* @param {string} bucketName
*/
uploadCar (s3, bucketName) {
// rebind this for curried function
const _this = this

/**
* @param {AsyncIterable<import('./bindings').BackupContent} source
*/
return async function * (source) {
for await (const bak of source) {
const backupUrl = await this.s3Upload(s3, bucketName, bak)
const backupUrl = await _this.s3Upload(s3, bucketName, bak)
/** @type {import('./bindings').RemoteBackup} */
const backup = { ...bak, backupUrl }
yield backup
Expand All @@ -83,10 +96,11 @@ export default class Backup {
* @param {import('./bindings').BackupContent} bak
*/
async s3Upload (s3, bucketName, bak) {
const key = `complete/${bak.contentCid}/${bak.sourceCid}.car`
const region = await s3.config.region()
const key = `complete/${bak.sourceCid}.car`
const region = this.env.S3_BUCKET_REGION
const url = new URL(`https://${bucketName}.s3.${region}.amazonaws.com/${key}`)
this.log(`uploading to ${url}`)
this.log(`got bak ${JSON.stringify(bak)}`)

try {
// Request the head object of the file we are about to backup
Expand Down Expand Up @@ -117,26 +131,30 @@ export default class Backup {
*/
exportCar (ipfs, options = {}) {
/**
* @param {AsyncIterable<import('./bindings').BackupCandidate>} source
* @returns {AsyncIterableIterator<import('./bindings').BackupContent>}
* @param {AsyncIterable<import('./bindings').BackupCandidate>} source // TODO: Doesn't exist
* @returns {AsyncIterableIterator<import('./bindings').BackupContent>} // TODO: Doesn't exist
*/
const _this = this
return async function * (source) {
for await (const candidate of source) {
yield { ...candidate, content: this.ipfsDagExport(ipfs, candidate.sourceCid, options) }
for await (const pin of source) {
yield { ...pin, content: _this.ipfsDagExport(ipfs, pin.sourceCid, pin.peer, options) }
}
}
}

/**
* Export a CAR for the passed CID.
*
* @param {import('./ipfs-client').IpfsClient} ipfs
* @param {import('@nftstorage/ipfs-cluster').Cluster} ipfs
* @param {import('multiformats').CID} cid
* @param {string} peer
* @param {Object} [options]
* @param {number} [options.maxDagSize]
*/
async * ipfsDagExport (ipfs, cid, options) {
async * ipfsDagExport (ipfs, cid, peer, options) {
const maxDagSize = options.maxDagSize || this.MAX_DAG_SIZE
const libp2p = await getLibp2p()
const dagula = await Dagula.fromNetwork(libp2p, { peer })

let reportInterval
try {
Expand All @@ -157,8 +175,8 @@ export default class Backup {
this.log(`received ${this.fmt(bytesReceived)} of ${formattedTotal} bytes`)
}, this.REPORT_INTERVAL)

for await (const chunk of ipfs.dagExport(cid, { timeout: this.BLOCK_TIMEOUT })) {
bytesReceived += chunk.byteLength
for await (const chunk of dagula.get(cid, { timeout: this.BLOCK_TIMEOUT })) {
bytesReceived += chunk.bytes.length
yield chunk
}

Expand All @@ -169,7 +187,7 @@ export default class Backup {
}

/**
* @param {import('./ipfs-client').IpfsClient} ipfs
* @param {import('@nftstorage/ipfs-cluster').Cluster} ipfs
* @param {import('multiformats').CID} cid
* @returns {Promise<number | undefined>}
*/
Expand Down Expand Up @@ -204,7 +222,8 @@ export default class Backup {
sourceCid,
contentCid: CID.parse(pinnedPin.content_cid),
authKeyId: String(pinnedPin.auth_key_id),
pinRequestId: String(pinnedPin.id)
pinRequestId: String(pinnedPin.id),
peer: String(pinnedPin.ipfs_peer_id)
}
yield pin
}
Expand All @@ -224,24 +243,24 @@ export default class Backup {

await pipe(this.getPinsNotBackedUp(roPg), async (source) => {
for await (const pins of batch(source, concurrency)) {
this.log(`Got pins: ${JSON.stringify(pins)}`)
this.log(`Got ${pins.length} pins: ${JSON.stringify(pins)}`)
await Promise.all(pins.map(async pin => {
this.log(`processing pin ${JSON.stringify(pin)}`)
try {
await pipe(
[pin],
this.exportCar(cluster),
this.uploadCar(this.s3, this.env.s3PickupBucketName),
this.uploadCar(this.s3, this.env.S3_BUCKET_NAME),
this.registerBackup(rwPg, pin.contentCid, pin.pinRequestId)
)
totalSuccessful++
} catch (err) {
this.log(`failed to backup ${pin.sourceCid}`, err)
}
totalProcessed++
}))
totalProcessed++
}
this.log(`processed ${totalSuccessful} of ${totalProcessed} CIDs successfully`)
if (totalProcessed > 0) this.log(`processed ${totalSuccessful} of ${totalProcessed} CIDs successfully`)
})
this.log('backup complete 🎉')
}
Expand Down

0 comments on commit 4edf2bf

Please sign in to comment.