From 314141e99850e1eea9a748bd98840228c877ea69 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 27 May 2020 20:08:20 +0100 Subject: [PATCH] chore: add gossipsub grafting delay Because of https://github.com/libp2p/go-libp2p-pubsub/issues/331 we have to wait for a the gossipsub peer to become grafted before sending messages we expect to be recieved. There's no way to tell from the API which peers are grafted so we just have to add an abitrary wait and hope it happens during that time window. Also refactors the pubsub tests to remove duplicated code. --- test/ipns-pubsub.js | 56 ++++++++++++++---------- test/pubsub.js | 101 ++++++++++++++------------------------------ 2 files changed, 65 insertions(+), 92 deletions(-) diff --git a/test/ipns-pubsub.js b/test/ipns-pubsub.js index 2501e4d0..e431f050 100644 --- a/test/ipns-pubsub.js +++ b/test/ipns-pubsub.js @@ -6,6 +6,7 @@ const base64url = require('base64url') const ipns = require('ipns') const delay = require('delay') const last = require('it-last') +const drain = require('it-drain') const pRetry = require('p-retry') const waitFor = require('./utils/wait-for') const { expect } = require('./utils/chai') @@ -15,21 +16,23 @@ const daemonsOptions = { args: ['--enable-namesys-pubsub'] // enable ipns over pubsub } -const retryOptions = { - retries: 5 -} - const namespace = '/record/' const ipfsRef = '/ipfs/QmPFVLPmp9zv5Z5KUqLhe2EivAGccQW2r7M7jhVJGLZoZU' describe('ipns-pubsub', function () { this.timeout(350 * 1000) - let nodes = [] + let go + let js + let otherGo // Spawn daemons before(async function () { - nodes = await Promise.all([ + [ + go, + js, + otherGo + ] = await Promise.all([ daemonFactory.spawn({ type: 'go', test: true, @@ -52,11 +55,11 @@ describe('ipns-pubsub', function () { // Connect nodes and wait for republish before(async function () { - await nodes[0].api.swarm.connect(nodes[1].api.peerId.addresses[0]) + await go.api.swarm.connect(js.api.peerId.addresses[0]) // TODO: go-ipfs needs two nodes in the DHT to be able to publish a record // Remove this when js-ipfs has a DHT - await nodes[0].api.swarm.connect(nodes[2].api.peerId.addresses[0]) + await go.api.swarm.connect(otherGo.api.peerId.addresses[0]) console.log('wait for republish as we can receive the republish message first') // eslint-disable-line await delay(60000) @@ -65,7 +68,7 @@ describe('ipns-pubsub', function () { after(() => daemonFactory.clean()) it('should get enabled state of pubsub', async function () { - for (const node of nodes) { + for (const node of [js, go]) { const state = await node.api.name.pubsub.state() expect(state).to.exist() expect(state.enabled).to.equal(true) @@ -75,14 +78,14 @@ describe('ipns-pubsub', function () { it('should publish the received record to a go node and a js subscriber should receive it', async function () { this.timeout(300 * 1000) // TODO find out why JS doesn't resolve, might be just missing a DHT - await expect(last(nodes[1].api.name.resolve(nodes[0].api.peerId.id, { stream: false }))).to.eventually.be.rejected.with(/was not found in the network/) - await subscribeToReceiveByPubsub(nodes[0], nodes[1], nodes[0].api.peerId.id, nodes[1].api.peerId.id) + await expect(last(js.api.name.resolve(go.api.peerId.id, { stream: false }))).to.eventually.be.rejected.with(/was not found in the network/) + await subscribeToReceiveByPubsub(go, js, go.api.peerId.id, js.api.peerId.id) }) it('should publish the received record to a js node and a go subscriber should receive it', async function () { this.timeout(350 * 1000) - await last(nodes[0].api.name.resolve(nodes[1].api.peerId.id, { stream: false })) - await subscribeToReceiveByPubsub(nodes[1], nodes[0], nodes[1].api.peerId.id, nodes[0].api.peerId.id) + await drain(go.api.name.resolve(js.api.peerId.id, { stream: false })) + await subscribeToReceiveByPubsub(js, go, js.api.peerId.id, go.api.peerId.id) }) }) @@ -106,6 +109,7 @@ const subscribeToReceiveByPubsub = async (nodeA, nodeB, idA, idB) => { await waitForPeerToSubscribe(nodeB.api, topic) await nodeB.api.pubsub.subscribe(topic, checkMessage) await waitForNotificationOfSubscription(nodeA.api, topic, idB) + await delay(10000) // FIXME: gossipsub need this delay https://github.com/libp2p/go-libp2p-pubsub/issues/331 const res1 = await nodeA.api.name.publish(ipfsRef, { resolve: false }) await waitFor(() => subscribed === true, (50 * 1000)) const res2 = await last(nodeB.api.name.resolve(idA)) @@ -115,23 +119,29 @@ const subscribeToReceiveByPubsub = async (nodeA, nodeB, idA, idB) => { } // wait until a peer know about other peer to subscribe a topic -const waitForNotificationOfSubscription = (daemon, topic, peerId) => pRetry(async () => { - const res = await daemon.pubsub.peers(topic) +const waitForNotificationOfSubscription = async (daemon, topic, peerId) => { + const start = Date.now() - if (!res || !res.length || !res.includes(peerId)) { - throw new Error('Could not find peer subscribing') - } -}, retryOptions) + await pRetry(async (attempt) => { + const res = await daemon.pubsub.peers(topic) + + if (!res.includes(peerId)) { + throw new Error(`Could not find peer ${peerId} subscription in list ${res} after ${attempt} retries and ${Date.now() - start}ms`) + } + }) +} // Wait until a peer subscribes a topic const waitForPeerToSubscribe = async (daemon, topic) => { - await pRetry(async () => { + const start = Date.now() + + await pRetry(async (attempt) => { const res = await daemon.pubsub.ls() - if (!res || !res.length || !res.includes(topic)) { - throw new Error('Could not find subscription') + if (!res.includes(topic)) { + throw new Error(`Could not find subscription to ${topic} in ${res} after ${attempt} retries and ${Date.now() - start}ms`) } return res[0] - }, retryOptions) + }) } diff --git a/test/pubsub.js b/test/pubsub.js index 1ced647a..ac786293 100644 --- a/test/pubsub.js +++ b/test/pubsub.js @@ -5,29 +5,28 @@ const pRetry = require('p-retry') const { expect } = require('./utils/chai') const daemonFactory = require('./utils/daemon-factory') - -const retryOptions = { - retries: 5 -} +const delay = require('delay') const waitForTopicPeer = (topic, peer, daemon) => { - return pRetry(async () => { + const start = Date.now() + + return pRetry(async (attempt) => { const peers = await daemon.api.pubsub.peers(topic) if (!peers.includes(peer.id)) { - throw new Error(`Could not find peer ${peer.id}`) + throw new Error(`Could not find peer ${peer.id} after ${attempt} retries and ${Date.now() - start}ms`) } - }, retryOptions) + }) } const daemonOptions = { args: ['--enable-pubsub-experiment'] } -const timeout = 20e3 +const timeout = 60 * 1000 describe('pubsub', function () { - this.timeout(60 * 1000) + this.timeout(timeout) const tests = { 'publish from Go, subscribe on Go': [() => daemonFactory.spawn({ ...daemonOptions, type: 'go' }), () => daemonFactory.spawn({ ...daemonOptions, type: 'go' })], @@ -42,13 +41,10 @@ describe('pubsub', function () { let daemon2 before('spawn nodes', async function () { - this.timeout(timeout) - ;[daemon1, daemon2] = await Promise.all(tests[name].map(fn => fn())) + [daemon1, daemon2] = await Promise.all(tests[name].map(fn => fn())) }) before('connect', async function () { - this.timeout(timeout) - await daemon1.api.swarm.connect(daemon2.api.peerId.addresses[0]) await daemon2.api.swarm.connect(daemon1.api.peerId.addresses[0]) @@ -63,23 +59,30 @@ describe('pubsub', function () { after(() => daemonFactory.clean()) - it('should exchange ascii data', function () { - const data = Buffer.from('hello world') - const topic = 'pubsub-ascii' + function testPubsub (data) { + const topic = 'pubsub-' + Math.random() - const subscriber = () => new Promise((resolve) => { + const subscriber = () => new Promise((resolve, reject) => { + daemon1.api.pubsub.subscribe(topic, () => {}) daemon2.api.pubsub.subscribe(topic, (msg) => { - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').and.to.include(topic) - expect(msg).to.have.property('from', daemon1.api.peerId.id) - resolve() + try { + expect(msg.data).to.deep.equal(data) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').and.to.include(topic) + expect(msg).to.have.property('from', daemon1.api.peerId.id) + + resolve() + } catch (err) { + reject(err) + } }) }) const publisher = async () => { + await waitForTopicPeer(topic, daemon1.api.peerId, daemon2) await waitForTopicPeer(topic, daemon2.api.peerId, daemon1) + await delay(10000) // FIXME: https://github.com/libp2p/go-libp2p-pubsub/issues/331 await daemon1.api.pubsub.publish(topic, data) } @@ -87,58 +90,18 @@ describe('pubsub', function () { subscriber(), publisher() ]) + } + + it('should exchange ascii data', function () { + return testPubsub(Buffer.from('hello world')) }) it('should exchange non ascii data', function () { - const data = Buffer.from('你好世界') - const topic = 'pubsub-non-ascii' - - const subscriber = () => new Promise((resolve) => { - daemon2.api.pubsub.subscribe(topic, (msg) => { - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').and.to.include(topic) - expect(msg).to.have.property('from', daemon1.api.peerId.id) - resolve() - }) - }) - - const publisher = async () => { - await waitForTopicPeer(topic, daemon2.api.peerId, daemon1) - await daemon1.api.pubsub.publish(topic, data) - } - - return Promise.all([ - subscriber(), - publisher() - ]) + return testPubsub(Buffer.from('你好世界')) }) it('should exchange binary data', function () { - const data = Buffer.from('a36161636179656162830103056164a16466666666f400010203040506070809', 'hex') - const topic = 'pubsub-binary' - - const subscriber = () => new Promise((resolve) => { - daemon2.api.pubsub.subscribe(topic, (msg) => { - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').and.to.include(topic) - expect(msg).to.have.property('from', daemon1.api.peerId.id) - resolve() - }) - }) - - const publisher = async () => { - await waitForTopicPeer(topic, daemon2.api.peerId, daemon1) - await daemon1.api.pubsub.publish(topic, data) - } - - return Promise.all([ - subscriber(), - publisher() - ]) + return testPubsub(Buffer.from('a36161636179656162830103056164a16466666666f400010203040506070809', 'hex')) }) }) })