forked from peerlibrary/meteor-peerdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.coffee
615 lines (477 loc) · 20.8 KB
/
server.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
globals = @
# From Meteor's random/random.js
UNMISTAKABLE_CHARS = '23456789ABCDEFGHJKLMNPQRSTWXYZabcdefghijkmnopqrstuvwxyz'
INSTANCES = parseInt(process.env.PEERDB_INSTANCES ? 1)
INSTANCE = parseInt(process.env.PEERDB_INSTANCE ? 0)
throw new Error "Invalid number of instances: #{ INSTANCES }" unless 0 <= INSTANCES <= UNMISTAKABLE_CHARS.length
throw new Error "Invalid instance index: #{ INSTANCE }" unless (INSTANCES is 0 and INSTANCE is 0) or 0 <= INSTANCE < INSTANCES
# TODO: Support also other types of _id generation (like ObjectID)
# TODO: We could do also a hash of an ID and then split, this would also prevent any DOS attacks by forcing IDs of a particular form
PREFIX = UNMISTAKABLE_CHARS.split ''
if INSTANCES > 1
range = UNMISTAKABLE_CHARS.length / INSTANCES
PREFIX = PREFIX[Math.round(INSTANCE * range)...Math.round((INSTANCE + 1) * range)]
MESSAGES_TTL = 60 # seconds
# We augment the cursor so that it matches our extra method in documents manager.
MeteorCursor = Object.getPrototypeOf(MongoInternals.defaultRemoteCollectionDriver().mongo.find()).constructor
MeteorCursor::exists = ->
# You can only observe a tailable cursor.
throw new Error "Cannot call exists on a tailable cursor" if @_cursorDescription.options.tailable
unless @_synchronousCursorForExists
# A special cursor with limit forced to 1 and fields to only _id.
cursorDescription = _.clone @_cursorDescription
cursorDescription.options = _.clone cursorDescription.options
cursorDescription.options.limit = 1
cursorDescription.options.fields =
_id: 1
@_synchronousCursorForExists = @_mongo._createSynchronousCursor cursorDescription,
selfForIteration: @
useTransform: false
@_synchronousCursorForExists._rewind()
!!@_synchronousCursorForExists._nextObject()
# Fields:
# created
# type
# data
# We use a lower case collection name to signal it is a system collection
globals.Document.Messages = new Mongo.Collection 'peerdb.messages'
# Auto-expire messages after MESSAGES_TTL seconds
globals.Document.Messages._ensureIndex
created: 1
,
expireAfterSeconds: MESSAGES_TTL
fieldsToProjection = (fields) ->
projection =
_id: 1 # In the case we want only id, that is, detect deletions
for field in fields
if _.isString field
projection[field] = 1
else
_.extend projection, field
projection
# TODO: Should we add retry?
globals.Document._observerCallback = (f) ->
return (obj, args...) ->
try
id = if _.isObject obj then obj._id else obj
# We call f only if the first character of id is in PREFIX.
# By that we allow each instance to operate only on a subset
# of documents, allowing simple coordination while scaling.
f obj, args... if id[0] in PREFIX
catch e
Log.error "PeerDB exception: #{ e }: #{ util.inspect args, depth: 10 }"
Log.error e.stack
extractValue = (obj, path) ->
while path.length
obj = obj[path[0]]
path = path[1..]
obj
# Cannot use => here because we are not in the globals.Document._TargetedFieldsObservingField context.
# We have to modify prototype directly because there are classes which already inherit from the class
# and we cannot just override the class as we are doing for other server-side only methods.
globals.Document._TargetedFieldsObservingField::_setupTargetObservers = (updateAll) ->
if not updateAll and @ instanceof globals.Document._ReferenceField
index = {}
index["#{ @sourcePath }._id"] = 1
@sourceCollection._ensureIndex index
if @reverseName
index = {}
index["#{ @reverseName }._id"] = 1
@targetCollection._ensureIndex index
initializing = true
observers =
added: globals.Document._observerCallback (id, fields) =>
@updateSource id, fields if updateAll or not initializing
unless updateAll
observers.changed = globals.Document._observerCallback (id, fields) =>
@updateSource id, fields
observers.removed = globals.Document._observerCallback (id) =>
@removeSource id
referenceFields = fieldsToProjection @fields
handle = @targetCollection.find({}, fields: referenceFields).observeChanges observers
initializing = false
handle.stop() if updateAll
# Cannot use => here because we are not in the globals.Document._Trigger context.
# We are modifying prototype directly to match code style of
# _TargetedFieldsObservingField::_setupTargetObservers but in this case it is not
# really needed, because there are no already existing classes which would inherit
# from globals.Document._Trigger.
globals.Document._Trigger::_setupObservers = ->
initializing = true
queryFields = fieldsToProjection @fields
@collection.find({}, fields: queryFields).observe
added: globals.Document._observerCallback (document) =>
@trigger document, new @document({}) unless initializing
changed: globals.Document._observerCallback (newDocument, oldDocument) =>
@trigger newDocument, oldDocument
removed: globals.Document._observerCallback (oldDocument) =>
@trigger new @document({}), oldDocument
initializing = false
class globals.Document._Trigger extends globals.Document._Trigger
trigger: (newDocument, oldDocument) =>
@generator newDocument, oldDocument
class globals.Document._ReferenceField extends globals.Document._ReferenceField
updateSource: (id, fields) =>
# Just to be sure
return if _.isEmpty fields
selector = {}
selector["#{ @sourcePath }._id"] = id
update = {}
if @inArray
for field, value of fields
path = "#{ @ancestorArray }.$#{ @arraySuffix }.#{ field }"
if _.isUndefined value
update.$unset ?= {}
update.$unset[path] = ''
else
update.$set ?= {}
update.$set[path] = value
# We cannot use top-level $or with $elemMatch
# See: https://jira.mongodb.org/browse/SERVER-11537
selector[@ancestorArray] ?= {}
selector[@ancestorArray].$elemMatch ?=
$or: []
s = {}
# We have to repeat id selector here as well
# See: https://jira.mongodb.org/browse/SERVER-11536
s["#{ @arraySuffix }._id".substring(1)] = id
# Remove initial dot with substring(1)
if _.isUndefined value
s["#{ @arraySuffix }.#{ field }".substring(1)] =
$exists: true
else
s["#{ @arraySuffix }.#{ field }".substring(1)] =
$ne: value
selector[@ancestorArray].$elemMatch.$or.push s
# $ operator updates only the first matching element in the array,
# so we have to loop until nothing changes
# See: https://jira.mongodb.org/browse/SERVER-1243
loop
break unless @sourceCollection.update selector, update, multi: true
else
for field, value of fields
path = "#{ @sourcePath }.#{ field }"
s = {}
if _.isUndefined value
update.$unset ?= {}
update.$unset[path] = ''
s[path] =
$exists: true
else
update.$set ?= {}
update.$set[path] = value
s[path] =
$ne: value
selector.$or ?= []
selector.$or.push s
@sourceCollection.update selector, update, multi: true
removeSource: (id) =>
selector = {}
selector["#{ @sourcePath }._id"] = id
# If it is an array or a required field of a subdocument is in an array, we remove references from an array
if @isArray or (@required and @inArray)
update =
$pull: {}
update.$pull[@ancestorArray] = {}
# @arraySuffix starts with a dot, so with .substring(1) we always remove a dot
update.$pull[@ancestorArray]["#{ @arraySuffix or '' }._id".substring(1)] = id
@sourceCollection.update selector, update, multi: true
# If it is an optional field of a subdocument in an array, we set it to null
else if not @required and @inArray
path = "#{ @ancestorArray }.$#{ @arraySuffix }"
update =
$set: {}
update.$set[path] = null
# $ operator updates only the first matching element in the array.
# So we have to loop until nothing changes.
# See: https://jira.mongodb.org/browse/SERVER-1243
loop
break unless @sourceCollection.update selector, update, multi: true
# If it is an optional reference, we set it to null
else if not @required
update =
$set: {}
update.$set[@sourcePath] = null
@sourceCollection.update selector, update, multi: true
# Else, we remove the whole document
else
@sourceCollection.remove selector
updatedWithValue: (id, value) =>
unless _.isObject(value) and _.isString(value._id)
# Optional field
return if _.isNull(value) and not @required
# TODO: This is not triggered if required field simply do not exist or is set to undefined (does MongoDB support undefined value?)
Log.error "Document '#{ @sourceDocument.Meta._name }' '#{ id }' field '#{ @sourcePath }' was updated with an invalid value: #{ util.inspect value }"
return
# Only _id is requested, we do not have to do anything
unless _.isEmpty @fields
referenceFields = fieldsToProjection @fields
target = @targetCollection.findOne value._id,
fields: referenceFields
transform: null
unless target
Log.error "Document '#{ @sourceDocument.Meta._name }' '#{ id }' field '#{ @sourcePath }' is referencing a nonexistent document '#{ value._id }'"
# TODO: Should we call reference.removeSource here? And remove from reverse fields?
return
# We omit _id because that field cannot be changed, or even $set to the same value, but is in target
@updateSource target._id, _.omit target, '_id'
return unless @reverseName
# TODO: Current code is run too many times, for any update of source collection reverse field is updated
# We just add the ID to the reverse field array and leave for any additional fields
# (@reverseFields) to be added by reference fields configured through Meta._reverseFields
selector =
_id: value._id
selector["#{ @reverseName }._id"] =
$ne: id
update = {}
update[@reverseName] =
_id: id
@targetCollection.update selector,
$addToSet: update
class globals.Document._GeneratedField extends globals.Document._GeneratedField
_updateSourceField: (id, fields) =>
[selector, sourceValue] = @generator fields
return unless selector
if @isArray and not _.isArray sourceValue
Log.error "Generated field '#{ @sourcePath }' defined as an array with selector '#{ selector }' was updated with a non-array value: #{ util.inspect sourceValue }"
return
if not @isArray and _.isArray sourceValue
Log.error "Generated field '#{ @sourcePath }' not defined as an array with selector '#{ selector }' was updated with an array value: #{ util.inspect sourceValue }"
return
update = {}
if _.isUndefined sourceValue
update.$unset = {}
update.$unset[@sourcePath] = ''
else
update.$set = {}
update.$set[@sourcePath] = sourceValue
@sourceCollection.update selector, update, multi: true
_updateSourceNestedArray: (id, fields) =>
assert @arraySuffix # Should be non-null
values = @generator fields
unless _.isArray values
Log.error "Value returned from the generator for field '#{ @sourcePath }' is not a nested array despite field being nested in an array: #{ util.inspect values }"
return
for [selector, sourceValue], i in values
continue unless selector
if _.isArray sourceValue
Log.error "Generated field '#{ @sourcePath }' not defined as an array with selector '#{ selector }' was updated with an array value: #{ util.inspect sourceValue }"
continue
path = "#{ @ancestorArray }.#{ i }#{ @arraySuffix }"
update = {}
if _.isUndefined sourceValue
update.$unset = {}
update.$unset[path] = ''
else
update.$set = {}
update.$set[path] = sourceValue
break unless @sourceCollection.update selector, update, multi: true
updateSource: (id, fields) =>
if _.isEmpty fields
fields._id = id
# TODO: Not completely correct when @fields contain multiple fields from same subdocument or objects with projections (they will be counted only once) - because Meteor always passed whole subdocuments we could count only top-level fields in @fields, merged with objects?
else if _.size(fields) isnt @fields.length
targetFields = fieldsToProjection @fields
fields = @targetCollection.findOne id,
fields: targetFields
transform: null
# There is a slight race condition here, document could be deleted in meantime.
# In such case we set fields as they are when document is deleted.
unless fields
fields =
_id: id
else
fields._id = id
# Only if we are updating value nested in a subdocument of an array we operate
# on the array. Otherwise we simply set whole array to the value returned.
if @inArray and not @isArray
@_updateSourceNestedArray id, fields
else
@_updateSourceField id, fields
removeSource: (id) =>
@updateSource id, {}
updatedWithValue: (id, value) =>
# Do nothing. Code should not be updating generated field by itself anyway.
class globals.Document extends globals.Document
@_sourceFieldProcessDeleted: (field, id, ancestorSegments, pathSegments, value) ->
if ancestorSegments.length
assert ancestorSegments[0] is pathSegments[0]
@_sourceFieldProcessDeleted field, id, ancestorSegments[1..], pathSegments[1..], value[ancestorSegments[0]]
else
value = [value] unless _.isArray value
ids = (extractValue(v, pathSegments)._id for v in value when extractValue(v, pathSegments)?._id)
assert field.reverseName
query =
_id:
$nin: ids
query["#{ field.reverseName }._id"] = id
update = {}
update[field.reverseName] =
_id: id
field.targetCollection.update query, {$pull: update}, multi: true
@_sourceFieldUpdated: (id, name, value, field, originalValue) ->
# TODO: Should we check if field still exists but just value is undefined, so that it is the same as null? Or can this happen only when removing the field?
if _.isUndefined value
if field?.reverseName
@_sourceFieldProcessDeleted field, id, [], name.split('.')[1..], originalValue
return
field = field or @Meta.fields[name]
# We should be subscribed only to those updates which are defined in @Meta.fields
assert field
originalValue = originalValue or value
if field instanceof globals.Document._ObservingField
if field.ancestorArray and name is field.ancestorArray
unless _.isArray value
Log.error "Document '#{ @Meta._name }' '#{ id }' field '#{ name }' was updated with a non-array value: #{ util.inspect value }"
return
else
value = [value]
for v in value
field.updatedWithValue id, v
if field.reverseName
# In updatedWithValue we added possible new entry/ies to reverse fields, but here
# we have also to remove those which were maybe removed from the value and are
# not referencing anymore a document which got added the entry to its reverse
# field in the past. So we make sure that only those documents which are still in
# the value have the entry in their reverse fields by creating a query which pulls
# the entry from all other.
pathSegments = name.split('.')
if field.ancestorArray
ancestorSegments = field.ancestorArray.split('.')
assert ancestorSegments[0] is pathSegments[0]
@_sourceFieldProcessDeleted field, id, ancestorSegments[1..], pathSegments[1..], originalValue
else
@_sourceFieldProcessDeleted field, id, [], pathSegments[1..], originalValue
else if field not instanceof globals.Document._Field
value = [value] unless _.isArray value
# If value is an array but it should not be, we cannot do much else.
# Same goes if the value does not match structurally fields.
for v in value
for n, f of field
# TODO: Should we skip calling @_sourceFieldUpdated if we already called it with exactly the same parameters this run?
@_sourceFieldUpdated id, "#{ name }.#{ n }", v[n], f, originalValue
@_sourceUpdated: (id, fields) ->
for name, value of fields
@_sourceFieldUpdated id, name, value
@_setupSourceObservers: (updateAll) ->
return if _.isEmpty @Meta.fields
indexes = []
sourceFields =
_id: 1 # To make sure we do not pass empty set of fields
sourceFieldsWalker = (obj) ->
for name, field of obj
if field instanceof globals.Document._ObservingField
sourceFields[field.sourcePath] = 1
if field instanceof globals.Document._ReferenceField
index = {}
index["#{ field.sourcePath }._id"] = 1
indexes.push index
else if field not instanceof globals.Document._Field
sourceFieldsWalker field
sourceFieldsWalker @Meta.fields
unless updateAll
for index in indexes
@Meta.collection._ensureIndex index
initializing = true
observers =
added: globals.Document._observerCallback (id, fields) =>
@_sourceUpdated id, fields if updateAll or not initializing
unless updateAll
observers.changed = globals.Document._observerCallback (id, fields) =>
@_sourceUpdated id, fields
handle = @Meta.collection.find({}, fields: sourceFields).observeChanges observers
initializing = false
handle.stop() if updateAll
@updateAll: ->
sendMessage 'updateAll'
@_updateAll: ->
# It is only reasonable to run anything if this instance
# is not disabled. Otherwise we would still go over all
# documents, just we would not process any.
return if globals.Document.instanceDisabled
Log.info "Updating all references..."
setupObservers true
Log.info "Done"
prepared = false
prepareList = []
started = false
startList = []
@prepare: (f) ->
if prepared
f()
else
prepareList.push f
@runPrepare: ->
assert not prepared
prepared = true
prepare() for prepare in prepareList
return
@startup: (f) ->
if started
f()
else
startList.push f
@runStartup: ->
assert not started
started = true
start() for start in startList
return
# TODO: What happens if this is called multiple times? We should make sure that for each document observrs are made only once
setupObservers = (updateAll) ->
setupTriggerObserves = (triggers) ->
for name, trigger of triggers
trigger._setupObservers()
setupTargetObservers = (fields) ->
for name, field of fields
# There are no arrays anymore here, just objects (for subdocuments) or fields
if field instanceof globals.Document._TargetedFieldsObservingField
field._setupTargetObservers updateAll
else if field not instanceof globals.Document._Field
setupTargetObservers field
for document in globals.Document.list
# We setup triggers only when we are not updating all
setupTriggerObserves document.Meta.triggers unless updateAll
# For fields we pass updateAll on
setupTargetObservers document.Meta.fields
document._setupSourceObservers updateAll
sendMessage = (type, data) ->
globals.Document.Messages.insert
created: moment.utc().toDate()
type: type
data: data
setupMessages = ->
initializing = true
globals.Document.Messages.find({}).observeChanges
added: (id, fields) ->
return if initializing
switch fields.type
when 'updateAll'
globals.Document._updateAll()
else
Log.error "Unknown message type '#{ fields.type }': " + util.inspect _.extend({}, {_id: id}, fields), false, null
initializing = false
globals.Document.instanceDisabled = INSTANCES is 0
globals.Document.instances = INSTANCES
Meteor.startup ->
# To try delayed references one last time, throwing any exceptions
# (Otherwise setupObservers would trigger strange exceptions anyway)
globals.Document.defineAll()
# We first have to setup messages, so that migrations can run properly
# (if they call updateAll, the message should be listened for)
setupMessages() unless globals.Document.instanceDisabled
globals.Document.runPrepare()
if globals.Document.instanceDisabled
Log.info "Skipped observers"
# To make sure everything is really skipped
PREFIX = []
else
if globals.Document.instances is 1
Log.info "Enabling observers..."
else
Log.info "Enabling observers, instance #{ INSTANCE }/#{ globals.Document.instances }, matching ID prefix: #{ PREFIX.join '' }"
setupObservers()
Log.info "Done"
globals.Document.runStartup()
Document = globals.Document
assert globals.Document._ReferenceField.prototype instanceof globals.Document._TargetedFieldsObservingField
assert globals.Document._GeneratedField.prototype instanceof globals.Document._TargetedFieldsObservingField