I've been doing a bit of programming work lately that would greatly benefit from a speed boost
using parallel/concurrent processing tools. Essentially, I'm doing a parameter sweep
where the values for two different simulation parameters are input into the simulation and allowed
to run with the results being recorded to disk at the end. The point is to find out how the
simulation results vary with the parameter values.
In my current code, a new simulation is initialized with each pair of parameter values inside one
iteration of a for loop; each iteration of the loop is independent of the others. Spreading these
iterations over the 12 cores on my workstation should result in about a 12x decrease in the amount
of time the simulation takes to run.
I've had good success using the parfor loop construct in Matlab in the past, but my simulation
was written in Python and I want to learn more about Python's multiprocessing tools, so this post
will explore that module in the context of performing parameter sweeps.
Profile the code first to identify bottlenecks
First, I profiled my code to identify where any slowdowns might be occurring in the serial
program. I used a great tutorial at the Zapier Engineering blog to write a decorator for profiling
the main instance method of my class that was doing most of the work. Surprisingly, I found that a
few numpy methods were taking the most time, namely norm() and cross(). To address this, I
directly imported the Fortran BLAS nrm2() function using scipy's get_blas_funcs() function and
hard-coded the cross product in pure Python inside the method; these two steps alone resulted in a
4x decrease in simulation time. I suspect the reason for this was because the overhead of calling
functions on small arrays outweighs the increase in speed using Numpy's optimized C code. I was
normalizing single vectors and taking cross products between two vectors at a time many times
during each loop iteration.
A brief glance at Python's multiprocessing module
PyMOTW has a good, minimal description of the
main aspects of the multiprocessing module. They state that the simplest way to create tasks on
different cores of a machine is to create new Process objects with target functions. Each object
is then set to execute by calling its start() method.
The basic example from their site looks like this:
import multiprocessing
def worker():
"""worker function"""
print 'Worker'
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker)
jobs.append(p)
p.start()
In this example, it's important to create the Process instances inside the
if __name__ == '__main__':
section of the script because child processes import the script where the target function is
contained. Placing the object instantiation in this section prevents an infinite, recursive string
of such instantiations. A workaround to this is to define the function in a different script and
import it into the namespace.
To send arguments to the function (worker() in the example above), we can use the args keyword
in the Process object instantiation like
p = multiprocessing.Process(target=worker, args=(i,))
A very important thing to note is that the arguments must be objects that can be pickled using
Python's pickle module. If an argument is a class instance, this means that every attritube of that
class must be pickleable.
An important class in the multiprocessing module is a Pool. A Pool object
controls a pool of worker processes. Jobs can be submitted to the Pool, which then sends the jobs
to the individual workers.
The Pool.map() achieves the same functionality as Matlab's parfor construct. This method
essentially applies a function to each element in an iterable and returns the results. For example,
if I wanted to square each number in a list of integers between 0 and 9 and perform the square
operation on multiple processors, I would write a function for squaring an argument, and supply
this function and the list of integers to Pool.map(). The code looks like this:
import multiprocessing
def funSquare(num):
return num ** 2
if __name__ == '__main__':
pool = multiprocessing.Pool()
results = pool.map(funSquare, range(10))
print(results)
Design the solution to the problem
In my parameter sweep, I have two classes: one is an object that I'm simulating and the other acts
as a controller that sends parameters to the structure and collects the results of the
simulation. Everything was written in a serial fashion and I want to change it so the bulk of the
work is performed in parallel.
After the bottlenecks were identified in the serial code, I began thinking about how the the
problem of parameter sweeps could be addressed using the multiprocessing module.
The solution requirements I identified for my parameter sweep are as follows:
- Accept two values (one for each parameter) from the range of values to test as inputs to the
simulation.
- For each pair of values, run the simulation as an independent process.
- Return the results of the simulation as as a list or Numpy array.
I often choose to return the results as Numpy arrays since I can easily pickle them when saving to
a disk. This may change depending on your specific problem.
Implementation of the solution
I'll now give a simplified example of how this solution to the parameter sweep can be implemented
using Python's multiprocessing module. I won't use objects like in my real code, but will first
demonstrate an example where Pool.map() is applied to a list of numbers.
import multiprocessing
def runSimulation(params):
"""This is the main processing function. It will contain whatever
code should be run on multiple processors.
"""
param1, param2 = params
# Example computation
processedData = []
for ctr in range(1000000):
processedData.append(param1 * ctr - param2 ** 2)
return processedData
if __name__ == '__main__':
# Define the parameters to test
param1 = range(100)
param2 = range(2, 202, 2)
# Zip the parameters because pool.map() takes only one iterable
params = zip(param1, param2)
pool = multiprocessing.Pool()
results = pool.map(runSimulation, params)
This is a rather silly example of a simulation, but I think it illustrates the point nicely. In the
__main__ portion of the code, I first define two lists for each parameter value that I want to
'simulate.' These parameters are zipped together in this example because Pool.map() takes only
one iterable as its argument. The pool is opened using with multiprocessing.Pool().
Most of the work is performed in the function runSimulation(params). It takes a tuple of two
parameters which are unpacked. Then, these parameters are used in the for loop to build a list of
simulated values which is eventually returned.
Returning to the __main__ section, each simulation is run on a different core of my machine using
the Pool.map() function. This applies the function called runSimulation() to the values in the
params iterable. In other words, it calls the code described in runSimulation() with a
different pair of values in params.
All the results are eventually returned in a list in the same order as the parameter iterable. This
means that the first element in the results list corresponds to parameters of 0 and 2 in this
example.
Iterables over arbitrary objects
In my real simulation code, I use a class to encapsulate a number of structural parameters and
methods for simulating a polymer model. So long as instances of this class can be pickled, I can use them as the iterable in Pool.map(),
not just lists of floating point numbers.
import multiprocessing
class simObject():
def __init__(self, params):
self.param1, self.param2 = params
def runSimulation(objInstance):
"""This is the main processing function. It will contain whatever
code should be run on multiple processors.
"""
param1, param2 = objInstance.param1, objInstance.param2
# Example computation
processedData = []
for ctr in range(1000000):
processedData.append(param1 * ctr - param2 ** 2)
return processedData
if __name__ == '__main__':
# Define the parameters to test
param1 = range(100)
param2 = range(2, 202, 2)
objList = []
# Create a list of objects to feed into pool.map()
for p1, p2 in zip(param1, param2):
objList.append(simObject((p1, p2)))
pool = multiprocessing.Pool()
results = pool.map(runSimulation, objList)
Again, this is a silly example, but it demonstrates that lists of objects can be used in the
parameter sweep, allowing for easy parallelization of object-oriented code.
Instead of runSimulation(), you may want to apply an instance method to a list in pool.map(). A
naïve way to do this is to replace runSimulation with with the method name but this too causes
problems. I won't go into the details here, but one solution is to use an instance's __call__()
method and pass the object instance into the pool. More details can be found here.
Comparing computation times
The following code makes a rough comparison between computation time for the parallel and serial
versions of map():
import multiprocessing
import time
def runSimulation(params):
"""This is the main processing function. It will contain whatever
code should be run on multiple processors.
"""
param1, param2 = params
# Example computation
processedData = []
for ctr in range(1000000):
processedData.append(param1 * ctr - param2 ** 2)
return processedData
if __name__ == '__main__':
# Define the parameters to test
param1 = range(100)
param2 = range(2, 202, 2)
params = zip(param1, param2)
pool = multiprocessing.Pool()
# Parallel map
tic = time.time()
results = pool.map(runSimulation, params)
toc = time.time()
# Serial map
tic2 = time.time()
results = map(runSimulation, params)
toc2 = time.time()
print('Parallel processing time: %r\nSerial processing time: %r'
% (toc - tic, toc2 - tic2))
On my machine, pool.map() ran in 9.6 seconds, but the serial version took 163.3 seconds. My
laptop has 8 cores, so I would have expected the speedup to be a factor of 8, not a factor
of 16. I'm not sure why it's 16, but I suspect part of the reason is that measuring system time
using the time.time() function is not wholly accurate.
Important points
I can verify that all the cores are being utilized on my machine while the code is running by using
the htop console program. In some cases, Python modules like Numpy,
scipy, etc. may limit processes in Python to running on only one core on Linux machines, which
defeats the purpose of writing concurrent code in this case. (See for example this discussion.)
To fix this, we can import Python's os module to reset the task affinity in our code:
os.system("taskset -p 0xff %d" % os.getpid())
Conclusions
I think that Matlab's parfor construct is easier to use because one doesn't have to consider the
nuances of writing concurrent code. So long as each loop iteration is independent of the others,
you simply write a parfor instead of for and you're set.
In Python, you have to prevent infinite, recursive function calls by placing your code in the
__main__ section of your script or by placing the function in a different script and importing
it. You also have to be sure that Numpy and other Python modules that use BLAS haven't reset the
core affinity. What you gain over Matlab's implementation is the power of using Python as a general
programming language with a lot of tools for scientific computing. This and the multiprocessing
module is free; you have to have an institute license or pay for Matlab's Parallel Computing
Toolbox.