I have a django application that is backed by a MySQL database. I have recently moved a section of code out of the request flow and put it into a Process. The code uses select_for_update() to lock affected rows in the DB but now I am occasionally seeing the Process updating a record while it should be locked in the main Thread. If I switch my Executor from a ProcessPoolExecutor to a ThreadPoolExecutor the locking works as expected. I thought that select_for_update() operated at the database level so it shouldn't make any difference whether code is in Threads, Processes, or even on another machine - what am I missing?
I've boiled my code down to a sample that exhibits the same behaviour:
from concurrent import futures import logging from time import sleep from django.db import transaction from myapp.main.models import CompoundBase logger = logging.getLogger() executor = futures.ProcessPoolExecutor() # executor = futures.ThreadPoolExecutor() def test() -> None: pk = setup() f1 = executor.submit(select_and_sleep, pk) f2 = executor.submit(sleep_and_update, pk) futures.wait([f1, f2]) def setup() -> int: cb = CompoundBase.objects.first() cb.corporate_id = 'foo' cb.save() return cb.pk def select_and_sleep(pk: int) -> None: try: with transaction.atomic(): cb = CompoundBase.objects.select_for_update().get(pk=pk) print('Locking') sleep(5) cb.corporate_id = 'baz' cb.save() print('Updated after sleep') except Exception: logger.exception('select_and_sleep') def sleep_and_update(pk: int) -> None: try: sleep(2) print('Updating') with transaction.atomic(): cb = CompoundBase.objects.select_for_update().get(pk=pk) cb.corporate_id = 'bar' cb.save() print('Updated without sleep') except Exception: logger.exception('sleep_and_update') test() When run as shown I get:
Locking Updating Updated without sleep Updated after sleep But if I change to the ThreadPoolExecutor I get:
Locking Updating Updated after sleep Updated without sleep