Multi-Processing & Threading
Threading:
Threads share the same memory and can write to and read from shared variables
Due to Python Global Interpreter Lock, two threads won’t be executed at the same time, but concurrently (for example with context switching)
Effective for I/O-bound tasks
Can be implemented with threading module
(asyncio can also be considered for I/O bound with slow I/O with many different connections)
Multi-processing:
Every process has is own memory space
Every process can contain one ore more subprocesses/threads
Can be used to achieve parallelism by taking advantage of multi-core machines since processes can run on different CPU cores
Effective for CPU-bound tasks
Can be implemented with multiprocessing module (or concurrent.futures.ProcessPoolExecutor)
source: https://towardsdatascience.com/multithreading-multiprocessing-python-180d0975ab29
Below are examples showing the most important features for both Multi-processing and Threading:
Get the number of CPU core available
Set the number of process/thread desired
Start the Multi-processing/Threading
Get the output from the process/thread
Stop/Kill the process/thread
Communicate with the process/thread (ex: get progression, send signals)
Threading
Basic example: The concurrent.futures way
[Ramarao Amara] -> https://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread-in-python
In Python 3.2+, stdlib concurrent.futures module provides a higher level API to threading, including passing return values or exceptions from a worker thread back to the main thread.
import concurrent.futures
from numbers import Number
import time
import numpy as np
def task(value:Number) -> Number:
progression = 0
for k in range(0, 50):
time.sleep(np.random.rand(1)[0])
progression = ((k + 1) * 100) / 50
progression = 100
return value * 2
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(task, 4.5)
return_value = future.result()
print(return_value)
Basic example: The threading way
import threading
from numbers import Number
import time
import numpy as np
def task(value:Number) -> Number:
progression = 0
for k in range(0, 50):
time.sleep(np.random.rand(1)[0])
progression = ((k + 1) * 100) / 50
progression = 100
return value * 2
input_value = 4.5
x = threading.Thread(target=task, args=(input_value, ))
x.start()
# Do whatever you want
x.join()
By using this method, we cannot directly get the return value from the thread, solutions are described in this stackoverflow question: https://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread-in-python
Example using the solution of extending the Thread class:
import threading
import sys
from numbers import Number
import time
import numpy as np
class ReturnValueThread(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.result = None
def run(self):
if self._target is None:
return # could alternatively raise an exception, depends on the use case
try:
self.result = self._target(*self._args, **self._kwargs)
except Exception as exc:
print(f'{type(exc).__name__}: {exc}', file=sys.stderr) # properly handle the exception
def join(self, *args, **kwargs):
super().join(*args, **kwargs)
return self.result
def task(value:Number) -> Number:
progression = 0
for k in range(0, 50):
time.sleep(np.random.rand(1)[0])
progression = ((k + 1) * 100) / 50
progression = 100
return value * 2
input_value = 4.5
x = ReturnValueThread(target=task, args=(input_value, ))
x.start()
# Do whatever you want
result = x.join()
print("Result:", result)
Get data from the thread while he’s alive
As an application example, in many application, we want to get the progression of the task and its state. For example in the example above, we would like to get the value of progression variable.
One way to do this is to pass a mutable object in the function input argument (can also be done by accessing self if the function is within a class).
Warning
You can also use the methods described in the multiprocessing case ! (see :ref:`mp-get-data`).
Example using an input mutable argument:
import concurrent.futures
from numbers import Number
from typing import Dict
import time
import numpy as np
def task(value:Number, progression:Dict[int, float], index:int) -> Number:
progression[index] = 0.0
for k in range(0, 50):
time.sleep(np.random.rand(1)[0])
progression[index] = ((k + 1) * 100.0) / 50
progression[index] = 100
return value * 2
with concurrent.futures.ThreadPoolExecutor() as executor:
index_thread = 0
progression = {index_thread: 0.0}
future = executor.submit(task, 4.5, progression, index_thread)
while not future.done():
time.sleep(0.1)
print(f"Thread progression: {progression[index_thread]:.2f}%%", end="\r")
print("\n")
return_value = future.result()
print(return_value)
Example using a class attribute:
import concurrent.futures
from numbers import Number
import time
import numpy as np
class TaskClass:
def __init__(self):
self.progression = 0.0
def task(self, value:Number) -> Number:
self.progression = 0.0
for k in range(0, 50):
time.sleep(np.random.rand(1)[0])
self.progression = ((k + 1) * 100.0) / 50
self.progression = 100
return value * 2
def getProgression(self) -> float:
return self.progression
task_inst = TaskClass()
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(task_inst.task, 4.5)
while not future.done():
time.sleep(0.1)
print(f"Thread progression: {task_inst.getProgression():.2f}%%", end="\r")
print("\n")
return_value = future.result()
print(return_value)
Multiprocessing
Basic Example
import multiprocessing as mp
from numbers import Number
import time
import numpy as np
def task(value:Number) -> Number:
progression = 0
for k in range(0, 50):
time.sleep(np.random.rand(1)[0])
progression = ((k + 1) * 100) / 50
progression = 100
return value * 2
p = mp.Process(target=task, args=(4.5, ))
p.start()
# Do whatever you want
p.join()
Get the return value from the process
The best way to get the return value is to use an multiprocessing.Queue, example below:
import multiprocessing as mp
from numbers import Number
import time
import numpy as np
def task(value:Number) -> Number:
progression = 0
for k in range(0, 20):
time.sleep(np.random.rand(1)[0])
progression = ((k + 1) * 100) / 50
progression = 100
return value * 2
def _taskWrapper(q:mp.Queue, *args, **kwargs) -> None:
ret = task(*args, **kwargs)
q.put(ret)
q = mp.Queue()
p = mp.Process(target=_taskWrapper, args=(q, 4.5, ))
p.start()
# Do whatever you want
result = q.get()
print(result)
p.join()
Get data from the process while he’s alive
As the process does not share the same memory area, we cannot use mutable object as we did for threading. So we also need to use Queue if we want to get data from the process while he’s running.
import multiprocessing as mp
from numbers import Number
import time
import numpy as np
def task(value:Number, q:mp.Queue) -> Number:
progression = 0
q.put(("progression", progression))
for k in range(0, 50):
time.sleep(np.random.rand(1)[0])
progression = ((k + 1) * 100) / 50
q.put(("progression", progression))
progression = 100
q.put(("progression", progression))
q.put(("result" , value * 2))
q = mp.Queue()
p = mp.Process(target=task, args=(4.5, q, ))
p.start()
while (ret := q.get())[0] == "progression":
progression = ret[1]
print(f"Process progression: {progression:.2f}%", end="\r")
print("\n")
result = ret[1]
print(result)
p.join()
Warning
Queue works on the principle of First-In First-Out, you cannot get directly the last element added to the stack.
You can also use shared memory objects provided by the multiprocessing library (mp.Value and mp.Array):
import multiprocessing as mp
from numbers import Number
import time
import numpy as np
def task(value:Number, q:mp.Queue, progression:mp.Value) -> Number:
progression.value = 0
for k in range(0, 50):
time.sleep(np.random.rand(1)[0])
progression.value = ((k + 1) * 100) / 50
progression.value = 100
q.put(("result" , value * 2))
q = mp.Queue()
progression = mp.Value('f')
p = mp.Process(target=task, args=(4.5, q, progression))
p.start()
while p.is_alive():
time.sleep(0.05)
print(f"Process progression: {progression.value:.2f}%", end="\r")
print("\n")
result = q.get()[1]
print(result)
p.join()
Locks
If a part of your program has to be accessed by only one process at a time, you can use Lock, simple example below:
import multiprocessing as mp
def process_function(lock):
lock.acquire()
# CRITICAL SECTION
print("CRITICAL SECTION")
print("Only One Process has to access at a given time")
lock.release()
lock = mp.Lock()
p1 = mp.Process(target=process_function, args=(lock,))
p2 = mp.Process(target=process_function, args=(lock,))
p1.start()
p2.start()
p1.join()
p2.join()
A lot of basic features/concetps have not been mentioned (daemon, pool, pipes, etc…), you can learn more in the official multiprocessing documentation: https://docs.python.org/3/library/multiprocessing.html
Sources:
Multi-threading and Multi-processing in Python (Giorgos Myrianthous): https://towardsdatascience.com/multithreading-multiprocessing-python-180d0975ab29
StackOverFlow - How to get the return value from a thread in Python?: https://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread-in-python
Alexandra Zaharia: https://alexandra-zaharia.github.io/posts/how-to-return-a-result-from-a-python-thread/
Multiprocessing - Pipe VS Queue: https://stackoverflow.com/questions/8463008/multiprocessing-pipe-vs-queue
Pipes, Queues and LOcks in python: https://www.educative.io/answers/pipes-queues-and-lock-in-multiprocessing-in-python