using LinearAlgebra
using Distributions
using BenchmarkTools
n = 6000;
x = rand(Uniform(0,1), n,n);
println(BLAS.get_num_threads())4
This document is the seventh of a set of notes, this document focusing on the parallelization. The notes are not meant to be particularly complete in terms of useful functions (Google and LLMs can now provide that quite well), but rather to introduce the language and consider key programming concepts in the context of Julia.
Given that, the document heavily relies on demos, with interpretation in some cases left to the reader.
In some cases, I have set the code chunks to not execute when rendering this document, since the parallelization complicates execution and display of results.
Let’s first discuss some general concepts related to parallelization, based on the overview in this SCF tutorial.
Some of the considerations that apply when thinking about how effective a given parallelization approach will be include:
The following are some basic principles/suggestions for how to parallelize your computation.
More on GPUs in the next set of notes.
One of the nice things about Julia is that much of the parallelization functionality is available directly in the Base package/module or in a limited number of additional packages. In Python and R, there is a confusing explosion of different parallelization packages (though for R, I highly recommend the future package and related packages for all your non-GPU parallelization work). E.g., in Python there is ipyparallel, ray, dask, multiprocessing in addition to other numerical computing packages that can run code in parallel (and on the GPU), such as JAX and PyTorch.
This material is drawn from this SCF tutorial.
As with Python and R, Julia uses BLAS, a standard library of basic linear algebra operations (written in Fortran or C), for linear algebra operations. A fast BLAS can greatly speed up linear algebra relative to the default BLAS on a machine. Julia uses a fast, open source, free BLAS library called OpenBLAS. In addition to being fast when used on a single core, the openBLAS library is threaded - if your computer has multiple cores and there are free resources, your linear algebra will use multiple cores.
Here’s an example.
using LinearAlgebra
using Distributions
using BenchmarkTools
n = 6000;
x = rand(Uniform(0,1), n,n);
println(BLAS.get_num_threads())4
function chol_xtx(x)
z = x'x; ## `z` is positive definite.
C = cholesky(z);
return C;
end
BLAS.set_num_threads(4);
@btime chol = chol_xtx(x); 5.734 s (5 allocations: 549.32 MiB)
BLAS.set_num_threads(1);
@btime chol = chol_xtx(x); 6.881 s (5 allocations: 549.32 MiB)
We see that using four threads is faster than one, but in this case the speedup was (surprisingly) minimal. (In Python or R, I would expect a much bigger speedup and since all are using BLAS/LAPACK, I don’t understand why there is so little difference in Julia.)
By default, Julia will set the number of threads for linear algebra equal to the number of processors on your machine.
As seen above, you can check the number of threads being used with:
BLAS.get_num_threads()1
You can also control the number of threads used for linear algebra via the OMP_NUM_THREADS (or OPENBLAS_NUM_THREADS) environment variable in the shell:
## Option 1
export OMP_NUM_THREADS=4
julia
## Option 2
OMP_NUM_THREADS=4 juliaIn Julia, you can directly set up software threads to use for parallel processing.
Here we’ll see some examples of running a for loop in parallel, both acting on a single object and used as a parallel map operation.
Here we can operate on a vector in parallel:
using Base.Threads
n = 50000000;
x = rand(n);
@threads for i in 1:length(x)
x[i] = tan(x[i]) + 3*sin(x[i]);
endThe iterations are grouped into chunks; otherwise the latency from communicating about each individual task (iteration) would probably slow things down a lot. One has some control over this with the option schedule argument, like this: @threads :static .....
If anyone is familiar with OpenMP, the use of the macro above to tag the loop for parallelization will look familiar.
We could also threads to carry out a parallel map operation, implemented as a for loop.
n = 1000;
function test(n)
x = rand(Uniform(0,1), n,n);
z = x'x;
C = cholesky(z);
return(C.U[1,1]) ## Arbitrary output to assure that the computation was done.
end
a = zeros(12)
@threads for i in 1:12
a[i] = test(n);
endYou can also create (aka spawn) individual tasks on threads, with the tasks running in parallel. Tasks will be allocated to the available threads as threads become available (i.e., you can have many more tasks than threads, with the threadpool working its way through the tasks).
Let’s see an example (taken from here of sorting a vector in parallel, by sorting subsets of the vector in separate threads.
import Base.Threads.@spawn
"""
Sort the elements of `v` in place, from indices `lo` to `hi` inclusive.
"""
function psort!(v, lo::Int=1, hi::Int=length(v))
println(current_task(), ' ', lo, ' ', hi)
if lo >= hi # 1 or 0 elements; nothing to do.
return v
end
if hi - lo < 100000 # Below some cutoff, run in serial.
sort!(view(v, lo:hi), alg = MergeSort);
return v
end
mid = (lo+hi)>>>1 # Find the midpoint.
### Set up parallelization. ###
## Sort two halves in parallel, one in current call and one in a new task
## in a separate thread.
half = @spawn psort!(v, lo, mid) # Sort the lower half in new thread.
psort!(v, mid+1, hi) # Sort the upper half in the current call.
wait(half) # Wait for the lower half to finish.
temp = v[lo:mid]; # Create workspace for merging.
i, k, j = 1, lo, mid+1 # Merge the two sorted sub-arrays.
@inbounds while k < j <= hi # `@inbounds` skips array bound checking for efficiency.
if v[j] < temp[i]
v[k] = v[j]
j += 1
else
v[k] = temp[i]
i += 1
end
k += 1
end
@inbounds while k < j
v[k] = temp[i]
k += 1
i += 1
end
return v
endpsort!
How does this work? Let’s consider an example where we sort a vector of length 250000.
The vector gets split into elements 1:125000 (run in task #1) and 125001:250000 (run in the main call). Then the elements 1:125000 are split into 1:62500 (run in task #2) and 62501:125000 (run in task #1), while the elements 125001:250000 are split into 125001:187500 (run in task #3) and 187501:250000 (run in the main call). No more splitting occurs because vectors of length less than 100000 are run in serial.
Assuming we have at least four threads (including the main process), each of the tasks will run in a separate thread, and all four sorts on the vector subsets will run in parallel. With fewer threads than tasks, some tasks will have to wait.
x = rand(250000);
## Run once to invoke JIT.
psort!(x);
sort!(x);Task (runnable) @0x00007fa897db7080 1 250000
Task (runnable) @0x00007fa897db7080 125001 250000
Task (runnable) @0x00007fa897db7080 187501 250000
Task (runnable) @0x00007fa89694c010 1 125000
Task (runnable) @0x00007fa89694c1a0 125001 187500
Task (runnable) @0x00007fa89694c010 62501 125000
Task (runnable) @0x00007fa8998841a0 1 62500
We see that the output from current_task() shows that the task labels correspond with what I stated above.
I’ll use @time since using the repeated runs in @btime would end up sorting an already sorted vector.
x = rand(250000);
y = x[:];
@time psort!(x);
@time sort!(y; alg = MergeSort);Task (runnable) @0x00007fa897db7080 1 250000
Task (runnable) @0x00007fa897db7080 125001 250000
Task (runnable) @0x00007fa89694f080 1 125000
Task (runnable) @0x00007fa89694f210 125001 187500
Task (runnable) @0x00007fa89694f080 62501 125000
Task (runnable) @0x00007fa8998801a0 1 62500
Task (runnable) @0x00007fa897db7080 187501 250000
0.006713 seconds (549 allocations: 2.877 MiB)
0.258231 seconds (174.99 k allocations: 16.453 MiB, 27.29% gc time, 93.61% compilation time)
Assuming that we started Julia such that there are multiple threads available (and also needing to account for the compilation time), we see that the parallel sort is faster than the non-parallel sort, though I was getting somewhat varied timing results on repeated runs.
The number of tasks running in parallel will be at most the number of threads set in the Julia session.
You can see the number of threads available:
Threads.nthreads()4
You can control the number of threads used for threading in Julia (apart from linear algebra) either by:
JULIA_NUM_THREADS environment variable in the shell before starting Julia, or-t (or --threads) flag, e.g.: julia -t 4.Note that we can’t use OMP_NUM_THREADS as the Julia threading is not based on OpenMP.
Surprisingly, I’m not seeing a way to change the number of threads after Julia starts.
This material is drawn from this SCF tutorial.
We’d want to think about how JIT compilation fits into the multi-process parallelization. I have not considered that in these materials.
We can use pmap to run a parallel map operation across multiple Julia processes (on one or more machines). pmap is good for cases where each task takes a non-negligible amount of time, as there is overhead (latency) in starting the tasks.
Here we’ll carry out multiple computationally expensive calculations in the map.
using Distributed
if nprocs() == 1
addprocs(4)
end
nprocs()WARNING: using Distributed.@spawn in module Main conflicts with an existing identifier.
5
We need to import packages and create the function on each of the worker processes using @everywhere.
@everywhere begin
using Distributions
using LinearAlgebra
function test(n)
x = rand(Uniform(0,1), n,n)
z = x'x
C = cholesky(z)
return C.U[2,3] ## Arbitrary output to assure that the computation was done.
end
end
result = pmap(test, repeat([5000],12))12-element Vector{Float64}:
11.937251860428423
11.883408190400335
11.95987057332551
11.711543109565259
11.739308301710718
11.253047211559021
10.928092406221712
11.493726016418318
11.235452679899693
11.56918006302922
11.580243164873602
11.924951129390438
One can use static allocation (prescheduling) with the batch_size argument, thereby assigning that many tasks to each worker to reduce latency.
for loopsOne can execute for loops in parallel across multiple worker processes as follows. This is particularly handy for cases where one uses a reduction operator (e.g., the + here) so that little data needs to be copied back to the main process. (And in this case we don’t copy any data to the workers either.)
Here we’ll sum over a large number of random numbers with chunks done on each of the workers, comparing the time to a basic for loop.
using BenchmarkTools
function forfun(n)
sum = 0.0
for i in 1:n
sum += rand(1)[1]
end
return(sum)
end
function pforfun(n)
out = @sync @distributed (+) for i = 1:n
rand(1)[1]
end
return(out)
end
n=50000000
@time forfun(n);
@btime forfun(n); 2.587218 seconds (50.01 M allocations: 2.981 GiB, 14.32% gc time, 0.43% compilation time)
1.899 s (50000001 allocations: 2.98 GiB)
@time pforfun(n);
@btime pforfun(n); 1.736884 seconds (498.52 k allocations: 33.223 MiB, 24.03% compilation time)
484.000 ms (317 allocations: 13.36 KiB)
The use of @sync causes the operation to block until the result is available so we can get the correct timing.
Without a reduction operation, depending on what one is doing, one could end up passing a lot of data back to the main process, and this could take a lot of time. For such calculations, one would generally be better off using threaded for loops in order to take advantage of shared memory.
We’d have to look into how the random number seed is set on each worker to better understand any issues that might arise from parallel random number generation, but I believe that each worker has a different seed (but note that this does not explicitly ensure that the random number streams on the workers are distinct, as is the case if one uses the L’Ecuyer algorithm).
With multiple workers, particularly on more than one machine, one generally wants to be careful about having to copy large data objects to each worker, as that could make up a substantial portion of the time involved in the computation.
One can explicitly copy a variable to the workers in an @everywhere block by using Julia’s interpolation syntax:
x = randn(5);
println(x[1])
@everywhere begin
x = $x # copy to workers using interpolation syntax
println(pointer_from_objref(x), ' ', x[1])
end
sleep(2) # There is probably a better way to wait until all worker printing is done.-0.020421923133491972
Ptr{Nothing} @0x00007fa810ce2e90 -0.020421923133491972
From worker 4: Ptr{Nothing} @0x00007f3776e4c010 -0.020421923133491972
From worker 5: Ptr{Nothing} @0x00007fd32dc50010 -0.020421923133491972
From worker 2: Ptr{Nothing} @0x00007fb700c4c010 -0.020421923133491972
From worker 3: Ptr{Nothing} @0x00007f2b9ed50010 -0.020421923133491972
We see based on pointer_from_objref that each copy of x is stored at a distinct location in memory, even when processes are on the same machine.
Also note that if one creates a variable within an @everywhere block, that variable is available to all tasks run on the worker, so it is global’ with respect to those tasks. Note the repeated values in the result here.
@everywhere begin
x = rand(5)
function test(i)
return sum(x)
end
end
result = pmap(test, 1:12, batch_size = 3)12-element Vector{Float64}:
2.856845670431737
2.9430623763370254
2.8345321006917334
2.599262869484746
2.856845670431737
2.9430623763370254
2.8345321006917334
2.599262869484746
2.856845670431737
2.9430623763370254
2.8345321006917334
2.599262869484746
If one wants to have multiple processes all work on the same object, without copying it, one can consider using Julia’s SharedArray (one machine) or DArray from the DistributedArrays package (multiple machines) types, which break up arrays into pieces, with different pieces stored locally on different processes.
One can use the Distributed.@spawnat macro to run tasks on separate workers/processes, in a fashion similar to using Threads.@spawn to run tasks on separate threads. More details can be found here.
In addition to using processes on one machine, one can use processes across multiple machines. One can either start the processes when you start the main Julia session or you can start them from within the Julia session. In both cases you’ll need to have the ability to ssh to the other machines without entering your password. In some cases you as the user might need to set up SSH keys.
To start the processes when starting Julia, create a “machinefile” that lists the names of the machines and the number of worker processes to start on each machine.
Here’s an example machinefile:
radagast.berkeley.edu
radagast.berkeley.edu
gandalf.berkeley.edu
gandalf.berkeley.edu
Note that if you’re using Slurm on a Linux cluster, you could generate that file in the shell from within your Slurm allocation like this:
srun hostname > machinesThen start Julia like this:
julia --machine-file machinesIf anyone has used MPI for programming parallel code, the idea of a “machine file” or “host file” will look familiar.
From within Julia, you can add processes like this (first we’ll remove the existing worker processes started using addprocs() previously):
rmprocs(workers())
addprocs([("radagast", 2), ("gandalf", 2)])4-element Vector{Int64}:
6
7
8
9
To check on the number of processes:
nprocs()5
When we demo this in class, we’ll try our parallel map example (the linear algebra calculation) and check that we have workers running on the various machines using top or ps after sshing to the machines.
result = pmap(test, repeat([5000],12))