Skip to content

Commit

Permalink
Fix state/status semantics and refactor helia tests to use utils.
Browse files Browse the repository at this point in the history
  • Loading branch information
saul-jb committed Apr 9, 2024
1 parent 7127f42 commit e49373b
Show file tree
Hide file tree
Showing 20 changed files with 83 additions and 169 deletions.
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/benchmarks/src/import/import-bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export const createImportBench = async (size: number, persistent: boolean): Prom

async run () {
const [{ cid }] = await client.import(group, dataFile, { path: '/test' })
const [item] = await client.getStatus([cid])
const [item] = await client.getState([cid])

return item
}
Expand Down
6 changes: 3 additions & 3 deletions packages/benchmarks/src/transfer/transfer-bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export const createTransferBench = async (size: number, persistent: boolean): Pr

const dataFile = Path.join(dataPath, `${size}.data`)
const [{ cid }] = await clients[0].import(group, dataFile, { path: '/test' })
const [item] = await clients[0].getStatus([cid])
const [item] = await clients[0].getState([cid])

return {
blocks: item.blocks,
Expand All @@ -44,9 +44,9 @@ export const createTransferBench = async (size: number, persistent: boolean): Pr

async run () {
for (;;) {
const [{ state }] = await clients[1].getStatus([cid])
const [{ status }] = await clients[1].getState([cid])

if (state === 'COMPLETED') {
if (status === 'COMPLETED') {
break
}

Expand Down
20 changes: 10 additions & 10 deletions packages/cli/src/commands/list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,20 @@ export const handler = createHandler<typeof builder>(async function * (argv) {
return
}

const statuses = await argv.client.getStatus(items.map(i => i.cid))
const states = await argv.client.getState(items.map(i => i.cid))

const getStatus = ({ cid }: { cid: string }): (typeof statuses)[number] =>
statuses.find(s => s.cid === cid) ?? { state: 'NOTFOUND', blocks: 0, size: 0, cid }
const getState = ({ cid }: { cid: string }): (typeof states)[number] =>
states.find(s => s.cid === cid) ?? { status: 'NOTFOUND', blocks: 0, size: 0, cid }

const peers = await argv.client.countPeers(items.map(i => i.cid))

const getPeers = ({ cid }: { cid: string }): number =>
peers.find(p => p.cid === cid)?.peers ?? 0

const completed = {
blocks: items.map(getStatus).reduce((a, b) => a + b.blocks, 0),
size: items.map(getStatus).reduce((a, b) => a + b.size, 0),
count: items.map(getStatus).filter(i => i.state === 'COMPLETED').length
blocks: items.map(getState).reduce((a, b) => a + b.blocks, 0),
size: items.map(getState).reduce((a, b) => a + b.size, 0),
count: items.map(getState).filter(i => i.status === 'COMPLETED').length
}

const speeds = await argv.client.getSpeeds(items.map(i => i.cid), 5000)
Expand Down Expand Up @@ -129,14 +129,14 @@ export const handler = createHandler<typeof builder>(async function * (argv) {
for (const [key, subtree] of Object.entries(tree)) {
try {
const [item] = List.Return.parse([subtree])
const timeRemaining = Math.ceil((item.size - getStatus(item).size) / (getSpeed(item) * 1000))
const timeRemaining = Math.ceil((item.size - getState(item).size) / (getSpeed(item) * 1000))

yield [
`${' '.repeat(depth)}${key}`.slice(0, 18).padEnd(20),
`${formatSize(getStatus(item).size)}/${formatSize(item.size)} (${formatPercent(getStatus(item).size / item.size)})`.slice(0, 25).padEnd(27),
`${formatSize(getState(item).size)}/${formatSize(item.size)} (${formatPercent(getState(item).size / item.size)})`.slice(0, 25).padEnd(27),
`${formatSize(getSpeed(item) * 1000)}/s ${isNaN(timeRemaining) ? '' : `(${timeRemaining} s)`}`.slice(0, 25).padEnd(27),
`${getStatus(item).blocks}/${item.blocks} (${formatPercent(getStatus(item).blocks / item.blocks)})`.slice(0, 18).padEnd(20),
getStatus(item).state.slice(0, 13).padEnd(15),
`${getState(item).blocks}/${item.blocks} (${formatPercent(getState(item).blocks / item.blocks)})`.slice(0, 18).padEnd(20),
getState(item).status.slice(0, 13).padEnd(15),
`${item.priority}`.slice(0, 8).padEnd(10),
`${revisionCounter[item.path] ?? 0}`.slice(0, 8).padEnd(10),
`${getPeers(item)}`.slice(0, 8).padEnd(10),
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/test/list.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe('list', () => {
]

it('text', async () => {
const params = mockParams({ list: items, countPeers: [{ cid, peers: 1 }], getStatus: [{ cid, state: 'COMPLETED', size: 50, blocks: 5 }], getSpeeds: [{ cid, speed: 1000 }] }, { group: 'group-abc' })
const params = mockParams({ list: items, countPeers: [{ cid, peers: 1 }], getState: [{ cid, status: 'COMPLETED', size: 50, blocks: 5 }], getSpeeds: [{ cid, speed: 1000 }] }, { group: 'group-abc' })

const response = await all(handler(params))

Expand Down Expand Up @@ -76,7 +76,7 @@ describe('list', () => {
const params = mockParams({
list: items,
countPeers: [{ cid, peers: 1 }],
getStatus: [{ cid, state: 'COMPLETED', size: 50, blocks: 5 }],
getState: [{ cid, status: 'COMPLETED', size: 50, blocks: 5 }],
getSpeeds: [{ cid, speed: 1000 }]
}, {
group: 'group-abc',
Expand Down
10 changes: 5 additions & 5 deletions packages/client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
Export,
GetSchedule,
GetSpeeds,
GetStatus,
GetState,
ID,
Import,
JoinGroup,
Expand Down Expand Up @@ -122,11 +122,11 @@ export class Client {
return GetSpeeds.Return.parse(raw)
}

async getStatus (cids: GetStatus.Params['cids']): Promise<GetStatus.Return> {
const params: GetStatus.Params = { cids }
const raw = await this.client.rpc.request(GetStatus.name, params)
async getState (cids: GetState.Params['cids']): Promise<GetState.Return> {
const params: GetState.Params = { cids }
const raw = await this.client.rpc.request(GetState.name, params)

return GetStatus.Return.parse(raw)
return GetState.Return.parse(raw)
}

async id (): Promise<ID.Return> {
Expand Down
10 changes: 5 additions & 5 deletions packages/client/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ describe('client', () => {
response: [
{
cid: 'QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN',
state: 'DOWNLOADING',
status: 'DOWNLOADING',
blocks: 12,
size: 1234
}
Expand All @@ -554,13 +554,13 @@ describe('client', () => {
response: [
{
cid: 'QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ',
state: 'COMPLETED',
status: 'COMPLETED',
blocks: 1,
size: 0
},
{
cid: 'QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa',
state: 'NOTFOUND',
status: 'NOTFOUND',
blocks: 0,
size: 0
}
Expand All @@ -570,8 +570,8 @@ describe('client', () => {

for (const { params, response } of requests) {
const [req, res] = await Promise.all([
getRequest(interfaces.GetStatus.name, async () => response),
client.getStatus(params.cids)
getRequest(interfaces.GetState.name, async () => response),
client.getState(params.cids)
])

assert.deepEqual(req, { ...params })
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import { GetStatus } from '@organicdesign/db-rpc-interfaces'
import { GetState } from '@organicdesign/db-rpc-interfaces'
import { CID } from 'multiformats/cid'
import type { ModuleMethod } from '@/interface.js'

const command: ModuleMethod = ({ pinManager, net }) => {
net.rpc.addMethod(GetStatus.name, async (raw: unknown): Promise<GetStatus.Return> => {
const params = GetStatus.Params.parse(raw)
net.rpc.addMethod(GetState.name, async (raw: unknown): Promise<GetState.Return> => {
const params = GetState.Params.parse(raw)

return Promise.all(params.cids.map(async str => {
const cid = CID.parse(str)

const [state, blocks, size] = await Promise.all([
pinManager.getState(cid),
const [status, blocks, size] = await Promise.all([
pinManager.getStatus(cid),
pinManager.getBlockCount(cid),
pinManager.getSize(cid)
])

return {
cid: str,
state,
status,
blocks,
size
}
Expand Down
4 changes: 2 additions & 2 deletions packages/daemon/src/common/handle-commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import connections from './commands/connections.js'
import countPeers from './commands/count-peers.js'
import createGroup from './commands/create-group.js'
import getSpeeds from './commands/get-speeds.js'
import getStatus from './commands/get-status.js'
import getState from './commands/get-state.js'
import id from './commands/id.js'
import joinGroup from './commands/join-group.js'
import listGroups from './commands/list-groups.js'
Expand All @@ -22,7 +22,7 @@ export default (components: Components): void => {
countPeers,
createGroup,
getSpeeds,
getStatus,
getState,
id,
joinGroup,
listGroups,
Expand Down
4 changes: 2 additions & 2 deletions packages/daemon/src/common/pin-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ export class PinManager {
return this.pinManager.downloadSync(pin, options)
}

async getState (cid: CID): Promise<'COMPLETED' | 'DOWNLOADING' | 'DESTROYED' | 'UPLOADING' | 'NOTFOUND'> {
return this.pinManager.getState(cid)
async getStatus (cid: CID): Promise<'COMPLETED' | 'DOWNLOADING' | 'DESTROYED' | 'UPLOADING' | 'NOTFOUND'> {
return this.pinManager.getStatus(cid)
}

async getSpeed (cid: CID, range?: number): Promise<number> {
Expand Down
16 changes: 8 additions & 8 deletions packages/daemon/test/modules/downloader.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,44 +118,44 @@ describe('downloader', () => {
const client = createNetClient(socket)
const key = Path.join('/', group, path)

const status1 = await client.rpc.request('get-status', {
const status1 = await client.rpc.request('get-state', {
cids: [dag[0].toString()]
})

assert.deepEqual(status1, [{
cid: dag[0].toString(),
blocks: 0,
size: 0,
state: 'NOTFOUND'
status: 'NOTFOUND'
}])

await components.pinManager.put(key, { priority: 1, cid: dag[0] })

const status2 = await client.rpc.request('get-status', {
const status2 = await client.rpc.request('get-state', {
cids: [dag[0].toString()]
})

assert.deepEqual(status2, [{
cid: dag[0].toString(),
blocks: 0,
size: 0,
state: 'DOWNLOADING'
status: 'DOWNLOADING'
}])

const value = await blockstore.get(dag[0])

await components.helia.blockstore.put(dag[0], value)
await new Promise(resolve => setTimeout(resolve, 100))

const status3 = await client.rpc.request('get-status', {
const status3 = await client.rpc.request('get-state', {
cids: [dag[0].toString()]
})

assert.deepEqual(status3, [{
cid: dag[0].toString(),
blocks: 1,
size: value.length,
state: 'DOWNLOADING'
status: 'DOWNLOADING'
}])

const values = await Promise.all(dag.map(async cid => {
Expand All @@ -168,15 +168,15 @@ describe('downloader', () => {

await new Promise(resolve => setTimeout(resolve, 500))

const status4 = await client.rpc.request('get-status', {
const status4 = await client.rpc.request('get-state', {
cids: [dag[0].toString()]
})

assert.deepEqual(status4, [{
cid: dag[0].toString(),
blocks: dag.length,
size: values.reduce((a, c) => a + c, 0),
state: 'COMPLETED'
status: 'COMPLETED'
}])

client.close()
Expand Down
1 change: 1 addition & 0 deletions packages/helia-pin-manager/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"license": "GPL-3.0-or-later",
"devDependencies": {
"@ipld/dag-pb": "^4.1.0",
"@organicdesign/db-test-utils": "^0.1.0",
"aegir": "^42.2.4",
"helia": "^4.0.1",
"interface-blockstore": "^5.2.10",
Expand Down
24 changes: 12 additions & 12 deletions packages/helia-pin-manager/src/pin-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class PinManager {
const pin = await this.pins.get(cid)

if (pin != null) {
await this.pins.put(cid, { state: 'DESTROYED', depth: pin.depth })
await this.pins.put(cid, { status: 'DESTROYED', depth: pin.depth })
}

try {
Expand Down Expand Up @@ -87,7 +87,7 @@ export class PinManager {
this.events.dispatchEvent(new CIDEvent('pins:adding', cid))

const pin = await this.pins.getOrPut(cid, {
state: 'UPLOADING',
status: 'UPLOADING',
depth: Number.MAX_SAFE_INTEGER
})

Expand All @@ -113,7 +113,7 @@ export class PinManager {

await this.pins.put(cid, {
...pin,
state: 'COMPLETED'
status: 'COMPLETED'
})

this.events.dispatchEvent(new CIDEvent('pins:added', cid))
Expand All @@ -129,7 +129,7 @@ export class PinManager {

const depth = Number.MAX_SAFE_INTEGER

await this.pins.put(cid, { depth, state: 'DOWNLOADING' })
await this.pins.put(cid, { depth, status: 'DOWNLOADING' })
await this.downloads.getOrPut(cid, cid, { depth: 0 })

this.events.dispatchEvent(new CIDEvent('pins:adding', cid))
Expand All @@ -138,10 +138,10 @@ export class PinManager {
/**
* Get the current state of the pin.
*/
async getState (cid: CID): Promise<Pin['state'] | 'NOTFOUND'> {
async getStatus (cid: CID): Promise<Pin['status'] | 'NOTFOUND'> {
const pin = await this.pins.get(cid)

return pin == null ? 'NOTFOUND' : pin.state
return pin == null ? 'NOTFOUND' : pin.status
}

/**
Expand Down Expand Up @@ -174,7 +174,7 @@ export class PinManager {
const cids: CID[] = []

for await (const pin of this.pins.all()) {
if (pin.state === 'DOWNLOADING') {
if (pin.status === 'DOWNLOADING') {
cids.push(pin.cid)
}
}
Expand Down Expand Up @@ -226,7 +226,7 @@ export class PinManager {
throw new Error('no such pin')
}

if (pinData.state === 'COMPLETED') {
if (pinData.status === 'COMPLETED') {
return []
}

Expand All @@ -242,12 +242,12 @@ export class PinManager {
if (heads.length === 0) {
await addPinRef(this.helia, pin)

const isCompleted = pinData.state === 'COMPLETED'
const isCompleted = pinData.status === 'COMPLETED'

if (!isCompleted) {
await this.pins.put(pin, {
...pinData,
state: 'COMPLETED'
status: 'COMPLETED'
})

this.events.dispatchEvent(new CIDEvent('pins:added', pin))
Expand All @@ -266,7 +266,7 @@ export class PinManager {
throw new Error('no such pin')
}

if (pinData.state === 'COMPLETED') {
if (pinData.status === 'COMPLETED') {
return
}

Expand Down Expand Up @@ -319,7 +319,7 @@ export class PinManager {

await this.pins.put(pin, {
...pinData,
state: 'COMPLETED'
status: 'COMPLETED'
})

this.events.dispatchEvent(new CIDEvent('pins:added', pin))
Expand Down
Loading

0 comments on commit e49373b

Please sign in to comment.