Distributed Image Reconstruction

MPIReco image reconstructions can also be executed across different computers, thanks to DaggerImageReconstruction. With this feature one could perform a reconstruction over the network on a machine with some specific resource, be it a specific system matrix or access to a GPU.

To enable a distributed reconstruction one has to first use the Distributed standard library to add a new Julia process on the remote machine.

using Distributed
workers = addprocs(["remote_address])
worker = first(workers)
worker = 1 #hide # comment out to properly connect to a different process
1

The worker is the Julia process id, which is used to identify where to move data to and perform computations on. Afterwards we just load both MPIReco and DaggerImageReconstruction:

using MPIReco, DaggerImageReconstruction

Instead of a MPIFile, we now want to create a DMPIFile, a distributed MPIFIle. Such a file expects a path on the remote machine together with the worker id:

distr_bSF = DMPIFile(joinpath(datadir, "calibrations", "12.mdf"), worker = worker)
distr_b = DMPIFile(joinpath(datadir, "measurements", "20211226_203916_MultiPatch", "1.mdf"), worker = worker)
DMPIFile{MDFFileV2{Base.ReshapedArray{Int32, 4, Base.ReinterpretArray{Int32, 1, UInt8, Vector{UInt8}, false}, Tuple{}}}}
	Study: "FocusField", Dates.DateTime("2014-11-13T16:20:01.217")
	Experiment: "Ph5DotsDF10Overlap0LL (E99)", Dates.DateTime("2014-11-27T14:40:01.280")

Note that you might need to consider differences between the operating systems of both machines. For example, in this case we constructing our filepath locally, while evaluating it on the remote. You could also construct the path on the worker with:

remotecall_fetch(() -> joinpath("..."), worker)
"..."

Once we have our distributed files, we can configure our reconstruction like usual and the algorithm figures out on which worker to execute based on the provided distributed MPIFiles. In this case, both files were located on the remote. If the measurements are located on the local machine, then one has to transfer the data over the network.

c1 = reconstruct("SinglePatch", distr_b;
                   SNRThresh=5,
                   sf = distr_bSF,
                   frames=1:acqNumFrames(distr_b),
                   minFreq=80e3,
                   recChannels=1:rxNumChannels(distr_b),
                   iterations=1,
                   spectralLeakageCorrection=true);
[ Info: Loading SM
[ Info: Preparing SF
[ Info: Adapting SF

Most MPIFiles instances are just handles to one or more local files and thus can't be meaningfully send over the network. To get around this, you can transform an MDF into an MDFinMemory, which is a fully in-memory representation of an MDF.

b = MPIFile(joinpath(datadir, "measurements", "20211226_203916_MultiPatch", "1.mdf"))
bInMemory = MDFv2InMemory(b)
MPIFile
	Study: "FocusField", Dates.DateTime("2014-11-13T16:20:01.217")
	Experiment: "Ph5DotsDF10Overlap0LL (E99)", Dates.DateTime("2014-11-27T14:40:01.280")

Now the algorithm can't determinte the worker from the measurement anymore and we have to utilize the loadDaggerPlan function from DaggerImageReconstruction. This function expects a path to a local RecoPlan, which is then constructed on the given worker:

planpaths = getRecoPlanList(;full=true)
index = findfirst(endswith("SinglePatch.toml"), planpaths)
distr_plan = loadDaggerPlan(planpaths[index], getRecoPlanModules(), worker = 1)
RecoPlan{DaggerReconstructionAlgorithm}
└─ parameter::RecoPlan{DaggerReconstructionParameter}
   └─ algo::RecoPlan{SinglePatchReconstructionAlgorithm} [Distributed, Worker 1]
      └─ parameter::RecoPlan{SinglePatchParameters}
         ├─ reco::RecoPlan{SinglePatchReconstructionParameter}
         │  ├─ solverParams::RecoPlan{ElaborateSolverParameters}
         │  │  ├─ solver
         │  │  ├─ iterations
         │  │  ├─ enforceReal
         │  │  ├─ enforcePositive
         │  │  ├─ callbacks
         │  │  ├─ normalizeReg
         │  │  ├─ randomized
         │  │  ├─ subMatrixFraction
         │  │  ├─ shuffleRows
         │  │  ├─ seed
         │  │  ├─ greedy_randomized
         │  │  └─ theta
         │  ├─ weightingParams::RecoPlan{NoWeightingParameters} [Cached, 1]
         │  ├─ reg
         │  ├─ sf
         │  ├─ arrayType
         │  └─ sfLoad::RecoPlan{DenseSystemMatixLoadingParameter} [Cached, 3]
         │     ├─ tfCorrection
         │     ├─ loadasreal
         │     ├─ freqFilter::RecoPlan{SNRThresholdFrequencyFilterParameter}
         │     │  ├─ numPeriodAverages
         │     │  ├─ numPeriodGrouping
         │     │  ├─ maxMixingOrder
         │     │  ├─ numSidebandFreqs
         │     │  ├─ SNRThresh
         │     │  ├─ minFreq
         │     │  ├─ recChannels
         │     │  └─ maxFreq
         │     ├─ bgCorrection
         │     └─ gridding::RecoPlan{SystemMatrixGriddingParameter}
         │        ├─ fov
         │        ├─ deadPixels
         │        ├─ gridsize
         │        └─ center
         ├─ post::RecoPlan{NoPostProcessing}
         └─ pre::RecoPlan{CommonPreProcessingParameters} [Cached, 1]
            ├─ tfCorrection
            ├─ neglectBGFrames
            ├─ numPeriodAverages
            ├─ numPeriodGrouping
            ├─ numAverages
            ├─ spectralLeakageCorrection
            ├─ bgParams::RecoPlan{NoBackgroundCorrectionParameters}
            ├─ frames
            └─ loadasreal

We can then configure the plan as usual:

setAll!(distr_plan; SNRThresh=5,
                   sf = distr_bSF,
                   frames=1:acqNumFrames(bInMemory),
                   minFreq=80e3,
                   recChannels=1:rxNumChannels(bInMemory),
                   iterations=1,
                   spectralLeakageCorrection=true)

And perform a normal reconstruction with it:

distr_algo = build(distr_plan)
c2 = reconstruct(distr_algo, bInMemory)
isapprox(c1.data.data, c2.data.data)
true

This page was generated using Literate.jl.