Skip to content

Commit

Permalink
Calls Connection.Reset() when the connection is released to the pool.…
Browse files Browse the repository at this point in the history
… This is very unlikely to cause anyone trouble.

Added a callback argument to ConnectionPool.drain()
Updated Tests
  • Loading branch information
ben-page committed Nov 24, 2014
1 parent 63239f9 commit a88a1ec
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 56 deletions.
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ Acquire a Tedious Connection object from the pool.
* `err` {Object} An Error object is an error occurred trying to acquire a connection, otherwise null.
* `connection` {Object} A [Connection](http://pekim.github.com/tedious/api-connection.html)

### connectionPool.drain()
### connectionPool.drain(callback)
Close all pooled connections and stop making new ones. The pool should be discarded after it has been drained.
* `callback()` {Function} Callback function

### connectionPool.error {event}
The 'error' event is emitted when a connection fails to connect to the SQL Server.
Expand All @@ -88,17 +89,21 @@ The following method is added to the Tedious [Connection](http://pekim.github.co
### Connection.release()
Release the connect back to the pool to be used again

## Version 0.3.x Changes
## Version 0.3.1 Changes
* Calls Connection.Reset() when the connection is released to the pool. This is very unlikely to cause anyone trouble.
* Added a callback argument to ConnectionPool.drain()

## Version 0.3.0 Changes
* Removed dependency on the `generic-pool` node module.
* Added `poolConfig` options `retryDelay`
* Added `poolConfig` options `aquireTimeout` **(Possibly Breaking)**
* Added `poolConfig` options `log`
* `idleTimeoutMillis` renamed to `idleTimeout` **(Possibly Breaking)**
* The `ConnectionPool` `'error'` event added
* The behavior of the err parameter of the callback passed to `acquire()` has changed. It only returns errors related to acquiring a connection not Tedious Connection errors. Connection errors can happen anytime the pool is being filled and could go unnoticed if only passed the the callback. Subscribe to the `'error'` event on the pool to be notified of all connection errors. **(Possibly Breaking)**
* `PooledConnection` object removed.
* `PooledConnection` object removed.

## Version 0.2.x Breaking Changes
* To acquire a connection, call on `acquire()` on a `ConnectionPool` rather than `requestConnection()`.
* After acquiring a `PooledConnection`, do not wait for the `'connected'` event. The connection is received connected.
* Call `release()` on a `PooledConnection` to release the it back to the pool. `close()` permenantly closes the connection (as `close()` behaves in in tedious).
* Call `release()` on a `PooledConnection` to release the it back to the pool. `close()` permanently closes the connection (as `close()` behaves in in tedious).
84 changes: 59 additions & 25 deletions lib/connection-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ var USED = 2;
var RETRY = 3;

function ConnectionPool(poolConfig, connectionConfig) {
this.connections = [];
this.waiting = [];
this.connections = []; //open connections of any state
this.waiting = []; //acquire() callbacks that are waiting for a connection to come available
this.connectionConfig = connectionConfig;

this.max = poolConfig.max || 50;
Expand Down Expand Up @@ -44,7 +44,7 @@ function ConnectionPool(poolConfig, connectionConfig) {
} else {
this.log = function() {};
}

this.drained = false;

setTimeout(fill.bind(this), 4);
Expand Down Expand Up @@ -212,49 +212,83 @@ function setFree(pooled) {
}, this.idleTimeout);
}



ConnectionPool.prototype.release = function(connection) {
if (this.drained) //pool has been drained
return;

var waiter = this.waiting.shift();
var self = this;
var i, pooled;

connection.reset(function (err) {
if (err) { //there is an error, don't reuse the connection, just close it
for (i = self.connections.length - 1; i >= 0; i--) {
pooled = self.connections[i];
if (pooled.con === connection) {
pooled.con.close();
return;
}
}
}
self.log('connection reset');

if (waiter !== undefined) {
if (waiter.timeout)
clearTimeout(waiter.timeout);
waiter.callback(null, connection);
} else {
for (var i = this.connections.length - 1; i >= 0; i--) {
var pooled = this.connections[i];
if (pooled.con === connection) {
setFree.call(this, pooled);
break;
var waiter = self.waiting.shift();

if (waiter !== undefined) {
if (waiter.timeout)
clearTimeout(waiter.timeout);
waiter.callback(null, connection);
} else {
for (i = self.connections.length - 1; i >= 0; i--) {
pooled = self.connections[i];
if (pooled.con === connection) {
setFree.call(self, pooled);
break;
}
}
}
}
});
};

ConnectionPool.prototype.drain = function () {
ConnectionPool.prototype.drain = function (callback) {
this.log('draining pool');
if (this.drained) //pool has been drained
if (this.drained) {//pool has been drained
callback();
return;
}

this.drained = true;

for (var i = this.connections.length - 1; i >= 0; i--) {
var pooled = this.connections[i];
pooled.con.close();
if (pooled.timeout)
clearTimeout(pooled.timeout);
}
var eventTotal = this.connections.length + this.waiting.length;
var eventCount = 0;

var ended = function() {
if (++eventCount === eventTotal)
if (callback)
callback();
};

for (i = this.waiting.length - 1; i >= 0; i--) {
var waiter = this.waiting[i];

if (waiter.timeout)
clearTimeout(waiter.timeout);
}

this.connections = null;
this.waiting = null;

for (var i = this.connections.length - 1; i >= 0; i--) {
var pooled = this.connections[i];
pooled.con.on('end', ended);

if (pooled.timeout)
clearTimeout(pooled.timeout);

pooled.con.close();
}

this.connections = null;
};

module.exports = ConnectionPool;
module.exports = ConnectionPool;
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "tedious-connection-pool",
"version": "0.3.0",
"version": "0.3.1",
"description": "Connection Pool for tedious.",
"main": "lib/connection-pool.js",
"scripts": {
Expand Down Expand Up @@ -47,7 +47,6 @@
},
"licenses": "MIT",
"readmeFilename": "README.md",
"gitHead": "22e4053020343706eb9c998e9354263cf2ced759",
"bugs": {
"url": "https://github.com/pekim/tedious-connection-pool/issues"
},
Expand Down
59 changes: 46 additions & 13 deletions test/connection-pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ describe('ConnectionPool', function () {

setTimeout(function() {
assert.equal(pool.connections.length, poolConfig.min);
done();
pool.drain();
pool.drain(done);
}, 4);
});

Expand All @@ -64,8 +63,8 @@ describe('ConnectionPool', function () {
run++;
assert(pool.connections.length <= poolConfig.max);
if (run === count) {
done();
pool.drain();
pool.drain(done);
return;
}
connection.release();
}, 200);
Expand All @@ -90,8 +89,7 @@ describe('ConnectionPool', function () {
var pool = new ConnectionPool(poolConfig, {});
pool.on('error', function(err) {
assert(!!err);
done();
pool.drain();
pool.drain(done);
});
});

Expand All @@ -114,8 +112,7 @@ describe('ConnectionPool', function () {
}

assert.equal(pool.connections.length, poolConfig.min);
done();
pool.drain();
pool.drain(done);
}

setTimeout(testConnected, 100);
Expand Down Expand Up @@ -150,8 +147,7 @@ describe('ConnectionPool', function () {

var request = new Request('select 42', function (err, rowCount) {
assert.strictEqual(rowCount, 1);
done();
pool.drain();
pool.drain(done);
});

request.on('row', function (columns) {
Expand All @@ -171,8 +167,7 @@ describe('ConnectionPool', function () {

pool.on('error', function(err) {
assert(err && err.name === 'ConnectionError');
done();
pool.drain();
pool.drain(done);
});

//This simulates a lost connections by creating a job that kills the current session and then deletesthe job.
Expand All @@ -192,7 +187,7 @@ describe('ConnectionPool', function () {
'EXECUTE msdb..sp_add_jobstep @job_id=@jobId, @step_name=\'Step2\', @command = @deletecommand;' +

'EXECUTE msdb..sp_start_job @job_id=@jobId;' +
'WAITFOR DELAY \'00:00:10\';' +
'WAITFOR DELAY \'01:00:00\';' +
'SELECT 42';

var request = new Request(command, function (err, rowCount) {
Expand All @@ -206,4 +201,42 @@ describe('ConnectionPool', function () {
connection.execSql(request);
});
});

it('release(), reset()', function (done) {
this.timeout(10000);

var poolConfig = {max: 1};
var pool = new ConnectionPool(poolConfig, connectionConfig);


var createRequest = function(query, value, callback) {
var request = new Request(query, function (err, rowCount) {
assert.strictEqual(rowCount, 1);
callback();
});

request.on('row', function (columns) {
assert.strictEqual(columns[0].value, value);
});

return request;
};

pool.acquire(function(err, conn) {
assert(!err);

conn.execSql(createRequest('SELECT 42', 42, function () {
pool.release(conn); //release the connect

pool.acquire(function (err, conn) { //re-acquire the connection
assert(!err);

conn.execSql(createRequest('SELECT 42', 42, function () {

pool.drain(done);
}));
});
}));
});
});
});
25 changes: 13 additions & 12 deletions test/memory-usage.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//test for memory leak on bad connections
//test for memory leak after closing connections
var ConnectionPool = require('../lib/connection-pool');
var fs = require('fs');

var count = 0;
var mem = [];
var groupCount = 10;
var groupCount = 20;
var poolSize = 1000;

var pool = new ConnectionPool({ max: poolSize, min: poolSize, retryDelay: 1}, {
Expand All @@ -14,15 +14,16 @@ var pool = new ConnectionPool({ max: poolSize, min: poolSize, retryDelay: 1}, {
});

pool.on('error', function() {
var i = Math.floor(count++ / poolSize);
if (i === groupCount) {
var previous = 0;
for (var f = 0; f < groupCount; f++) {
var size = (mem[f] / poolSize);
fs.writeSync(1, ((f+1) * poolSize) + ': ' + Math.round(mem[f] / poolSize * 100) + 'KB\n');
previous = size;
}
process.exit(0);
var i = Math.floor(count++ / poolSize);
if (i === groupCount) {
var previous = 0;
for (var f = 0; f < groupCount; f++) {
var size = (mem[f] / poolSize);
fs.writeSync(1, ((f + 1) * poolSize) + ': ' + Math.round(mem[f] / poolSize * 100) + 'KB\n');
global.gc();
previous = size;
}
mem[i] = (mem[i] || 0) + (process.memoryUsage().heapUsed / 1000); //kilobytes
process.exit(0);
}
mem[i] = (mem[i] || 0) + (process.memoryUsage().heapUsed / 1000); //kilobytes
});

0 comments on commit a88a1ec

Please sign in to comment.