Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE #981

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,195 changes: 546 additions & 649 deletions package-lock.json

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,23 @@
"winston-daily-rotate-file": "^4.7.1"
},
"devDependencies": {
"@babel/cli": "^7.23.0",
"@babel/core": "^7.23.2",
"@babel/plugin-syntax-import-attributes": "^7.22.5",
"@babel/preset-env": "^7.23.2",
"@babel/cli": "^7.23.4",
"@babel/core": "^7.23.5",
"@babel/plugin-syntax-import-attributes": "^7.23.3",
"@babel/preset-env": "^7.23.5",
"@babel/register": "^7.22.15",
"@commitlint/cli": "^18.0.0",
"@commitlint/config-conventional": "^18.0.0",
"@commitlint/cli": "^18.4.3",
"@commitlint/config-conventional": "^18.4.3",
"babel-plugin-module-resolver": "^5.0.0",
"chai": "^4.3.10",
"chai-http": "^4.4.0",
"eslint": "^8.52.0",
"eslint": "^8.55.0",
"eslint-plugin-es": "^4.1.0",
"eslint-plugin-mocha": "^10.2.0",
"husky": "^8.0.3",
"mocha": "^10.2.0",
"semver": "^7.5.4",
"sinon": "^17.0.0",
"sinon": "^17.0.1",
"socket.io-client": "^4.7.2",
"standard-version": "^9.5.0",
"supertest": "^6.3.3"
Expand Down
113 changes: 77 additions & 36 deletions src/datalayer/persistance.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,32 @@ const getMirrors = async (storeId) => {
}
};

const clearPendingRoots = async (storeId) => {
const url = `${CONFIG.DATALAYER_URL}/clear_pending_roots`;
const { cert, key, timeout } = getBaseOptions();

try {
const response = await superagent
.post(url)
.key(key)
.cert(cert)
.timeout(timeout)
.send({ store_id: storeId });

const data = response.body;

if (data.success) {
return true;
}

logger.error(`Unable to clear pending root for ${storeId}`);
return false;
} catch (error) {
logger.error(error);
return false;
}
};

const addMirror = async (storeId, url, forceAddMirror = false) => {
await wallet.waitForAllTransactionsToConfirm();
const homeOrg = await Organization.getHomeOrg();
Expand Down Expand Up @@ -398,51 +424,65 @@ const getRoots = async (storeIds) => {
};

const pushChangeListToDataLayer = async (storeId, changelist) => {
try {
await wallet.waitForAllTransactionsToConfirm();
let attempts = 0;
const maxAttempts = 5;

const url = `${CONFIG.DATALAYER_URL}/batch_update`;
const { cert, key, timeout } = getBaseOptions();
while (attempts < maxAttempts) {
try {
await wallet.waitForAllTransactionsToConfirm();

const response = await superagent
.post(url)
.key(key)
.cert(cert)
.timeout(timeout)
.send({
changelist,
id: storeId,
fee: _.get(CONFIG, 'DEFAULT_FEE', 300000000),
});
const url = `${CONFIG.DATALAYER_URL}/batch_update`;
const { cert, key, timeout } = getBaseOptions();

const data = response.body;
const response = await superagent
.post(url)
.key(key)
.cert(cert)
.timeout(timeout)
.send({
changelist,
id: storeId,
fee: _.get(CONFIG, 'DEFAULT_FEE', 300000000),
});

console.log(data);
const data = response.body;
console.log(data);

if (data.success) {
logger.info(
`Success!, Changes were submitted to the datalayer for storeId: ${storeId}`,
);
return true;
}
if (data.success) {
logger.info(
`Success!, Changes were submitted to the datalayer for storeId: ${storeId}`,
);
return true;
}

if (data.error.includes('Key already present')) {
logger.info(
`The datalayer key was already present, its possible your data was pushed to the datalayer but never broadcasted to the blockchain. This can create a mismatched state in your node.`,
if (data.error.includes('Key already present')) {
logger.info('Pending root detected, waiting 5 seconds and retrying');
const rootsCleared = await clearPendingRoots(storeId);

if (rootsCleared) {
attempts++;
await new Promise((resolve) => setTimeout(resolve, 5000));
continue; // Retry
}
}

logger.error(
`There was an error pushing your changes to the datalayer, ${JSON.stringify(
data,
)}`,
);
return true;
return false;
} catch (error) {
logger.error(error.message);
logger.info('There was an error pushing your changes to the datalayer');
return false;
}

logger.error(
`There was an error pushing your changes to the datalayer, ${JSON.stringify(
data,
)}`,
);
return false;
} catch (error) {
logger.error(error.message);
logger.info('There was an error pushing your changes to the datalayer');
}

logger.error(
'Maximum attempts reached. Unable to push changes to the datalayer.',
);
return false;
};

const createDataLayerStore = async () => {
Expand Down Expand Up @@ -688,5 +728,6 @@ export {
cancelOffer,
verifyOffer,
takeOffer,
clearPendingRoots,
getValue,
};
172 changes: 2 additions & 170 deletions src/datalayer/syncService.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import _ from 'lodash';

import { decodeHex, decodeDataLayerResponse } from '../utils/datalayer-utils';
import { Organization, Staging, ModelKeys, Simulator } from '../models';
import { decodeDataLayerResponse } from '../utils/datalayer-utils';
import { Simulator } from '../models';
import { getConfig } from '../utils/config-loader';
import { logger } from '../config/logger.cjs';

Expand All @@ -11,171 +11,6 @@ import * as simulator from './simulator';
const { USE_SIMULATOR } = getConfig().APP;

const POLLING_INTERVAL = 5000;
const frames = ['-', '\\', '|', '/'];

const startDataLayerUpdatePolling = async () => {
logger.info('Start Datalayer Update Polling');
const updateStoreInfo = await dataLayerWasUpdated();
if (updateStoreInfo.length) {
await Promise.all(
updateStoreInfo.map(async (store) => {
logger.info(
`Updates found syncing storeId: ${store.storeId} ${
frames[Math.floor(Math.random() * 3)]
}`,
);
await syncDataLayerStoreToClimateWarehouse(
store.storeId,
store.rootHash,
);

console.log('UPDATE STORE', store.storeId, store.rootHash);
await Organization.update(
{ registryHash: store.rootHash },
{ where: { registryId: store.storeId } },
);
}),
);
}
};

const syncDataLayerStoreToClimateWarehouse = async (storeId, rootHash) => {
let storeData;

if (USE_SIMULATOR) {
storeData = await simulator.getStoreData(storeId, rootHash);
} else {
storeData = await dataLayer.getStoreData(storeId, rootHash);
}

if (!_.get(storeData, 'keys_values', []).length) {
return;
}

const organizationToTruncate = await Organization.findOne({
attributes: ['orgUid'],
where: { registryId: storeId },
raw: true,
});

try {
if (_.get(organizationToTruncate, 'orgUid')) {
const truncateOrganizationPromises = Object.keys(ModelKeys).map((key) =>
ModelKeys[key].destroy({
where: { orgUid: organizationToTruncate.orgUid },
}),
);

await Promise.all(truncateOrganizationPromises);

await Promise.all(
storeData.keys_values.map(async (kv) => {
const key = decodeHex(kv.key.replace(`${storeId}_`, ''));
const modelKey = key.split('|')[0];
let value;

try {
value = JSON.parse(decodeHex(kv.value));
} catch (err) {
console.trace(err);
logger.error(`Cant parse json value: ${decodeHex(kv.value)}`);
}

if (ModelKeys[modelKey]) {
await ModelKeys[modelKey].upsert(value);

const stagingUuid =
modelKey === 'unit'
? value.warehouseUnitId
: modelKey === 'project'
? value.warehouseProjectId
: undefined;

if (stagingUuid) {
await Staging.destroy({
where: { uuid: stagingUuid },
});
}
}
}),
);

// clean up any staging records than involved delete commands,
// since we cant track that they came in through the uuid,
// we can infer this because diff.original is null instead of empty object.
await Staging.cleanUpCommitedAndInvalidRecords();
}
} catch (error) {
console.trace('ERROR DURING SYNC TRANSACTION', error);
}
};

const dataLayerWasUpdated = async () => {
const organizations = await Organization.findAll({
attributes: ['registryId', 'registryHash'],
where: { subscribed: true },
raw: true,
});

// exit early if there are no subscribed organizations
if (!organizations.length) {
return [];
}

const subscribedOrgIds = organizations.map((org) => org.registryId);

if (!subscribedOrgIds.length) {
return [];
}

let rootResponse;
if (USE_SIMULATOR) {
rootResponse = await simulator.getRoots(subscribedOrgIds);
} else {
rootResponse = await dataLayer.getRoots(subscribedOrgIds);
}

if (!rootResponse.success) {
return [];
}

const updatedStores = rootResponse.root_hashes.filter((rootHash) => {
const org = organizations.find(
(org) => org.registryId == rootHash.id.replace('0x', ''),
);

if (org) {
// When a transfer is made, the climate warehouse is locked from making updates
// while waiting for the transfer to either be completed or rejected.
// This means that we know the transfer completed when the root hash changed
// and we can remove it from the pending staging table.
if (org.isHome == 1 && org.registryHash != rootHash.hash) {
Staging.destroy({ where: { isTransfer: true } });
}

// store has been updated if its confirmed and the hash has changed
return rootHash.confirmed && org.registryHash != rootHash.hash;
}

return false;
});

if (!updatedStores.length) {
return [];
}

const updateStoreInfo = await Promise.all(
updatedStores.map(async (rootHash) => {
const storeId = rootHash.id.replace('0x', '');
return {
storeId,
rootHash: rootHash.hash,
};
}),
);

return updateStoreInfo;
};

const unsubscribeFromDataLayerStore = async (storeId) => {
if (!USE_SIMULATOR) {
Expand Down Expand Up @@ -399,9 +234,6 @@ export const waitForAllTransactionsToConfirm = async () => {
};

export default {
startDataLayerUpdatePolling,
syncDataLayerStoreToClimateWarehouse,
dataLayerWasUpdated,
subscribeToStoreOnDataLayer,
getSubscribedStoreData,
getRootHistory,
Expand Down
Loading
Loading