How to do parallel programming in Python?

Each Answer to this Q is separated by one/two green lines.

For C++, we can use OpenMP to do parallel programming; however, OpenMP will not work for Python. What should I do if I want to parallel some parts of my python program?

The structure of the code may be considered as:

solve1(A)
solve2(B)

Where solve1 and solve2 are two independent function. How to run this kind of code in parallel instead of in sequence in order to reduce the running time?
The code is:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break
            
        node1 = partition[0]
        node2 = partition[1]
    
        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

Where setinner and setouter are two independent functions. That’s where I want to parallel…

You can use the multiprocessing module. For this case I might use a processing pool:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

This will spawn processes that can do generic work for you. Since we did not pass processes, it will spawn one process for each CPU core on your machine. Each CPU core can execute one process simultaneously.

If you want to map a list to a single function you would do this:

args = [A, B]
results = pool.map(solve1, args)

Don’t use threads because the GIL locks any operations on python objects.

This can be done very elegantly with Ray.

To parallelize your example, you’d need to define your functions with the @ray.remote decorator, and then invoke them with .remote.

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

There are a number of advantages of this over the multiprocessing module.

  1. The same code will run on a multicore machine as well as a cluster of machines.
  2. Processes share data efficiently through shared memory and zero-copy serialization.
  3. Error messages are propagated nicely.
  4. These function calls can be composed together, e.g.,

    @ray.remote
    def f(x):
        return x + 1
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
    
  5. In addition to invoking functions remotely, classes can be instantiated remotely as actors.

Note that Ray is a framework I’ve been helping develop.

CPython uses the Global Interpreter Lock which makes parallel programing a bit more interesting than C++

This topic has several useful examples and descriptions of the challenge:

Python Global Interpreter Lock (GIL) workaround on multi-core systems using taskset on Linux?

The solution, as others have said, is to use multiple processes. Which framework is more appropriate, however, depends on many factors. In addition to the ones already mentioned, there is also charm4py and mpi4py (I am the developer of charm4py).

There is a more efficient way to implement the above example than using the worker pool abstraction. The main loop sends the same parameters (including the complete graph G) over and over to workers in each of the 1000 iterations. Since at least one worker will reside on a different process, this involves copying and sending the arguments to the other process(es). This could be very costly depending on the size of the objects. Instead, it makes sense to have workers store state and simply send the updated information.

For example, in charm4py this can be done like this:

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

Note that for this example we really only need one worker. The main loop could execute one of the functions, and have the worker execute the other. But my code helps to illustrate a couple of things:

  1. Worker A runs in process 0 (same as the main loop). While result_a.get() is blocked waiting on the result, worker A does the computation in the same process.
  2. Arguments are automatically passed by reference to worker A, since it is in the same
    process (there is no copying involved).

In some cases, it’s possible to automatically parallelize loops using Numba, though it only works with a small subset of Python:

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

Unfortunately, it seems that Numba only works with Numpy arrays, but not with other Python objects. In theory, it might also be possible to compile Python to C++ and then automatically parallelize it using the Intel C++ compiler, though I haven’t tried this yet.

You can use joblib library to do parallel computation and multiprocessing.

from joblib import Parallel, delayed

You can simply create a function foo which you want to be run in parallel and based on the following piece of code implement parallel processing:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

Where num_cores can be obtained from multiprocessing library as followed:

import multiprocessing

num_cores = multiprocessing.cpu_count()

If you have a function with more than one input argument, and you just want to iterate over one of the arguments by a list, you can use the the partial function from functools library as follow:

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

You can find a complete explanation of the python and R multiprocessing with couple of examples here.

If you cannot invest the time to learn the requirements and assumptions of the libraries or modules recommended in the other answers the following may suit you:

  1. Give your script options to run individual parts of the the task.
  2. When ready to run n parts in parallel, launch them with child = subprocess.Popen(args = [sys.argv[0], ...]), providing the part number and other details in additional options and/or parameter files, and call child.wait() for each child.

If you want to monitor progress, launch more workers as soon as workers finish or do something else while waiting, use child.poll() instead of child.wait() and check whether child.returncode is still None.

For big tasks, the overhead of launching new processes and writing and reading files is minimal. For many small tasks one would want to launch workers only once and then communicate with them via pipes or sockets but that’s a lot more work and and has to be done carefully to avoid the possibility of deadlocks. In this situation, it is probably better to learn how to use the modules recommended in other answers.

I always use the ‘multiprocessing’ native library to handle parallelism in Python. To control the number of processes in the queue, I use a shared variable as a counter. In the following example, you can see how the parallel execution of simple processes works.

I made an update to the script to make it easier to use. Basically, the only thing you have to do is override the process method with the function you want to run in parallel. See the example, the procedure is very simple. Alternatively, you can also remove all execution log occurrences.

When I have some time, I’ll update the code to work with processes that return values.

Requirements

[email protected]:~$ pip install coloredlogs==15.0.1

Code

Parallel processing script (copy and paste):

#!/usr/bin/env python
# encoding: utf-8

from multiprocessing import Manager, Pool, Value, cpu_count
from multiprocessing.pool import ThreadPool
from typing import Any, Iterator
from datetime import datetime
from logging import Logger
import coloredlogs
import logging
import time
import sys
import os


LOG_LEVEL = "DEBUG"


def get_logger(name: str = __name__, level: str = LOG_LEVEL) -> Logger:
    assert level in ("NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL")

    # Setting-up the script logging:
    logging.basicConfig(
        stream=sys.stdout,
        format="%(asctime)s %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
        level=level
    )

    logger = logging.getLogger(name)
    coloredlogs.install(level=level, logger=logger, isatty=True)

    return logger


class ParallelProcessing:
    """
    Parallel processing.

    References
    ----------
    [1] Class `ParallelProcessing`: https://stackoverflow.com/a/70464369/16109419

    Examples
    --------
    >>> class MyParallelProcessing(ParallelProcessing):
    >>>     def process(self, name: str) -> None:
    >>>         logger = get_logger()
    >>>         logger.info(f"Executing process: {name}...")
    >>>         time.sleep(5)
    >>>
    >>>
    >>> params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]
    >>> mpp = MyParallelProcessing()
    >>> mpp.run(args_list=params_list)
    """

    _n_jobs: int
    _waiting_time: int
    _queue: Value
    _logger: Logger

    def __init__(self, n_jobs: int = -1, waiting_time: int = 1):
        """
        Instantiates a parallel processing object to execute processes in parallel.

        Parameters
        ----------
        n_jobs: int
            Number of jobs.
        waiting_time: int
            Waiting time when jobs queue is full, e.g. `_queue.value` == `_n_jobs`.
        """
        self._n_jobs = n_jobs if n_jobs >= 0 else cpu_count()
        self._waiting_time = waiting_time if waiting_time >= 0 else 60*60
        self._logger = get_logger()

    def process(self, *args) -> None:
        """
        Abstract process that must be overridden.

        Parameters
        ----------
        *args
            Parameters of the process to be executed.
        """
        raise NotImplementedError("Process not defined ('NotImplementedError' exception).")

    def _execute(self, *args) -> None:
        """
        Run the process and remove it from the process queue by decreasing the queue process counter.

        Parameters
        ----------
        *args
            Parameters of the process to be executed.
        """
        self.process(*args)
        self._queue.value -= 1

    def _error_callback(self, result: Any) -> None:
        """
        Error callback.

        Parameters
        ----------
        result: Any
            Result from exceptions.
        """
        self._logger.error(result)
        os._exit(1)

    def run(self, args_list: Iterator[tuple], use_multithreading: bool = False) -> None:
        """
        Run processes in parallel.

        Parameters
        ----------
        args_list: Iterator[tuple]
            List of process parameters (`*args`).
        use_multithreading: bool
            Use multithreading instead multiprocessing.
        """
        manager = Manager()
        self._queue = manager.Value('i', 0)
        lock = manager.Lock()
        pool = Pool(processes=self._n_jobs) if not use_multithreading else ThreadPool(processes=self._n_jobs)

        start_time = datetime.now()

        with lock:  # Write-protecting the processes queue shared variable.
            for args in args_list:
                while True:
                    if self._queue.value < self._n_jobs:
                        self._queue.value += 1

                        # Running processes in parallel:
                        pool.apply_async(func=self._execute, args=args, error_callback=self._error_callback)

                        break
                    else:
                        self._logger.debug(f"Pool full ({self._n_jobs}): waiting {self._waiting_time} seconds...")
                        time.sleep(self._waiting_time)

        pool.close()
        pool.join()

        exec_time = datetime.now() - start_time
        self._logger.info(f"Execution time: {exec_time}")

Example of use:

class MyParallelProcessing(ParallelProcessing):
    def process(self, name: str) -> None:
        """
        Process to run in parallel (overrides abstract method).
        """
        logger = get_logger()
        logger.info(f"Executing process: {name}...")
        time.sleep(5)


def main() -> None:
    n_jobs = int(sys.argv[1])  # Number of jobs to run in parallel.
    params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]

    mpp = MyParallelProcessing(n_jobs=n_jobs)

    # Executing processes in parallel:
    mpp.run(args_list=params_list)


if __name__ == '__main__':
    main()

Execution and Output

[email protected]:~$ python run.py 1
2021-12-23 12:41:51 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:51 MYMACHINE __mp_main__[12352] INFO Executing process: A...
2021-12-23 12:41:52 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:53 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:54 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:55 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:56 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __mp_main__[12352] INFO Executing process: B...
2021-12-23 12:41:58 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:59 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:00 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
...
2021-12-23 12:42:10 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:11 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __mp_main__[12352] INFO Executing process: E...
2021-12-23 12:42:13 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:14 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:15 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:16 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:18 MYMACHINE __mp_main__[12352] INFO Executing process: F...
2021-12-23 12:42:23 MYMACHINE __main__[24180] INFO Execution time: 0:00:31.274478
[email protected]:~$ python run.py 3
2021-12-23 12:33:59 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:33:59 MYMACHINE __mp_main__[19776] INFO Executing process: A...
2021-12-23 12:33:59 MYMACHINE __mp_main__[24632] INFO Executing process: B...
2021-12-23 12:33:59 MYMACHINE __mp_main__[15852] INFO Executing process: C...
2021-12-23 12:34:00 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:01 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:02 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:03 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:04 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:05 MYMACHINE __mp_main__[19776] INFO Executing process: D...
2021-12-23 12:34:05 MYMACHINE __mp_main__[24632] INFO Executing process: E...
2021-12-23 12:34:05 MYMACHINE __mp_main__[15852] INFO Executing process: F...
2021-12-23 12:34:10 MYMACHINE __main__[7628] INFO Execution time: 0:00:11.087672
[email protected]:~$ python run.py 6
2021-12-23 12:40:48 MYMACHINE __mp_main__[26312] INFO Executing process: A...
2021-12-23 12:40:48 MYMACHINE __mp_main__[11468] INFO Executing process: B...
2021-12-23 12:40:48 MYMACHINE __mp_main__[12000] INFO Executing process: C...
2021-12-23 12:40:48 MYMACHINE __mp_main__[19864] INFO Executing process: D...
2021-12-23 12:40:48 MYMACHINE __mp_main__[25356] INFO Executing process: E...
2021-12-23 12:40:48 MYMACHINE __mp_main__[14504] INFO Executing process: F...
2021-12-23 12:40:53 MYMACHINE __main__[1180] INFO Execution time: 0:00:05.295934

Here is a complete example that works on Windows environment; the advantage of asynchronous processing is to save time:

import multiprocessing
import time
from multiprocessing import Pool, freeze_support
from multiprocessing import Pool


def f1(a):
    c = 0
    for i in range(0, 99999999):
        c = c + 1
    return 1


def f2(b):
    c = 0
    for i in range(0, 99999999):
        c = c + 1
    return 1

if __name__ == '__main__':

    pool = Pool(multiprocessing.cpu_count())
    result1 = pool.apply_async(f1, [0])
    result2 = pool.apply_async(f2, [9])
    freeze_support()
    t0 = time.time()
    answer1 = result1.get(timeout=10)
    answer2 = result2.get(timeout=10)
    print(time.time()-t0)
    t0 = time.time()
    aa = f1(1)
    bb = f2(2)
    print(time.time()-t0)

You can convert your Dataframe to Dask Dataframe and It can handle the parallel computing for you.

import dask.dataframe as dd
pdf = pd.Pandas({"A" : A, "B" : B})
ddf = dd.from_pandas(pdf, npartitions=3)
solve(ddf)


The answers/resolutions are collected from stackoverflow, are licensed under cc by-sa 2.5 , cc by-sa 3.0 and cc by-sa 4.0 .

Leave a Reply

Your email address will not be published.