Skip to content

Commit

Permalink
Batch insert support (#127)
Browse files Browse the repository at this point in the history
* Batch insert support. Closes #125

* don't use .returning() with redshift dialect

* add a test case: create with $noSelect and without allowedInsert

* fix: Objection.js hooks don't work with '.toKnexQuery()' method
  • Loading branch information
mdmitry01 authored Nov 10, 2020
1 parent db29655 commit 4c3b529
Showing 1 changed file with 92 additions and 43 deletions.
135 changes: 92 additions & 43 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -616,58 +616,107 @@ class Service extends AdapterService {
});
}

/**
* `create` service function for Objection.
* @param {object} data
* @param {object} params
*/
async _create (data, params) {
const transaction = await this._createTransaction(params);
const create = (data, params) => {
const q = this._createQuery(params);
const allowedUpsert = this.mergeRelations(this.allowedUpsert, params.mergeAllowUpsert);
const allowedInsert = this.mergeRelations(this.allowedInsert, params.mergeAllowInsert);

if (this.createUseUpsertGraph) {
if (allowedUpsert) {
q.allowGraph(allowedUpsert);
}
q.upsertGraphAndFetch(data, this.upsertGraphOptions);
} else if (allowedInsert) {
q.allowGraph(allowedInsert);
q.insertGraph(data, this.insertGraphOptions);
} else {
q.insert(data, this.id);
}

return q
.then(row => {
if (params.query && params.query.$noSelect) { return data; }

let id;
_getCreatedRecords (insertResults, inputData, params) {
if (params.query && params.query.$noSelect) {
return inputData;
}
if (!Array.isArray(insertResults)) {
insertResults = [insertResults];
}

if (Array.isArray(this.id)) {
id = [];
const findQuery = Object.assign({ $and: [] }, params.query);
const idsQueries = [];

for (const idKey of this.id) {
id.push(row && row[idKey] ? row[idKey] : data[idKey]);
}
if (Array.isArray(this.id)) {
for (const insertResult of insertResults) {
const ids = [];
for (const idKey of this.id) {
if (idKey in insertResult) {
ids.push(insertResult[idKey]);
} else {
id = row && row[this.id] ? row[this.id] : data[this.id];
return inputData;
}
}
idsQueries.push(this.getIdsQuery(ids));
}
} else {
const ids = [];
for (const insertResult of insertResults) {
if (this.id in insertResult) {
ids.push(insertResult[this.id]);
} else {
return inputData;
}
}
idsQueries.push(this.getIdsQuery(null, ids));
}

if (!id || (Array.isArray(id) && !id.length)) { return data; }
if (idsQueries.length > 1) {
findQuery.$and.push({ $or: idsQueries });
} else {
findQuery.$and = findQuery.$and.concat(idsQueries);
}

return this._get(id, params);
})
.catch(errorHandler);
};
return this._find(Object.assign({}, params, { query: findQuery }))
.then(page => {
const records = page.data || page;
if (Array.isArray(inputData)) {
return records;
}
return records[0];
});
}

if (Array.isArray(data)) {
return Promise.all(data.map(current => create(current, params))).then(this._commitTransaction(transaction), this._rollbackTransaction(transaction));
/**
* @param data
* @param params
* @returns {Promise<Object|Object[]>}
* @private
*/
_batchInsert (data, params) {
const { dialect } = this.Model.knex().client;
// batch insert only works with Postgresql and SQL Server
if (dialect === 'postgresql' || dialect === 'mssql') {
return this._createQuery(params)
.insert(data)
.returning(this.id);
}
if (!Array.isArray(data)) {
return this._createQuery(params).insert(data);
}
const promises = data.map(dataItem => {
return this._createQuery(params).insert(dataItem);
});
return Promise.all(promises);
}

return create(data, params).then(this._commitTransaction(transaction), this._rollbackTransaction(transaction));
/**
* `create` service function for Objection.
* @param {object} data
* @param {object} params
*/
async _create (data, params) {
const transaction = await this._createTransaction(params);
const q = this._createQuery(params);
let promise = q;
const allowedUpsert = this.mergeRelations(this.allowedUpsert, params.mergeAllowUpsert);
const allowedInsert = this.mergeRelations(this.allowedInsert, params.mergeAllowInsert);

if (this.createUseUpsertGraph) {
if (allowedUpsert) {
q.allowGraph(allowedUpsert);
}
q.upsertGraph(data, this.upsertGraphOptions);
} else if (allowedInsert) {
q.allowGraph(allowedInsert);
q.insertGraph(data, this.insertGraphOptions);
} else {
promise = this._batchInsert(data, params);
}
return promise
.then(insertResults => this._getCreatedRecords(insertResults, data, params))
.then(this._commitTransaction(transaction), this._rollbackTransaction(transaction))
.catch(errorHandler);
}

/**
Expand Down

0 comments on commit 4c3b529

Please sign in to comment.