forked from mpareja/gearshaft
-
Notifications
You must be signed in to change notification settings - Fork 0
/
document-projection.js
82 lines (65 loc) · 2.45 KB
/
document-projection.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
const assert = require('assert')
const { operationError } = require('../errors')
const { retry } = require('../retry')
const { StaleDocumentError } = require('../document-store')
const createError = operationError('document-projection')
exports.createDocumentProjection = (options) => {
assertOptions(options)
const {
documentStore,
entity: Entity,
identify,
log,
projection,
versionField = 'globalPosition'
} = options
// use projection-specific field name to avoid data type
// collisions across projections
const loggedIdField = Entity.name + 'Id'
const handler = async (message) => {
const id = identify(message)
const globalPosition = message.metadata.globalPosition
await retry([StaleDocumentError], async () => {
const foundDocument = await getDocument(id)
const doc = foundDocument || new Entity()
const save = foundDocument ? documentStore.update : documentStore.insert
const meta = {
foundDocumentVersion: foundDocument ? foundDocument[versionField] : undefined,
globalPosition,
[loggedIdField]: id,
messageId: message.id,
type: message.constructor.name
}
if (foundDocument && foundDocument[versionField] >= globalPosition) {
log.info(meta, `${Entity.name} document-projection: message ignored, already processed`)
return
}
projection.project(doc, message)
doc[versionField] = globalPosition
await save(doc)
log.info(meta, `${Entity.name} document-projection: document updated successfully`)
})
}
const registerHandlers = (register) => {
projection.registerHandlers((MessageType) => {
register(MessageType, handler)
})
}
const getDocument = async (id) => {
try {
return await documentStore.get(id)
} catch (inner) {
throw createError('error retrieving document', inner)
}
}
return { documentStore, registerHandlers, handler }
}
const errorMessage = (msg) => `document-projection: ${msg}`
const assertOptions = (options) => {
assert(typeof options === 'object', errorMessage('options required'))
assert(options.documentStore, errorMessage('documentStore required'))
assert(typeof options.entity === 'function', errorMessage('entity required'))
assert(typeof options.identify === 'function', errorMessage('identify required'))
assert(options.log, errorMessage('log required'))
assert(options.projection, errorMessage('projection required'))
}