3

I have access to a HPC system. Let's say I have three nodes/system available. Details of each node is as follows:

scontrol show node

   Arch=x86_64 CoresPerSocket=10
   CPUAlloc=20 CPUTot=20 CPULoad=22.67
   AvailableFeatures=(null)
   ActiveFeatures=(null)
   Gres=(null)
   .
   .
   RealMemory=91000 AllocMem=0 FreeMem=77291 Sockets=2 Boards=1
   State=ALLOCATED ThreadsPerCore=1 TmpDisk=0 Weight=1 Owner=N/A MCS_label=N/A
   Partitions=cpu_normal_q
   BootTime=2023-10-20T12:56:13 SlurmdStartTime=2023-10-20T12:57:43
   CfgTRES=cpu=20,mem=91000M,billing=20
   AllocTRES=cpu=20
   CapWatts=n/a
   CurrentWatts=0 AveWatts=0
   ExtSensorsJoules=n/s ExtSensorsWatts=0 ExtSensorsTemp=n/s

I have a R-code which utilizes doParallel package in R and performs parallel computing.

library(doParallel)
library(Matrix)

# Set the number of cores to be used
num_cores <- detectCores()

# Initialize a parallel backend using doParallel
cl <- makeCluster(num_cores)

# Register the cluster for parallel processing
registerDoParallel(cl)

# Get the number of cores being utilized
cores_utilized <- getDoParWorkers()

# Function to perform matrix multiplication and inversion
matrix_mult_inv <- function() {

  # Generate random matrices
  mat <- matrix(rnorm(10000), nrow = 100)
  
  # Perform matrix multiplication
  result <- mat %*% mat
  
  # Compute the inverse of the result matrix
  inv_result <- solve(result)
  
  return(inv_result)
}

# Record the start time
start_time <- Sys.time()

# Perform the matrix multiplication and inversion in parallel
result <- foreach(i = 1:300, .combine = cbind) %dopar% {
  write.table(matrix_mult_inv(),paste("iteration_", i, ".txt", sep = ""))
}

# Record the end time
end_time <- Sys.time()

# Print the number of cores being utilized
print(paste("Number of cores being utilized:", cores_utilized))

# Print the time taken to run all the iterations:
print(paste("Time taken:", end_time - start_time))

# Stop the parallel backend
stopCluster(cl)

The code is designed to perform 300 iterations and within each iteration, matrix multiplication and inversion is done. The output is the total time taken to run 300 iterations and number of cores utilized.

My goal is to run this code in the HPC environment such that 20 cores from each system is utilized simultaneously so that I have 60 cores in total. Is it possible to do that?

I have looked into parSapply from snow package as well, but I think ultimately it depends on the makeCluster fucntion. I tried with

cl <- makeCluster(num_nodes, type = "SOCK", explicit = TRUE,
                  outfile = "", nodes = c(#3 specific node names input here#),
                  cpus = cores_per_node)

but this just utilized 3 cores in total.

2
  • Are you working on a cluster with a Slurm workload manager? Do you have a submission shell script to submit via sbatch? Commented Apr 26 at 2:52
  • @GeorgeOstrouchov Yes! I have tried different combinations of --ntasks, --cpus-per-task and --nodes, but it all runs on single node utilizing only 20 cores. I am also looking into OpenMPI if that helps. So far have not been able to utilize it. Commented Apr 26 at 12:53

1 Answer 1

2

Much of parallel computing development in R is aimed at laptops and at making Windows, Macs, and Unix look the same (by doing different things under the covers). This leads to confusion and often inefficiency when transitioning to a Unix cluster. The package pbdMPI was developed with clusters and their standard MPI practices specifically in mind. Here is a rewrite of your code with pbdMPI. Some explanations are commented in the code.

The main thing to realize is that clusters are designed to run batch jobs and your script.R is a generalization of a serial code. Several instances of this identical code are started asynchronously by mpirun in your submission shell script. The codes differ only by their assigned rank. Most rank management is automated, even for some complex interactions between the ranks. Rscript R sessions are tasks in Slurm language and ranks in MPI language (usually!).

library(pbdMPI)
library(Matrix)

# Function to perform matrix multiplication and inversion
matrix_mult_inv = function() {
  
  # Generate random matrices
  mat = matrix(rnorm(10000), nrow = 100)
  
  # Perform matrix multiplication
  result = mat %*% mat
  
  # Compute the inverse of the result matrix
  inv_result = solve(result)
  
  return(inv_result)
}

n_mat = 300
my_mats = comm.chunk(n_mat, form = "vector") # split the work

size = comm.size()
rank = comm.rank()
comm.cat("Rank:", rank, "working on matrices:", my_mats, "\n", all.rank = TRUE)

mat_list = list(length(my_mats))
for(i in seq_along(my_mats)) {
  mat_list[[i]] = matrix_mult_inv()
  ## write.table(mat, paste("iteration_", my_mats[i], ".txt", sep = ""))
}

## It is probably faster to combine to rank 0 in memory (if not too big)
## you can also combine to all ranks with allgather() - just as fast
my_cbind_mats = do.call(cbind, mat_list)
all_mats_r0 = gather(my_cbind_mats) # only rank 0 gets result, others get NULL
if(rank == 0) {
  all_mats_r0 = do.call(cbind, all_mats_r0)
  cat("Total dim:", dim(all_mats_r0), "\n")
  ## Here you can do further computation with all_mats_r0
}
## Or if you used allgather(), you'd have all_mats on all ranks (memory permitting)

## if you want to use parallel::mclapply() within ranks, you'll need
num_cores = Sys.getenv("SLURM_CPUS_PER_TASK")
comm.cat("my_cores:", num_cores, "\n", all.rank = TRUE)

finalize() # required! for graceful exit

To run this code on a Slurm cluster, save it in script.R, save the following script in script.sh and submit on a login node with sbatch script.sh.

#!/bin/bash
#SBATCH --job-name myjob
#SBATCH --account=<your-account>
#SBATCH --partition=<your-cpu-partition>
#SBATCH --mem=64g
#SBATCH --nodes=2
#SBATCH --cpus-per-task=1
#SBATCH --tasks-per-node=8
#SBATCH --time 00:20:00
#SBATCH -e ./myjob.e
#SBATCH -o ./myjob.o

pwd
module load r  ## this may differ on your cluster
module list

time mpirun -np 16 Rscript script.R

All of <your...> tokens above need to be changed to your cluster requirements. I set this up for 2 nodes, 8 R sessions on each node, and 1 core per session. So you are running 16 identical R codes, which decide what to do differently based on their rank. For example, comm.chunk() returns a different result to each rank, so that each rank works on different data.

I've added a printing of a comm.print() to show how many Rscripts are running and which matrices they are handling. Some overprinting is still possible here as printing is handed off between several processes on a cluster.

Timing of the code is best done in the shell script (prepending time to mpirun) because the ranks run asynchronously and any internal timing may not reflect the collective performance. The timing goes to error output in file myjob.e Your regular output goes to myjob.o. See the pbdMPI package documentation for more information on, for example, management of printing with comm.print() and comm.cat().

I commented out your write.table() but it can be used as is because you have each rank writing to different files. On the other hand, you may do further parallel processing with the matrices before writing some results.

All of these should be configured to your needs. The 1 core per session can be increased if you use some other multithreaded code inside each rank (such as mclapply() for shared-memory advantages). Just be aware of the total cores on a node and do not oversubscribe.

When using MPI (via pbdMPI) and fork (via mclapply()) on 6 nodes of 20 cores each, you can have 120-way parallelism. MPI allows you to run from 6 ranks (with 20 cores available for mclapply() within each rank) to 120 ranks (with only 1 core available within each rank). This is because MPI instances can run on shared or distributed memory systems, but they never share memory.

The advantage to letting mclapply() handle some parallelism is that it does share memory, providing a smaller memory footprint by not duplicating its data. There are some more nuances to this because MPI does parallel aggregations (like allreduce() and allgather()), whereas mclapply() does not. So if memory is not an issue, sometimes it is worthwhile to push MPI into the nodes.

6
  • I am trying to install pbdMPI package in R in the HPC systems. Getting the error: ``` *** caught bus error *** address -#some numbers and letters#-, cause 'non-existent physical address' ``` Any suggestions why this is happening or how to overcome it. Commented Apr 29 at 13:16
  • @ArnoneelSinha You mentioned OpenMPI earlier. Do you need to module load openmpi? What do you get for module list? Commented Apr 29 at 16:57
  • I loaded the module before installing the package in R. I am assuming this could be related to not having elevated privileges. I have asked support for if they can solve it. PS - Only reason I haven't marked your solution or haven't upvoted it yet is because I am yet to reproduce the code. Once I do that I will definitely do those. Commented Apr 29 at 19:14
  • @ArnoneelSinha Any luck with support for the install? On the web, some bus errors result from a compiler or compiler options mismatch: possibly the OpenMPI library compiled with a compiler different from R. Seeing the result of your module list might identify the compiler used for OpenMPI and R Commented May 3 at 18:09
  • 1
    If your 10,000 iterations are independent, then yes you can parallelize them with any combination of ranks and forked processes. What decides how to allocate between the two would be the memory needed and the types of aggregations needed. I added a couple of paragraphs on this in the answer above. Commented May 15 at 3:24

Not the answer you're looking for? Browse other questions tagged or ask your own question.