From ee179088f9c4e86562e8a93292ccefbfc7ae95c5 Mon Sep 17 00:00:00 2001 From: Stevan Nesovic Date: Wed, 23 Oct 2024 19:32:06 +0200 Subject: [PATCH 1/4] Edge node publish mode implementation --- controllers/knowledgeBankController.js | 96 +++++++++++++++++++------- create-paranet.js | 18 ++--- services/publishService.js | 26 +++++-- sync-assets-queue.js | 86 +++++++++++++++-------- 4 files changed, 158 insertions(+), 68 deletions(-) diff --git a/controllers/knowledgeBankController.js b/controllers/knowledgeBankController.js index 4a72024..a0e0a1b 100644 --- a/controllers/knowledgeBankController.js +++ b/controllers/knowledgeBankController.js @@ -21,39 +21,77 @@ exports.getDatasets = async (req, res) => { exports.getAssets = async (req, res) => { try { + const edgeNodePublishMode = req.user.config.find( + item => item.option === 'edge_node_publish_mode' + ).value || null; + const paranetUAL = req.user.config.find( + item => item.option === 'edge_node_paranet_ual' + ).value || null; + const offset = parseInt(req.query.offset) || 0; const limit = parseInt(req.query.limit) || 10; - const assets = await internalSequelize.sequelize.query( - `WITH RankedAssets AS (SELECT *, - ROW_NUMBER() OVER (PARTITION BY ual ORDER BY created_at DESC, id DESC) AS row_num - FROM synced_assets) + if(edgeNodePublishMode === 'public') { + + const { count, rows: assets } = await Asset.findAndCountAll({ + attributes: [ + 'id', + 'ual', + [Sequelize.col('assertion_id'), 'public_assertion_id'], + [Sequelize.col('created_at'), 'backend_synced_at'] + ], + where: { + publishing_status: "COMPLETED" + }, + limit: limit, + offset: offset + }); + + + res.json({ + totalItems: count, + offset: offset, + limit: limit, + data: assets + }); + } else { + const assets = await internalSequelize.sequelize.query( + `WITH RankedAssets AS ( + SELECT *, + ROW_NUMBER() OVER (PARTITION BY ual ORDER BY created_at DESC, id DESC) AS row_num + FROM synced_assets + WHERE paranet_ual = :paranetUAL -- Add the filter for paranet_ual + ) SELECT * FROM RankedAssets WHERE row_num = 1 ORDER BY created_at DESC, id DESC LIMIT ${limit} OFFSET ${offset}`, - { - type: internalSequelize.Sequelize.QueryTypes.SELECT - } - ); + { + type: internalSequelize.Sequelize.QueryTypes.SELECT, + replacements: { paranetUAL } + } + ); - const total = await internalSequelize.sequelize.query( - `select ual, count(*) - from synced_assets - group by ual`, - { - type: internalSequelize.Sequelize.QueryTypes.SELECT - } - ); + const total = await internalSequelize.sequelize.query( + `SELECT ual, COUNT(*) + FROM synced_assets + WHERE paranet_ual = :paranetUAL -- Add the filter for paranet_ual + GROUP BY ual`, + { + type: internalSequelize.Sequelize.QueryTypes.SELECT, + replacements: { paranetUAL } + } + ); - res.json({ - totalItems: total.length, - offset: offset, - limit: limit, - data: assets - }); + res.json({ + totalItems: total.length, + offset: offset, + limit: limit, + data: assets + }); + } } catch (error) { console.error('Error fetching paginated assets:', error); res.status(500).json({ error: 'Failed to fetch assets' }); @@ -74,9 +112,15 @@ exports.previewAssetExternal = async (req, res) => { const formattedUserConfig = publishService.setUserConfig(userConfig); const DkgClient = publishService.initDkgClient(blockchain); - let result = await DkgClient.asset.get(assetUAL, { - paranetUAL: formattedUserConfig.edge_node_paranet_ual - }); + let result; + if(formattedUserConfig.edge_node_publish_mode === "public") { + result = await DkgClient.asset.get(assetUAL); + } else { + result = await DkgClient.asset.get(assetUAL, { + paranetUAL: formattedUserConfig.edge_node_paranet_ual + }); + } + let formattedKnowledgeAsset = {}; formattedKnowledgeAsset.private = result.private.assertion; formattedKnowledgeAsset.public = result.public.assertion; @@ -266,7 +310,7 @@ exports.confirmAndCreateAssets = async (req, res) => { if (operationStatus === OPERATION_STATUSES.COMPLETED) { const vectorizationEnabled = req.user.config.find( item => item.option === 'vectorization_enabled' - ).value; + ).value || null; if (vectorizationEnabled === 'true') { await vectorService.vectorizeKnowledgeAsset( result, diff --git a/create-paranet.js b/create-paranet.js index 334590e..47b97d8 100644 --- a/create-paranet.js +++ b/create-paranet.js @@ -1,18 +1,18 @@ const DKG = require('dkg.js'); +require('dotenv').config(); const { PARANET_NODES_ACCESS_POLICY, PARANET_MINERS_ACCESS_POLICY } = require('dkg.js/constants.js'); let DkgClient = new DKG({ - environment: '', - endpoint: '', + environment: process.env.DKG_ENV, + endpoint: process.env.RUNTIME_NODE_ENDPOINT, port: '8900', blockchain: { name: 'base:84532', - publicKey: '', - privateKey: - '' + publicKey: process.env.PUB_KEY, + privateKey: process.env.PRIV_KEY }, maxNumberOfRetries: 30, frequency: 2, @@ -22,14 +22,14 @@ let DkgClient = new DKG({ let KAContent = { public: { '@context': ['https://schema.org'], - '@id': 'urn:id:paranet:4', - paranetName: 'Stable (Staging) Paranet', - paranetDescription: 'Stable (Staging) Paranet' + '@id': 'urn:id:paranet:5', + paranetName: 'Testnet Paranet 5', + paranetDescription: 'Testnet Paranet 5' } }; // Operational wallet public key of the node -const NODE1_PUBLIC_KEY = '0xbdd9a9bdacb827890c33dff74a69c6866f1022d2'; +const NODE1_PUBLIC_KEY = '0xe7144d3a965b166878212d6c6caf00c0bceab513'; async function createParanet() { let KA = await DkgClient.asset.create(KAContent, { diff --git a/services/publishService.js b/services/publishService.js index 7943eb6..b284daf 100644 --- a/services/publishService.js +++ b/services/publishService.js @@ -40,6 +40,7 @@ class PublishService { case 'internal': return this.internalPublishService( asset, + this.userConfig.edge_node_publish_mode, this.userConfig.edge_node_paranet_ual, wallet ); @@ -48,6 +49,7 @@ class PublishService { default: return this.internalPublishService( asset, + this.userConfig.edge_node_publish_mode, this.userConfig.edge_node_paranet_ual, wallet ); @@ -87,10 +89,26 @@ class PublishService { } } - async internalPublishService(asset, paranetUAL, wallet = null) { - return await this.dkgClient.asset.localStore(asset, { - epochsNum: 2, - paranetUAL: paranetUAL + async internalPublishService(asset, edgeNodePublishMode, paranetUAL, wallet = null) { + if(edgeNodePublishMode === "public") { + return await this.dkgClient.asset.create(asset, { + epochsNum: 2 + }); + } + if(edgeNodePublishMode === "paranet") { + return await this.dkgClient.asset.create(asset, { + epochsNum: 2, + paranetUAL: paranetUAL + }); + } + if(edgeNodePublishMode === "curated_paranet") { + return await this.dkgClient.asset.localStore(asset, { + epochsNum: 2, + paranetUAL: paranetUAL + }); + } + return await this.dkgClient.asset.create(asset, { + epochsNum: 2 }); } diff --git a/sync-assets-queue.js b/sync-assets-queue.js index 70a9f7f..ed21c7c 100644 --- a/sync-assets-queue.js +++ b/sync-assets-queue.js @@ -4,6 +4,7 @@ const sequelize = require('sequelize'); const { SyncedAsset, Notification } = require('./models'); const { Queue, Worker } = require('bullmq'); const redis = require('ioredis'); +const axios = require('axios'); const connection = new redis({ maxRetriesPerRequest: null }); @@ -18,12 +19,34 @@ new Worker( 'syncQueue', async (job) => { console.log(`Starting sync job...Time: ${job.data.timestamp}`); + console.log(`Checking Edge node publish mode...Time: ${job.data.timestamp}`); + + const publicConfig = await axios.get( + `${process.env.AUTH_SERVICE_ENDPOINT}/auth/params/public` + ); + const edgeNodePublishMode = publicConfig.data.config.find( + item => item.option === 'edge_node_publish_mode' + ).value || null; + + if(edgeNodePublishMode === "public") { + console.log(`Edge node publish mode is public, aborting sync operation...Time: ${job.data.timestamp}`); + return; + } + + const paranetUAL = publicConfig.data.config.find( + item => item.option === 'edge_node_paranet_ual' + ).value || null; + try { - let internalSyncedAssets = await SyncedAsset.count(); - if (internalSyncedAssets === 0) { + const internalSyncedAssets = await SyncedAsset.findAll({ + where: { + paranet_ual: paranetUAL, + }, + }); + if (internalSyncedAssets.length === 0) { console.log(`First time query...`); const assets = await externalSequelize.query( - getInitialQuery(), + getInitialQuery(paranetUAL), { type: QueryTypes.SELECT } @@ -32,13 +55,16 @@ new Worker( let notification = await storeNotification(assets); await storeSyncedAssets(assets, notification); } - } else if (internalSyncedAssets > 0) { + } else if (internalSyncedAssets.length > 0) { const lastSyncedAsset = await SyncedAsset.findOne({ + where: { + paranet_ual: paranetUAL, + }, order: [['id', 'DESC']] }); const date = new Date(lastSyncedAsset.backend_synced_at); const utcDateString = date.toISOString().replace('Z', '').replace('T', ' ').slice(0, 19); - const assets = await externalSequelize.query(getNextQuery(utcDateString) + const assets = await externalSequelize.query(getNextQuery(utcDateString, paranetUAL) , { type: QueryTypes.SELECT @@ -60,13 +86,7 @@ new Worker( } ); -// Add Jobs Every 30 Seconds -setInterval(async () => { - console.log('Queueing sync job...'); - await syncQueue.add('syncJob', { timestamp: Date.now() }); -}, 10000); - -const getInitialQuery = () => { +const getInitialQuery = (paranetUAL) => { return ` SELECT sa.* FROM paranet_synced_asset sa @@ -77,26 +97,28 @@ const getInitialQuery = () => { (SELECT ual, MAX(created_at) FROM paranet_synced_asset GROUP BY ual) - GROUP BY ual) latest - ON sa.id = latest.max_id`; + AND paranet_ual = '${paranetUAL}' + GROUP BY ual + ) latest ON sa.id = latest.max_id;`; }; -const getNextQuery = (date) => { +const getNextQuery = (date, paranetUAL) => { return ` - SELECT sa.* - FROM paranet_synced_asset sa - INNER JOIN ( - SELECT ual, MAX(id) AS max_id - FROM paranet_synced_asset - WHERE created_at > CONVERT_TZ('${date}', @@session.time_zone, '+00:00') - AND (ual, created_at) IN - (SELECT ual, MAX(created_at) - FROM paranet_synced_asset - WHERE created_at > CONVERT_TZ('${date}', @@session.time_zone, '+00:00') - GROUP BY ual) - GROUP BY ual - ) latest - ON sa.id = latest.max_id;`; + SELECT sa.* + FROM paranet_synced_asset sa + INNER JOIN ( + SELECT ual, MAX(id) AS max_id + FROM paranet_synced_asset + WHERE created_at > CONVERT_TZ('${date}', @@session.time_zone, '+00:00') + AND (ual, created_at) IN + (SELECT ual, MAX(created_at) + FROM paranet_synced_asset + WHERE created_at > CONVERT_TZ('${date}', @@session.time_zone, '+00:00') + GROUP BY ual) + AND paranet_ual = '${paranetUAL}' + GROUP BY ual + ) latest ON sa.id = latest.max_id; + `; }; function getCurrentTimeProperFormat() { @@ -157,3 +179,9 @@ async function storeSyncedAssets(assets, notification) { } return true; } + +// Add Jobs Every 30 Seconds +setInterval(async () => { + console.log('Queueing sync job...'); + await syncQueue.add('syncJob', { timestamp: Date.now() }); +}, 10000); From 9c8437ff1ebf67ec7654fa81b00e4a20a9105663 Mon Sep 17 00:00:00 2001 From: Stevan Nesovic Date: Wed, 23 Oct 2024 19:32:53 +0200 Subject: [PATCH 2/4] Remove operation key of testing node --- create-paranet.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/create-paranet.js b/create-paranet.js index 47b97d8..d11a675 100644 --- a/create-paranet.js +++ b/create-paranet.js @@ -29,7 +29,7 @@ let KAContent = { }; // Operational wallet public key of the node -const NODE1_PUBLIC_KEY = '0xe7144d3a965b166878212d6c6caf00c0bceab513'; +const NODE1_PUBLIC_KEY = ''; async function createParanet() { let KA = await DkgClient.asset.create(KAContent, { From 2e601e87edf0e009b31ec647eead341f6d3183f0 Mon Sep 17 00:00:00 2001 From: Stevan Nesovic Date: Wed, 23 Oct 2024 19:35:08 +0200 Subject: [PATCH 3/4] Code optimization --- services/publishService.js | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/services/publishService.js b/services/publishService.js index b284daf..d4ff395 100644 --- a/services/publishService.js +++ b/services/publishService.js @@ -90,26 +90,26 @@ class PublishService { } async internalPublishService(asset, edgeNodePublishMode, paranetUAL, wallet = null) { - if(edgeNodePublishMode === "public") { - return await this.dkgClient.asset.create(asset, { - epochsNum: 2 - }); - } - if(edgeNodePublishMode === "paranet") { - return await this.dkgClient.asset.create(asset, { - epochsNum: 2, - paranetUAL: paranetUAL - }); - } - if(edgeNodePublishMode === "curated_paranet") { - return await this.dkgClient.asset.localStore(asset, { - epochsNum: 2, - paranetUAL: paranetUAL - }); + switch (edgeNodePublishMode) { + case "public": + return await this.dkgClient.asset.create(asset, { + epochsNum: 2 + }); + case "paranet": + return await this.dkgClient.asset.create(asset, { + epochsNum: 2, + paranetUAL: paranetUAL + }); + case "curated_paranet": + return await this.dkgClient.asset.localStore(asset, { + epochsNum: 2, + paranetUAL: paranetUAL + }); + default: + return await this.dkgClient.asset.create(asset, { + epochsNum: 2 + }); } - return await this.dkgClient.asset.create(asset, { - epochsNum: 2 - }); } definePublishType(endpoint) { From c297cb5b4fd07caf84a1fe8525988b112eecd0dc Mon Sep 17 00:00:00 2001 From: Stevan Nesovic Date: Wed, 23 Oct 2024 19:35:47 +0200 Subject: [PATCH 4/4] Code optimization 2 --- sync-assets-queue.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sync-assets-queue.js b/sync-assets-queue.js index ed21c7c..ad72eca 100644 --- a/sync-assets-queue.js +++ b/sync-assets-queue.js @@ -180,7 +180,7 @@ async function storeSyncedAssets(assets, notification) { return true; } -// Add Jobs Every 30 Seconds +// Add Jobs Every 10 Seconds setInterval(async () => { console.log('Queueing sync job...'); await syncQueue.add('syncJob', { timestamp: Date.now() });