Skip to content

Commit

Permalink
Merge pull request OriginTrail#6 from OriginTrail/feature/publish-modes
Browse files Browse the repository at this point in the history
Feature: Publish modes (public, paranet, curated paranet)
  • Loading branch information
nesovic authored Oct 24, 2024
2 parents c57dd46 + c297cb5 commit f746b89
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 69 deletions.
96 changes: 70 additions & 26 deletions controllers/knowledgeBankController.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,77 @@ exports.getDatasets = async (req, res) => {

exports.getAssets = async (req, res) => {
try {
const edgeNodePublishMode = req.user.config.find(
item => item.option === 'edge_node_publish_mode'
).value || null;
const paranetUAL = req.user.config.find(
item => item.option === 'edge_node_paranet_ual'
).value || null;

const offset = parseInt(req.query.offset) || 0;
const limit = parseInt(req.query.limit) || 10;

const assets = await internalSequelize.sequelize.query(
`WITH RankedAssets AS (SELECT *,
ROW_NUMBER() OVER (PARTITION BY ual ORDER BY created_at DESC, id DESC) AS row_num
FROM synced_assets)
if(edgeNodePublishMode === 'public') {

const { count, rows: assets } = await Asset.findAndCountAll({
attributes: [
'id',
'ual',
[Sequelize.col('assertion_id'), 'public_assertion_id'],
[Sequelize.col('created_at'), 'backend_synced_at']
],
where: {
publishing_status: "COMPLETED"
},
limit: limit,
offset: offset
});


res.json({
totalItems: count,
offset: offset,
limit: limit,
data: assets
});
} else {
const assets = await internalSequelize.sequelize.query(
`WITH RankedAssets AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY ual ORDER BY created_at DESC, id DESC) AS row_num
FROM synced_assets
WHERE paranet_ual = :paranetUAL -- Add the filter for paranet_ual
)
SELECT *
FROM RankedAssets
WHERE row_num = 1
ORDER BY created_at DESC, id DESC
LIMIT ${limit}
OFFSET ${offset}`,
{
type: internalSequelize.Sequelize.QueryTypes.SELECT
}
);
{
type: internalSequelize.Sequelize.QueryTypes.SELECT,
replacements: { paranetUAL }
}
);

const total = await internalSequelize.sequelize.query(
`select ual, count(*)
from synced_assets
group by ual`,
{
type: internalSequelize.Sequelize.QueryTypes.SELECT
}
);
const total = await internalSequelize.sequelize.query(
`SELECT ual, COUNT(*)
FROM synced_assets
WHERE paranet_ual = :paranetUAL -- Add the filter for paranet_ual
GROUP BY ual`,
{
type: internalSequelize.Sequelize.QueryTypes.SELECT,
replacements: { paranetUAL }
}
);

res.json({
totalItems: total.length,
offset: offset,
limit: limit,
data: assets
});
res.json({
totalItems: total.length,
offset: offset,
limit: limit,
data: assets
});
}
} catch (error) {
console.error('Error fetching paginated assets:', error);
res.status(500).json({ error: 'Failed to fetch assets' });
Expand All @@ -74,9 +112,15 @@ exports.previewAssetExternal = async (req, res) => {
const formattedUserConfig = publishService.setUserConfig(userConfig);

const DkgClient = publishService.initDkgClient(blockchain);
let result = await DkgClient.asset.get(assetUAL, {
paranetUAL: formattedUserConfig.edge_node_paranet_ual
});
let result;
if(formattedUserConfig.edge_node_publish_mode === "public") {
result = await DkgClient.asset.get(assetUAL);
} else {
result = await DkgClient.asset.get(assetUAL, {
paranetUAL: formattedUserConfig.edge_node_paranet_ual
});
}

let formattedKnowledgeAsset = {};
formattedKnowledgeAsset.private = result.private.assertion;
formattedKnowledgeAsset.public = result.public.assertion;
Expand Down Expand Up @@ -266,7 +310,7 @@ exports.confirmAndCreateAssets = async (req, res) => {
if (operationStatus === OPERATION_STATUSES.COMPLETED) {
const vectorizationEnabled = req.user.config.find(
item => item.option === 'vectorization_enabled'
).value;
).value || null;
if (vectorizationEnabled === 'true') {
await vectorService.vectorizeKnowledgeAsset(
result,
Expand Down
18 changes: 9 additions & 9 deletions create-paranet.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
const DKG = require('dkg.js');
require('dotenv').config();
const {
PARANET_NODES_ACCESS_POLICY,
PARANET_MINERS_ACCESS_POLICY
} = require('dkg.js/constants.js');

let DkgClient = new DKG({
environment: '',
endpoint: '',
environment: process.env.DKG_ENV,
endpoint: process.env.RUNTIME_NODE_ENDPOINT,
port: '8900',
blockchain: {
name: 'base:84532',
publicKey: '',
privateKey:
''
publicKey: process.env.PUB_KEY,
privateKey: process.env.PRIV_KEY
},
maxNumberOfRetries: 30,
frequency: 2,
Expand All @@ -22,14 +22,14 @@ let DkgClient = new DKG({
let KAContent = {
public: {
'@context': ['https://schema.org'],
'@id': 'urn:id:paranet:4',
paranetName: 'Stable (Staging) Paranet',
paranetDescription: 'Stable (Staging) Paranet'
'@id': 'urn:id:paranet:5',
paranetName: 'Testnet Paranet 5',
paranetDescription: 'Testnet Paranet 5'
}
};

// Operational wallet public key of the node
const NODE1_PUBLIC_KEY = '0xbdd9a9bdacb827890c33dff74a69c6866f1022d2';
const NODE1_PUBLIC_KEY = '';

async function createParanet() {
let KA = await DkgClient.asset.create(KAContent, {
Expand Down
28 changes: 23 additions & 5 deletions services/publishService.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class PublishService {
case 'internal':
return this.internalPublishService(
asset,
this.userConfig.edge_node_publish_mode,
this.userConfig.edge_node_paranet_ual,
wallet
);
Expand All @@ -48,6 +49,7 @@ class PublishService {
default:
return this.internalPublishService(
asset,
this.userConfig.edge_node_publish_mode,
this.userConfig.edge_node_paranet_ual,
wallet
);
Expand Down Expand Up @@ -87,11 +89,27 @@ class PublishService {
}
}

async internalPublishService(asset, paranetUAL, wallet = null) {
return await this.dkgClient.asset.localStore(asset, {
epochsNum: 2,
paranetUAL: paranetUAL
});
async internalPublishService(asset, edgeNodePublishMode, paranetUAL, wallet = null) {
switch (edgeNodePublishMode) {
case "public":
return await this.dkgClient.asset.create(asset, {
epochsNum: 2
});
case "paranet":
return await this.dkgClient.asset.create(asset, {
epochsNum: 2,
paranetUAL: paranetUAL
});
case "curated_paranet":
return await this.dkgClient.asset.localStore(asset, {
epochsNum: 2,
paranetUAL: paranetUAL
});
default:
return await this.dkgClient.asset.create(asset, {
epochsNum: 2
});
}
}

definePublishType(endpoint) {
Expand Down
86 changes: 57 additions & 29 deletions sync-assets-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const sequelize = require('sequelize');
const { SyncedAsset, Notification } = require('./models');
const { Queue, Worker } = require('bullmq');
const redis = require('ioredis');
const axios = require('axios');
const connection = new redis({
maxRetriesPerRequest: null
});
Expand All @@ -18,12 +19,34 @@ new Worker(
'syncQueue',
async (job) => {
console.log(`Starting sync job...Time: ${job.data.timestamp}`);
console.log(`Checking Edge node publish mode...Time: ${job.data.timestamp}`);

const publicConfig = await axios.get(
`${process.env.AUTH_SERVICE_ENDPOINT}/auth/params/public`
);
const edgeNodePublishMode = publicConfig.data.config.find(
item => item.option === 'edge_node_publish_mode'
).value || null;

if(edgeNodePublishMode === "public") {
console.log(`Edge node publish mode is public, aborting sync operation...Time: ${job.data.timestamp}`);
return;
}

const paranetUAL = publicConfig.data.config.find(
item => item.option === 'edge_node_paranet_ual'
).value || null;

try {
let internalSyncedAssets = await SyncedAsset.count();
if (internalSyncedAssets === 0) {
const internalSyncedAssets = await SyncedAsset.findAll({
where: {
paranet_ual: paranetUAL,
},
});
if (internalSyncedAssets.length === 0) {
console.log(`First time query...`);
const assets = await externalSequelize.query(
getInitialQuery(),
getInitialQuery(paranetUAL),
{
type: QueryTypes.SELECT
}
Expand All @@ -32,13 +55,16 @@ new Worker(
let notification = await storeNotification(assets);
await storeSyncedAssets(assets, notification);
}
} else if (internalSyncedAssets > 0) {
} else if (internalSyncedAssets.length > 0) {
const lastSyncedAsset = await SyncedAsset.findOne({
where: {
paranet_ual: paranetUAL,
},
order: [['id', 'DESC']]
});
const date = new Date(lastSyncedAsset.backend_synced_at);
const utcDateString = date.toISOString().replace('Z', '').replace('T', ' ').slice(0, 19);
const assets = await externalSequelize.query(getNextQuery(utcDateString)
const assets = await externalSequelize.query(getNextQuery(utcDateString, paranetUAL)
,
{
type: QueryTypes.SELECT
Expand All @@ -60,13 +86,7 @@ new Worker(
}
);

// Add Jobs Every 30 Seconds
setInterval(async () => {
console.log('Queueing sync job...');
await syncQueue.add('syncJob', { timestamp: Date.now() });
}, 10000);

const getInitialQuery = () => {
const getInitialQuery = (paranetUAL) => {
return `
SELECT sa.*
FROM paranet_synced_asset sa
Expand All @@ -77,26 +97,28 @@ const getInitialQuery = () => {
(SELECT ual, MAX(created_at)
FROM paranet_synced_asset
GROUP BY ual)
GROUP BY ual) latest
ON sa.id = latest.max_id`;
AND paranet_ual = '${paranetUAL}'
GROUP BY ual
) latest ON sa.id = latest.max_id;`;
};

const getNextQuery = (date) => {
const getNextQuery = (date, paranetUAL) => {
return `
SELECT sa.*
FROM paranet_synced_asset sa
INNER JOIN (
SELECT ual, MAX(id) AS max_id
FROM paranet_synced_asset
WHERE created_at > CONVERT_TZ('${date}', @@session.time_zone, '+00:00')
AND (ual, created_at) IN
(SELECT ual, MAX(created_at)
FROM paranet_synced_asset
WHERE created_at > CONVERT_TZ('${date}', @@session.time_zone, '+00:00')
GROUP BY ual)
GROUP BY ual
) latest
ON sa.id = latest.max_id;`;
SELECT sa.*
FROM paranet_synced_asset sa
INNER JOIN (
SELECT ual, MAX(id) AS max_id
FROM paranet_synced_asset
WHERE created_at > CONVERT_TZ('${date}', @@session.time_zone, '+00:00')
AND (ual, created_at) IN
(SELECT ual, MAX(created_at)
FROM paranet_synced_asset
WHERE created_at > CONVERT_TZ('${date}', @@session.time_zone, '+00:00')
GROUP BY ual)
AND paranet_ual = '${paranetUAL}'
GROUP BY ual
) latest ON sa.id = latest.max_id;
`;
};

function getCurrentTimeProperFormat() {
Expand Down Expand Up @@ -157,3 +179,9 @@ async function storeSyncedAssets(assets, notification) {
}
return true;
}

// Add Jobs Every 10 Seconds
setInterval(async () => {
console.log('Queueing sync job...');
await syncQueue.add('syncJob', { timestamp: Date.now() });
}, 10000);

0 comments on commit f746b89

Please sign in to comment.