This repository has been archived by the owner on May 3, 2022. It is now read-only.
forked from decentralized-identity/element
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstorage-manager.js
125 lines (112 loc) · 3.33 KB
/
storage-manager.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
const { objectToMultihash } = require('../../func');
class StorageManager {
constructor(db, storage, options) {
this.db = db;
this.storage = storage;
this.options = options || { autoPersist: false, retryIntervalSeconds: 5 };
this.logger = console;
}
retryUntilDone() {
this.interval = setInterval(async () => {
const pendingCount = (await this.getNotPersisted()).length;
if (pendingCount === 0) {
clearInterval(this.interval);
return false;
}
await this.retryAllNotPersisted();
await this.db.awaitableSync();
return true;
}, 1 * this.options.retryIntervalSeconds * 1000); // every 5 seconds
}
async getNotPersisted() {
return (await this.db.readCollection(
'element:sidetree:cas-cachable'
)).filter(doc => doc.persisted === false);
}
async retryAllNotPersisted() {
const allUnPersisted = await this.getNotPersisted();
await Promise.all(
allUnPersisted.map(async item => {
try {
const cid = await this.storage.write(item.object);
if (cid !== item.multihash) {
throw new Error('CID is not valid.');
}
await this.db.write(item.id, {
type: 'element:sidetree:cas-cachable',
multihash: item.multihash,
object: item.object,
persisted: true,
});
} catch (e) {
// console.log('still failing');
}
})
);
}
async write(object) {
const key = await objectToMultihash(object);
const cacheWriteResult = await this.db.write(
`element:sidetree:cas-cachable:${key}`,
{
type: 'element:sidetree:cas-cachable',
multihash: key,
object,
persisted: false,
}
);
// console.log('cacheWriteResult: ', cacheWriteResult);
try {
const cid = await this.storage.write(object);
if (cid !== key || cid !== cacheWriteResult.multihash) {
throw new Error('CID is not valid.');
}
await this.db.write(`element:sidetree:cas-cachable:${key}`, {
type: 'element:sidetree:cas-cachable',
multihash: key,
object,
persisted: true,
});
} catch (e) {
// ipfs failed, check options and maybe retry forever...
if (this.options.autoPersist) {
this.retryUntilDone();
}
}
return key;
}
async read(cid) {
try {
const data = await this.db.read(`element:sidetree:cas-cachable:${cid}`);
// maybe data was created on another node, try to read ipfs.
if (data === null || data === undefined) {
const fromStorage = await this.storage.read(cid);
if (fromStorage) {
await this.db.write(`element:sidetree:cas-cachable:${cid}`, {
type: 'element:sidetree:cas-cachable',
multihash: cid,
object: fromStorage,
persisted: true,
});
return fromStorage;
}
}
if (data.persisted) {
return data.object;
}
this.logger.warn(
`Data returned from manager, but not persisted... ${JSON.stringify(
data
)}`
);
return data.object;
} catch (e) {
throw new Error('Could not read element:sidetree:cas-cachable');
}
}
close() {
this.storage.close();
this.db.close();
}
}
module.exports = StorageManager;