Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mechanism to check analysis before it's executed #215

Merged
merged 29 commits into from
Nov 10, 2016
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
17925d1
Add mechanism to check analysis before it's executed
jgoizueta Oct 27, 2016
ba97e6e
Fix tests
jgoizueta Oct 27, 2016
023a5d5
:lipstick:
jgoizueta Oct 27, 2016
a6aae8d
Replace count by faster estimation of the number of rows
jgoizueta Oct 27, 2016
74b1fad
Keep precheck failed nodes status in the nodes themselves
jgoizueta Oct 27, 2016
6fc38a4
Fix base Node computeRequirements
jgoizueta Oct 27, 2016
f38729e
Compute requirements for aggregate-intersection anaylises
jgoizueta Oct 27, 2016
9808d6c
Fix requirements problem with aliased nodes
jgoizueta Oct 27, 2016
a2e4520
:lipstick:
jgoizueta Oct 27, 2016
f91a33d
Replace comparison operator
jgoizueta Oct 27, 2016
86961cf
Syntax fixes
jgoizueta Oct 27, 2016
b5d2bbd
Fix Node limit computation
jgoizueta Oct 28, 2016
880fa16
Fix requirements & limits names
jgoizueta Oct 28, 2016
3b40450
Integration tests for requirements/limits
jgoizueta Oct 28, 2016
700a116
Add prechecking for line-creation analyses
jgoizueta Oct 28, 2016
fa211d9
Add prechecking for geocoding returning polygons
jgoizueta Oct 28, 2016
e36669e
Avoid using internal details of DatabaseService
jgoizueta Oct 28, 2016
6d6d71c
Handle SQL timeouts during pre-checks
jgoizueta Oct 28, 2016
3d26bfe
Remove testing remnant
jgoizueta Nov 4, 2016
e2e6e59
Estimate requirements and check limits in sigle step before registration
jgoizueta Nov 4, 2016
baf428c
Expose node id in limit-rejected analyses
jgoizueta Nov 4, 2016
722de54
:lipstick:
jgoizueta Nov 4, 2016
1309a75
Allow unlimited number of rows for a node's output.
jgoizueta Nov 7, 2016
27782e6
Move some limit-checking functionality to Requirements Class
jgoizueta Nov 8, 2016
4d97ee2
Don't limit analyses by number of output rows in general
jgoizueta Nov 8, 2016
e88b88e
Simplify sequential line requirement estimation.
jgoizueta Nov 8, 2016
aa12fef
Fix tests
jgoizueta Nov 8, 2016
e931566
Perform limit-checking before registering the analysis
jgoizueta Nov 8, 2016
b247f61
Raise limit for points per sequential line
jgoizueta Nov 10, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions lib/analysis.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var toposort = require('../lib/dag/toposort');
var validator = require('../lib/dag/validator');

var DatabaseService = require('./service/database');
var Requirements = require('./service/requirements');

var AnalysisLogger = require('./logging/logger');

Expand Down Expand Up @@ -60,6 +61,7 @@ AnalysisFactory.prototype.create = function(configuration, definition, callback)
configuration.batch,
configuration.limits
);
var requirements = new Requirements(databaseService, configuration.limits);
var logger = configuration.logger ? new AnalysisLogger(configuration.logger.stream, configuration.user) : undefined;

async.waterfall(
Expand All @@ -80,6 +82,16 @@ AnalysisFactory.prototype.create = function(configuration, definition, callback)
return done(err, analysis);
});
},
function analysis$collectRequirements(analysis, done) {
requirements.computeRequirements(analysis, function(err) {
return done(err, analysis);
});
},
function analysis$validateRequirements(analysis, done) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like take a deep think because I think we can do it during analysis creation or validation and probably before registering the analysis in catalog...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd check limits after validating analysis and before registering it. If some node reaches the limit the analysis should fail and it shouldn't be registered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm changing all this so we get the requirements and validate in a single operations, and do it before registering, as we have talked about.

requirements.validateRequirements(analysis, function(err) {
return done(err, analysis);
});
},
function analysis$queueOperations(analysis, done) {
databaseService.queueAnalysisOperations(analysis, function(err) {
return done(err, analysis);
Expand All @@ -100,10 +112,8 @@ AnalysisFactory.prototype.create = function(configuration, definition, callback)
if (err && err.message && err.message.match(/permission denied/i)) {
err = new Error('Analysis requires authentication with API key: permission denied.');
}

return callback(err);
}

return callback(null, analysis);
}
);
Expand Down
53 changes: 53 additions & 0 deletions lib/node/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -468,3 +468,56 @@ function validate(validator, params, expectedParamName) {

return param;
}

Node.prototype.computeRequirements = function(databaseService, limits, callback) {
// By default simply compute maximum of the inputs' number of rows.
// TODO: if the most common multi-input analysis is some kind of join we should use
// the product of the input numberOfRows instead
var maxRows = Math.max.apply(
null,
this.inputNodes.map(function(node) { return node.estimatedRequirements.numberOfRows || 0; })
);
if (maxRows < 0) {
maxRows = 0;
}
this.estimatedRequirements = {
numberOfRows: maxRows
};
this.limits = {
maximumNumberOfRows: getNodeLimit(limits, this.getType(), 'maximumNumberOfRows', 1000000)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since limit values should be definable in Redis, maybe we should use snake_case instead of camelCase?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean maximumNumberOfRows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, having maximum_number_of_rows instead.

};
return callback(null, this.requirementMessages());
};

Node.prototype.requirementMessages = function() {
var messages = [];
if (this.estimatedRequirements.numberOfRows > this.limits.maximumNumberOfRows) {
messages.push('too many result rows');
}
return messages;
};

Node.prototype.validateRequirements = function(callback) {
var messages = this.requirementMessages();
var err;
if (messages.length > 0) {
this.status = STATUS.FAILED;
this.errorMessage = messages.join('\n');
err = new Error(this.errorMessage);
}
callback(err);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must avoid to use callbacks in synchronous functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was making that function callable in an asynchronous way in case we needed to specialise it in cases in which limits (such as quotas) need to be obtain asyncrhonously.

But since we don't need async calls now and we'll change this design anyway I'll remove the callback.

};

function getNodeLimit(globalLimits, nodeType, limitName, defaultValue) {
var limit = null;
var limits = globalLimits.analyses;
if (limits) {
if (limits[nodeType] !== undefined) {
limits = limits[nodeType];
}
limit = limits[limitName];
}
return limit || defaultValue;
}

module.exports.getNodeLimit = getNodeLimit;
13 changes: 13 additions & 0 deletions lib/node/nodes/aggregate-intersection.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,16 @@ var queryAggregateTemplate = Node.template([
'WHERE ST_Intersects(_cdb_analysis_source.the_geom, _cdb_analysis_target.the_geom)',
'GROUP BY {{=it.groupByColumns}}'
].join('\n'));

AggregateIntersection.prototype.computeRequirements = function(databaseService, limits, callback) {
// we estimate the maximum possible number of rows of the result
var product = this.source.estimatedRequirements.numberOfRows *
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is not consider good practice, should I define an accessor and use this.source.getEstimatedRequirements().numberOfRows instead?
(your opinion will be welcome, @dgaubert )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have several places where we don't follow getter/setter pattern. IMHO in Javascript is not useful, you can always get a member property directly with .. My advice is to avoid the Cannot read property 'wadus' of null using default values and so on.

this.target.estimatedRequirements.numberOfRows;
this.estimatedRequirements = {
numberOfRows: product
};
this.limits = {
maximumNumberOfRows: Node.getNodeLimit(limits, TYPE, 'maximumNumberOfRows', 1000000)
};
return callback(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, not use callbacks with synchronous code.

Copy link
Contributor Author

@jgoizueta jgoizueta Nov 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we do need the callback parameter, so we have a common interface for all nodes, because some node classes need to perform asynchronous operations in the computeRequirements function (executing SQL code in the database). This particular specialization of computeRequirements is synchronous, but we need to provide for the other cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but be careful with CPU intensive tasks and consider use process.nextTick() if required.

};
12 changes: 12 additions & 0 deletions lib/node/nodes/filter-category.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,15 @@ module.exports = FilterCategory;
FilterCategory.prototype.sql = function() {
return this.category.sql(this.source.getQuery());
};

FilterCategory.prototype.computeRequirements = function(databaseService, limits, callback) {
// We use a very simplistic approach: estimate as many rows as the unfiltered source
// (the actual value is always equal or less to that)
this.estimatedRequirements = {
numberOfRows: this.source.estimatedRequirements.numberOfRows
};
this.limits = {
maximumNumberOfRows: Node.getNodeLimit(limits, TYPE, 'maximumNumberOfRows', 1000000)
};
return callback(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

};
23 changes: 22 additions & 1 deletion lib/node/nodes/source.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,26 @@ Source.prototype.sql = function() {
* @returns {Node.STATUS}
*/
Source.prototype.getStatus = function() {
return Node.STATUS.READY;
return Node.STATUS.READY; // TODO: this ignores the possibility of requirements exceeding the limits
};

var estimatedCountTemplate = Node.template('EXPLAIN (FORMAT JSON) {{=it.sourceQuery}}');

Source.prototype.computeRequirements = function(databaseService, limits, callback) {
var sql = estimatedCountTemplate({
sourceQuery: this.query
});
var self = this;
databaseService.run(sql, function(err, resultSet){
if (err) {
return callback(err);
}
self.estimatedRequirements = {
numberOfRows: resultSet.rows[0]['QUERY PLAN'][0].Plan['Plan Rows']
};
self.limits = {
maximumNumberOfRows: Node.getNodeLimit(limits, TYPE, 'maximumNumberOfRows', 1000000)
};
return callback(null);
});
};
107 changes: 107 additions & 0 deletions lib/service/requirements.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
'use strict';

var async = require('async');
var Node = require('../node/node');
var debug = require('../util/debug')('requirements');

var QUERY_RUNNER_READONLY_OP = true;
var QUERY_RUNNER_WRITE_OP = !QUERY_RUNNER_READONLY_OP;

// A priori checking of the requirements/limits of an analysis
function Requirements(databaseService, limits) {
this.databaseService = databaseService;
this.limits = limits;
}

// TODO: consider doing computation & validation in one single process
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd implement computation & validation for each node in one single process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure!

Requirements.prototype.computeRequirements = function (analysis, callback) {
var sortedNodes = analysis.getSortedNodes();
var allNodes = analysis.getNodes();
var aliasedNodesPresent = allNodes.length > sortedNodes.length;
var self = this;
async.eachSeries(
sortedNodes,
function(node, done) {
node.computeRequirements(self.databaseService, self.limits, function(err) {
if (aliasedNodesPresent) {
// some nodes are aliased (multiple nodes with the same id);
// we need to replicate the requirements and limits to them, because
// another node later in the sequence may try to access them
replicateRequirementsToAliases(node, allNodes);
}
return done(err);
});
},
function finish(err) {
if (err) {
return callback(err);
}
return callback(null);
}
);
};

// Validates analysis requirements, node by node individually; as soon as
// a node fails to pass the requirements this is aborted, the node status
// and error message stored in the cataglo, and the error is returned to
// the callback.
Requirements.prototype.validateRequirements = function (analysis, callback) {
var self = this;
async.eachSeries(
analysis.getSortedNodes(),
function(node, done) {
node.validateRequirements(function(err) {
if (err) {
// register the failed status
var sql = updateNodeAsFailedAtAnalysisCatalogQuery([node.id()], err.message);
self.databaseService.queryRunner.run(sql, QUERY_RUNNER_WRITE_OP, function(sql_err) {
if (sql_err) {
// FiXME: what should we do if saving the status fails?
debug('SQL ERROR:', sql_err);
}
return done(err);
});
} else {
return done(err);
}
});
},
callback
);
};

module.exports = Requirements;

function replicateRequirementsToAliases(node, allNodes) {
var id = node.id();
allNodes.forEach(function(otherNode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forEach visit all items of the collection always, there is no way to abort the loop if we've replicated the requirements in first node of the list (following iterations are unnecessary). I'd use the classic for (var i=0 ...) {} and return when node requirements are replicated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhmm! 🤔 we need replicate in more than one node. Sorry, I didn't see it.

if (otherNode.id() === id && !otherNode.estimatedRequirements) {
otherNode.estimatedRequirements = node.estimatedRequirements;
otherNode.limits = node.limits;
}
});
}

function pgQuoteCastMapper(cast) {
return function(input) {
return '\'' + input + '\'' + (cast ? ('::' + cast) : '');
};
}

function updateNodeAtAnalysisCatalogQuery(nodeIds, columns) {
nodeIds = Array.isArray(nodeIds) ? nodeIds : [nodeIds];
return [
'UPDATE cdb_analysis_catalog SET',
columns.join(','),
'WHERE node_id IN (' + nodeIds.map(pgQuoteCastMapper()).join(', ') + ')'
].join('\n');
}

function updateNodeAsFailedAtAnalysisCatalogQuery(nodeIds, errorMessage) {
var status = Node.STATUS.FAILED;
return updateNodeAtAnalysisCatalogQuery(nodeIds, [
'status = \'' + status + '\'',
'last_error_message = $last_error_message$' + errorMessage + '$last_error_message$',
'updated_at = NOW()'
]);
}
13 changes: 13 additions & 0 deletions test/fixtures/table/postal_codes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE TABLE postal_codes (
cartodb_id integer NOT NULL,
the_geom geometry(Geometry,4326),
the_geom_webmercator geometry(Geometry,3857),
code text
);

ALTER TABLE ONLY postal_codes
ADD CONSTRAINT postal_codes_pkey PRIMARY KEY (cartodb_id);

CREATE INDEX postal_codes_the_geom_idx ON postal_codes USING gist (the_geom);

CREATE INDEX postal_codes_the_geom_webmercator_idx ON postal_codes USING gist (the_geom_webmercator);
66 changes: 66 additions & 0 deletions test/integration/analysis.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,72 @@ describe('workflow', function() {
});
});

it('should compute node requirements and limits for source', function(done) {
Analysis.create(testConfig, sourceAnalysisDefinition, function(err, analysis) {
assert.ok(!err, err);
assert.equal(analysis.getRoot().estimatedRequirements.numberOfRows, 6);
assert.equal(analysis.getRoot().limits.maximumNumberOfRows, 1000000);
done();
});
});

it('should abort analysis over the limits for source', function(done) {
var limitedConfig = testConfig.create({
limits: {
analyses: {
source: {
maximumNumberOfRows: 5
}
}
}
});
Analysis.create(limitedConfig, sourceAnalysisDefinition, function(err) {
assert.ok(err);
done();
});
});

it('should compute node requirements and limits for trade areas', function(done) {
var enqueueFn = BatchClient.prototype.enqueue;

BatchClient.prototype.enqueue = function(query, callback) {
return callback(null, {status: 'ok'});
};

Analysis.create(testConfig, tradeAreaAnalysisDefinition, function(err, analysis) {
BatchClient.prototype.enqueue = enqueueFn;

assert.ok(!err, err);
assert.equal(analysis.getRoot().estimatedRequirements.numberOfRows, 6);
assert.equal(analysis.getRoot().limits.maximumNumberOfRows, 1000000);
done();
});
});

it('should abort analysis over the limits for trade areas', function(done) {
var limitedConfig = testConfig.create({
limits: {
analyses: {
'trade-area': {
maximumNumberOfRows: 5
}
}
}
});

var enqueueFn = BatchClient.prototype.enqueue;
BatchClient.prototype.enqueue = function(query, callback) {
return callback(null, {status: 'ok'});
};

Analysis.create(limitedConfig, tradeAreaAnalysisDefinition, function(err) {
BatchClient.prototype.enqueue = enqueueFn;

assert.ok(err);
done();
});
});

});

});
3 changes: 2 additions & 1 deletion test/setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ before(function setupTestDatabase(done) {

fs.realpathSync('./test/fixtures/table/madrid_districts.sql'),
fs.realpathSync('./test/fixtures/table/atm_machines.sql'),
fs.realpathSync('./test/fixtures/table/airbnb_rooms.sql')
fs.realpathSync('./test/fixtures/table/airbnb_rooms.sql'),
fs.realpathSync('./test/fixtures/table/postal_codes.sql')
];

async.waterfall(
Expand Down