Dear Salonnières,
thanks for attending compu-salon episode 8, multiprocessing. Here's a summary of our discussion.
We're not meeting today (Mar 23) in observance of PCGM. Next week (Mar 30, at 4pm) we'll talk about using file formats (and even databases!) in Python.
I'll send out a reminder next week. Until then!
Michele
Here's what we discussed and programmed together.
Parallel programming is difficult, there's no way around it. It's especially difficult to do it in the transparent, zen-like way that we pursue in this salon. This may be because of the immaturity of tools and concepts, so things may be improving in the future. (Maybe, but things sometimes get worse: GPU programming can be more painful than cluster programming.) So be warned, pain lies this way. But if you still want to tread this path, here's the gentlest, quickest introduction I could come up with.
This is a big topic with lots of theory behind it, and many treatments. We only mentioned a few important themes. Paraphrasing Konrad Hinsen, we define
Generally in scientific computing we are happy with parallelism alone, but some pitfalls of concurrency do appear in its implementation. The main architectural constraint in managing parallelism in scientific applications is decomposition (e.g., of a computational domain) and distribution (e.g., of tasks). These are tackled through a model (or a combination of models):
Another model worth mentioning is task farming (e.g., repeating a simulation for many different sets of parameters). Here there is no communication between processes, and the main job is dispatching tasks efficiently and collecting results.
The main concerns in parallel programming are
Correctness: are you sure that your code is really doing what it should? Explicit message passing introduces synchronization problems such as deadlocks (multiple processes waiting on each other in non-solvable fashion) and race conditions (code that has different results depending on the contingent order of execution of processes).
While not necessarily incorrect, the result of a computation can be nondeterministic. At the salon, Nick brought up the example of the self-optimizing library FFTW.
Last, we spoke about the notion of process and thread. A CPU process consists of code, memory, and one or more threads.
All threads within a process share the same data, but execute instructions at different points within the code. The operating system can switch between threads without warning (and on multicore CPU, threads may truly be running in parallel), so threads must be careful in writing and reading variables, because these could be changed under their feet (as it were) if a thread gets suspended in the middle of writing or reading. These problems are managed with synchronization mechanisms such as locks, semaphores, and more.
In Python the benefits of multithreading are limited: because of the complexity of managing memory access in a high-level, interpreted language, the standard Python interpreter restricts access to all Python objects to one thread at a time. This is known as the Global Interpreter Lock. Thus, the preferred multi-core parallel-computing model in Python is multiprocessing, supported by both message passing and shared memory. The Python multiprocessing
module deals with this case, but multiprocessing can be achieved with a similar interface on networked clusters.
The multiprocessing
module is a relatively recent addition to the Python canon (version 2.6) that standardizes a previous third party package.
multiprocessing
—in general, all of Doug's Python Module of the Week series is excellent.threading
module documentation, since multiprocessing
duplicates the threading
interface as much as possible.Note: on OS X (at least on Snow Leopard) the multiprocessing
distributed with python 2.6 is broken. The best way to get it forward may be to update python (brew install python
; then make sure your $PATH
includes /usr/local
and /usr/local/share/python
). You will need to rebuild your virtualenv
s after updating Python.
In our salon, we realtime-coded a very simple example from GW data analysis: a search for an inspiral signal using matched filtering. Here's a cleaned-up version. We start by defining the signal and the matched-filtering products between signals; we'll put these in a separate module, filtering.py
:
# --- module filtering.py ---
# integer division should return a float, so we avoid the classic
# "C" bug that 3/5 = 0; this is standard in Python 3, by the way
from __future__ import division
import math, numpy as N
# we'll need a couple of constants
Msun = 4.9257e-6 # Mass of the Sun in seconds
Mpcs = 1.02927e14 # megaparsec in seconds
# we work with a basic array of frequencies from 0 to fmax;
# we take one more element than a power of two, so that the inverse real FFT
# has 2^N elements; this is consistent with the standard arrangements
# of frequencies in numerical FFTs, which goes as
# [0,df,2*df,...,f_Nyquist,-(f_Nyquist-df),..,-df]
fmax = 1024
f = N.linspace(0,fmax,2**16 + 1) # enough samples to cover chirp mass
# between 0.8 and 1.75 Msun
df = f[1] - f[0] # frequency spacing
fmin = 40
def template(Mc):
"""Return a 'Newtonian' chirp with chirp mass Mc [Msun],
as defined in the frequency domain by the stationary-phase approximation.
See, e.g., Maggiore (2007)."""
# by operating on f[1:] avoid taking "difficult" powers at f = 0
phasing = (3/128) * (math.pi * Mc * Msun * f[1:])**(-5/3)
# the last factor (a Boolean) sets to 0 all elements with f < fmin
amplitude = math.sqrt(5/24) * math.pi**(-2/3) * (Mc * Msun)**(5/6) * f[1:]**(-7/6) * (f[1:] > fmin)
# but we include f = 0 in the final resulting array
waveform = N.zeros(len(f),'complex128')
waveform[1:] = amplitude * N.exp(-1j * phasing)
return waveform
# to see these waveforms in the time domain you could do
# import matplotlib.pyplot as P
# P.plot(N.fft.irfft(template(0.8)))
# P.plot(N.fft.irfft(template(1.74)),'r')
def normsq(signal):
"""Squared "noise" norm of a signal (with noise variance = 1)."""
return (4/df) * N.sum(N.abs(signal)**2)
def normtemplate(Mc):
"""Return a normalized template by calling template and normsq."""
t = template(Mc)
return t / math.sqrt(normsq(t))
def maxsnr(signal,temp):
"""Signal--template correlation, maximized over time shifts
with the 'fft trick'. Again, see Maggiore (2007)."""
return N.max((4/df) * N.real(N.fft.irfft(N.conj(signal) * temp)))
We then set up a parallel code to evaluate the signal-to-noise ratio (SNR) of a fiducial signal after filtering by all templates in a bank. We will do so by using the multiprocessing
abstraction Pool
. Specifically:
Pool
initializes the desired number of worker processes, which are spawned with the fork function of the operating system. Each spawned process gets a complete copy of the memory space of the current process, including all the function and variable definitions. The copy is done efficiently, and blocks of memory will be actually copied only when they are first modified in the new process. (This is not quite true on Windows, but we shall not worry about that.)Pool
are then asked (with the method map
) to evaluate a function (in this case getsnr
) on all the elements of a list (Mc
); the list elements are spread among the workers, which operate in parallel; but the results are returned in the correct order. All of this happens transparently, but you can bet that it requires quite a bit of message passing under the hood.Here's a script that does all this:
# --- script search.py ---
import time, multiprocessing as mp
import numpy as N, matplotlib.pyplot as P
import filtering as F # our own module with data-analysis definitions
# we're going to evaluate SNRs against this fiducial signal...
signal = F.template(1.2)
# ...for normalized templates in this range of chirp masses
Mcs = N.arange(0.8,1.74,0.01)
def getsnr(Mc):
"""Evaluate SNR of fiducial signal after filtering by a normalized
template with chirp mass Mc."""
return F.maxsnr(signal,F.normtemplate(Mc))
# we first do it (and time it) on a single CPU
t0 = time.time()
snrs = map(getsnr,Mcs) # equivalent to [getsnr(Mc) for Mc in Mcs]
print "Monoprocessing:", time.time() - t0
# next, we create a multiprocessing _pool_ of processes
# each is _forked_ with a copy of all variables and functions defined so far
pool = mp.Pool(processes=4)
# and now we ask the pool to evaluate getsnr() on all elements of Mcs,
# by distributing them among the workers using multiprocessing.map()
t0 = time.time()
snrs = pool.map(getsnr,Mcs)
print "Multiprocessing:", time.time() - t0
# gracefully close all worker processes
pool.close()
# let's see what they look like, using matplotlib
P.plot(Mcs,snrs)
P.show()
Pool
objects support a few other methods, including apply_async(func,args)
, which assigns a single task to the first available worker. The function is async
ronous, since we don't want to wait for the worker to complete before we can assign more tasks, so we get back an AsyncResult
object that can be queried for the actual func
result with get()
(when ready()
is True
, or after calling wait()
).
The multiprocessing
module supports also operation at a lower level, with explicit data passing. To show this we'll modify our example, and set up standing worker processes that wait for input (signals) from a Pipe
and return output (SNRs) on a Queue
. Pipes and queues are examples of shared multiprocessing
objects that can be used for communication. (Remember that once a process is forked, all changes to its memory space remain local and are not propagated to other processes. The multiprocessing
objects are actually proxies that achieve propagation by hidden message passing.)
A convenient feature of multiprocessing
is that messages can be any types of Python objects (whereas message-passing libraries such as MPI are limit to arrays of basic types such as integers and floats). Internally, this is achieved with serialization.
The multiprocessing
communication objects include Event
s and Condition
s (which act as signals between processes), Lock
s (used to control exclusive access to resources), Semaphore
s (used to control shared but limited access to resources), as well as shared, synchronized versions of Python lists, dictionaries, and even namespaces, all obtained through multiprocessing.Manager
. Updates to these objects are seen in all processes. Shared memory objects are also possible, specifically Value
and Array
(these are statically-typed ctypes
objects).
If the last paragraph was a little scary, don't worry! As I hinted above, parallel programming is nice if you can stay within high-level abstractions, but gets "dirty" pretty soon if you have to worry about the details of concurrency. So I hope you don't!
Here's a script that implements the cached evaluation of SNRs on a set of worker processes:
# --- script cachedsearch.py ---
import time, multiprocessing as mp, random
import numpy as N, matplotlib.pyplot as P
import filtering as F # our own module with data-analysis definitions
# this time we're going to evaluate the SNR of multiple fiducial signals
# against the same bank of templates,
nsignals = 10
Mcs = N.arange(0.8,1.74,0.01)
# to speed up the computation, we're going to cache subsets
# of pre-computed templates within each worker process
def worker(Mcs,pipe,results):
"""Worker function for parallel processes: takes as input
a list 'Mcs' of chirp masses, a Pipe to receive fiducial
signals from the master, and a Queue 'results' to return
maximum SNRs and the corresponding chirp mass."""
# make a list of cached templates
templates = [F.normtemplate(Mc) for Mc in Mcs]
# loop forever!
while True:
# receive a signal from the master process
signal = pipe.recv()
# exit if it is a poison pill
if signal is None:
return
# evaluate all the SNRs
snrs = [F.maxsnr(signal,t) for t in templates]
# put into queue the location and value of the max SNR
maxt = N.argmax(snrs)
results.put((Mcs[maxt],snrs[maxt]))
# we will run 4 worker processes
cpus = 4
# we create shared, synchronized objects to send signals
# and to receive results
pipes = [mp.Pipe() for i in range(cpus)] # a Pipe for every worker
results = mp.Queue() # a single Queue for all
# (that's almost poetry)
# we will not use the multiprocessing.Pool abstraction,
# but instead create a list of processes and have each execute
# the function 'worker' with arguments given by a subset of chirp masses,
# the worker "end" of the appropriate Pipe, and the Queue
procs = [mp.Process(name = 'worker-%s' % i,
target = worker,
args = (Mcs[i::cpus],pipes[i][1],results)
) for i in range(cpus)]
# start all the processes
for p in procs:
p.start()
# repeat 'nsignals' times
for j in range(nsignals):
# create a signal with random chirp mass
trueMc = random.uniform(0.8,1.74)
signal = F.template(trueMc)
# get maximum SNRs from workers
t0 = time.time() # start timing
for p in pipes: # send signal to all workers...
p[0].send(signal) # ...using the master "end" of the Pipe
snrs = N.array([results.get() for i in range(cpus)])
# collect 'cpus' results from the Queue,
# place them into an array
print "This iteration:", time.time() - t0
# compare the Mc with the maximum SNRs with the correct one
print "True: %.3f Msun -> best fit: %.3f Msun" % (trueMc,snrs[N.argmax(snrs[:,1]),0])
# send poison pills to all workers
for p in pipes:
p[0].send(None)
# wait for worker processes to terminate
for p in procs:
p.join()
Cool! That's where we left things, but I should give you some
In order of wonkishness:
Pool
on networked clusters, but requires a little work to set these up.multiprocessing
shared Array
objects to pass numpy
arrays economically. Not pretty, but see also numpy-sharedmem.