Parallelization data organization/transfer

11 views (last 30 days)
Matthew Lang
Matthew Lang on 5 Dec 2023
Edited: Matthew Lang on 21 Dec 2023
I've got Nf files recorded from Nu field units that contain Nd data samples from Ns sensors. Since each recording is a different duration, I've been storing them in a larger struct array "DM" using a ".val" container and pre-loading in RAM for speed. I also save "DM" as a .mat file for easier/quicker loading later.
I pull the "data" within a double for loop from the ".val" sub-container. The main function takes "data" as an input, calculates many (Np) parameters for each sample and spits back a parameter array, or "values matrix" VM, which is then saved/aggregated into a larger "values matrix array" (VMA) similarly structured to "DM". There's other inputs like unit calibration factors and threshold constants that get passed to mainfunction and other outputs similar to VMA, but this simplified section hopefully gets things across:
% <section to pre-allocate VMA here>
for ui = 1:Nu
for fi = 1:Nf
data = DM(ui,fi).val; % size(data) = [Nd,Ns]
VM = mainfunction(data); % size(VM) = [Nd,Np]
VMA(ui,fi).val = VM;
end
end
I've only ever pegged 20-30% CPU utilization, so I was hoping to spin each "ui" batch of Nf recordings to a separate (local) worker. Parfor does technically work, but struggles to keep all CPUs utilized and blows out the RAM needed, so there's lots of copying going on and the order everything is run is definitely a bit random. Not against using it, but seems like it lacks some kind of control knobs that are needed here to be efficient.
Spmd seems maybe a better way to control things, and I can make "DM" a constant then get rid of the "for ui" loop and use data = DM(labindex,fi).val; but I can't see how to save all my "VM" results back to the client. Spmd can't aggregate that VM array back into VMA b/c of the ".val" method, so I'm looking for a better way to structure things for spmd (or, perhaps, parfeval) to work.
Also, RAM usage. This is maybe on the edge of "big data". Once I have "DM" loaded & "VMA" pre-allocated, I'm sitting near 110GB of RAM utilized. With single-thread, I didn't need much more than that and was able to just store the results into their slot in VMA w/ a minimal extra amount of RAM needed for mainfunction. So far, though, my parallelize efforts have required a ton more RAM behind the scenes, even trying DM as a "parallel.pool.constant". It'd be nice to limit it more closely to that 110GB. I did try making DM distributed, but spmd can't use the .val way of extracting data from a distributed "DM", and has the same limitations on communicating VMA results back to the client.
I'm looking for a way for each worker to pull from a different section of a common source dataset, and then write back a larger batch of calculated parameters to a different section of a common results dataset. That seems simple enough, so perhaps it's my data structure that is getting in the way? None of the workers need to communicate anything besides the results back to the client, so this seems like a rather vanilla parallelization effort, I just can't quite see the correct clearing in this forest yet.

Answers (2)

Edric Ellis
Edric Ellis on 7 Dec 2023
parfor is designed to "just work" for a wide range of cases, but it's true that sometimes you need a bit more control. parforOptions is one approach if you want to be in charge of partitioning the work, but I suspect that will not help you. You've also tried parallel.pool.Constant, which gives a degree more control over data transfer, but might actually get in the way here of partitioning the data. (In parfor, your code will "slice" DM which means that only the relevant portions of DM will be sent to each worker - i.e. each worker does not get a full copy of DM).
Even though your serial implementation uses only ~25% CPU, this doesn't necessarily mean that you can speed things up using parallelisation. Some algorithms are "memory bound" (rather than "compute bound"), which means that the limiting factor is the speed at which your system can get data to the CPU from main memory (and back again). It's frustratingly hard to tell when you're in this situation, but one simple experiment you can run is to run two MATLAB clients at the same time running your serial code. (This assumes you can do that without having to page memory to disk using swap space...) If they're both able to run at the normal speed without interfering with each other, then it's a fair bet that you can get benefit from parallelisation.
From what you've said, I suspect the key to making things work efficiently is to ensure that only the process operating on a given portion of DM actually loads that data into memory. Most likely this means not loading it at the client and transferring it to the workers; rather, you want to arrange for the workers to load the data they need directly. How much this affects things depends on how many workers you're running.
One way to do that is to use spmd, a bit like this:
spmd
% Ignoring the case where Nu isn't equally divisible...
nuPerWorker = Nu / numlabs;
uStart = 1 + (labindex-1) * nuPerWorker;
uStop = uStart + nuPerWorker;
myDM = loadRangeOfDM(uStart, uStop);
myVMA = preallocateVMA();
for ui = 1:nuPerWorker
% operate on myDM(ui,...)
end
end
% Collect results from workers using Composite indexing
VMA = [VMA{:}];
This avoids loading DM on the client; it does duplicate VMA in memory at the end when retrieving the results.
  1 Comment
Matthew Lang
Matthew Lang on 21 Dec 2023
Edited: Matthew Lang on 21 Dec 2023
Thanks Edric,
The root of my problem is that my application is a bit of an inverse of the typical parallel application. I think a big assumption is that you can have loads and loads of data to process through, but the result of that crunching is smaller set of results. For me, any one individual recording is Nd x 3, but the results are Nd x ~300. So the bulk of my problem is on the back end and getting the much larger result back together on the client.
It seems like Matlab fundamentally lacks a common-memory access functionality within the toolbox (vs direct memory access within e.g. OpenMP). I even tried passing a handle to an object to workers, thinking it's the closest thing to "pass by reference" in Matlab, but not for spmd (it made Nworker copies of the object). This makes sense when you have many workers spread out over many physical machines, but is kind of frustrating for a single machine w/ large RAM & high core count specifically to try and keep only one copy of both source & results in RAM. It's not dissimilar to this post a few years ago. I think that user ended up just changing the order in which they distributed an input array and were happy about the results. Unfortunately, my back-end result being much larger than the input places extra limitations. I tried to chronicle all the different options (that ended up working) in another self-answer. I would welcome any comments on my methods or perhaps they would spark a new idea from the community.

Sign in to comment.


Matthew Lang
Matthew Lang on 21 Dec 2023
Edited: Matthew Lang on 21 Dec 2023
Alright. After a lot of reading, bit of Mathworks technical help, and a bunch of trial & error, I think I've come up with several ways to accomplish a high-core count, large RAM effort of keeping a large source dataset & even larger results dataset in RAM.
TL;DR: surprisingly enough, placing parfor on my inner loop (against the 'best practices' advice) got me to 50% of the ultimate speedup possible, while keeping peak RAM to a sane limit, and still able to index the results very fast. The other method that seemed to work (assuming you have a very fast NVME drive) is to use spmd & ValueStore, though getting those results back into RAM to do further work on the results is slower than you'd think.
Now, the details. Below I list out 11 methods I tried to test different structures and methods to accomplish running my mainfunction() algorithm on a large number of recordings from different devices. Below the descriptions of the method is a table that lists the results.
A) - single thread, using struct array for VMA results
for ui = 1:Nu
for fi = 1:Nf
data = DM(ui,fi).data;
VMA(ui,fi).val = mainfunction(data);
end
end
B) - single thread, using cell array for VMA results
for ui = 1:Nu
for fi = 1:Nf
data = DM(ui,fi).data;
VMA{ui,fi} = mainfunction(data);
end
end
C) - parfor on outer ui loop
parfor ui = 1:Nu
for fi = 1:Nf
data = DM(ui,fi).data;
VMA(ui,fi).val = mainfunction(data);
end
end
D) - parfor on inner fi loop
for ui = 1:Nu
parfor fi = 1:Nf
data = DM(ui,fi).data;
VMA(ui,fi).val = mainfunction(data);
end
end
E) - spmd: use manual gather method
DMd = distributed(DM'); % DM has 24 columns, and there are 24 workers
clear DM
tic;
spmd % Each worker operating on one unit's recordings
for fi = 1:Nf
VMw{fi} = mainfunction(getLocalPart(DMd(fi,labindex)).data);
end
end
toc;
tic;
for ui = 1:Nu
VMt = VMw{ui};
VMA{ui,:} = VMt'; % VMA is now accessed like this: VM = VMA{ui}{fi};
end
spmd % clear variables still residing on workers
VMw=[];
DMd=[];
fi=[];
end
clear VMt VMw
delete(gcp('nocreate')); % shutdown pool, release any remaining RAM for workers
toc;
F) - spmd: use builtin gather function
DMd = distributed(DM');
clear DM
p = parpool;
tic;
spmd
for fi = 1:Nf
VMAw{fi} = mainfunction(getLocalPart(DMd(fi,spmdIndex)).data);
end
end
toc;
tic;
VMA = gather(VMAw);
clear VMAw
toc;
delete(p,'NoCreate'); % release pool + worker RAM
G) - spmd: no gather, access worker results directly
DMd = distributed(DM');
clear DM
tic;
spmd
for fi = 1:Nf
VMAw{fi} = mainfunction(getLocalPart(DMd(fi,spmdIndex)).data);
end
end
toc;
% Index using VM = VMAw{fi}{ui};
H) - feval w/ fetchNext
sizeDM = size(DM);
for ii = 1:Nu
for jj = 1:Nf
idx = sub2ind(sizeDM,ii,jj);
F(idx) = parallel.FevalFuture;
end
end
tic;
for ui = 1:Nu
for fi = 1:Nf
data = DM(ui,fi).data;
idx = sub2ind(sizeDM,ui,fi);
F(idx) = parfeval(@mainfunction,1,data);
end
end
toc;
tic;
for ii = 1:Nu*Nf
[completedIdx,value] = fetchNext(F,30);
[ui,fi] = ind2sub(sizeDM,completedIdx);
VMA{ui,fi} = value;
end
toc;
I) spmd: use ValueStore directly
p = parpool;
DMd = distributed(DM');
clear DM
tic;
spmd
VS = getCurrentValueStore;
for fi = 1:Nf % (wfi = worker file index)
vkey = sprintf('VM_%02d_%02d',spmdIndex,fi);
VS(vkey) = mainfunction(getLocalPart(DMd(fi,spmdIndex)).data); % size(VM) = [Nd,Np]
end
end
CVS = p.ValueStore; % indexing looks like: VM = CVS(sprintf('VM_%02d_%02d',ui,fi));
toc;
J) spmd: recreate VMA from ValueStore
p = parpool;
DMd = distributed(DM');
clear DM
tic;
spmd
VS = getCurrentValueStore;
for fi = 1:Nf
vkey = sprintf('VM_%02d_%02d',spmdIndex,fi);
VS(vkey) = mainfunction(getLocalPart(DMd(fi,spmdIndex)).data);
end
end
toc;
tic;
CVS = p.ValueStore;
for ui = 1:Nu
for fi = 1:Nf
VMA{ui,fi} = CVS(sprintf('VM_%02d_%02d',ui,fi));
end
end
delete(gcp('nocreate')); %shutdown pool to release RAM
toc;
K) spmd: just like J, but with larger variable chunks in the ValueStore
p = parpool;
DMd = distributed(DM');
%}
tic;
spmd
VS = getCurrentValueStore;
for fi = 1:Nf
VMw{fi} = mainfunction(getLocalPart(DMd(fi,spmdIndex)).data);
end
vkey = sprintf('VM_%02d',spmdIndex);
VS(vkey) = VMw;
end
toc;
tic;
CVS = p.ValueStore;
for ui = 1:Nu
VMt = CVS(sprintf('VM_%02d',ui));
VMA{ui} = VMt';
end
delete(gcp('nocreate')); %shutdown pool to release RAM
toc;
For all the tests, I setup a pool with 24 workers (as I had 24 units of recordings in the dataset). I took note of peak RAM usage (in GB), the time (in seconds) to crunch the files separately from the time it took to gather the results back together, as that 2nd piece seemed to be quite the challenge. I also used a little test of selecting a specific result and plotting a parameter to see how quickly I could index a result (in ms). I have follow-on analysis that loops through the entire results array which would be drastically impacted if there's even a small increase in time to index the results:
% VMA indexing speed check
fi=1;ui=2;
tic;
VM = VMA(ui,fi).val;
figure(1);clf;plot(VM(:,1));
toc;
Observations:
-First off, it was interesting to note no difference between A) & B), so no performance hit for using a cell array vs a struct array.
-The better performance of parfor on the file # index was surprising. (fyi, I have 24 units, but ~800 files for each unit). I would have assumed it was better to put the parfor on the loop with fewer iterations. The fact that it consumed more RAM and took longer still kind of boggles me.
-Once I found an spmd method that could actually run (using getLocalPart()), I was shocked at how quick it was, nearly twice as fast in just running the mainfunction() operation as the best parfor result. The problem remains getting the data back to the client. This is where being able to save results back to a share memory object would be awesome. But whether I gathered the data myself or used the built-in gather() function, I was frankly shocked that the "Gather Time" was nearly four times as long as just the "Process Time". This is definitely stretching my default concept that no re-arranging of data should be faster than if everything is in RAM. Perhaps there's some efficiency to be had in how the variables RAM is structured, but seems like this is a symptom of some deep inefficiency in either how I have my data storage setup, or the way Matlab/Windows does memory management.
-If I just ignored the gather, then I can have results to look at in just ~20seconds and with pretty low peak RAM. This got me very excited. The problem is then if I try to access the worker composite object result directly without "gathering", there's something about a composite object that adds a significant amount of overhead to indexing the result. Absolutely crushed that this didn't work.
-Using feval to generate a job queue and get the results sounded intriguing, and it achieved similar Process Time, but the gather time was only marginally better than the spmd, and the peak RAM was similar.
-The last three tests used ValueStore within spmd to save the results. This also surprised me at how good it was. Since it didn't store results in RAM, and since I have a pretty fast NVME drive, the peak RAM was actually lower than the single thread result. The problem is that accessing the ValueStore pulls data from disk every time you access a new entry, which makes the results indexing acceptable if a human was rifling through the results matrix, but is nearly twice as slow when looping through the results, adding a LOT of disk activity as well.
-Unfortunately, it's not really like saving a variable to a .mat file and takes a lot longer than I would think to loop through and pull each entry from the ValueStore & copy it to RAM. If I save the results in larger chunks (e.g. an entire unit's recordings results), the loading happens faster, but the peak RAM was actually the worst of the bunch.
So, when I started this process, I assumed I'd be able to find a method that
1) Cut the process time down by >10x, while
2) only using a marginal amount of extra RAM and
3) leaving the results in RAM in a fashion that is amenable to fast follow-on analysis
While there was no method that ticked all boxes, parfor placed on the loop with the larger # of iterations (kinda counterintuitive) seemed to come out as the best option. The ValueStore was both surprising (I've never seen an NVME drive show 4Gb/s throughput in normal use), and slightly frustrating that saving of the results is so quick, but loading of the results is not.
Comments or fingers to poke holes through my logic and/or methods are welcome.

Categories

Find more on Parallel Computing Fundamentals in Help Center and File Exchange

Products


Release

R2021a

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!