Dear Salonnières,

thanks for attending compu-salon episode 8, multiprocessing. Here's a summary of our discussion.

We're not meeting today (Mar 23) in observance of PCGM. Next week (Mar 30, at 4pm) we'll talk about using file formats (and even databases!) in Python.

I'll send out a reminder next week. Until then!

Michele

Compu-salon ep. 8 (2012/03/16) in summary

Here's what we discussed and programmed together.

A warning

Parallel programming is difficult, there's no way around it. It's especially difficult to do it in the transparent, zen-like way that we pursue in this salon. This may be because of the immaturity of tools and concepts, so things may be improving in the future. (Maybe, but things sometimes get worse: GPU programming can be more painful than cluster programming.) So be warned, pain lies this way. But if you still want to tread this path, here's the gentlest, quickest introduction I could come up with.

Parallel and concurrent programming: definitions

This is a big topic with lots of theory behind it, and many treatments. We only mentioned a few important themes. Paraphrasing Konrad Hinsen, we define

  • Parallelism: using multiple processors to make computations faster.
  • Concurrency: allow multiple tasks to proceed in coordination, but without waiting for each other.

Generally in scientific computing we are happy with parallelism alone, but some pitfalls of concurrency do appear in its implementation. The main architectural constraint in managing parallelism in scientific applications is decomposition (e.g., of a computational domain) and distribution (e.g., of tasks). These are tackled through a model (or a combination of models):

  • Message passing: multiple processes, usually on different CPUs, collaborate by explicitly sending information around. This model was dominant on computing clusters, and the classic implementation was MPI.
  • Shared memory: multiple processes collaborate by accessing the same data objects (perhaps hierarchically, with each process having faster access to its local memory). This model is feasible on multi-core CPUs, on some high-end supercomputers, or on clusters through data-object libraries (such as Global Arrays) that hide the required message passing.
  • Multithreading: multiple threads on the same CPU work in the same data space. This model especially suited to multicore CPUs, and it can be implemented explicitly (through threading libraries) or transparently (through directives that tell the compiler how to parallelize loops, or again through libraries that hide the threads internally, such as FFTW or PETSc).

Another model worth mentioning is task farming (e.g., repeating a simulation for many different sets of parameters). Here there is no communication between processes, and the main job is dispatching tasks efficiently and collecting results.

The main concerns in parallel programming are

  • Efficiency: that's why you're parallel-programming anyway, right? There are many aspects to efficiency, including load balancing, avoiding communication bottlenecks, optimizing for specific processors and architectures (these days memory access is especially important). I would add my usual warning that human time consumed in development is as important as computer time used in execution, and the two add together in the crucial time-to-paper statistic.
  • Correctness: are you sure that your code is really doing what it should? Explicit message passing introduces synchronization problems such as deadlocks (multiple processes waiting on each other in non-solvable fashion) and race conditions (code that has different results depending on the contingent order of execution of processes).

    While not necessarily incorrect, the result of a computation can be nondeterministic. At the salon, Nick brought up the example of the self-optimizing library FFTW.

Last, we spoke about the notion of process and thread. A CPU process consists of code, memory, and one or more threads.

All threads within a process share the same data, but execute instructions at different points within the code. The operating system can switch between threads without warning (and on multicore CPU, threads may truly be running in parallel), so threads must be careful in writing and reading variables, because these could be changed under their feet (as it were) if a thread gets suspended in the middle of writing or reading. These problems are managed with synchronization mechanisms such as locks, semaphores, and more.

In Python the benefits of multithreading are limited: because of the complexity of managing memory access in a high-level, interpreted language, the standard Python interpreter restricts access to all Python objects to one thread at a time. This is known as the Global Interpreter Lock. Thus, the preferred multi-core parallel-computing model in Python is multiprocessing, supported by both message passing and shared memory. The Python multiprocessing module deals with this case, but multiprocessing can be achieved with a similar interface on networked clusters.

The Python multiprocessing module in practice

The multiprocessing module is a relatively recent addition to the Python canon (version 2.6) that standardizes a previous third party package.

Note: on OS X (at least on Snow Leopard) the multiprocessing distributed with python 2.6 is broken. The best way to get it forward may be to update python (brew install python; then make sure your $PATH includes /usr/local and /usr/local/share/python). You will need to rebuild your virtualenvs after updating Python.

In our salon, we realtime-coded a very simple example from GW data analysis: a search for an inspiral signal using matched filtering. Here's a cleaned-up version. We start by defining the signal and the matched-filtering products between signals; we'll put these in a separate module, filtering.py:

# --- module filtering.py ---

# integer division should return a float, so we avoid the classic
# "C" bug that 3/5 = 0; this is standard in Python 3, by the way
from __future__ import division

import math, numpy as N

# we'll need a couple of constants
Msun = 4.9257e-6    # Mass of the Sun in seconds
Mpcs = 1.02927e14   # megaparsec in seconds

# we work with a basic array of frequencies from 0 to fmax;
# we take one more element than a power of two, so that the inverse real FFT 
# has 2^N elements; this is consistent with the standard arrangements
# of frequencies in numerical FFTs, which goes as
# [0,df,2*df,...,f_Nyquist,-(f_Nyquist-df),..,-df]

fmax = 1024
f    = N.linspace(0,fmax,2**16 + 1) # enough samples to cover chirp mass
                                    # between 0.8 and 1.75 Msun
df   = f[1] - f[0]                  # frequency spacing

fmin = 40
def template(Mc):
    """Return a 'Newtonian' chirp with chirp mass Mc [Msun],
    as defined in the frequency domain by the stationary-phase approximation.
    See, e.g., Maggiore (2007)."""

    # by operating on f[1:] avoid taking "difficult" powers at f = 0
    phasing   = (3/128) * (math.pi * Mc * Msun * f[1:])**(-5/3)
    # the last factor (a Boolean) sets to 0 all elements with f < fmin
    amplitude = math.sqrt(5/24) * math.pi**(-2/3) * (Mc * Msun)**(5/6) * f[1:]**(-7/6) * (f[1:] > fmin)

    # but we include f = 0 in the final resulting array
    waveform = N.zeros(len(f),'complex128')    
    waveform[1:] = amplitude * N.exp(-1j * phasing)

    return waveform

# to see these waveforms in the time domain you could do
# import matplotlib.pyplot as P
# P.plot(N.fft.irfft(template(0.8)))
# P.plot(N.fft.irfft(template(1.74)),'r')

def normsq(signal):
    """Squared "noise" norm of a signal (with noise variance = 1)."""
    return (4/df) * N.sum(N.abs(signal)**2)

def normtemplate(Mc):
    """Return a normalized template by calling template and normsq."""
    t = template(Mc)
    return t / math.sqrt(normsq(t))

def maxsnr(signal,temp):
    """Signal--template correlation, maximized over time shifts
    with the 'fft trick'. Again, see Maggiore (2007)."""
    return N.max((4/df) * N.real(N.fft.irfft(N.conj(signal) * temp)))

We then set up a parallel code to evaluate the signal-to-noise ratio (SNR) of a fiducial signal after filtering by all templates in a bank. We will do so by using the multiprocessing abstraction Pool. Specifically:

  • Pool initializes the desired number of worker processes, which are spawned with the fork function of the operating system. Each spawned process gets a complete copy of the memory space of the current process, including all the function and variable definitions. The copy is done efficiently, and blocks of memory will be actually copied only when they are first modified in the new process. (This is not quite true on Windows, but we shall not worry about that.)
  • The workers in the Pool are then asked (with the method map) to evaluate a function (in this case getsnr) on all the elements of a list (Mc); the list elements are spread among the workers, which operate in parallel; but the results are returned in the correct order. All of this happens transparently, but you can bet that it requires quite a bit of message passing under the hood.

Here's a script that does all this:

# --- script search.py ---

import time, multiprocessing as mp
import numpy as N, matplotlib.pyplot as P
import filtering as F   # our own module with data-analysis definitions

# we're going to evaluate SNRs against this fiducial signal...
signal = F.template(1.2)

# ...for normalized templates in this range of chirp masses
Mcs = N.arange(0.8,1.74,0.01)

def getsnr(Mc):
    """Evaluate SNR of fiducial signal after filtering by a normalized
    template with chirp mass Mc."""
    return F.maxsnr(signal,F.normtemplate(Mc))

# we first do it (and time it) on a single CPU
t0 = time.time()
snrs = map(getsnr,Mcs)      # equivalent to [getsnr(Mc) for Mc in Mcs]
print "Monoprocessing:", time.time() - t0

# next, we create a multiprocessing _pool_ of processes
# each is _forked_ with a copy of all variables and functions defined so far
pool = mp.Pool(processes=4)

# and now we ask the pool to evaluate getsnr() on all elements of Mcs,
# by distributing them among the workers using multiprocessing.map()
t0 = time.time()
snrs = pool.map(getsnr,Mcs)
print "Multiprocessing:", time.time() - t0

# gracefully close all worker processes
pool.close()

# let's see what they look like, using matplotlib
P.plot(Mcs,snrs)
P.show()

Pool objects support a few other methods, including apply_async(func,args), which assigns a single task to the first available worker. The function is asyncronous, since we don't want to wait for the worker to complete before we can assign more tasks, so we get back an AsyncResult object that can be queried for the actual func result with get() (when ready() is True, or after calling wait()).

The multiprocessing module supports also operation at a lower level, with explicit data passing. To show this we'll modify our example, and set up standing worker processes that wait for input (signals) from a Pipe and return output (SNRs) on a Queue. Pipes and queues are examples of shared multiprocessing objects that can be used for communication. (Remember that once a process is forked, all changes to its memory space remain local and are not propagated to other processes. The multiprocessing objects are actually proxies that achieve propagation by hidden message passing.)

A convenient feature of multiprocessing is that messages can be any types of Python objects (whereas message-passing libraries such as MPI are limit to arrays of basic types such as integers and floats). Internally, this is achieved with serialization.

The multiprocessing communication objects include Events and Conditions (which act as signals between processes), Locks (used to control exclusive access to resources), Semaphores (used to control shared but limited access to resources), as well as shared, synchronized versions of Python lists, dictionaries, and even namespaces, all obtained through multiprocessing.Manager. Updates to these objects are seen in all processes. Shared memory objects are also possible, specifically Value and Array (these are statically-typed ctypes objects).

If the last paragraph was a little scary, don't worry! As I hinted above, parallel programming is nice if you can stay within high-level abstractions, but gets "dirty" pretty soon if you have to worry about the details of concurrency. So I hope you don't!

Here's a script that implements the cached evaluation of SNRs on a set of worker processes:

# --- script cachedsearch.py ---

import time, multiprocessing as mp, random
import numpy as N, matplotlib.pyplot as P
import filtering as F   # our own module with data-analysis definitions

# this time we're going to evaluate the SNR of multiple fiducial signals
# against the same bank of templates,
nsignals = 10
Mcs = N.arange(0.8,1.74,0.01)

# to speed up the computation, we're going to cache subsets
# of pre-computed templates within each worker process

def worker(Mcs,pipe,results):
    """Worker function for parallel processes: takes as input
    a list 'Mcs' of chirp masses, a Pipe to receive fiducial
    signals from the master, and a Queue 'results' to return
    maximum SNRs and the corresponding chirp mass."""

    # make a list of cached templates
    templates = [F.normtemplate(Mc) for Mc in Mcs]

    # loop forever!
    while True:
        # receive a signal from the master process
        signal = pipe.recv()
    
        # exit if it is a poison pill
        if signal is None:
            return
    
        # evaluate all the SNRs
        snrs = [F.maxsnr(signal,t) for t in templates]
    
        # put into queue the location and value of the max SNR
        maxt = N.argmax(snrs)
        results.put((Mcs[maxt],snrs[maxt]))


# we will run 4 worker processes
cpus = 4

# we create shared, synchronized objects to send signals
# and to receive results
pipes   = [mp.Pipe() for i in range(cpus)]  # a Pipe for every worker
results = mp.Queue()                        # a single Queue for all
                                            # (that's almost poetry)

# we will not use the multiprocessing.Pool abstraction,
# but instead create a list of processes and have each execute
# the function 'worker' with arguments given by a subset of chirp masses,
# the worker "end" of the appropriate Pipe, and the Queue
procs = [mp.Process(name = 'worker-%s' % i,
                    target = worker,
                    args = (Mcs[i::cpus],pipes[i][1],results)
                    ) for i in range(cpus)]

# start all the processes
for p in procs:
    p.start()

# repeat 'nsignals' times
for j in range(nsignals):
    # create a signal with random chirp mass
    trueMc = random.uniform(0.8,1.74)
    signal = F.template(trueMc)

    # get maximum SNRs from workers
    t0 = time.time()        # start timing
    for p in pipes:         # send signal to all workers...
        p[0].send(signal)   # ...using the master "end" of the Pipe
    snrs = N.array([results.get() for i in range(cpus)])
                            # collect 'cpus' results from the Queue,
                            # place them into an array
    print "This iteration:", time.time() - t0

    # compare the Mc with the maximum SNRs with the correct one
    print "True: %.3f Msun -> best fit: %.3f Msun" % (trueMc,snrs[N.argmax(snrs[:,1]),0])

# send poison pills to all workers
for p in pipes:
    p[0].send(None)

# wait for worker processes to terminate
for p in procs:
    p.join()

Cool! That's where we left things, but I should give you some

Further resources

In order of wonkishness:

  • Using IPython for parallel computing. It lets you use abstractions similar to Pool on networked clusters, but requires a little work to set these up.
  • Classic MPI in Python. It lets you pass any type of objects, is optimized for numpy, and can interoperate with Python C extensions that call MPI.
  • Using multiprocessing shared Array objects to pass numpy arrays economically. Not pretty, but see also numpy-sharedmem.
  • Cython has some native support for efficient multithreading; it also lets you turn off the GIL and use threads efficiently in C extensions under some conditions.