Django batching/bulk update_or_create?

Each Answer to this Q is separated by one/two green lines.

I have data in the database which needs updating peridocially. The source of the data returns everything that’s avalible at that point in time, so will include new data that is not already in the database.

As I loop through the source data I don’t want to be making 1000s of individual writes if possible.

Is there anything such as update_or_create but works in batches?

One thought was using update_or_create in combination with manual transactions, but I’m not sure if that just queues up the individual writes or if it would combine it all into one SQL insert?

Or similarly could using @commit_on_success() on a function with update_or_create inside a the loop work?

I am not doing anything with the data other than translating it and saving it to a model. Nothing is dependant on that model existing during the loop

Since Django added support for bulk_update, this is now somewhat possible, though you need to do 3 database calls (a get, a bulk create, and a bulk update) per batch. It’s a bit challenging to make a good interface to a general purpose function here, as you want the function to support both efficient querying as well as the updates. Here is a method I implemented that is designed for bulk update_or_create where you have a number of common identifying keys (which could be empty) and one identifying key that varies among the batch.

This is implemented as a method on a base model, but can be used independently of that. This also assumes that the base model has an auto_now timestamp on the model named updated_on; if this is not the case, the lines of the code that assume this have been commented for easy modification.

In order to use this in batches, chunk your updates into batches before calling it. This is also a way to get around data that can have one of a small number of values for a secondary identifier without having to change the interface.

class BaseModel(models.Model):
    updated_on = models.DateTimeField(auto_now=True)
    
    @classmethod
    def bulk_update_or_create(cls, common_keys, unique_key_name, unique_key_to_defaults):
        """
        common_keys: {field_name: field_value}
        unique_key_name: field_name
        unique_key_to_defaults: {field_value: {field_name: field_value}}
        
        ex. Event.bulk_update_or_create(
            {"organization": organization}, "external_id", {1234: {"started": True}}
        )
        """
        with transaction.atomic():
            filter_kwargs = dict(common_keys)
            filter_kwargs[f"{unique_key_name}__in"] = unique_key_to_defaults.keys()
            existing_objs = {
                getattr(obj, unique_key_name): obj
                for obj in cls.objects.filter(**filter_kwargs).select_for_update()
            }
            
            create_data = {
                k: v for k, v in unique_key_to_defaults.items() if k not in existing_objs
            }
            for unique_key_value, obj in create_data.items():
                obj[unique_key_name] = unique_key_value
                obj.update(common_keys)
            creates = [cls(**obj_data) for obj_data in create_data.values()]
            if creates:
                cls.objects.bulk_create(creates)

            # This set should contain the name of the `auto_now` field of the model
            update_fields = {"updated_on"}
            updates = []
            for key, obj in existing_objs.items():
                obj.update(unique_key_to_defaults[key], save=False)
                update_fields.update(unique_key_to_defaults[key].keys())
                updates.append(obj)
            if existing_objs:
                cls.objects.bulk_update(updates, update_fields)
        return len(creates), len(updates)

    def update(self, update_dict=None, save=True, **kwargs):
        """ Helper method to update objects """
        if not update_dict:
            update_dict = kwargs
        # This set should contain the name of the `auto_now` field of the model
        update_fields = {"updated_on"}
        for k, v in update_dict.items():
            setattr(self, k, v)
            update_fields.add(k)
        if save:
            self.save(update_fields=update_fields)

Example usage:

class Event(BaseModel):
    organization = models.ForeignKey(Organization)
    external_id = models.IntegerField(unique=True)
    started = models.BooleanField()


organization = Organization.objects.get(...)
updates_by_external_id = {
    1234: {"started": True},
    2345: {"started": True},
    3456: {"started": False},
}
Event.bulk_update_or_create(
    {"organization": organization}, "external_id", updates_by_external_id
)

Possible Race Conditions

The code above leverages a transaction and select-for-update to prevent race conditions on updates. There is, however, a possible race condition on inserts if two threads or processes are trying to create objects with the same identifiers.

The easy mitigation is to ensure that the combination of your common_keys and your unique_key is a database-enforced uniqueness constraint (which is the intended use of this function). This can be achieved with either the unique_key referencing a field with unique=True, or with the unique_key combined with a subset of the common_keys enforced as unique together by a UniqueConstraint). With database-enforced uniqueness protection, if multiple threads are trying to perform conflicting creates, all but one will fail with an IntegrityError. Due to the enclosing transaction, threads that fail will perform no changes and can be safely retried or ignored (a conflicting create that failed could just be treated as a create that happened first and then was immediately overwritten).

If leveraging uniqueness constraints is not possible, then you will either need to implement your own concurrency control or lock the entire table.

Batching your updates is going to be an upsert command and like @imposeren said, Postgres 9.5 gives you that ability. I think Mysql 5.7 does as well (see http://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html) depending on your exact needs. That said it’s probably easiest to just use a db cursor. Nothing wrong with that, it’s there for when the ORM just isn’t enough.

Something along these lines should work. It’s psuedo-ish code, so don’t just cut-n-paste this but the concept is there for ya.

class GroupByChunk(object):
    def __init__(self, size):
        self.count = 0
        self.size = size
        self.toggle = False

    def __call__(self, *args, **kwargs):
        if self.count >= self.size:  # Allows for size 0
            self.toggle = not self.toggle
            self.count = 0
        self.count += 1
        return self.toggle

def batch_update(db_results, upsert_sql):
    with transaction.atomic():
        cursor = connection.cursor()   
        for chunk in itertools.groupby(db_results, GroupByChunk(size=1000)):
            cursor.execute_many(upsert_sql, chunk)

Assumptions here are:

  • db_results is some kind of results iterator, either in a list or dictionary
  • A result from db_results can be fed directly into a raw sql exec statement
  • If any of the batch updates fail, you’ll be rolling back ALL of them. If you want to move that to for each chunk, just push the with block down a bit

There is django-bulk-update-or-create library for Django that can do that.


The answers/resolutions are collected from stackoverflow, are licensed under cc by-sa 2.5 , cc by-sa 3.0 and cc by-sa 4.0 .