Speed ​​up Python by 2x with multiprocessing, async and MapReduce

Disadvantages of Python: Slow performance and GIL issue

Python is one of the most popular programming languages ​​in the world due to its simplicity and ease of use. But like any language, Python also has its drawbacks. The main disadvantage is speed. One of the reasons why the language is slow is its dynamic nature. Python is an interpreted language, which means that code is executed line by line using the Python interpreter. This causes Python to run slower than compiled programming languages ​​such as C++ or Java.

GIL (Global Interpreter Lock) is a feature of the Python interpreter that affects its multithreading and parallel execution. GIL is a lock that allows only one Python thread to execute at any given time, even on multi-core processors.

Is Python slow everywhere?

If you dive deeper into Python, you can find a wonderful library Asyncio, which first appeared in version 3.5 (2015). The library allows you to engage in asynchronous programming using concurrent code execution based on coroutines (you can view the library at this link). It speeds up Python in IO Bound workloads. For example, the asyncpg library allows you to work with Postgres at the speed 1 million lines per second! So Python is not that slow!

What about CPU-bound?

I/O is what asyncio was created for in the first place; when writing code, you need to make sure that there is no counting code in the coroutines. At first glance, this seriously limits asyncio, but in fact the library is more versatile. asyncio has an API for interacting with the Python multiprocessing library.

The GIL prevents multiple bytecode sections from executing in parallel. This means that for tasks other than I/O, there is no performance benefit from multithreading—unlike languages ​​like Java and C++. Python does not offer any solution for parallelizing computational problems, but in fact there is a solution – the multiprocessing library. Instead of running threads to parallelize work, the parent process will launch child processes. Each child process runs a separate Python interpreter with its own GIL.

Less words, more practice!

Machine specifications:
Processor: Ryzen 5 5500U, 6 cores, 12 threads, Max frequency 4 GHz
Memory: 16 RAM Frequency: 2667 MHz, SSD

Let's write a function. The function will be very simple – counting from zero to a large number – but it will allow you to understand how the API works and what kind of winnings can be achieved.

"""Два параллельных процесса"""
import time
from multiprocessing import Process

def count(count_to: int) -> int:
    start = time.time()
    counter = 0
    while counter < count_to:
        counter = counter + 1
    end = time.time()
    print(f'Закончен подсчет до {count_to} за время {end-start}')
    return counter

if __name__ == "__main__":
    start_time = time.time()

    #Создаём процесс для выполнения функции count
    to_one_hundred_million = Process(target=count, args=(100000000,))
    to_two_hundred_million = Process(target=count, args=(200000000,))
    #Запускаем процесс. Этот метод возвращает управление немедленн
    to_one_hundred_million.start()
    to_two_hundred_million.start()
    #Ждём завершения процесса. 
    #Этот метод блокирует выполнение, пока процесс не завершится
    to_one_hundred_million.join()
    to_two_hundred_million.join()
    end_time = time.time()
    print(f'Полное время работы {end_time-start_time}')

Here we create a simple function called count that takes an integer and increments it in a loop until it reaches the upper bound we pass. We then create two processes, one that counts to 100,000,000 and one that counts to 200,000,000. The Process class takes two arguments: target , which is the name of the function to be executed, and args , which is a tuple of arguments passed to it. The start method is then called for each process. It immediately returns and begins executing the process. In this example, both processes are started one after the other, and then the join method is called for each. This causes the main process to block until both child processes are finished.

As a result, we get the following results:
Закончен подсчет до 100000000 за время 6.757261514663696
Закончен подсчет до 200000000 за время 12.582566976547241
Полное время работы 12.697719097137451

Both count functions would have taken a little over 19 sec in total, but our application finished in 12 sec. That is, compared to the sequential version, we won about 7 sec. This gives a good performance gain, but it looks inelegant, because we have to call start and join for each running process. Moreover, we do not know which process will finish first.

Using a process pool

In the previous example, we manually created processes and called their start and join methods to start them and wait for them to complete. This approach had several drawbacks, from code quality to the inability to retrieve the results returned by the process. The multiprocessing module provides an API to deal with these problems: process pools.

Let's look at an example:

"""Работа пула процессов"""
from multiprocessing import Pool

def say_hello(name: str) -> str:
    return f'Привет, {name}'

if __name__ == "__main__":
    with Pool() as process_pool:#Создание пула процессов
        hi_jeff = process_pool.apply(say_hello, args=('Jeff',))
        hi_john = process_pool.apply(say_hello, args=('John',))
        print(hi_jeff)
        print(hi_john)

Here we create a process pool in the with Pool() as process_pool clause. This is a context manager because when we are done with the pool, we need to gracefully stop the processes Python created. If we don't, we risk leaking a valuable resource: processes. When we create this pool, it automatically creates as many processes as there are CPU cores on the machine. The number of cores can be obtained from the multiprocessing.cpu_count() function. If that doesn't work, we can pass an arbitrary integer to the Pool() function in the processes argument. But usually the default value is a good starting point. Next, we use the process pool's apply method to execute the say_hello function in a separate process. This works, but there is a problem. The apply method blocks until the function completes. This means that if each call to say_hello took 10 seconds, the program would run for about 20 seconds because the execution is sequential and all the efforts to make it parallel have been wasted. This problem can be solved by using the process pool method apply_async (this is an asynchronous version of Pool.apply(). It does not wait for the function to complete.)

Using process pool executors with asyncio

We've seen how to use process pools to perform counting operations concurrently. Process pools are fine for simple cases, but Python provides an abstraction on top of process pools in the concurrent.futures module. An abstract class, Executor , defines two methods for performing work asynchronously. The first, submit , takes a callable and returns a Future object—the equivalent of Pool.apply_async . The second method, map , takes a callable and a list of arguments, and asynchronously executes the object with each of those arguments. It returns an iterator over the results of the calls.
To demonstrate how ProcessPoolExecutor works, let's take the number counting example again and run it on a few small boundaries and a few large boundaries to see how the results appear.

"""Исполнители пула процессов"""
import time
from concurrent.futures import ProcessPoolExecutor

def count(count_to: int) -> int:
    start = time.time()
    counter = 0
    while counter < count_to:
        counter = counter + 1

    end = time.time()
    print(f'Закончен подсчет до {count_to} за время {end - start}')
    return counter

if __name__ == "__main__":
    with ProcessPoolExecutor() as process_pool:
        numbers = [1, 3, 5, 22, 100000000]
        for result in process_pool.map(count, numbers):
            print(result)

Закончен подсчет до 1 за время 0.0
Закончен подсчет до 3 за время 0.0
Закончен подсчет до 5 за время 0.0
1
Закончен подсчет до 22 за время 0.0
3
5
22
Закончен подсчет до 100000000 за время 6.688919544219971
100000000

As before, the ProcessPoolExecutor object is created under the control of the context manager. The number of resources is also set to the number of processor cores by default, as with the process pool. We then use the process_pool.map method to execute the count function, specifying a list of upper bounds. If we run this program, we see that calls to count with small upper bounds complete quickly and print almost instantly. But calls with the 100000000 upper bound take much longer and print after the small numbers. Although it appears that the program is counting the numbers simultaneously, the order of the iterations is actually deterministic and is determined by the order in which the numbers appear in the numbers list. This means that if the first number were 100000000, we would have to wait for the corresponding call to complete before we could print the other results, even though they were calculated earlier.

Process pool executors combined with the event loop

Now that we're familiar with how process pool executors work, let's look at how to incorporate them into an asyncio event loop. This will allow us to use the gather* and as_completed** functions to manage multiple processes.
*asyncio.gather()The function allows the caller to group waitable objects. These objects, after grouping, can be launched in competitive mode.
**asyncio.as_completed()The function starts and waits for the tasks/awaitable objects passed to it to be executed, and as soon as the results of any task appear, in real time, it starts returning them in the iterator

Having a pool, we can use a special event loop method asyncio -run_in_executor. This method takes an executable object and an executor (of a process pool or thread pool), then executes that object within the pool and returns a waitable object that can be used in an await clause or passed to an API function such as gather.

Let's implement the previous example again, now with a process pool executor. We will feed the executor several counting tasks and wait for them to complete using gather. The run_in_executor method only accepts a callable object and does not allow you to specify function arguments. We'll get around this difficulty by using a partial function to construct calls to count that don't require arguments.

"""Исполнитель пула процессов с asyncio"""
import asyncio
from asyncio.events import AbstractEventLoop
from concurrent.futures import ProcessPoolExecutor
from functools import partial
from typing import List

def count(count_to: int) -> int:
    counter = 0
    while counter < count_to:
        counter = counter + 1
    return counter

async def main():
    with ProcessPoolExecutor() as process_pool:
        #Создать частично применяемую функцию count с фиксированным аргументом
        loop: AbstractEventLoop = asyncio.get_running_loop()
        nums = [1, 3, 5, 22, 100000000]
        #Сформировать все обращения к пулу процессов, поместив их в список
        calls: List[partial[int]] = [partial(count, num) for num in nums]
        call_coros = []

        for call in calls:
            call_coros.append(loop.run_in_executor(process_pool, call))
        #Ждать получения результатов
        results = await asyncio.gather(*call_coros)

        for result in results:
            print(result)

if __name__ == "__main__":
    asyncio.run(main())

First, we create a process pool executor as before. Then we get an asyncio event loop, since run_in_executor is a method of AbstractEventLoop. Then, using partial function application, we call count with each number in the nums list as an argument, since a direct call with an argument is not possible. The generated calls to count can be passed to the executor. We loop over these calls, calling loop.run_in_executor for each one and storing the waitable objects returned in the call_coros list. We then pass this list to asyncio.gather and wait for all the calls to complete.

Solving the problem using MapReduce and asyncio

Now comes the best part. To understand what types of problems can be solved using MapReduce, let's consider a hypothetical problem. Then apply this knowledge to a similar problem with a large, publicly available data set. Let's assume that our site receives a large amount of text data through the “Feedback” field in a form on the technical support portal. Since our site is successful, the size of this data set containing customer feedback is already measured in terabytes and growing every day.

A simple solution is to run a single process and loop through all the comments, remembering how many times each word appears. This will work, but since the dataset is large, going through it sequentially can take a long time.

It is for this kind of task that MapReduce technology is designed. In the MapReduce programming model, a large data set is first broken down into smaller pieces. We can then solve the problem for a subset of the data rather than the entire set—this is called mapping because we are “mapping” the data onto a partial result. Once the problems for all subsets have been solved, we can combine the results into a final answer. This step is called reduction because we “reduce” several answers into one. Counting the frequency of occurrences of words in a large text data set is a canonical MapReduce task. If the data set is large enough, splitting it into smaller pieces can provide performance benefits because all mapping operations can be performed in parallel, as shown in this figure:

  A large data set is divided into sections, after which the mapping function generates intermediate results, which are then combined into the final

A large data set is divided into sections, after which the mapping function generates intermediate results, which are then combined into the final

To better understand how MapReduce works, let's look at a specific example. Suppose we have a file where each line contains text data. We want to count how many times each word appears in this set.
First, we need to break the data into smaller chunks. For simplicity, let's say one string is a chunk. Then we need to define a mapping operation. Since we want word frequencies, we'll break the text string by spaces. This will give us an array of words in a string. We can then loop through it, remembering the different words we encounter in the dictionary.
Finally, the reduction operation needs to be defined. It takes one or more results from mapping operations and combines them into a final response. In this example, we need to take the two dictionaries built by the mapping operation and combine them into one. If a word exists in both dictionaries, its occurrence counts are added together; if not, we copy the occurrence count to the resulting dictionary. With the operations defined, we can apply a map operation to each line of text, and then a reduce operation to the pair of display results. Let's take a look at the code that does this for our strings.

"""Однопоточная модель MapReduce"""
import functools
from typing import Dict

def map_frequency(text: str) -> Dict[str, int]:
    words = text.split(' ')
    frequencies = {}
    for word in words:
        if word in frequencies:
            #Если слово уже есть в словаре, то прибавить единицу к счетчику
            frequencies[word] = frequencies[word] + 1
        else:
            # Если слова еще нет в словаре, счетчик равным единице
            frequencies[word] = 1 
    return frequencies

def merge_dictionaries(first: Dict[str, int], second: Dict[str, int]) -> Dict[str, int]:
    merged = first
    for key in second:
        if key in merged:
            # Если слово встречается в обоих словарях, сложить счетчики
            merged[key] = merged[key] + second[key]
        else:
            # Если слово не встречается в обоих словарях, скопировать счетчик
            merged[key] = second[key]
    return merged

lines = [
"Я люблю вечерний пир",
"Где веселье председатель",
"А свобода мой кумир",
"За столом законодатель",
"Где до утра слово пей!"
"Заглушает крики песен",
"Где просторен круг гостей",
"А кружок бутылок тесен"
]
# Для каждой строки текста выполнить операцию map
mapped_results = [map_frequency(line) for line in lines]

for result in mapped_results:
    print(result)
# Редуцировать все промежуточные счетчики в окончательный результат
print(functools.reduce(merge_dictionaries, mapped_results))

We apply a map operation to each line of text, which gives frequency counters for each line. The partial display results can then be combined. We use our merge_dictionaries merge function in combination with the functools library function. reduce. The result is the following picture:

{'Я': 1, 'люблю': 1, 'вечерний': 1, 'пир': 1}
{'Где': 1, 'веселье': 1, 'председатель': 1}
{'А': 1, 'свобода': 1, 'мой': 1, 'кумир': 1}
{'За': 1, 'столом': 1, 'законодатель': 1}
{'Где': 1, 'до': 1, 'утра': 1, 'слово': 1, 'пей!': 1, 'Заглушает': 1, 'крики': 1, 'песен': 1}
{'Где': 1, 'просторен': 1, 'круг': 1, 'гостей': 1}
{'А': 1, 'кружок': 1, 'бутылок': 1, 'тесен': 1}
Финальный результат:
{'Я': 1, 'люблю': 1, 'вечерний': 1, 'пир': 1, 'Где': 3, 'веселье': 1, 'председатель': 1, 'А': 2, 'свобода': 1, 'мой': 1, 'кумир': 1, 'За': 1, 'столом': 1, 'законодатель': 1, 'до': 1, 'утра': 1, 'слово': 1, 'пей!': 1, 'Заглушает': 1, 'крики': 1, 'песен': 1, 'просторен': 1, 'круг': 1, 'гостей': 1, 'кружок': 1, 'бутылок': 1, 'тесен': 1}

Now that we have covered the basics of MapReduce on a model problem, let's look at how to apply it to a real dataset where the multiprocessing library can provide performance benefits.

Large data set

We need a large enough dataset to demonstrate the full benefits of combining MapReduce with the multiprocessing library. If the dataset is too small, we will likely see performance degradation due to the overhead of process management rather than benefits.
Data set Google Books Ngram sufficient for our purposes.

The Google Books Ngram dataset consists of n-grams taken from 8,000,000 books dating back to the year 1500. That's more than six percent of all the books ever published. The number of times each unique n-gram appears in the texts is counted, and the results are grouped by year. The dataset contains n-grams for n from 1 to 5, presented in tab-delimited format. Each row in the dataset contains the n-gram, the year it appeared, how many times it appeared, and in how many books. Let's look at the first few rows of the unigram dataset for the word aardvark:

Aardvark 1822 2 1
Aardvark 1824 3 1
Aardvark 1827 10 7

This means that in 1822 the word aardvark (aardvark) appeared twice in the same book. And in 1827 it appeared ten times in seven books. There are many more rows for the word aardvark in the dataset (for example, it appeared 1,200 times in 2007), suggesting that aardvarks have been increasingly mentioned in the literature over time. In this example, we will count the number of occurrences of single words (unigrams) starting with the letter a. This data set occupies approximately 1.8 GB. We aggregate data based on the number of times each word appears in literature since 1500. And let’s use this to answer the question: “How many times does the word aardvark found in literature since 1500?” (You can download the file here link or this)

Using asyncio for mapping and reducing

To have something to compare with, let’s first write a synchronous version of counting word frequencies. First of all, you need to load the entire data set into memory. And then build a dictionary that will store a mapping of words to the number of occurrences. For each line of the file, it is checked whether the given word already exists in the dictionary. If yes, then its counter in the dictionary is increased by one, otherwise the word is added to the dictionary with a counter equal to 1.

"""Подсчет частот слов"""
import gzip
import time

freqs = {}

with gzip.open('googlebooks-eng-all-1gram-20120701-a.gz', 'rb') as f:
    lines = f.read().decode('utf-8').splitlines()

    start = time.time()
    for line in lines:
        data = line.split('\t')
        word = data[0]
        count = int(data[2])
        if word in freqs:
            freqs[word] = freqs[word] + count
        else:
            freqs[word] = count

    print(f"Aardvark встречается {freqs['Aardvark']} раз.")
    end = time.time()

print(f'{end-start:.4f}')

In the time spent on the counting operation, we will only include the frequency counting time, not taking into account the file loading time. Was spent 47.8429 seconds.

Let's see if we can improve this result by using the multiprocessing and asyncio libraries. When running on a machine with fewer cores or other resources, the result may be different. First, let's split our data set into smaller chunks. Let's define a partition generator that takes a large list of data and allocates chunks of arbitrary size.

def partition(data: List, 
              chunk_size: int) -> Generator[List[str], None, None]:
    for i in range(0, len(data), chunk_size):
        yield data[i:i + chunk_size]

This generator can be used to create chunks of size chunk_size. We will need it to create chunks that are passed to the mapping functions, which will then be run in parallel. Next, we define the mapping function. It is not much different from the function from the previous example, except that it is adapted to our data set.

def map_frequencies(chunk: List[str]) -> Dict[str, int]:
    counter = {}
    for line in chunk:
        word, _, count, _ = line.split('\t')
        if counter.get(word):
            counter[word] = counter[word] + int(count)
        else:
            counter[word] = int(count)
    return counter

For now, let's leave the same reduction operation as in the previous example. Now we have everything we need to parallelize mapping operations. We will create a pool of processes, divide the data into portions and for each portion we will execute the map_frequencies function, taking a resource (“executor”) from the pool. Only one question remains: how to choose the serving size?

There is no simple answer to this question. There is a heuristic rule – a balanced approach: the portion should be neither too large nor too small. It should not be small because the created portions are serialized (in pickle format) and distributed to the executors, after which the executors deserialize them. The serialization and deserialization process can be time-consuming, negating any gains if done too frequently. For example, a chunk size of 2 is obviously a bad decision because it would require almost 1,000,000 serialization and deserialization operations.

But too large a portion is also bad, because it will not allow us to fully utilize the capabilities of the computer. For example, if there are 10 cores, but only two portions, then we do not load the eight cores that could work in parallel.
For this example, I chose a portion size of 20,000. If you use this approach in your data processing problems, try different portion sizes until you find one that works for your machine and data set, or develop a heuristic algorithm to determine the correct size. Now let's combine all these parts with a process pool and the run_in_executor event loop method to parallelize the display operations.

"""Распараллеливание с помощью MapReduce и пула процессов"""
import asyncio
import concurrent.futures
import functools
import time
from typing import Dict, List
import gzip
from typing import Generator


def partition(data: List, 
              chunk_size: int) -> Generator[List[str], None, None]:
    for i in range(0, len(data), chunk_size):
        yield data[i:i + chunk_size]

def map_frequencies(chunk: List[str]) -> Dict[str, int]:
    counter = {}
    for line in chunk:
        word, _, count, _ = line.split('\t')
        if counter.get(word):
            counter[word] = counter[word] + int(count)
        else:
            counter[word] = int(count)
    return counter

def merge_dictionaries(first: Dict[str, int], second: Dict[str, int]) -> Dict[str, int]:
    merged = first
    for key in second:
        if key in merged:
            merged[key] = merged[key] + second[key]
        else:
            merged[key] = second[key]
    return merged


async def main(partition_size: int):
    with gzip.open('googlebooks-eng-all-1gram-20120701-a.gz', 'rb') as f:
        contents = f.read().decode('utf-8').splitlines()
        loop = asyncio.get_running_loop()
        tasks = []
        start = time.time()
        with concurrent.futures.ProcessPoolExecutor() as pool:
            for chunk in partition(contents, partition_size):
                #Для каждой порции выполнить операцию отображения в отдельном процессе
                task = loop.run_in_executor(pool, 
                                            functools.partial(map_frequencies, chunk))
                tasks.append(task)
            #Ждать завершения всех операций отображения
            intermediate_results = await asyncio.gather(*tasks)
            #Редуцировать промежуточные результаты в окончательный
            final_result = functools.reduce(merge_dictionaries, intermediate_results)
        print(f"Aardvark встречается {final_result['Aardvark']} раз.")
        end = time.time()
        print(f'Время MapReduce: {(end - start):.4f} секунд')

if __name__ == "__main__":
    asyncio.run(main(partition_size=20000))

In the main coroutine, we create a pool of processes and split the data into chunks. For each chunk, the map_frequencies function is executed in a separate process. Then, using asyncio.gather, we wait for the intermediate dictionaries to complete. Once all the mapping operations are complete, we perform the reduce operation to obtain the final result.

This program completes in approximately 23.5326 si.e. we got a gain of 2 times compared to the sequential version!!! Not bad, considering that we didn't write that much additional code! You can experiment with machines with more cores and see if the performance of this algorithm can be improved even more.

What is the final conclusion?

In this article, I looked at ways to perform multiple tasks simultaneously in Python using a process pool. We learned how to create a process pool and run functions in parallel. We also looked at using asyncio to run tasks concurrently and wait for results. We discussed the use of the MapReduce programming model using process pooling and asyncio to solve problems efficiently. A multi-threaded program that accelerated the code in 2 times!

I hope that every reader of this article will find something valuable for themselves: new information or update their past knowledge.

Inspired by the book: Asyncio and Concurrent Programming in Python

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *