Multi-Processing & Threading

Threading:

  • Threads share the same memory and can write to and read from shared variables

  • Due to Python Global Interpreter Lock, two threads won’t be executed at the same time, but concurrently (for example with context switching)

  • Effective for I/O-bound tasks

  • Can be implemented with threading module

(asyncio can also be considered for I/O bound with slow I/O with many different connections)

Multi-processing:

  • Every process has is own memory space

  • Every process can contain one ore more subprocesses/threads

  • Can be used to achieve parallelism by taking advantage of multi-core machines since processes can run on different CPU cores

  • Effective for CPU-bound tasks

  • Can be implemented with multiprocessing module (or concurrent.futures.ProcessPoolExecutor)

source: https://towardsdatascience.com/multithreading-multiprocessing-python-180d0975ab29

Below are examples showing the most important features for both Multi-processing and Threading:

  • Get the number of CPU core available

  • Set the number of process/thread desired

  • Start the Multi-processing/Threading

  • Get the output from the process/thread

  • Stop/Kill the process/thread

  • Communicate with the process/thread (ex: get progression, send signals)

Threading

Basic example: The concurrent.futures way

[Ramarao Amara] -> https://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread-in-python

In Python 3.2+, stdlib concurrent.futures module provides a higher level API to threading, including passing return values or exceptions from a worker thread back to the main thread.

import concurrent.futures
from numbers import Number
import time
import numpy as np


def task(value:Number) -> Number:
    progression = 0
    for k in range(0, 50):
        time.sleep(np.random.rand(1)[0])
        progression = ((k + 1) * 100) / 50
    progression = 100
    return value * 2


with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(task, 4.5)
    return_value = future.result()
    print(return_value)

Basic example: The threading way

import threading
from numbers import Number
import time
import numpy as np


def task(value:Number) -> Number:
    progression = 0
    for k in range(0, 50):
        time.sleep(np.random.rand(1)[0])
        progression = ((k + 1) * 100) / 50
    progression = 100
    return value * 2

input_value = 4.5
x = threading.Thread(target=task, args=(input_value, ))
x.start()
# Do whatever you want
x.join()

By using this method, we cannot directly get the return value from the thread, solutions are described in this stackoverflow question: https://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread-in-python

Example using the solution of extending the Thread class:

import threading
import sys
from numbers import Number
import time
import numpy as np


class ReturnValueThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.result = None

    def run(self):
        if self._target is None:
            return  # could alternatively raise an exception, depends on the use case
        try:
            self.result = self._target(*self._args, **self._kwargs)
        except Exception as exc:
            print(f'{type(exc).__name__}: {exc}', file=sys.stderr)  # properly handle the exception

    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        return self.result


def task(value:Number) -> Number:
    progression = 0
    for k in range(0, 50):
        time.sleep(np.random.rand(1)[0])
        progression = ((k + 1) * 100) / 50
    progression = 100
    return value * 2

input_value = 4.5
x = ReturnValueThread(target=task, args=(input_value, ))
x.start()
# Do whatever you want
result = x.join()
print("Result:", result)

Get data from the thread while he’s alive

As an application example, in many application, we want to get the progression of the task and its state. For example in the example above, we would like to get the value of progression variable.

One way to do this is to pass a mutable object in the function input argument (can also be done by accessing self if the function is within a class).

Warning

You can also use the methods described in the multiprocessing case ! (see :ref:`mp-get-data`).

Example using an input mutable argument:

import concurrent.futures
from numbers import Number
from typing import Dict
import time
import numpy as np


def task(value:Number, progression:Dict[int, float], index:int) -> Number:
    progression[index] = 0.0
    for k in range(0, 50):
        time.sleep(np.random.rand(1)[0])
        progression[index] = ((k + 1) * 100.0) / 50
    progression[index] = 100
    return value * 2


with concurrent.futures.ThreadPoolExecutor() as executor:
    index_thread = 0
    progression = {index_thread: 0.0}
    future = executor.submit(task, 4.5, progression, index_thread)
    while not future.done():
        time.sleep(0.1)
        print(f"Thread progression: {progression[index_thread]:.2f}%%", end="\r")
    print("\n")
    return_value = future.result()
    print(return_value)

Example using a class attribute:

import concurrent.futures
from numbers import Number
import time
import numpy as np


class TaskClass:
    def __init__(self):
        self.progression = 0.0

    def task(self, value:Number) -> Number:
        self.progression = 0.0
        for k in range(0, 50):
            time.sleep(np.random.rand(1)[0])
            self.progression = ((k + 1) * 100.0) / 50
        self.progression = 100
        return value * 2

    def getProgression(self) -> float:
        return self.progression


task_inst = TaskClass()
with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(task_inst.task, 4.5)
    while not future.done():
        time.sleep(0.1)
        print(f"Thread progression: {task_inst.getProgression():.2f}%%", end="\r")
    print("\n")
    return_value = future.result()
    print(return_value)

Multiprocessing

Basic Example

import multiprocessing as mp
from numbers import Number
import time
import numpy as np


def task(value:Number) -> Number:
    progression = 0
    for k in range(0, 50):
        time.sleep(np.random.rand(1)[0])
        progression = ((k + 1) * 100) / 50
    progression = 100
    return value * 2


p = mp.Process(target=task, args=(4.5, ))
p.start()
# Do whatever you want
p.join()

Get the return value from the process

The best way to get the return value is to use an multiprocessing.Queue, example below:

import multiprocessing as mp
from numbers import Number
import time
import numpy as np


def task(value:Number) -> Number:
    progression = 0
    for k in range(0, 20):
        time.sleep(np.random.rand(1)[0])
        progression = ((k + 1) * 100) / 50
    progression = 100
    return value * 2

def _taskWrapper(q:mp.Queue, *args, **kwargs) -> None:
    ret = task(*args, **kwargs)
    q.put(ret)

q = mp.Queue()
p = mp.Process(target=_taskWrapper, args=(q, 4.5, ))
p.start()
# Do whatever you want
result = q.get()
print(result)
p.join()

Get data from the process while he’s alive

As the process does not share the same memory area, we cannot use mutable object as we did for threading. So we also need to use Queue if we want to get data from the process while he’s running.

import multiprocessing as mp
from numbers import Number
import time
import numpy as np


def task(value:Number, q:mp.Queue) -> Number:
    progression = 0
    q.put(("progression", progression))
    for k in range(0, 50):
        time.sleep(np.random.rand(1)[0])
        progression = ((k + 1) * 100) / 50
        q.put(("progression", progression))
    progression = 100
    q.put(("progression", progression))
    q.put(("result" , value * 2))

q = mp.Queue()
p = mp.Process(target=task, args=(4.5, q, ))
p.start()
while (ret := q.get())[0] == "progression":
    progression = ret[1]
    print(f"Process progression: {progression:.2f}%", end="\r")
print("\n")
result = ret[1]
print(result)
p.join()

Warning

Queue works on the principle of First-In First-Out, you cannot get directly the last element added to the stack.

You can also use shared memory objects provided by the multiprocessing library (mp.Value and mp.Array):

import multiprocessing as mp
from numbers import Number
import time
import numpy as np


def task(value:Number, q:mp.Queue, progression:mp.Value) -> Number:
    progression.value = 0
    for k in range(0, 50):
        time.sleep(np.random.rand(1)[0])
        progression.value = ((k + 1) * 100) / 50
    progression.value = 100
    q.put(("result" , value * 2))


q = mp.Queue()
progression = mp.Value('f')
p = mp.Process(target=task, args=(4.5, q, progression))
p.start()
while p.is_alive():
    time.sleep(0.05)
    print(f"Process progression: {progression.value:.2f}%", end="\r")
print("\n")
result = q.get()[1]
print(result)
p.join()

Locks

If a part of your program has to be accessed by only one process at a time, you can use Lock, simple example below:

import multiprocessing as mp

def process_function(lock):
    lock.acquire()
    # CRITICAL SECTION
    print("CRITICAL SECTION")
    print("Only One Process has to access at a given time")
    lock.release()

lock = mp.Lock()
p1 = mp.Process(target=process_function, args=(lock,))
p2 = mp.Process(target=process_function, args=(lock,))
p1.start()
p2.start()
p1.join()
p2.join()

A lot of basic features/concetps have not been mentioned (daemon, pool, pipes, etc…), you can learn more in the official multiprocessing documentation: https://docs.python.org/3/library/multiprocessing.html


Sources: