3

I have a django application that is backed by a MySQL database. I have recently moved a section of code out of the request flow and put it into a Process. The code uses select_for_update() to lock affected rows in the DB but now I am occasionally seeing the Process updating a record while it should be locked in the main Thread. If I switch my Executor from a ProcessPoolExecutor to a ThreadPoolExecutor the locking works as expected. I thought that select_for_update() operated at the database level so it shouldn't make any difference whether code is in Threads, Processes, or even on another machine - what am I missing?

I've boiled my code down to a sample that exhibits the same behaviour:

from concurrent import futures import logging from time import sleep from django.db import transaction from myapp.main.models import CompoundBase logger = logging.getLogger() executor = futures.ProcessPoolExecutor() # executor = futures.ThreadPoolExecutor() def test() -> None: pk = setup() f1 = executor.submit(select_and_sleep, pk) f2 = executor.submit(sleep_and_update, pk) futures.wait([f1, f2]) def setup() -> int: cb = CompoundBase.objects.first() cb.corporate_id = 'foo' cb.save() return cb.pk def select_and_sleep(pk: int) -> None: try: with transaction.atomic(): cb = CompoundBase.objects.select_for_update().get(pk=pk) print('Locking') sleep(5) cb.corporate_id = 'baz' cb.save() print('Updated after sleep') except Exception: logger.exception('select_and_sleep') def sleep_and_update(pk: int) -> None: try: sleep(2) print('Updating') with transaction.atomic(): cb = CompoundBase.objects.select_for_update().get(pk=pk) cb.corporate_id = 'bar' cb.save() print('Updated without sleep') except Exception: logger.exception('sleep_and_update') test() 

When run as shown I get:

Locking Updating Updated without sleep Updated after sleep 

But if I change to the ThreadPoolExecutor I get:

Locking Updating Updated after sleep Updated without sleep 

1 Answer 1

3

The good news is that it's mostly there, I did some reading around and based on an answer I found here

I am assuming that you are running on Linux as that seems to be the behaviour on the platform.

It looks like under Linux the default Process start strategy is the fork strategy, which is usually what you want, however in this exact circumstance it appears that resources (such as DB connections) are being shared, resulting in the db operations being treated as the same transaction and thus are not blocked. To get the behaviour you want, each process would appear to need its own resources and to not share resouces with its parent process (and subsequently any other children of the parent).

It is possible to get the behaviour you want using the following code, however be aware that I had to split the code into two files.

fn.py

from time import sleep from django.db import transaction import django django.setup() from myapp.main.models import CompoundBase def setup() -> int: cb = CompoundBase.objects.first() cb.corporate_id = 'foo' cb.save() return cb.pk def select_and_sleep(pk: int) -> None: try: with transaction.atomic(): cb = CompoundBase.objects.select_for_update().get(pk=pk) print('Locking') sleep(5) cb.corporate_id = 'baz' cb.save() print('Updated after sleep') except Exception: logger.exception('select_and_sleep') def sleep_and_update(pk: int) -> None: try: sleep(2) print('Updating') with transaction.atomic(): cb = CompoundBase.objects.select_for_update().get(pk=pk) cb.corporate_id = 'bar' cb.save() print('Updated without sleep') except Exception: logger.exception('sleep_and_update') 

proc_test.py

from concurrent import futures from multiprocessing import get_context from time import sleep import logging import fn logger = logging.getLogger() executor = futures.ProcessPoolExecutor(mp_context=get_context("forkserver")) # executor = futures.ThreadPoolExecutor() def test() -> None: pk = fn.setup() f1 = executor.submit(fn.select_and_sleep, pk) f2 = executor.submit(fn.sleep_and_update, pk) futures.wait([f1, f2]) test() 

There are three strategies in starting a process, fork, spawn, and forkserver, using either spawn or forkserver appears to get you the behaviour that you are looking for.

References:

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.