wake-up-neo.com

Bulk-Upert in MongoDB mit Mungo

Gibt es eine Option, Bulk-Uperts mit Mungo durchzuführen? Im Grunde ein Array haben und jedes Element einfügen, wenn es nicht existiert, oder es aktualisieren, wenn es existiert? (Ich verwende Zoll _ids)

Wenn ich .insert verwende, gibt MongoDB einen Fehler E11000 für doppelte Schlüssel zurück (die aktualisiert werden sollten). Das Einfügen mehrerer neuer Dokumente funktioniert jedoch gut: 

var Users = self.db.collection('Users');

Users.insert(data, function(err){
            if (err) {
                callback(err);
            }
            else {
                callback(null);
            }
        });

Die Verwendung von .save gibt einen Fehler zurück, bei dem der Parameter ein einzelnes Dokument sein muss:

Users.save(data, function(err){
   ...
}

Diese Antwort lässt vermuten, dass es keine solche Option gibt, sie ist jedoch spezifisch für C # und auch schon 3 Jahre alt. Also habe ich mich gefragt, ob es eine Option gibt, dies mit Mungo zu tun?

Vielen Dank!

26
user3122267

Nicht speziell in "Mungo" oder zumindest noch nicht schriftlich. Die MongoDB-Shell ab Version 2.6 verwendet die "Bulk-Operations-API" "under the hood" sozusagen für alle allgemeinen Hilfsmethoden. In seiner Implementierung versucht dies zuerst, und wenn ein älterer Versionsserver entdeckt wird, gibt es einen "Fallback" für die ältere Implementierung.

Alle mongoose-Methoden verwenden "derzeit" die "Legacy" -Implementierung oder die Write-Concentration-Antwort und die grundlegenden Legacymethoden. Es gibt jedoch einen .collection-Zugriff von einem beliebigen Moose-Modell, der im Wesentlichen auf das "Collection-Objekt" von dem zugrunde liegenden "Node-Native-Treiber" aus zugreift, auf dem Moose selbst implementiert ist:

 var mongoose = require('mongoose'),
     Schema = mongoose.Schema;

 mongoose.connect('mongodb://localhost/test');

 var sampleSchema  = new Schema({},{ "strict": false });

 var Sample = mongoose.model( "Sample", sampleSchema, "sample" );

 mongoose.connection.on("open", function(err,conn) { 

    var bulk = Sample.collection.initializeOrderedBulkOp();
    var counter = 0;

    // representing a long loop
    for ( var x = 0; x < 100000; x++ ) {

        bulk.find(/* some search */).upsert().updateOne(
            /* update conditions */
        });
        counter++;

        if ( counter % 1000 == 0 )
            bulk.execute(function(err,result) {             
                bulk = Sample.collection.initializeOrderedBulkOp();
            });
    }

    if ( counter % 1000 != 0 )
        bulk.execute(function(err,result) {
           // maybe do something with result
        });

 });

Der Hauptnachteil dabei ist, dass "Mungos" -Methoden tatsächlich wissen, dass möglicherweise noch keine Verbindung hergestellt wird, und "Schlange stehen", bis dies abgeschlossen ist. Der native Treiber, nach dem Sie suchen, unterscheidet diesen Unterschied nicht.

Sie müssen sich also wirklich darüber im Klaren sein, dass die Verbindung auf irgendeine Art und Weise hergestellt wird. Sie können jedoch die systemeigenen Treibermethoden verwenden, solange Sie mit dem, was Sie tun, vorsichtig sind.

22
Neil Lunn

Sie müssen kein Limit (1000) verwalten, wie @ neil-lunn vorschlägt. Mongoose macht das schon. Ich habe seine großartige Antwort als Grundlage für diese vollständige, auf Promise basierende Implementierung und dieses Beispiels verwendet:

var Promise = require('bluebird');
var mongoose = require('mongoose');

var Show = mongoose.model('Show', {
  "id": Number,
  "title": String,
  "provider":  {'type':String, 'default':'eztv'}
});

/**
 * Atomic connect Promise - not sure if I need this, might be in mongoose already..
 * @return {Priomise}
 */
function connect(uri, options){
  return new Promise(function(resolve, reject){
    mongoose.connect(uri, options, function(err){
      if (err) return reject(err);
      resolve(mongoose.connection);
    });
  });
}

/**
 * Bulk-upsert an array of records
 * @param  {Array}    records  List of records to update
 * @param  {Model}    Model    Mongoose model to update
 * @param  {Object}   match    Database field to match
 * @return {Promise}  always resolves a BulkWriteResult
 */
function save(records, Model, match){
  match = match || 'id';
  return new Promise(function(resolve, reject){
    var bulk = Model.collection.initializeUnorderedBulkOp();
    records.forEach(function(record){
      var query = {};
      query[match] = record[match];
      bulk.find(query).upsert().updateOne( record );
    });
    bulk.execute(function(err, bulkres){
        if (err) return reject(err);
        resolve(bulkres);
    });
  });
}

/**
 * Map function for EZTV-to-Show
 * @param  {Object} show EZTV show
 * @return {Object}      Mongoose Show object
 */
function mapEZ(show){
  return {
    title: show.title,
    id: Number(show.id),
    provider: 'eztv'
  };
}

// if you are  not using EZTV, put shows in here
var shows = []; // giant array of {id: X, title: "X"}

// var eztv = require('eztv');
// eztv.getShows({}, function(err, shows){
//   if(err) return console.log('EZ Error:', err);

//   var shows = shows.map(mapEZ);
  console.log('found', shows.length, 'shows.');
  connect('mongodb://localhost/tv', {}).then(function(db){
    save(shows, Show).then(function(bulkRes){
      console.log('Bulk complete.', bulkRes);
      db.close();
    }, function(err){
        console.log('Bulk Error:', err);
        db.close();
    });
  }, function(err){
    console.log('DB Error:', err);
  });

// });

Dies hat den Vorteil, dass die Verbindung geschlossen wird, wenn sie fertig ist. Sie zeigt eventuell vorhandene Fehler an, ignoriert sie jedoch, wenn dies nicht der Fall ist (Fehlerrückrufe in Versprechen sind optional.) Es ist auch sehr schnell. Lass das hier, um meine Erkenntnisse zu teilen. Wenn Sie alle eztv-Shows in einer Datenbank speichern möchten, können Sie das eztv-Zeug kommentieren.

18
konsumer

Ich habe ein Plugin für Mongoose veröffentlicht, das eine statische upsertMany-Methode zur Ausführung von Bulk-Upert-Vorgängen mit einer Versprechungsschnittstelle verfügbar macht.

Ein weiterer Vorteil der Verwendung dieses Plugins gegenüber der Initialisierung Ihres eigenen Bulk-Op für die zugrunde liegende Collection ist, dass dieses Plugin Ihre Daten zuerst in das Mongoose-Modell konvertiert und dann vor dem Upert wieder in einfache Objekte. Dadurch wird sichergestellt, dass die Validierung des Mongoose-Schemas angewendet wird und die Daten entvölkert werden und für das Einfügen von Rohdaten geeignet sind.

https://github.com/meanie/mongoose-upsert-manyhttps://www.npmjs.com/package/@meanie/mongoose-upsert-many

Ich hoffe es hilft!

3
Adam Reis

Wenn Sie die Massenmethoden nicht in Ihrer db.collection sehen, dh Sie erhalten einen Fehler mit der Auswirkung der Variablen Xxx, hat keine Methode: initializeOrderedBulkOp ()

Aktualisieren Sie Ihre Mungo-Version. Anscheinend durchlaufen ältere Mungo-Versionen nicht alle zugrunde liegenden Mongo db.collection-Methoden. 

npm mongoose installieren

passte für mich auf.

1
zstew

Dies musste ich kürzlich beim Speichern von Produkten in meiner E-Commerce-App erreichen. In meiner Datenbank kam es zu einem Timeout, da ich alle 4 Stunden 10000 Elemente aktualisieren musste. Eine Option für mich bestand darin, socketTimeoutMS und connectTimeoutMS während der Verbindung zur Datenbank in mongoose zu setzen, aber es fühlte sich irgendwie hackig an und ich wollte die Standardwerte für die Verbindungszeitüberschreitung der Datenbank nicht manipulieren. Ich sehe auch, dass die Lösung von @neil lunn einen einfachen Ansatz für die Synchronisation des Moduls innerhalb der for-Schleife verwendet. Hier ist eine asynchrone Version von mir, von der ich glaube, dass sie den Job viel besser macht

let BATCH_SIZE = 500
Array.prototype.chunk = function (groupsize) {
    var sets = [];
    var chunks = this.length / groupsize;

    for (var i = 0, j = 0; i < chunks; i++ , j += groupsize) {
        sets[i] = this.slice(j, j + groupsize);
    }

    return sets;
}

function upsertDiscountedProducts(products) {

    //Take the input array of products and divide it into chunks of BATCH_SIZE

    let chunks = products.chunk(BATCH_SIZE), current = 0

    console.log('Number of chunks ', chunks.length)

    let bulk = models.Product.collection.initializeUnorderedBulkOp();

    //Get the current time as timestamp
    let timestamp = new Date(),

        //Keep track of the number of items being looped
        pendingCount = 0,
        inserted = 0,
        upserted = 0,
        matched = 0,
        modified = 0,
        removed = 0,

        //If atleast one upsert was performed
        upsertHappened = false;

    //Call the load function to get started
    load()
    function load() {

        //If we have a chunk to process
        if (current < chunks.length) {
            console.log('Current value ', current)

            for (let i = 0; i < chunks[current].length; i++) {
                //For each item set the updated timestamp to the current time
                let item = chunks[current][i]

                //Set the updated timestamp on each item
                item.updatedAt = timestamp;

                bulk.find({ _id: item._id })
                    .upsert()
                    .updateOne({
                        "$set": item,

                        //If the item is being newly inserted, set a created timestamp on it
                        "$setOnInsert": {
                            "createdAt": timestamp
                        }
                    })
            }

            //Execute the bulk operation for the current chunk
            bulk.execute((error, result) => {
                if (error) {
                    console.error('Error while inserting products' + JSON.stringify(error))
                    next()
                }
                else {

                    //Atleast one upsert has happened
                    upsertHappened = true;
                    inserted += result.nInserted
                    upserted += result.nUpserted
                    matched += result.nMatched
                    modified += result.nModified
                    removed += result.nRemoved

                    //Move to the next chunk
                    next()
                }
            })



        }
        else {
            console.log("Calling finish")
            finish()
        }

    }

    function next() {
        current++;

        //Reassign bulk to a new object and call load once again on the new object after incrementing chunk
        bulk = models.Product.collection.initializeUnorderedBulkOp();
        setTimeout(load, 0)
    }

    function finish() {

        console.log('Inserted ', inserted + ' Upserted ', upserted, ' Matched ', matched, ' Modified ', modified, ' Removed ', removed)

        //If atleast one chunk was inserted, remove all items with a 0% discount or not updated in the latest upsert
        if (upsertHappened) {
            console.log("Calling remove")
            remove()
        }


    }

    /**
     * Remove all the items that were not updated in the recent upsert or those items with a discount of 0
     */
    function remove() {

        models.Product.remove(
            {
                "$or":
                [{
                    "updatedAt": { "$lt": timestamp }
                },
                {
                    "discount": { "$eq": 0 }
                }]
            }, (error, obj) => {
                if (error) {
                    console.log('Error while removing', JSON.stringify(error))
                }
                else {
                    if (obj.result.n === 0) {
                        console.log('Nothing was removed')
                    } else {
                        console.log('Removed ' + obj.result.n + ' documents')
                    }
                }
            }
        )
    }
}
0
PirateApp