Skip to content
This repository has been archived by the owner on Aug 1, 2023. It is now read-only.

Commit

Permalink
chore: add gossipsub grafting delay
Browse files Browse the repository at this point in the history
Because of libp2p/go-libp2p-pubsub#331 we have
to wait for a the gossipsub peer to become grafted before sending messages
we expect to be recieved.  There's no way to tell from the API which peers
are grafted so we just have to add an abitrary wait and hope it happens
during that time window.

Also refactors the pubsub tests to remove duplicated code.
  • Loading branch information
achingbrain committed May 29, 2020
1 parent ddbda73 commit 314141e
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 92 deletions.
56 changes: 33 additions & 23 deletions test/ipns-pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const base64url = require('base64url')
const ipns = require('ipns')
const delay = require('delay')
const last = require('it-last')
const drain = require('it-drain')
const pRetry = require('p-retry')
const waitFor = require('./utils/wait-for')
const { expect } = require('./utils/chai')
Expand All @@ -15,21 +16,23 @@ const daemonsOptions = {
args: ['--enable-namesys-pubsub'] // enable ipns over pubsub
}

const retryOptions = {
retries: 5
}

const namespace = '/record/'

const ipfsRef = '/ipfs/QmPFVLPmp9zv5Z5KUqLhe2EivAGccQW2r7M7jhVJGLZoZU'

describe('ipns-pubsub', function () {
this.timeout(350 * 1000)
let nodes = []
let go
let js
let otherGo

// Spawn daemons
before(async function () {
nodes = await Promise.all([
[
go,
js,
otherGo
] = await Promise.all([
daemonFactory.spawn({
type: 'go',
test: true,
Expand All @@ -52,11 +55,11 @@ describe('ipns-pubsub', function () {

// Connect nodes and wait for republish
before(async function () {
await nodes[0].api.swarm.connect(nodes[1].api.peerId.addresses[0])
await go.api.swarm.connect(js.api.peerId.addresses[0])

// TODO: go-ipfs needs two nodes in the DHT to be able to publish a record
// Remove this when js-ipfs has a DHT
await nodes[0].api.swarm.connect(nodes[2].api.peerId.addresses[0])
await go.api.swarm.connect(otherGo.api.peerId.addresses[0])

console.log('wait for republish as we can receive the republish message first') // eslint-disable-line
await delay(60000)
Expand All @@ -65,7 +68,7 @@ describe('ipns-pubsub', function () {
after(() => daemonFactory.clean())

it('should get enabled state of pubsub', async function () {
for (const node of nodes) {
for (const node of [js, go]) {
const state = await node.api.name.pubsub.state()
expect(state).to.exist()
expect(state.enabled).to.equal(true)
Expand All @@ -75,14 +78,14 @@ describe('ipns-pubsub', function () {
it('should publish the received record to a go node and a js subscriber should receive it', async function () {
this.timeout(300 * 1000)
// TODO find out why JS doesn't resolve, might be just missing a DHT
await expect(last(nodes[1].api.name.resolve(nodes[0].api.peerId.id, { stream: false }))).to.eventually.be.rejected.with(/was not found in the network/)
await subscribeToReceiveByPubsub(nodes[0], nodes[1], nodes[0].api.peerId.id, nodes[1].api.peerId.id)
await expect(last(js.api.name.resolve(go.api.peerId.id, { stream: false }))).to.eventually.be.rejected.with(/was not found in the network/)
await subscribeToReceiveByPubsub(go, js, go.api.peerId.id, js.api.peerId.id)
})

it('should publish the received record to a js node and a go subscriber should receive it', async function () {
this.timeout(350 * 1000)
await last(nodes[0].api.name.resolve(nodes[1].api.peerId.id, { stream: false }))
await subscribeToReceiveByPubsub(nodes[1], nodes[0], nodes[1].api.peerId.id, nodes[0].api.peerId.id)
await drain(go.api.name.resolve(js.api.peerId.id, { stream: false }))
await subscribeToReceiveByPubsub(js, go, js.api.peerId.id, go.api.peerId.id)
})
})

Expand All @@ -106,6 +109,7 @@ const subscribeToReceiveByPubsub = async (nodeA, nodeB, idA, idB) => {
await waitForPeerToSubscribe(nodeB.api, topic)
await nodeB.api.pubsub.subscribe(topic, checkMessage)
await waitForNotificationOfSubscription(nodeA.api, topic, idB)
await delay(10000) // FIXME: gossipsub need this delay https://github.com/libp2p/go-libp2p-pubsub/issues/331
const res1 = await nodeA.api.name.publish(ipfsRef, { resolve: false })
await waitFor(() => subscribed === true, (50 * 1000))
const res2 = await last(nodeB.api.name.resolve(idA))
Expand All @@ -115,23 +119,29 @@ const subscribeToReceiveByPubsub = async (nodeA, nodeB, idA, idB) => {
}

// wait until a peer know about other peer to subscribe a topic
const waitForNotificationOfSubscription = (daemon, topic, peerId) => pRetry(async () => {
const res = await daemon.pubsub.peers(topic)
const waitForNotificationOfSubscription = async (daemon, topic, peerId) => {
const start = Date.now()

if (!res || !res.length || !res.includes(peerId)) {
throw new Error('Could not find peer subscribing')
}
}, retryOptions)
await pRetry(async (attempt) => {
const res = await daemon.pubsub.peers(topic)

if (!res.includes(peerId)) {
throw new Error(`Could not find peer ${peerId} subscription in list ${res} after ${attempt} retries and ${Date.now() - start}ms`)
}
})
}

// Wait until a peer subscribes a topic
const waitForPeerToSubscribe = async (daemon, topic) => {
await pRetry(async () => {
const start = Date.now()

await pRetry(async (attempt) => {
const res = await daemon.pubsub.ls()

if (!res || !res.length || !res.includes(topic)) {
throw new Error('Could not find subscription')
if (!res.includes(topic)) {
throw new Error(`Could not find subscription to ${topic} in ${res} after ${attempt} retries and ${Date.now() - start}ms`)
}

return res[0]
}, retryOptions)
})
}
101 changes: 32 additions & 69 deletions test/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,28 @@
const pRetry = require('p-retry')
const { expect } = require('./utils/chai')
const daemonFactory = require('./utils/daemon-factory')

const retryOptions = {
retries: 5
}
const delay = require('delay')

const waitForTopicPeer = (topic, peer, daemon) => {
return pRetry(async () => {
const start = Date.now()

return pRetry(async (attempt) => {
const peers = await daemon.api.pubsub.peers(topic)

if (!peers.includes(peer.id)) {
throw new Error(`Could not find peer ${peer.id}`)
throw new Error(`Could not find peer ${peer.id} after ${attempt} retries and ${Date.now() - start}ms`)
}
}, retryOptions)
})
}

const daemonOptions = {
args: ['--enable-pubsub-experiment']
}

const timeout = 20e3
const timeout = 60 * 1000

describe('pubsub', function () {
this.timeout(60 * 1000)
this.timeout(timeout)

const tests = {
'publish from Go, subscribe on Go': [() => daemonFactory.spawn({ ...daemonOptions, type: 'go' }), () => daemonFactory.spawn({ ...daemonOptions, type: 'go' })],
Expand All @@ -42,13 +41,10 @@ describe('pubsub', function () {
let daemon2

before('spawn nodes', async function () {
this.timeout(timeout)
;[daemon1, daemon2] = await Promise.all(tests[name].map(fn => fn()))
[daemon1, daemon2] = await Promise.all(tests[name].map(fn => fn()))
})

before('connect', async function () {
this.timeout(timeout)

await daemon1.api.swarm.connect(daemon2.api.peerId.addresses[0])
await daemon2.api.swarm.connect(daemon1.api.peerId.addresses[0])

Expand All @@ -63,82 +59,49 @@ describe('pubsub', function () {

after(() => daemonFactory.clean())

it('should exchange ascii data', function () {
const data = Buffer.from('hello world')
const topic = 'pubsub-ascii'
function testPubsub (data) {
const topic = 'pubsub-' + Math.random()

const subscriber = () => new Promise((resolve) => {
const subscriber = () => new Promise((resolve, reject) => {
daemon1.api.pubsub.subscribe(topic, () => {})
daemon2.api.pubsub.subscribe(topic, (msg) => {
expect(msg.data.toString()).to.equal(data.toString())
expect(msg).to.have.property('seqno')
expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true)
expect(msg).to.have.property('topicIDs').and.to.include(topic)
expect(msg).to.have.property('from', daemon1.api.peerId.id)
resolve()
try {
expect(msg.data).to.deep.equal(data)
expect(msg).to.have.property('seqno')
expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true)
expect(msg).to.have.property('topicIDs').and.to.include(topic)
expect(msg).to.have.property('from', daemon1.api.peerId.id)

resolve()
} catch (err) {
reject(err)
}
})
})

const publisher = async () => {
await waitForTopicPeer(topic, daemon1.api.peerId, daemon2)
await waitForTopicPeer(topic, daemon2.api.peerId, daemon1)
await delay(10000) // FIXME: https://github.com/libp2p/go-libp2p-pubsub/issues/331
await daemon1.api.pubsub.publish(topic, data)
}

return Promise.all([
subscriber(),
publisher()
])
}

it('should exchange ascii data', function () {
return testPubsub(Buffer.from('hello world'))
})

it('should exchange non ascii data', function () {
const data = Buffer.from('你好世界')
const topic = 'pubsub-non-ascii'

const subscriber = () => new Promise((resolve) => {
daemon2.api.pubsub.subscribe(topic, (msg) => {
expect(msg.data.toString()).to.equal(data.toString())
expect(msg).to.have.property('seqno')
expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true)
expect(msg).to.have.property('topicIDs').and.to.include(topic)
expect(msg).to.have.property('from', daemon1.api.peerId.id)
resolve()
})
})

const publisher = async () => {
await waitForTopicPeer(topic, daemon2.api.peerId, daemon1)
await daemon1.api.pubsub.publish(topic, data)
}

return Promise.all([
subscriber(),
publisher()
])
return testPubsub(Buffer.from('你好世界'))
})

it('should exchange binary data', function () {
const data = Buffer.from('a36161636179656162830103056164a16466666666f400010203040506070809', 'hex')
const topic = 'pubsub-binary'

const subscriber = () => new Promise((resolve) => {
daemon2.api.pubsub.subscribe(topic, (msg) => {
expect(msg.data.toString()).to.equal(data.toString())
expect(msg).to.have.property('seqno')
expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true)
expect(msg).to.have.property('topicIDs').and.to.include(topic)
expect(msg).to.have.property('from', daemon1.api.peerId.id)
resolve()
})
})

const publisher = async () => {
await waitForTopicPeer(topic, daemon2.api.peerId, daemon1)
await daemon1.api.pubsub.publish(topic, data)
}

return Promise.all([
subscriber(),
publisher()
])
return testPubsub(Buffer.from('a36161636179656162830103056164a16466666666f400010203040506070809', 'hex'))
})
})
})
Expand Down

0 comments on commit 314141e

Please sign in to comment.