Partition a Datastore in Parallel
Partitioning a datastore in parallel, with a portion of the datastore on each worker in a parallel pool, can provide benefits in many cases:
Perform some action on only one part of the whole datastore, or on several defined parts simultaneously.
Search for specific values in the data store, with all workers acting simultaneously on their own partitions.
Perform a reduction calculation on the workers across all partitions.
This example shows how to use partition
to parallelize the reading of data from a datastore. It uses a
small datastore of airline data provided in MATLAB®, and finds the mean of the non-NaN values from its
'ArrDelay'
column.
A simple way to calculate the mean is to divide the sum of all the non-NaN values by the number of non-NaN values. The following code does this for the datastore first in a non-parallel way. To begin, you define a function to amass the count and sum. If you want to run this example, copy and save this function in a folder on the MATLAB command search path.
function [total,count] = sumAndCountArrivalDelay(ds) total = 0; count = 0; while hasdata(ds) data = read(ds); total = total + sum(data.ArrDelay,1,'OmitNaN'); count = count + sum(~isnan(data.ArrDelay)); end end
The following code creates a datastore, calls the function, and calculates the mean
without any parallel execution. The tic
and toc
functions are used to time the
execution, here and in the later parallel cases.
ds = datastore(repmat({'airlinesmall.csv'},20,1),'TreatAsMissing','NA'); ds.SelectedVariableNames = 'ArrDelay'; reset(ds); tic [total,count] = sumAndCountArrivalDelay(ds) sumtime = toc mean = total/count
total = 17211680 count = 2417320 sumtime = 7.7905 mean = 7.1201
The partition
function allows you to partition the datastore into
smaller parts, each represented as a datastore itself. These smaller datastores work
completely independently of each other, so that you can work with them inside of
parallel language features such as parfor
loops and spmd
blocks.
The number of partitions in the following code is set by the numpartitions
function, based on the datastore itself
(ds
) and the parallel pool (gcp
) size. This
does not necessarily equal the number of workers in the pool. In this case, the number
of loop iterations is then set to the number of partitions
(N
).
The following code starts a parallel pool on a local cluster, then partitions the
datastore among workers for iterating over the loop. Again, a separate function is
called, which includes the parfor
loop to amass the count and sum
totals. Copy and save this function if you want to run the example.
function [total, count] = parforSumAndCountArrivalDelay(ds) N = numpartitions(ds,gcp); total = 0; count = 0; parfor ii = 1:N % Get partition ii of the datastore. subds = partition(ds,N,ii); [localTotal,localCount] = sumAndCountArrivalDelay(subds); total = total + localTotal; count = count + localCount; end end
Now the MATLAB code calls this new function, so that the counting and summing of the non-NAN values can occur in parallel loop iterations.
p = parpool('Processes',4);
reset(ds);
tic
[total,count] = parforSumAndCountArrivalDelay(ds)
parfortime = toc
mean = total/count
Starting parallel pool (parpool) using the 'Processes' profile ... Connected to the parallel pool (number of workers: 4). total = 17211680 count = 2417320 parfortime = 6.4133 mean = 7.1201
Rather than let the software calculate the number of partitions, you can explicitly
set this value, so that the data can be appropriately partitioned to fit your algorithm.
For example, to parallelize data from within an spmd
block, you can
specify the number of workers (spmdSize
) as the number of partitions
to use. The following function uses an spmd
block to perform a
parallel read, and explicitly sets the number of partitions equal to the number of
workers. To run this example, copy and save the function.
function [total,count] = spmdSumAndCountArrivalDelay(ds) spmd subds = partition(ds,spmdSize,spmdIndex); [total,count] = sumAndCountArrivalDelay(subds); end total = sum([total{:}]); count = sum([count{:}]); end
Now the MATLAB code calls the function that uses an spmd
block.
reset(ds); tic [total,count] = spmdSumAndCountArrivalDelay(ds) spmdtime = toc mean = total/count
total = 17211680 count = 2417320 spmdtime = 4.6729 mean = 7.1201
delete(p);
Parallel pool using the 'Processes' profile is shutting down.
You might get some idea of modest performance improvements by comparing the times
recorded in the variables sumtime
, parfortime
, and
spmdtime
. Your results might vary, as the performance can be
affected by the datastore size, parallel pool size, hardware configuration, and other
factors.