Parallelism in python
Here we will use python's multiprocessing
module to use multiple cores within one
section of code.
Note that in python
threading
is for async I/O and will
not make your CPU-bound code faster, whereas multiprocessing
will.
Python has different executors for pools of threads and pools of processes -- see https://docs.python.org/3/library/concurrent.futures.html. In some cases one will achieve a speedup and the other won't! Typically, you should try using threads first.
On Windows and OS X python uses spawn
for each
new thread, which starts a whole new python interpreter (potentially taking seconds). Unix
uses fork
which is faster. Bare in mind that if you are developing software you might need to support both
methods!
We will use a python multiprocessing Pool
to parallelise the outer loop of the sum_lol()
function from the optimisation chapter.
The setup is quite slow, so I will keep using numba even though it is excluded from the timing:
from numba import njit
import numpy as np
from multiprocessing import Pool
import time
@njit
def setup(N):
xlen = N
ylen = N
rand_mat = np.random.rand(xlen, ylen)
lol = list()
for x in range(xlen):
sublist = list()
for y in range(ylen):
sublist.append(rand_mat[x, y])
lol.append(sublist)
return lol
# Serial version
def sum_lol(lol):
sum = 0
for x in range(len(lol)):
for y in range(len(lol[x])):
sum += lol[x][y]
return sum
# Parallel map which sums one sublist
def sum_list(sublist):
partial_sum = 0
for entry in sublist:
partial_sum += entry
return partial_sum
def run_test(N, threads):
lol = setup(N)
start_t = time.time_ns()
if threads > 1:
# map
with Pool(processes=threads) as pool:
partial_sums = pool.map(sum_list, lol)
# reduce
final_sum = sum_list(partial_sums)
else:
final_sum = sum_lol(lol)
end_t = time.time_ns()
time_ms = (end_t - start_t) / 1000000
print(f"Time to sum {N**2} entries with {threads} threads: {time_ms}ms")
def main():
for threads in range(1, 5):
for N_exp in range(5):
run_test(10**(N_exp), threads)
if __name__ == "__main__":
main()
Exercise What is the speedup and efficiency? Plot this as the size of the list grows, and varying the size of the pool.
Parallel sum solution
On OS X, this gives me the following:
Time to sum 1 entries with 1 threads: 0.004ms
Time to sum 100 entries with 1 threads: 0.009ms
Time to sum 10000 entries with 1 threads: 0.63ms
Time to sum 1000000 entries with 1 threads: 66.252ms
Time to sum 100000000 entries with 1 threads: 6856.647ms
Time to sum 1 entries with 2 threads: 369.239ms
Time to sum 100 entries with 2 threads: 350.236ms
Time to sum 10000 entries with 2 threads: 344.328ms
Time to sum 1000000 entries with 2 threads: 378.785ms
Time to sum 100000000 entries with 2 threads: 4554.182ms
Time to sum 1 entries with 3 threads: 372.856ms
Time to sum 100 entries with 3 threads: 360.495ms
Time to sum 10000 entries with 3 threads: 361.229ms
Time to sum 1000000 entries with 3 threads: 384.375ms
Time to sum 100000000 entries with 3 threads: 3399.802ms
Time to sum 1 entries with 4 threads: 394.478ms
Time to sum 100 entries with 4 threads: 375.59ms
Time to sum 10000 entries with 4 threads: 388.037ms
Time to sum 1000000 entries with 4 threads: 425.455ms
Time to sum 100000000 entries with 4 threads: 2956.621ms
For smaller lists, the time is always around 300-400ms on any multithreaded
implementation, which is just the overhead of creating the Pool
. For the larger
list more threads is giving more speed, at around 75% efficiency for two threads,
reducing to around 60% efficiency with four threads.
Here we are running a thread for each go around the outer loop, i.e. for x in range(len(lol))
.
We could also run threads to parallelise the inner loop too. What would be the advantages
and disadvantages of this approach?
Returning results and shared memory
The above example is simple in that the function being parallelised takes a single
iteraable and returns a single value (a scalar), so the return from Pool.map
is a list of these scalars.
If you want to run a function which has other arguments, you can use partial
to bind the other arguments before passing it as a closure
to the map call.
If you want each thread to write results into (non-overlapping) parts of shared memory
this is possible with SharedMemory
introduced in python 3.8. This is also useful if you have very large arrays as input
which would otherwise be copied across every thread increasing memory use.
We won't go into this much further, but here is an example of putting all of this together which assigns samples to a HDBSCAN model in parallel:
# Imports
from functools import partial
from tqdm.contrib.concurrent import thread_map, process_map
try:
from multiprocessing import shared_memory
from multiprocessing.managers import SharedMemoryManager
NumpyShared = collections.namedtuple('NumpyShared', ('name', 'shape', 'dtype'))
except ImportError as e:
sys.stderr.write("This code requires python v3.8 or higher\n")
sys.exit(0)
def assign_samples(chunk, X, y, model, n_samples, chunk_size):
# Load in shared memory
if isinstance(X, NumpyShared):
X_shm = shared_memory.SharedMemory(name = X.name)
X = np.ndarray(X.shape, dtype = X.dtype, buffer = X_shm.buf)
if isinstance(y, NumpyShared):
y_shm = shared_memory.SharedMemory(name = y.name)
y = np.ndarray(y.shape, dtype = y.dtype, buffer = y_shm.buf)
# Set range thread is operating on
start = chunk * chunk_size
end = min((chunk + 1) * chunk_size, n_samples)
# Run the prediction for this range
y[start:end] = hdbscan.approximate_predict(model.hdb, X[start:end, :])[0]
# Set output to zeros
y = np.zeros(n_samples, dtype=int)
block_size = 100000
with SharedMemoryManager() as smm:
# Make a shared memory array for input X and copy data into it
shm_X = smm.SharedMemory(size = X.nbytes)
X_shared_array = np.ndarray(X.shape, dtype = X.dtype, buffer = shm_X.buf)
X_shared_array[:] = X[:]
X_shared = NumpyShared(name = shm_X.name, shape = X.shape, dtype = X.dtype)
# Make a share memory array for output y and copy data into it
shm_y = smm.SharedMemory(size = y.nbytes)
y_shared_array = np.ndarray(y.shape, dtype = y.dtype, buffer = shm_y.buf)
y_shared_array[:] = y[:]
y_shared = NumpyShared(name = shm_y.name, shape = y.shape, dtype = y.dtype)
# Run the function `assign_sample` multithreaded
thread_map(partial(assign_samples,
X = X_shared,
y = y_shared,
model = dbscan_model,
n_samples = n_samples,
chunk_size = block_size),
range((n_samples - 1) // block_size + 1),
max_workers=threads)
# Copy results back into y
y[:] = y_shared_array[:]
return y
Notes:
thread_map
fromtqdm
is used, to provide a progress bar.partial
is used to bind the arguments ofassign_samples
, as the map only iterates over the first argument.- A
SharedMemoryManager
is used to share numpy arrays between threads without copying them. - The main code needs to set these arrays up using a buffer.
- The called function needs to unpack them.
- Non-overlapping rows of the array each thread operates on are manually calculated.