Skip to content

Commit

Permalink
Merged in feature-add-chain-function (pull request #1)
Browse files Browse the repository at this point in the history
added chain function for PubSub
  • Loading branch information
ravitejag committed Jul 21, 2015
2 parents 494fd01 + 87dae1f commit 17a8e3a
Showing 1 changed file with 99 additions and 0 deletions.
99 changes: 99 additions & 0 deletions packages/lib/PubSub.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* lib.PubSub.
*/

jsio('import lib.Callback as Callback');
from ..std.uuid import uuid;

var ctx = jsio.__env.global,
Expand Down Expand Up @@ -123,12 +124,110 @@ exports = Class(function () {
return this.subscribeOnce(type, this, f);
};

this.chain = function (evnt, func) {
var i,
cbs = this._callbacks || (this._callbacks = {}),
callback = new Callback(),
next = bind(this, function (val, cancel) {
var cb = this._callbacks[evnt][i] || {
// i is pointing to the end of the queue, so creating custom cb object.
fire: bind(this, function (val, cancel) {
// call pending callbacks from the next event.

var current = this._callbacks[evnt],
last = current[i];

// if we are chaining a second signal, that mapping cb will be added
// to the end of current event's queue, which becomes i'th value.
if (last) {
last.fire(cancel);
last.clear();
// this is to remove the last cb which is just a mapping
// between event queues.
current.pop();
}
// remove current event
this._activeCBs.shift();
})
};

// execution of next function in the queue can be aborted
// by passing true as a paramter to the cb.
if (cancel) {
// even if we want to cancel, we need to fire so that
// we can reset all the callbacks
cb.fire(val, cancel);
} else {
// call the registered callback function with value and
// fire function.
// binding val to fire so that next function in the chain will also
// get the value.
func(val, bind(cb, cb.fire, val));
}

callback.reset();
});

callback.run(next);

if (!cbs[evnt]) {
// we need to register only once, remaining will be taken care
// by the next function.
cbs[evnt] = [];
this.on(evnt, bind(this, function (val) {
var active_cbs = this._activeCBs || (this._activeCBs = []),
len = active_cbs.length,
pending, last;

// add to queue.
// this is needed for multi signal chaining
active_cbs.push(evnt);
if (len === 0) {

// if there is no active function, fire cb immedietly.
callback.fire(val);
} else {
// some other execution is ongoing, add to the queue.
last = active_cbs[len - 1];
pending = new Callback();
pending.run(bind(callback, callback.fire, val));
// add to the end of last event's queue
this._callbacks[last].push(pending);
}
}));
}
i = cbs[evnt].push(callback);
};

this.removeListener = function (type, f) {
this.unsubscribe(type, this, f);
return this;
};

this.removeAllListeners = function (type) {
var cbs = this._callbacks;

if (cbs) {
for (var evnt in cbs) {
if (type == null || type == evnt) {
for (var i = 0; i < cbs[evnt].length; i++) {
cbs[type][i].clear();
}
delete this._callbacks[evnt];
}
}
}

if (this._activeCBs) {
if (type == null) {
this._activeCBs.length = 0;
} else {
this._activeCBs = this._activeCBs.filter(function (event) {
return event !== type;
});
}
}

if (this._subscribers) {
for (var k in this._subscribers) {
if (type == null || type == k) {
Expand Down

0 comments on commit 17a8e3a

Please sign in to comment.