More MongoDB bulkWrite() improvements.

This commit is contained in:
Ylian Saint-Hilaire 2021-01-10 06:19:16 -08:00
parent e41ac08221
commit b6a257c1e4

113
db.js
View File

@ -38,8 +38,20 @@ module.exports.CreateDB = function (parent, func) {
obj.dbRecordsDecryptKey = null; obj.dbRecordsDecryptKey = null;
obj.changeStream = false; obj.changeStream = false;
obj.pluginsActive = ((parent.config) && (parent.config.settings) && (parent.config.settings.plugins != null) && (parent.config.settings.plugins != false) && ((typeof parent.config.settings.plugins != 'object') || (parent.config.settings.plugins.enabled != false))); obj.pluginsActive = ((parent.config) && (parent.config.settings) && (parent.config.settings.plugins != null) && (parent.config.settings.plugins != false) && ((typeof parent.config.settings.plugins != 'object') || (parent.config.settings.plugins.enabled != false)));
obj.pendingSet = false;
obj.pendingSets = null; // MongoDB bulk write state
obj.filePendingSet = false;
obj.filePendingSets = null;
obj.filePendingCb = null;
obj.filePendingCbs = null;
obj.powerFilePendingSet = false;
obj.powerFilePendingSets = null;
obj.powerFilePendingCb = null;
obj.powerFilePendingCbs = null;
obj.eventsFilePendingSet = false;
obj.eventsFilePendingSets = null;
obj.eventsFilePendingCb = null;
obj.eventsFilePendingCbs = null;
obj.SetupDatabase = function (func) { obj.SetupDatabase = function (func) {
// Check if the database unique identifier is present // Check if the database unique identifier is present
@ -1031,15 +1043,17 @@ module.exports.CreateDB = function (parent, func) {
} }
} else if (obj.databaseType == 3) { } else if (obj.databaseType == 3) {
// Database actions on the main collection (MongoDB) // Database actions on the main collection (MongoDB)
obj.Set = function (data) { // Fast Set operation using bulkWrite(), this is much faster then using replaceOne() obj.Set = function (data, func) { // Fast Set operation using bulkWrite(), this is much faster then using replaceOne()
if (obj.pendingSet == false) { if (obj.filePendingSet == false) {
// Perform the operation now // Perform the operation now
obj.pendingSet = true; obj.pendingSets = null; obj.filePendingSet = true; obj.filePendingSets = null;
obj.file.bulkWrite([{ replaceOne: { filter: { _id: data._id }, replacement: performTypedRecordEncrypt(common.escapeLinksFieldNameEx(data)), upsert: true } }], bulkWriteCompleted); if (func != null) { obj.filePendingCbs = [func]; }
obj.file.bulkWrite([{ replaceOne: { filter: { _id: data._id }, replacement: performTypedRecordEncrypt(common.escapeLinksFieldNameEx(data)), upsert: true } }], fileBulkWriteCompleted);
} else { } else {
// Add this operation to the pending list // Add this operation to the pending list
if (obj.pendingSets == null) { obj.pendingSets = {} } if (obj.filePendingSets == null) { obj.filePendingSets = {} }
obj.pendingSets[data._id] = data; obj.filePendingSets[data._id] = data;
if (func != null) { if (obj.filePendingCb == null) { obj.filePendingCb = [ func ]; } else { obj.filePendingCb.push(func); } }
} }
}; };
obj.Get = function (id, func) { obj.Get = function (id, func) {
@ -1113,7 +1127,19 @@ module.exports.CreateDB = function (parent, func) {
// Database actions on the events collection // Database actions on the events collection
obj.GetAllEvents = function (func) { obj.eventsfile.find({}).toArray(func); }; obj.GetAllEvents = function (func) { obj.eventsfile.find({}).toArray(func); };
obj.StoreEvent = function (event, func) { obj.eventsfile.insertOne(event, func); }; obj.StoreEvent = function (event, func) { // Fast MongoDB event store using bulkWrite()
if (obj.eventsFilePendingSet == false) {
// Perform the operation now
obj.eventsFilePendingSet = true; obj.eventsFilePendingSets = null;
if (func != null) { obj.eventsFilePendingCbs = [func]; }
obj.eventsfile.bulkWrite([{ insertOne: { document: event } }], eventsFileBulkWriteCompleted);
} else {
// Add this operation to the pending list
if (obj.eventsFilePendingSets == null) { obj.eventsFilePendingSets = [] }
obj.eventsFilePendingSets.push(event);
if (func != null) { if (obj.eventsFilePendingCb == null) { obj.eventsFilePendingCb = [func]; } else { obj.eventsFilePendingCb.push(func); } }
}
};
obj.GetEvents = function (ids, domain, func) { obj.eventsfile.find({ domain: domain, ids: { $in: ids } }).project({ type: 0, _id: 0, domain: 0, ids: 0, node: 0 }).sort({ time: -1 }).toArray(func); }; obj.GetEvents = function (ids, domain, func) { obj.eventsfile.find({ domain: domain, ids: { $in: ids } }).project({ type: 0, _id: 0, domain: 0, ids: 0, node: 0 }).sort({ time: -1 }).toArray(func); };
obj.GetEventsWithLimit = function (ids, domain, limit, func) { obj.eventsfile.find({ domain: domain, ids: { $in: ids } }).project({ type: 0, _id: 0, domain: 0, ids: 0, node: 0 }).sort({ time: -1 }).limit(limit).toArray(func); }; obj.GetEventsWithLimit = function (ids, domain, limit, func) { obj.eventsfile.find({ domain: domain, ids: { $in: ids } }).project({ type: 0, _id: 0, domain: 0, ids: 0, node: 0 }).sort({ time: -1 }).limit(limit).toArray(func); };
obj.GetUserEvents = function (ids, domain, username, func) { obj.eventsfile.find({ domain: domain, $or: [{ ids: { $in: ids } }, { username: username }] }).project({ type: 0, _id: 0, domain: 0, ids: 0, node: 0 }).sort({ time: -1 }).toArray(func); }; obj.GetUserEvents = function (ids, domain, username, func) { obj.eventsfile.find({ domain: domain, $or: [{ ids: { $in: ids } }, { username: username }] }).project({ type: 0, _id: 0, domain: 0, ids: 0, node: 0 }).sort({ time: -1 }).toArray(func); };
@ -1133,7 +1159,20 @@ module.exports.CreateDB = function (parent, func) {
// Database actions on the power collection // Database actions on the power collection
obj.getAllPower = function (func) { obj.powerfile.find({}).toArray(func); }; obj.getAllPower = function (func) { obj.powerfile.find({}).toArray(func); };
obj.storePowerEvent = function (event, multiServer, func) { if (multiServer != null) { event.server = multiServer.serverid; } obj.powerfile.insertOne(event, func); }; obj.storePowerEvent = function (event, multiServer, func) { // Fast MongoDB event store using bulkWrite()
if (multiServer != null) { event.server = multiServer.serverid; }
if (obj.powerFilePendingSet == false) {
// Perform the operation now
obj.powerFilePendingSet = true; obj.powerFilePendingSets = null;
if (func != null) { obj.powerFilePendingCbs = [func]; }
obj.powerfile.bulkWrite([{ insertOne: { document: event } }], powerFileBulkWriteCompleted);
} else {
// Add this operation to the pending list
if (obj.powerFilePendingSets == null) { obj.powerFilePendingSets = [] }
obj.powerFilePendingSets.push(event);
if (func != null) { if (obj.powerFilePendingCb == null) { obj.powerFilePendingCb = [func]; } else { obj.powerFilePendingCb.push(func); } }
}
};
obj.getPowerTimeline = function (nodeid, func) { obj.powerfile.find({ nodeid: { $in: ['*', nodeid] } }).project({ _id: 0, nodeid: 0, s: 0 }).sort({ time: 1 }).toArray(func); }; obj.getPowerTimeline = function (nodeid, func) { obj.powerfile.find({ nodeid: { $in: ['*', nodeid] } }).project({ _id: 0, nodeid: 0, s: 0 }).sort({ time: 1 }).toArray(func); };
obj.removeAllPowerEvents = function () { obj.powerfile.deleteMany({}, { multi: true }); }; obj.removeAllPowerEvents = function () { obj.powerfile.deleteMany({}, { multi: true }); };
obj.removeAllPowerEventsForNode = function (nodeid) { obj.powerfile.deleteMany({ nodeid: nodeid }, { multi: true }); }; obj.removeAllPowerEventsForNode = function (nodeid) { obj.powerfile.deleteMany({ nodeid: nodeid }, { multi: true }); };
@ -1421,16 +1460,56 @@ module.exports.CreateDB = function (parent, func) {
} }
// MongoDB pending bulk write operation, perform fast bulk document replacement. // MongoDB pending bulk write operation, perform fast bulk document replacement.
function bulkWriteCompleted() { function fileBulkWriteCompleted() {
if (obj.pendingSets != null) { // Callbacks
if (obj.filePendingCbs != null) { for (var i in obj.filePendingCbs) { obj.filePendingCbs[i](); } obj.filePendingCbs = null; }
if (obj.filePendingSets != null) {
// Perform pending operations // Perform pending operations
var ops = [], c = 0; var ops = [];
for (var i in obj.pendingSets) { c++; ops.push({ replaceOne: { filter: { _id: i }, replacement: performTypedRecordEncrypt(common.escapeLinksFieldNameEx(obj.pendingSets[i])), upsert: true } }); } obj.filePendingCbs = obj.filePendingCb;
obj.file.bulkWrite(ops, bulkWriteCompleted); obj.filePendingCb = null;
obj.pendingSets = null; for (var i in obj.filePendingSets) { ops.push({ replaceOne: { filter: { _id: i }, replacement: performTypedRecordEncrypt(common.escapeLinksFieldNameEx(obj.filePendingSets[i])), upsert: true } }); }
obj.file.bulkWrite(ops, fileBulkWriteCompleted);
obj.filePendingSets = null;
} else { } else {
// All done, no pending operations. // All done, no pending operations.
obj.pendingSet = false; obj.filePendingSet = false;
}
}
// MongoDB pending bulk write operation, perform fast bulk document replacement.
function eventsFileBulkWriteCompleted() {
// Callbacks
if (obj.eventsFilePendingCbs != null) { for (var i in obj.eventsFilePendingCbs) { obj.eventsFilePendingCbs[i](); } obj.eventsFilePendingCbs = null; }
if (obj.eventsFilePendingSets != null) {
// Perform pending operations
var ops = [];
for (var i in obj.eventsFilePendingSets) { ops.push({ document: obj.eventsFilePendingSets[i] }); }
obj.eventsFilePendingCbs = obj.eventsFilePendingCb;
obj.eventsFilePendingCb = null;
obj.eventsFilePendingSets = null;
obj.eventsfile.bulkWrite(ops, eventsFileBulkWriteCompleted);
} else {
// All done, no pending operations.
obj.eventsFilePendingSet = false;
}
}
// MongoDB pending bulk write operation, perform fast bulk document replacement.
function powerFileBulkWriteCompleted() {
// Callbacks
if (obj.powerFilePendingCbs != null) { for (var i in obj.powerFilePendingCbs) { obj.powerFilePendingCbs[i](); } obj.powerFilePendingCbs = null; }
if (obj.powerFilePendingSets != null) {
// Perform pending operations
var ops = [];
for (var i in obj.powerFilePendingSets) { ops.push({ document: obj.powerFilePendingSets[i] }); }
obj.powerFilePendingCbs = obj.powerFilePendingCb;
obj.powerFilePendingCb = null;
obj.powerFilePendingSets = null;
obj.powerfile.bulkWrite(ops, powerFileBulkWriteCompleted);
} else {
// All done, no pending operations.
obj.powerFilePendingSet = false;
} }
} }