Parallelism in python

Here we will use python's multiprocessing module to use multiple cores within one section of code.

Warning

Note that in python threading is for async I/O and will not make your CPU-bound code faster, whereas multiprocessing will.

Note

Python has different executors for pools of threads and pools of processes -- see https://docs.python.org/3/library/concurrent.futures.html. In some cases one will achieve a speedup and the other won't! Typically, you should try using threads first.

On Windows and OS X python uses spawn for each new thread, which starts a whole new python interpreter (potentially taking seconds). Unix uses fork which is faster. Bare in mind that if you are developing software you might need to support both methods!

We will use a python multiprocessing Pool to parallelise the outer loop of the sum_lol() function from the optimisation chapter.

The setup is quite slow, so I will keep using numba even though it is excluded from the timing:

from numba import njit
import numpy as np
from multiprocessing import Pool
import time

@njit
def setup(N):
    xlen = N
    ylen = N

    rand_mat = np.random.rand(xlen, ylen)

    lol = list()
    for x in range(xlen):
        sublist = list()
        for y in range(ylen):
            sublist.append(rand_mat[x, y])
        lol.append(sublist)
    return lol

# Serial version
def sum_lol(lol):
    sum = 0
    for x in range(len(lol)):
        for y in range(len(lol[x])):
            sum += lol[x][y]
    return sum

# Parallel map which sums one sublist
def sum_list(sublist):
    partial_sum = 0
    for entry in sublist:
        partial_sum += entry
    return partial_sum

def run_test(N, threads):
    lol = setup(N)
    start_t = time.time_ns()

    if threads > 1:
        # map
        with Pool(processes=threads) as pool:
            partial_sums = pool.map(sum_list, lol)
        # reduce
        final_sum = sum_list(partial_sums)
    else:
        final_sum = sum_lol(lol)

    end_t = time.time_ns()
    time_ms = (end_t - start_t) / 1000000

    print(f"Time to sum {N**2} entries with {threads} threads: {time_ms}ms")

def main():
    for threads in range(1, 5):
        for N_exp in range(5):
            run_test(10**(N_exp), threads)

if __name__ == "__main__":
    main()

Exercise What is the speedup and efficiency? Plot this as the size of the list grows, and varying the size of the pool.

Parallel sum solution

On OS X, this gives me the following:

Time to sum 1 entries with 1 threads: 0.004ms
Time to sum 100 entries with 1 threads: 0.009ms
Time to sum 10000 entries with 1 threads: 0.63ms
Time to sum 1000000 entries with 1 threads: 66.252ms
Time to sum 100000000 entries with 1 threads: 6856.647ms
Time to sum 1 entries with 2 threads: 369.239ms
Time to sum 100 entries with 2 threads: 350.236ms
Time to sum 10000 entries with 2 threads: 344.328ms
Time to sum 1000000 entries with 2 threads: 378.785ms
Time to sum 100000000 entries with 2 threads: 4554.182ms
Time to sum 1 entries with 3 threads: 372.856ms
Time to sum 100 entries with 3 threads: 360.495ms
Time to sum 10000 entries with 3 threads: 361.229ms
Time to sum 1000000 entries with 3 threads: 384.375ms
Time to sum 100000000 entries with 3 threads: 3399.802ms
Time to sum 1 entries with 4 threads: 394.478ms
Time to sum 100 entries with 4 threads: 375.59ms
Time to sum 10000 entries with 4 threads: 388.037ms
Time to sum 1000000 entries with 4 threads: 425.455ms
Time to sum 100000000 entries with 4 threads: 2956.621ms

For smaller lists, the time is always around 300-400ms on any multithreaded implementation, which is just the overhead of creating the Pool. For the larger list more threads is giving more speed, at around 75% efficiency for two threads, reducing to around 60% efficiency with four threads.

Here we are running a thread for each go around the outer loop, i.e. for x in range(len(lol)). We could also run threads to parallelise the inner loop too. What would be the advantages and disadvantages of this approach?

Returning results and shared memory

The above example is simple in that the function being parallelised takes a single iteraable and returns a single value (a scalar), so the return from Pool.map is a list of these scalars.

If you want to run a function which has other arguments, you can use partial to bind the other arguments before passing it as a closure to the map call.

If you want each thread to write results into (non-overlapping) parts of shared memory this is possible with SharedMemory introduced in python 3.8. This is also useful if you have very large arrays as input which would otherwise be copied across every thread increasing memory use.

We won't go into this much further, but here is an example of putting all of this together which assigns samples to a HDBSCAN model in parallel:

# Imports
from functools import partial
from tqdm.contrib.concurrent import thread_map, process_map
try:
    from multiprocessing import shared_memory
    from multiprocessing.managers import SharedMemoryManager
    NumpyShared = collections.namedtuple('NumpyShared', ('name', 'shape', 'dtype'))
except ImportError as e:
    sys.stderr.write("This code requires python v3.8 or higher\n")
    sys.exit(0)

def assign_samples(chunk, X, y, model, n_samples, chunk_size):
    # Load in shared memory
    if isinstance(X, NumpyShared):
        X_shm = shared_memory.SharedMemory(name = X.name)
        X = np.ndarray(X.shape, dtype = X.dtype, buffer = X_shm.buf)
    if isinstance(y, NumpyShared):
        y_shm = shared_memory.SharedMemory(name = y.name)
        y = np.ndarray(y.shape, dtype = y.dtype, buffer = y_shm.buf)

    # Set range thread is operating on
    start = chunk * chunk_size
    end = min((chunk + 1) * chunk_size, n_samples)

    # Run the prediction for this range
    y[start:end] = hdbscan.approximate_predict(model.hdb, X[start:end, :])[0]

# Set output to zeros
y = np.zeros(n_samples, dtype=int)
block_size = 100000
with SharedMemoryManager() as smm:
    # Make a shared memory array for input X and copy data into it
    shm_X = smm.SharedMemory(size = X.nbytes)
    X_shared_array = np.ndarray(X.shape, dtype = X.dtype, buffer = shm_X.buf)
    X_shared_array[:] = X[:]
    X_shared = NumpyShared(name = shm_X.name, shape = X.shape, dtype = X.dtype)

    # Make a share memory array for output y and copy data into it
    shm_y = smm.SharedMemory(size = y.nbytes)
    y_shared_array = np.ndarray(y.shape, dtype = y.dtype, buffer = shm_y.buf)
    y_shared_array[:] = y[:]
    y_shared = NumpyShared(name = shm_y.name, shape = y.shape, dtype = y.dtype)

    # Run the function `assign_sample` multithreaded
    thread_map(partial(assign_samples,
                                X = X_shared,
                                y = y_shared,
                                model = dbscan_model,
                                n_samples = n_samples,
                                chunk_size = block_size),
                        range((n_samples - 1) // block_size + 1),
                        max_workers=threads)

    # Copy results back into y
    y[:] = y_shared_array[:]

return y

Notes:

  • thread_map from tqdm is used, to provide a progress bar.
  • partial is used to bind the arguments of assign_samples, as the map only iterates over the first argument.
  • A SharedMemoryManager is used to share numpy arrays between threads without copying them.
  • The main code needs to set these arrays up using a buffer.
  • The called function needs to unpack them.
  • Non-overlapping rows of the array each thread operates on are manually calculated.