diff --git a/lib/connection-pool.js b/lib/connection-pool.js index 6b3902e..d3a4070 100644 --- a/lib/connection-pool.js +++ b/lib/connection-pool.js @@ -7,17 +7,20 @@ Connection.prototype.release = function() { this.pool.release(this); }; +var PENDING = 0; +var FREE = 1; +var USED = 2; +var RETRY = 3; + function ConnectionPool(poolConfig, connectionConfig) { - this.free = []; - this.used = []; - this.pending = []; + this.connections = []; this.waitingForConnection = []; this.connectionConfig = connectionConfig; this.max = poolConfig.max || 50; this.min = poolConfig.min || 10; this.idleTimeout = poolConfig.idleTimeout || poolConfig.idletimeoutMillis || 30000; //5 min - this.retryDelay = poolConfig.retryDelay || 30; + this.retryDelay = poolConfig.retryDelay || 5000; this.log = poolConfig.log; setTimeout(fill.bind(this), 4); @@ -25,144 +28,150 @@ function ConnectionPool(poolConfig, connectionConfig) { util.inherits(ConnectionPool, EventEmitter); -function delayCreateConnection() { - if (this.pending === undefined) //pool has been drained +function createConnection(pooled) { + if (this.connections === undefined) //pool has been drained return; - var self = this; - var placeholder = {}; - this.pending.push(placeholder); - - setTimeout(function() { - var i = self.pending.indexOf(placeholder); - self.pending.splice(i, 1); - createConnection.call(self); - }, this.retryDelay); -} - -function createConnection() { var self = this; var connection = new Connection(this.connectionConfig); connection.pool = this; - this.pending.push(connection); + if (!pooled) + pooled = { + con: connection, + status: PENDING + }; + this.connections.push(pooled); connection.on('connect', function (err) { - if (self.pending === undefined) { //pool has been drained + if (self.connections === undefined) { //pool has been drained connection.close(); return; } - var i = self.pending.indexOf(connection); - if (i === -1) - throw new Error('connection not in pool'); - - self.pending.splice(i, 1); - if (err) { if (EventEmitter.listenerCount(self, 'error')) self.emit('error', err); + + pooled.status = RETRY; + pooled.con = undefined; connection.removeAllListeners('end'); connection.close(); - delayCreateConnection.call(self); + + setTimeout(createConnection.bind(self, pooled), this.retryDelay); return; } - connection.this = this; - - self.release(connection); + var callback = self.waitingForConnection.shift(); + if (callback !== undefined) + setUsed.call(this, pooled, callback); + else + setFree.call(this, pooled); }); connection.on('end', function () { - if (self.pending === undefined) //pool has been drained + if (self.connections === undefined) //pool has been drained return; - var i = self.used.indexOf(connection); - if (i > -1) { - self.used.splice(i, 1); - } else { - i = self.free.indexOf(connection); - if (i > -1) - self.free.splice(i, 1); + for (var i = self.connections.length - 1; i >= 0; i--) { + if (self.connections[i].con === connection) { + self.connections.splice(i, 1); + fill.call(self); + return; + } } - - fill.call(self); }); } function fill() { - if (this.free === undefined) //pool has been drained + if (this.connections === undefined) //pool has been drained return; - var total = this.pending.length + this.free.length + this.used.length; + var available = 0; + for (var i = this.connections.length - 1; i >= 0; i--) { + if (this.connections[i].status !== USED) { + available++; + } + } var amount = Math.min( - this.max - total, //max that can be created - this.waitingForConnection.length - this.pending.length - this.free.length); //how many are needed, minus how many are available + this.max - this.connections.length, //max that can be created + this.waitingForConnection.length - available); //how many are needed, minus how many are available amount = Math.max( - this.min - total, //amount to create to reach min + this.min - this.connections.length, //amount to create to reach min amount); - for (var i = 0; i < amount; i++) { + for (i = 0; i < amount; i++) { createConnection.call(this); } } ConnectionPool.prototype.acquire = function (callback) { - if (this.pending === undefined) //pool has been drained + if (this.connections === undefined) //pool has been drained return; - var connection; + var free; - //look for valid free connection - while (true) { - connection = this.free.shift(); + //look for free connection + var l = this.connections.length; + for (var i = 0; i < l; i++) { + var pooled = this.connections[i]; - if (connection === undefined) { //no valid connection found - this.waitingForConnection.push(callback); - fill.call(this); - return; - } - - if (!connection.closed) //remove closed connections + if (pooled.status === FREE) { + free = pooled; break; + } } - this.used.push(connection); - callback(connection); + if (free === undefined) { //no valid connection found + this.waitingForConnection.push(callback); + fill.call(this); + } else { + setUsed.call(this, free, callback); + } }; +function setUsed(pooled, callback) { + pooled.status = USED; + if (pooled.timeout) + clearTimeout(pooled.timeout); + callback(pooled.con); +} + +function setFree(pooled) { + pooled.status = FREE; + pooled.timeout = setTimeout(function() { + pooled.con.close(); + }, this.idleTimeout); +} + ConnectionPool.prototype.release = function(connection) { - if (this.pending === undefined) //pool has been drained + if (this.connections === undefined) //pool has been drained return; var callback = this.waitingForConnection.shift(); + if (callback !== undefined) { callback(connection); } else { - var i = this.used.indexOf(connection); - if (i > -1) - this.used.splice(i, 1); - - this.free.push(connection); + for (var i = this.connections.length - 1; i >= 0; i--) { + var pooled = this.connections[i]; + if (pooled.con === connection) { + setFree.call(this, pooled); + break; + } + } } }; ConnectionPool.prototype.drain = function () { - if (this.pending === undefined) //pool has been drained + if (this.connections === undefined) //pool has been drained return; - var i; - - for (i = this.free.length - 1; i >= 0; i--) - this.free[i].close(); - - for (i = this.free.length - 1; i >= 0; i--) - this.used[i].close(); + for (var i = this.connections.length - 1; i >= 0; i--) + this.connections[i].con.close(); - this.free = undefined; - this.used = undefined; - this.pending = undefined; + this.connections = undefined; this.waitingForConnection = undefined; }; diff --git a/test/connection-pool.test.js b/test/connection-pool.test.js index 82a9d7a..608864e 100644 --- a/test/connection-pool.test.js +++ b/test/connection-pool.test.js @@ -18,17 +18,11 @@ describe('ConnectionPool', function () { var poolConfig = {min: 2}; var pool = new ConnectionPool(poolConfig, connectionConfig); - function testMin() { - if (pool.pending.length > 0) { //wait for connections to be created - setTimeout(testMin, 100); - return; - } - assert.equal(pool.free.length, poolConfig.min); + setTimeout(function() { + assert.equal(pool.connections.length, poolConfig.min); done(); pool.drain(); - } - - setTimeout(testMin, 100); + }, 4); }); it('max', function (done) { @@ -43,10 +37,7 @@ describe('ConnectionPool', function () { //run more queries than pooled connections runQueries(pool, count, 200, function() { run++; - var d = pool.pending.length + pool.free.length + pool.used.length <= poolConfig.max; - if (!d) - debugger; - assert(pool.pending.length + pool.free.length + pool.used.length <= poolConfig.max); + assert(pool.connections.length <= poolConfig.max); if (run === count) { done(); pool.drain(); @@ -67,7 +58,7 @@ describe('ConnectionPool', function () { it('connection error retry', function (done) { this.timeout(10000); - var poolConfig = {min: 2, max: 5, retryDelay: 5}; + var poolConfig = {min: 1, max: 5, retryDelay: 5}; var pool = new ConnectionPool(poolConfig, {}); pool.on('error', function(err) { assert(!!err); @@ -75,17 +66,33 @@ describe('ConnectionPool', function () { }); function testConnected() { - if (pool.pending.length > 0) { //wait for connections to be created - setTimeout(testConnected, 100); - return; + for (var i = pool.connections.length - 1; i >= 0; i--) { + if (pool.connections[i].status === 3/*RETRY*/) { + setTimeout(testConnected, 100); + return; + } } - assert.equal(pool.free.length, poolConfig.min); + + assert.equal(pool.connections.length, poolConfig.min); done(); pool.drain(); } setTimeout(testConnected, 100); }); + + it('idle timeout', function (done) { + this.timeout(10000); + var poolConfig = {min: 1, max: 5, idleTimeout: 100}; + var pool = new ConnectionPool(poolConfig, connectionConfig); + + setTimeout(function() { + runQueries(pool, 1, 0, function() { + done(); + pool.drain(); + }); + }, 300); + }); }); function runQueries(pool, count, keepOpen, complete) {