-
Notifications
You must be signed in to change notification settings - Fork 31
Description
Hi,
I am dealing with a large CyTOF dataset (>100 million cells in well over 1500 files) that requires a lot of computational power to process. To cope with such dataset, I am trying to parallelise the prepData() function. Has this been done before? Maybe it could be of help for others as datasets are just growing bigger these days.
I am working in a linux terminal with 252Gb RAM, 32 CPU and I managed to improve the reading of my .fcs files, doing so in batches building a flowset_list as an output (code below). I am not a bioinformatician so please feel free to pitch in with useful feedback and good coding practices I may be missing.
#To parallelise read.FCS function
library(flowCore)
library(parallel)
fcs_files <- meta$FCSFilePath # List of all file paths
batch_size <- 60 # 1500 divided by 25 batches (example)
file_batches <- split(fcs_files, ceiling(seq_along(fcs_files) / batch_size))
### Set up parallel workers
n_cores <- detectCores() - 1 # Leave one core for the OS
cl <- makeCluster(n_cores)
### Export necessary objects and libraries to workers
clusterExport(cl, varlist = c("file_batches", "read.flowSet"))
clusterEvalQ(cl, library(flowCore))
### Parallelize reading of batches
flowset_list <- parLapply(cl, file_batches, function(batch) { #parLapply is an ok option but mclapply is simpler
read.flowSet(batch, transformation = FALSE, truncate_max_range = FALSE)
})
lapply(flowset_list, sampleNames) # List sample names from each batch
### Stop the cluster
stopCluster(cl)
The flowset_list now goes into the modified NewPrepData(flowset_list, meta, md_col...) which keeps the same structure as original PrepData() but trying to speed up the generation of exprs matrix:
NewPrepData <- function(flowset_list, panel = NULL, md = NULL,
features = NULL, transform = TRUE, cofactor = 5,
panel_cols = list(channel = "fcs_colname",
antigen = "antigen", class = "marker_class"),
md_cols = list(
file = "file_name", id = "sample_id",
factors = c("condition", "patient_id")),
by_time = TRUE, FACS = FALSE) {
cat("PrepData: reading flowSet", "\n")
### generate the fs object required for sanity checks in the original code
fs <- Reduce(rbind2, flowset_list)
if (is.null(fs) || length(fs) == 0)
stop("No valid flowSets could be read.")
cat("PrepData: flowSet read successfully", "\n")
### original code untouched up until generating expression matrix:
# Get exprs. using parallelization
num_cores <- detectCores() - 1 # Use all available cores minus one
### Convert the flowSet to a list of flowFrames
fs_list <- flowSet_to_list(fs)
###Extract expression matrices in parallel
es_list <- mclapply(fs_list, function(frame) {
exprs(frame) # Extract the expression matrix
}, mc.cores = num_cores)
### Combine the individual matrices into a single matrix
es <- do.call(cbind, lapply(es_list, t))
rm(fs_list, es_list)
cat("PrepData: exprs matrix successfully generated", "\n")
### No changes further below
This code has worked nicely for 40 million cells (downsampled test to debug code) but for well over 100 million cells and more files I am not sure this is the most efficient way of using memory and CPUs (process gets slower and needs to use swap memory in linux terminal). Do any of you have some ideas to improve the efficiency of the current code or in general to optimise prepData() for bigger datasets?