diff --git a/lib/connection-pool.js b/lib/connection-pool.js index b5283f9..6b3902e 100644 --- a/lib/connection-pool.js +++ b/lib/connection-pool.js @@ -1,45 +1,169 @@ 'use strict'; -var poolModule = require('generic-pool'); -var PooledConnection = require('./pooled-connection'); +var Connection = require('tedious').Connection; +var EventEmitter = require('events').EventEmitter; +var util = require('util'); + +Connection.prototype.release = function() { + this.pool.release(this); +}; function ConnectionPool(poolConfig, connectionConfig) { + this.free = []; + this.used = []; + this.pending = []; + this.waitingForConnection = []; + this.connectionConfig = connectionConfig; + + this.max = poolConfig.max || 50; + this.min = poolConfig.min || 10; + this.idleTimeout = poolConfig.idleTimeout || poolConfig.idletimeoutMillis || 30000; //5 min + this.retryDelay = poolConfig.retryDelay || 30; + this.log = poolConfig.log; + + setTimeout(fill.bind(this), 4); +} + +util.inherits(ConnectionPool, EventEmitter); + +function delayCreateConnection() { + if (this.pending === undefined) //pool has been drained + return; + var self = this; + var placeholder = {}; + this.pending.push(placeholder); - this.pool = poolModule.Pool({ - name: poolConfig.name || "", - create: function (callback) { - var connection = new PooledConnection(self, connectionConfig); - connection.on('connect', function (err) { - if (err) - callback(err, null); - else - callback(null, connection); - }); - }, - destroy: function (connection) { - connection.destroyed = true; + setTimeout(function() { + var i = self.pending.indexOf(placeholder); + self.pending.splice(i, 1); + createConnection.call(self); + }, this.retryDelay); +} + +function createConnection() { + var self = this; + var connection = new Connection(this.connectionConfig); + connection.pool = this; + this.pending.push(connection); + + connection.on('connect', function (err) { + if (self.pending === undefined) { //pool has been drained + connection.close(); + return; + } + + var i = self.pending.indexOf(connection); + if (i === -1) + throw new Error('connection not in pool'); + + self.pending.splice(i, 1); + + if (err) { + if (EventEmitter.listenerCount(self, 'error')) + self.emit('error', err); + connection.removeAllListeners('end'); connection.close(); - }, - validate: function(connection) { - return !connection.closed; - }, - max: poolConfig.max || 10, - min: poolConfig.min || 0, - idleTimeoutMillis: poolConfig.idleTimeoutMillis || 30000, - log: poolConfig.log + delayCreateConnection.call(self); + return; + } + + connection.this = this; + + self.release(connection); + }); + + connection.on('end', function () { + if (self.pending === undefined) //pool has been drained + return; + + var i = self.used.indexOf(connection); + if (i > -1) { + self.used.splice(i, 1); + } else { + i = self.free.indexOf(connection); + if (i > -1) + self.free.splice(i, 1); + } + + fill.call(self); }); } +function fill() { + if (this.free === undefined) //pool has been drained + return; + + var total = this.pending.length + this.free.length + this.used.length; + + var amount = Math.min( + this.max - total, //max that can be created + this.waitingForConnection.length - this.pending.length - this.free.length); //how many are needed, minus how many are available + + amount = Math.max( + this.min - total, //amount to create to reach min + amount); + + for (var i = 0; i < amount; i++) { + createConnection.call(this); + } +} + ConnectionPool.prototype.acquire = function (callback) { - return this.pool.acquire(callback); + if (this.pending === undefined) //pool has been drained + return; + + var connection; + + //look for valid free connection + while (true) { + connection = this.free.shift(); + + if (connection === undefined) { //no valid connection found + this.waitingForConnection.push(callback); + fill.call(this); + return; + } + + if (!connection.closed) //remove closed connections + break; + } + + this.used.push(connection); + callback(connection); }; -ConnectionPool.prototype.drain = function (callback) { - var self = this; +ConnectionPool.prototype.release = function(connection) { + if (this.pending === undefined) //pool has been drained + return; - this.pool.drain(function () { - self.pool.destroyAllNow(callback); - }); + var callback = this.waitingForConnection.shift(); + if (callback !== undefined) { + callback(connection); + } else { + var i = this.used.indexOf(connection); + if (i > -1) + this.used.splice(i, 1); + + this.free.push(connection); + } +}; + +ConnectionPool.prototype.drain = function () { + if (this.pending === undefined) //pool has been drained + return; + + var i; + + for (i = this.free.length - 1; i >= 0; i--) + this.free[i].close(); + + for (i = this.free.length - 1; i >= 0; i--) + this.used[i].close(); + + this.free = undefined; + this.used = undefined; + this.pending = undefined; + this.waitingForConnection = undefined; }; module.exports = ConnectionPool; diff --git a/lib/pooled-connection.js b/lib/pooled-connection.js deleted file mode 100644 index 2f6e369..0000000 --- a/lib/pooled-connection.js +++ /dev/null @@ -1,23 +0,0 @@ -var Connection = require('tedious').Connection; - -function PooledConnection(connectionPool, config) { - var self = this; - - this.connectionPool = connectionPool; - this.destroyed = false; - - Connection.call(this, config); - - this.on('end', function () { - if (!this.destroyed) - self.connectionPool.pool.destroy(self); - }); -} - -PooledConnection.prototype = Object.create(Connection.prototype); - -PooledConnection.prototype.release = function () { - this.connectionPool.pool.release(this); -}; - -module.exports = PooledConnection; diff --git a/package.json b/package.json index fe548cb..9475945 100644 --- a/package.json +++ b/package.json @@ -3,9 +3,6 @@ "version": "0.2.4", "description": "Connection Pool for tedious.", "main": "lib/connection-pool.js", - "dependencies": { - "generic-pool": "2.1.x" - }, "scripts": { "test": "test" }, @@ -45,9 +42,8 @@ } ], "devDependencies": { - "tedious": ">=0.0.7", - "mocha": ">=1.6.0", - "async": ">=0.1.22" + "tedious": ">=1.7.0", + "mocha": ">=1.6.0" }, "licenses": "MIT", "readmeFilename": "README.md", diff --git a/test/connection-pool.test.js b/test/connection-pool.test.js index 44424a3..82a9d7a 100644 --- a/test/connection-pool.test.js +++ b/test/connection-pool.test.js @@ -1,106 +1,113 @@ -var assert = require('assert') -var async = require('async') +var assert = require('assert'); var ConnectionPool = require('../lib/connection-pool'); var Request = require('tedious').Request; var connectionConfig = { - userName: 'test', - password: 'test', - server: 'dev1', - // options: { - // debug: { - // packet: true, - // data: true, - // payload: true, - // token: true - // } - // } + userName: 'test', + password: 'test', + server: 'dev1', + options: { + appName: 'pool-test' + } }; -describe('ConnectionPool', function() { - describe('one connection', function() { - var poolConfig = {max: 1, log: false}; +describe('ConnectionPool', function () { + it('min', function (done) { + this.timeout(10000); - it('should connect, and end', function(done) { - testPool(poolConfig, poolConfig.max, acquireAndClose, done); - }); - - it('should connect, select, and end', function(done) { - testPool(poolConfig, poolConfig.max, acquireSelectAndClose, done); - }); - }); + var poolConfig = {min: 2}; + var pool = new ConnectionPool(poolConfig, connectionConfig); - describe('multiple connections within pool maxSize', function() { - var poolConfig = {max: 5, log: false}; - var numberOfConnectionsToUse = poolConfig.max; + function testMin() { + if (pool.pending.length > 0) { //wait for connections to be created + setTimeout(testMin, 100); + return; + } + assert.equal(pool.free.length, poolConfig.min); + done(); + pool.drain(); + } - it('should connect, and end', function(done) { - testPool(poolConfig, numberOfConnectionsToUse, acquireAndClose, done); + setTimeout(testMin, 100); }); - it('should connect, select, and end', function(done) { - testPool(poolConfig, numberOfConnectionsToUse, acquireSelectAndClose, done); + it('max', function (done) { + this.timeout(10000); + + var poolConfig = {min: 2, max: 5}; + var pool = new ConnectionPool(poolConfig, connectionConfig); + + var count = 20; + var run = 0; + + //run more queries than pooled connections + runQueries(pool, count, 200, function() { + run++; + var d = pool.pending.length + pool.free.length + pool.used.length <= poolConfig.max; + if (!d) + debugger; + assert(pool.pending.length + pool.free.length + pool.used.length <= poolConfig.max); + if (run === count) { + done(); + pool.drain(); + } + }); }); - }); - describe('connections exceed pool maxSize', function() { - var poolConfig = {max: 5, log: false}; - var numberOfConnectionsToUse = 20; + it('connection error event', function (done) { - it('should connect, and end', function(done) { - testPool(poolConfig, numberOfConnectionsToUse, acquireAndClose, done); + var poolConfig = {min: 2, max: 5}; + var pool = new ConnectionPool(poolConfig, {}); + pool.on('error', function(err) { + assert(!!err); + done(); + pool.drain(); + }); }); - it('should connect, select, and end', function(done) { - testPool(poolConfig, numberOfConnectionsToUse, acquireSelectAndClose, done); + it('connection error retry', function (done) { + this.timeout(10000); + var poolConfig = {min: 2, max: 5, retryDelay: 5}; + var pool = new ConnectionPool(poolConfig, {}); + pool.on('error', function(err) { + assert(!!err); + pool.connectionConfig = connectionConfig; + }); + + function testConnected() { + if (pool.pending.length > 0) { //wait for connections to be created + setTimeout(testConnected, 100); + return; + } + assert.equal(pool.free.length, poolConfig.min); + done(); + pool.drain(); + } + + setTimeout(testConnected, 100); }); - }); }); -function testPool(poolConfig, numberOfConnectionsToUse, useConnectionFunction, done) { - var pool = new ConnectionPool(poolConfig, connectionConfig); - - function doIt(done) { - useConnectionFunction(pool, function() { - done(); - }); - } - - var functions = []; - for (var f = 0; f < numberOfConnectionsToUse; f++) { - functions.push(doIt); - } - - async.parallel(functions, function() { - pool.drain(function() { - done(); - }); - }); -} - -function acquireAndClose(pool, done) { - pool.acquire(function (err, connection) { - assert.ok(!err); - - connection.release(); - done(); - }); -} - -function acquireSelectAndClose(pool, done) { - pool.acquire(function (err, connection) { - assert.ok(!err); - - var request = new Request('select 42', function(err, rowCount) { - assert.strictEqual(rowCount, 1); - connection.release(); - done(); - }); - - request.on('row', function(columns) { - assert.strictEqual(columns[0].value, 42); - }); - - connection.execSql(request); - }); +function runQueries(pool, count, keepOpen, complete) { + var createRequest = function (connection) { + var request = new Request('select 42', function (err, rowCount) { + assert.strictEqual(rowCount, 1); + setTimeout(function() { + complete(); + connection.release(); + }, keepOpen); + }); + + request.on('row', function (columns) { + assert.strictEqual(columns[0].value, 42); + }); + + connection.execSql(request); + }; + + for (var i = 0; i < count; i++) { + setTimeout(function() { + pool.acquire(createRequest); + }) + } }