How multiprocessing works in Python under the hood

I I've been writing in Python for quite some time. and in many projects used multiprocessing, a Python standard library package that provides an interface for working with processes, queues, process pools, and many other convenient tools for parallel programming. At some point I realized that I lacked a more detailed understanding of how this library works.

I wanted to get into the multiprocessing source code, figure it out and at the same time write an article. This article is mainly intended for those new to Python and those who want to understand in more detail how exactly processes and pools are created in Python and dive into implementation details.

In this article I will not tell you what processes are and why they are needed. You can read the very base about operating systems and processes, for example, here and here. It is also important to clarify that all code given in the article corresponds to Python version 3.11.4

Content

  1. Creating a new process in the OS

  2. Ways to create a new process in multiprocessing

  3. Context

  4. Process

    1. Creating a process via fork

    2. Creating a process via spawn

    3. Ending the process

  5. Process Pool

Creating a new process in the OS

Processes in the OS are created using system calls – low-level functions of the operating system that allow user programs to interact with the OS.

On UNIX-like systems, a system call is used to create a new process fork. There is actually a whole family of system calls: fork, vfork, clone. Their essence is very similar and in this article it is enough for us to consider only fork.
System call fork creates a copy of the current process, returns zero in the child process and the PID of the child in the parent process. It is important to say that no actual copying and allocation of memory occurs when a process is created. Technology is used instead copy-on-write, which creates a copy of a memory page only when an attempt is made to write to that page. This allows you to reduce the amount of memory consumed by processes and significantly speed up process creation.

In Windows, a system call is used to create processes CreateProcess from WinApi. Unlike UNIX-like systems, in Windows the program passed as the system call arguments is immediately loaded into the created process.

Ways to create a new process in multiprocessing

The multiprocessing package has 3 main methods for creating a new process: fork, spawn And forkserver.

fork:

  • As the name suggests, it uses a system call fork to create a new process

  • Multiprocessing is a way to create default processes on POSIX systems other than macOS

  • Not supported on windows

spawn:

  • Starts a new process using the command passed to it. In our case, a command will be passed to start the process with the Python interpreter. The arguments to the interpreter are the path to the file to be launched, as well as some other service arguments

  • Multiprocessing is a way to create default processes on macOS and windows

  • Usually works slower than fork

forkserver:

  • A server process is created that creates processes using the method fork. When a new process is required, the parent process connects to the server and asks it to fork the new process. This method combines the speed of work fork with good reliability (since the server process from which the child is created has a simple state)

  • Not supported on windows

In this article I will discuss only the first two methods, since they are the most frequently used.

Context

The first thing you need to understand when diving into the implementation of multiprocessing is the context class. The context has the same API as the multiprocessing module itself, but it allows you to set and fix how new processes are created: fork, spawn or forkserver.

import multiprocessing as mp

if __name__ == '__main__':
    ctx_spawn = mp.get_context('spawn')
    ctx_fork = mp.get_context('fork')

Context allows you to use different methods for creating a process within the same program. In the example above, all processes created through the context object ctx_spawnwill be created using the method spawnand through ctx_forkrespectively, by the method fork.

For each process creation method, a context class is implemented, inherited from BaseContext. These classes differ only in the process class that is used to spawn child processes:

class ForkContext(BaseContext):  
    _name="fork"  
    Process = ForkProcess  # Все созданные процессы будут иметь тип ForkProcess
  
class SpawnContext(BaseContext):  
    _name="spawn"  
    Process = SpawnProcess  # Все созданные процессы будут иметь тип SpawnProcess

BaseContext, in turn, simply implements the API like the multiprocessing package. You can view the source code here.

To create processes, you don’t have to use the context, but directly create multiprocessing.Process. In this case, the default context is simply used, which lies in the global variable _default_context. The default context is determined depending on the type of your OS. For windows and macOS this is spawnand for the rest – fork. This can be clearly seen in source code:

if sys.platform != 'win32':
	# тут объявляются классы контекстов и процессов для POSIX систем
	...

	_concrete_contexts = {  
	    'fork': ForkContext(),  
	    'spawn': SpawnContext(),  
	    'forkserver': ForkServerContext(),  
	}  
	if sys.platform == 'darwin':  
	    # на macOS используется spawn по умолчанию
		_default_context = DefaultContext(_concrete_contexts['spawn'])  
	else:  
		# на всех остальных POSIX системах используется fork
	    _default_context = DefaultContext(_concrete_contexts['fork'])
else:
	# тут объявляются класс контекста и процесса для windows
	...
	
	_concrete_contexts = {  
	    'spawn': SpawnContext(),  
	}  
	# на windows используется spawn по умолчанию
	_default_context = DefaultContext(_concrete_contexts['spawn'])

Attentive readers noticed the class DefaultContext– this context class simply uses the default method for creating processes for the current OS.

Process

Let's look at the simplest example of creating a process using the multiprocessing module and try to figure out what's happening under the hood.

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

In this example the class Process accepts parameters target – the function to be run in the child process, and args – parameters for this function. Let's look at the class declaration Process:

class Process(process.BaseProcess):  
    _start_method = None 
     
    @staticmethod  
    def _Popen(process_obj):  
        return _default_context.get_context().Process._Popen(process_obj)  
  
    @staticmethod  
    def _after_fork():  
        return _default_context.get_context().Process._after_fork()

Process is the default process class, it inherits from BaseProcess – basic for all processes. There are also classes ForkProcess, SpawnProcess, ForkServerProcess – implementation for a specific process creation method. All classes implement a static method _Popen to create a new process and _after_fork to clean up after creating a child process.
It can be noted that in Process The process creation method is not set. This is because it is taken from the default context. _default_context.get_context() returns an object of type ForkContext, SpawnContext or ForkServerContext. The context has an attribute Processwhose methods are already called _Popen And _after_forkhaving a specific implementation for the launch type.

Let's return to our example. When calling p.start() inside the process class a static method is called _Popen(). The class is initialized in it Popen (see code below) which is responsible for creating a process and interacting with it.

class ForkProcess(process.BaseProcess):  
    _start_method = 'fork'  
    @staticmethod  
    def _Popen(process_obj):  
        from .popen_fork import Popen  
        return Popen(process_obj)

class SpawnProcess(process.BaseProcess):  
    _start_method = 'spawn'  
    @staticmethod  
    def _Popen(process_obj):  
        from .popen_spawn_posix import Popen  
        return Popen(process_obj)  
  
    @staticmethod  
    def _after_fork():  
        # process is spawned, nothing to do  
        pass

Creating a process via fork

First, let's look at how processes are created using the method fork. As we already know, when the process starts, the class is initialized Popen, which has an implementation for each of the methods of creating processes. For fork the implementation can be found in the file popen_fork.py.

class Popen(object):  
    method = 'fork'  
  
    def __init__(self, process_obj):  
        util._flush_std_streams()  
        self.returncode = None  
        self.finalizer = None  
        # при инициализации вызывается метод _launch
        self._launch(process_obj)

	# другие методы
	# ...

	def _launch(self, process_obj):  
	    code = 1  
	    # создаем две пары дескриптором
	    parent_r, child_w = os.pipe()
	    child_r, parent_w = os.pipe()  
	    self.pid = os.fork()  # создаем новый процесс
	    if self.pid == 0:
		    # в эту ветвь заходит в дочернем процессе
	        try:  
	            os.close(parent_r)  
	            os.close(parent_w)  
	            code = process_obj._bootstrap(parent_sentinel=child_r)  
	        finally:  
	            os._exit(code)  
	    else:  
		    # в эту ветвь заходит в родительском процессе
	        os.close(child_w)  
	        os.close(child_r)  
	        self.finalizer = util.Finalize(self, util.close_fds,  
	                                       (parent_r, parent_w,))  
	        self.sentinel = parent_r

process_obj here is the process object multiprocessing.Process.

To create a process, use the function os.fork()which uses a system call fork. os.fork()like a system call forkreturns zero in the child process and the PID of the child in the parent process. fork completely copies the parent process, so program execution continues with the same instruction.
The child process calls the method _bootstrap a process object within which the desired user function is called.

Now we've looked all the way from p.start() before launching the target function. Let's recap. Inside p.start() the default context is taken and the class is created Popen, which has an implementation for each of the methods for creating new processes. In case of fork to create a new process the function is used os.fork(), which creates a copy of the parent process. The child process calls the target function, and the parent process cleans up unnecessary objects and completes execution of the method p.start().

Creating a process via spawn

How the process starts when used SpawnProcess? The multiprocessing package contains implementations of the method spawn both under Windows and POSIX systems. The meaning of their work is the same, they just use different interfaces for working with the OS. Therefore, for simplicity, we will consider the implementation under POSIX systems. The entire process code is almost identical to the implementation for forkthe main difference is in the Popen class used to create new processes:

class Popen(popen_fork.Popen):  
    method = 'spawn'  
    DupFd = _DupFd  
  
    def __init__(self, process_obj):  
        self._fds = []  
        super().__init__(process_obj)  
  
    # другие методы
	# ...
	
    def _launch(self, process_obj):  
        from . import resource_tracker  
        tracker_fd = resource_tracker.getfd()  
        self._fds.append(tracker_fd)  
        prep_data = spawn.get_preparation_data(process_obj._name)  
        fp = io.BytesIO()  
        set_spawning_popen(self)  
        try:  
            reduction.dump(prep_data, fp)  # сериализация словаря с информацией о процессе
            reduction.dump(process_obj, fp) # сериализация объекта процесса
        finally:  
            set_spawning_popen(None)  
  
        parent_r = child_w = child_r = parent_w = None  
        try:  
            parent_r, child_w = os.pipe() # создаем 4 дескриптора для обмена данными
            child_r, parent_w = os.pipe()  
            cmd = spawn.get_command_line(tracker_fd=tracker_fd,  
                                         pipe_handle=child_r)  
            self._fds.extend([child_r, child_w])  
            self.pid = util.spawnv_passfds(spawn.get_executable(),  
                                           cmd, self._fds)  
            self.sentinel = parent_r  
            with open(parent_w, 'wb', closefd=False) as f:  
                f.write(fp.getbuffer())  
        finally:  
            fds_to_close = []  
            for fd in (parent_r, parent_w):  
                if fd is not None:  
                    fds_to_close.append(fd)  
            self.finalizer = util.Finalize(self, util.close_fds, fds_to_close)  
  
            for fd in (child_r, child_w):  
                if fd is not None:  
                    os.close(fd)

process_obj here is also the object of the process multiprocessing.Process.

Let's look at the main parts:

prep_data = spawn.get_preparation_data(process_obj._name)

First, information about the parent process is collected that the child process will need to deserialize the parent object. This information includes the path to the file to be interpreted, launch arguments, the directory where the file was launched from, etc.

reduction.dump(prep_data, fp)
reduction.dump(process_obj, fp)

Then a dictionary with information about the parent process and the process object itself are serialized. reduction.dump inside calls standard pickle.dump. Pickle – a Python language module that allows you to convert language objects into a byte stream (serialize) and, accordingly, deserialize. This module is used in the multiprocessing package to pass Python objects between processes.

cmd = spawn.get_command_line(tracker_fd=tracker_fd,  
							 pipe_handle=child_r)  
self._fds.extend([child_r, child_w])  
self.pid = util.spawnv_passfds(spawn.get_executable(),  
							   cmd, self._fds) 

Next, a command is created to launch the interpreter, to which all the necessary arguments are passed. Then util.spawnv_passfds runs this command in a new process. File descriptors that should remain open in the new process are also transferred there.

with open(parent_w, 'wb', closefd=False) as f:  
	f.write(fp.getbuffer())  

The serialized information about the process is then passed to the child process. The child process deserializes the passed process object and runs the target function. Profit!

Let's dive deeper and see exactly how a process is created in a function util.spawnv_passfds:

# Start a program with only specified fds kept open  
def spawnv_passfds(path, args, passfds):  
    import _posixsubprocess  
    import subprocess  
    passfds = tuple(sorted(map(int, passfds)))  
    errpipe_read, errpipe_write = os.pipe()  
    try:  
        return _posixsubprocess.fork_exec(  
            args, [path], True, passfds, None, None,  
            -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,  
            False, False, -1, None, None, None, -1, None,  
            subprocess._USE_VFORK)  
    finally:  
        os.close(errpipe_read)  
        os.close(errpipe_write)

To create a new process, use the method fork-exec. Creating a new process is accomplished by two system calls. At first fork creates a child process that copies the parent process. The system call is then called in the child process exec (actually a system call exec no, by this we mean a family of several similar system calls, since their essence is the same). Exec runs a new executable in the context of an existing process, replacing the previous executable. Thus, the executable file changes within the same process.

Ending the process

Let's go back to the original example:

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Now we know what happens when we create a process and call a method on it start(). What happens when you call join()?

When calling join() The parent process waits for the child process to complete. First there are several checks and then the method is called wait in a class already familiar to us Popen:

class Popen(object):
	# другие методы
	# ...
	
	def poll(self, flag=os.WNOHANG):  
	    if self.returncode is None:  
	        try:  
		        # если процесс еще не завершился, то ждем
	            pid, sts = os.waitpid(self.pid, flag)  
	        except OSError:  
	            # Child process not yet created. See #1731717  
	            # e.errno == errno.ECHILD == 10            return None  
	        if pid == self.pid:  
	            self.returncode = os.waitstatus_to_exitcode(sts)  
	    return self.returncode  
	  
	def wait(self, timeout=None):  
		# проверяем завершился ли уже процесс
	    if self.returncode is None:  
	        if timeout is not None:  
	            from multiprocessing.connection import wait  
	            if not wait([self.sentinel], timeout):  
	                return None  
	        # This shouldn't block if wait() returned successfully.  
	        return self.poll(os.WNOHANG if timeout == 0.0 else 0)  
	    return self.returncode

There it checks whether the process has already completed – if so, it simply returns returncode child process. If not, then waits for the child process to terminate using the function os.waitpid().

Process pool

In addition to processes, the multiprocessing package provides many useful classes and functions for working with processes. One of the most commonly used tools is the process pool. Poole (English) Pool) allows you to parallelize the execution of a function on a set of values, using several processes.

When creating an object Pool indicates the number of worker processes that will perform tasks. You can also pass it a context that will be used to launch processes. In this article, we are interested in three main methods of the class Pool:

  • Pool.apply() – calls a function with arguments

  • Pool.apply_async() – asynchronous option Pool.apply(). That is apply_async() does not wait for the result of the function completion

  • Pool.map() – multiprocessor analogue of the built-in function map()which applies a function to any iterable sequence and returns a list of the results of that function.

  • Pool.map_async() – asynchronous option map()

Example of using a process pool:

import time  
from multiprocessing.pool import Pool  
  
def wait_and_return(x):  
    time.sleep(1)  
    return x  
  
if __name__ == "__main__":  
    with Pool(4) as pool:  
        result = pool.map(wait_and_return, [1,2,3,4])  
        print(result)

The program will print [1, 2, 3, 4]. But despite the fact that the total waiting time should be four seconds, the program will run in one second. This is because the arguments to the function are distributed across four worker processes that run in parallel.

The process pool has a job queue to which new jobs are added when methods are called apply_async(), map() etc. To simplify, we can say that worker processes take tasks from this queue. Used as a queue multiprocessing.Queue, which allows you to safely transfer data between processes. After completing a task, the worker process adds the result to a result queue common to all workers.

Let's look at the method apply_async(). It takes a function with arguments for it and puts it into a job queue. The method returns a class object ApplyResult. At its core, this class is of type Future – the encapsulated result of some operation that has not yet completed. When trying to get the result (ApplyResult.get()) the process is blocked until the operation is completed and the result is received.

map() takes a function and a list of arguments on which to run the function. The method also returns a future MapResultthe result of which can be obtained through a blocking method MapResult.get(). Jobs are divided into chunks, which are sent to the job queue. It is important to clarify that the queue contains not individual tasks, but lists of tasks. Accordingly, each worker process pulls from the queue not one separate task, but a list (chunk) of tasks. For apply_async() the chunk size is one.

Let's figure out exactly how work processes are created and tasks are distributed. When the pool is initialized, three threads are created:

  • _worker_handler – processes are created in this thread and the number of current worker processes is monitored. When some of the worker processes are completed, this thread creates new ones so that the number of worker processes always equals the value set during initialization.

  • _task_handler – this thread processes jobs from the queue into which jobs submitted by the user are placed when called apply_async(), map() and other methods. Jobs from this queue are then transferred to another queue, which is already being processed by worker processes. Work processes put results into the same queue as work results.

  • _result_handler – this thread collects the results of completed jobs from the general queue and writes them to a result object ApplyResult (or MapResult).

Conclusion

I hope this article has helped you gain a deeper understanding of processes and the use of the multiprocessing package in Python. Remember that understanding the basics of working with processes opens up new opportunities to optimize and speed up the execution of your programs. Thanks for your attention and good luck!

Useful materials

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *