Skip to content

Commit

Permalink
live query fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood committed Mar 10, 2016
1 parent 23c1679 commit f86f13a
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 10 deletions.
42 changes: 32 additions & 10 deletions lib/transport/binary/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ var net = require('net'),
OperationStatus = require('./operation-status'),
EventEmitter = require('events').EventEmitter,
Promise = require('bluebird'),
Operation = require('./protocol28/operation'); //TODO refactor this!!!
Operation;


function Connection(config) {
Expand Down Expand Up @@ -199,7 +199,7 @@ Connection.prototype.negotiateProtocol = function () {
this.logger.debug('server protocol: ' + this.protocolVersion);
if (this.protocolVersion >= 33) {
this.protocol = require('./protocol33');
}else if (this.protocolVersion >= 28) {
} else if (this.protocolVersion >= 28) {
this.protocol = require('./protocol28');
}
else if (this.protocolVersion === 26) {
Expand Down Expand Up @@ -274,21 +274,31 @@ Connection.prototype.bindToSocket = function () {
* @param {Buffer} data The data received from the server.
*/
Connection.prototype.handleSocketData = function (data) {
var buffer, result, offset;
var buffer, result, offset, processed, lastOp;
if (this.remaining) {


buffer = new Buffer(this.remaining.length + data.length);
this.remaining.copy(buffer);
data.copy(buffer, this.remaining.length);
}
else {

buffer = data;
}
offset = this.process(buffer);

processed = this.process(buffer);
offset = processed[0];
lastOp = processed[1];
if (buffer.length - offset === 0) {
this.remaining = null;
}
else {
this.remaining = buffer.slice(offset);
// TODO refactor. In case of pending buffers but not Reading the precess should continue on the remaining Buffer
if (lastOp != OperationStatus.READING) {
this.handleSocketData(new Buffer(0));
}
}
};

Expand Down Expand Up @@ -356,13 +366,16 @@ Connection.prototype.destroySocket = function () {
*
* @param {Buffer} buffer The buffer to process.
* @param {Integer} offset The offset to start processing from, defaults to 0.
* @return {Integer} The offset that was successfully read up to.
* @return {Array} The offset that was successfully read up to.
*/
Connection.prototype.process = function (buffer, offset) {
var code, parsed, result, status, item, op, deferred, err, token, operation;
offset = offset || 0;
if (this.queue.length === 0) {
op = new Operation();//TODO refactor this!
if (!Operation) {
Operation = require('./protocol' + this.protocolVersion + '/operation');
}
op = new Operation();
parsed = op.consume(buffer, offset);
status = parsed[0];
if (status === OperationStatus.PUSH_DATA) {
Expand All @@ -376,9 +389,10 @@ Connection.prototype.process = function (buffer, offset) {
result = parsed[3];
offset = parsed[4];
this.emit('live-query-result', token, operation, result);
return offset;
return [offset, status];
}
}

while ((item = this.queue.shift())) {
op = item[0];
deferred = item[1];
Expand All @@ -389,15 +403,23 @@ Connection.prototype.process = function (buffer, offset) {
if (status === OperationStatus.READING) {
// operation is incomplete, buffer does not contain enough data
this.queue.unshift(item);
return offset;
return [offset, status];
}
else if (status === OperationStatus.PUSH_DATA) {
this.emit('update-config', result);
this.queue.unshift(item);
return offset;
//return offset;
}
else if (status === OperationStatus.COMPLETE) {
deferred.resolve(result);
} else if (status === OperationStatus.LIVE_RESULT) {
token = parsed[1];
operation = parsed[2];
result = parsed[3];
offset = parsed[4];
this.emit('live-query-result', token, operation, result);
this.queue.unshift(item);
//return offset;
}
else if (status === OperationStatus.ERROR) {
if (result.status.error) {
Expand All @@ -416,5 +438,5 @@ Connection.prototype.process = function (buffer, offset) {
deferred.reject(new errors.Protocol('Unsupported operation status: ' + status));
}
}
return offset;
return [offset, status];
};
94 changes: 94 additions & 0 deletions test/db/live-query.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
var Promise = require('bluebird');
describe("Database API - Live Query ", function () {

this.timeout(20000);
before(function () {
return CREATE_TEST_DB(this, 'testdb_live_query')
.bind(this)

.then(function () {

this.db = USE_TOKEN_DB("testdb_live_query");
return this.db.open();
}).then(function () {
this.writeDB = USE_TOKEN_DB("testdb_live_query");
return this.writeDB.open();
})
.then(function () {
return this.db.class.create('Test', 'V');
})
.then(function (item) {
this.class = item;
return this.class.property.create([
{
name: 'name',
type: 'String'
},
{
name: 'creation',
type: 'DateTime'
}
]);
})
.then(function () {
return this.class.create([
{
name: 'a',
creation: '2001-01-01 00:00:01'
},
{
name: 'b',
creation: '2001-01-02 12:00:01'
},
{
name: 'c',
creation: '2009-01-01 00:12:01'
},
{
name: 'd',
creation: '2014-09-01 00:01:01'
},
{
name: 'e',
creation: '2014-09-01 00:24:01'
}
])
});
});
after(function () {
return DELETE_TEST_DB('testdb_live_query');
});


it('should trigger live query', function (done) {

var TOTAL = 2;
var record;
var count = 0;

this.db.liveQuery("LIVE SELECT FROM Test").on('live-insert', function (data) {
count++;
if (count === TOTAL) {
data.content.name.should.eql('a');
done();
}
});
var self = this;
setTimeout(function () {
var promises = [];
for (var i = 0; i < TOTAL; i++) {
promises.push(self.db.create("VERTEX", "Test")
.set(
{
name: 'a',
creation: '2001-01-01 00:00:01'
})
.one())
}
Promise.all(promises).then(function (rec) {
})
}, 1000);

});

});
15 changes: 15 additions & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ global.CREATE_TEST_DB = createTestDb.bind(null, TEST_SERVER);
global.DELETE_TEST_DB = deleteTestDb.bind(null, TEST_SERVER);
global.CREATE_POOL = createPool.bind(null, TEST_SERVER);
global.USE_ODB = useOdb.bind(null, TEST_SERVER);
global.USE_TOKEN_DB = useOdbWithToken.bind(null, TEST_SERVER);


global.CREATE_REST_DB = createTestDb.bind(null, REST_SERVER);
Expand All @@ -91,6 +92,20 @@ function useOdb(server, name) {

//context.pool.config.server.logger.debug = console.log.bind(console, '[ORIENTDB]');
}

function useOdbWithToken(server, name) {

return new global.LIB.ODatabase({
host: TEST_DB_CONFIG.host,
port: TEST_DB_CONFIG.port,
username: TEST_DB_CONFIG.username,
password: TEST_DB_CONFIG.password,
name: name,
useToken : true
})

//context.pool.config.server.logger.debug = console.log.bind(console, '[ORIENTDB]');
}
function createPool(server, context, name) {
context.pool = new global.LIB.Pool({
host: TEST_DB_CONFIG.host,
Expand Down

2 comments on commit f86f13a

@kaYcee
Copy link

@kaYcee kaYcee commented on f86f13a Mar 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi maggiolo00,
saw your LiveQuery fix (guess in response to #36, so thanks!) in this branch and tried it out in 2.1.13. As this version is using protocol 32 the change in line 376 of connection.js unfortunately fails. I hard-coded protocol28 for the sake of testing, since this directory was available. Good news is that the error "Unsupported operation status: 6" is issued less often than before, but could still be provoked by emitting 3 x 20 simple updates ("update #22:6 set name = 'lastcontact'") to the same record within 2 seconds from Studio (BATCH run) and listening to a live-update-event. The error now thrown has updated line numbers, of course:

Unhandled rejection OrientDB.ProtocolError: Unsupported operation status: 6 at Connection.process (/.../node_modules/orientjs/lib/transport/binary/connection.js:414:23) at Connection.handleSocketData (/.../node_modules/orientjs/lib/transport/binary/connection.js:284:17) at emitOne (events.js:90:13) at Socket.emit (events.js:182:7) at readableAddChunk (_stream_readable.js:153:18) at Socket.Readable.push (_stream_readable.js:111:10) at TCP.onread (net.js:529:20)

Any ideas?

@wolf4ood
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @kaYcee

i've tried this fix against the 2.2-beta. I will backport it to 2.1.x in the next days.

Please sign in to comment.