#' ---
#' title: "Parallel Computing with R"
#' ---
#'
#'
#'
## ----cache=F, message=F,warning=FALSE------------------------------------
library(knitr)
library(raster)
library(rasterVis)
library(dplyr)
library(ggplot2)
## New Packages
library(foreach)
library(doParallel)
library(arm)
library(coda)
library(ggmcmc)
library(fields)
library(snow)
#'
#' If you don't have the packages above, install them in the package manager or by running `install.packages("doParallel")`.
#'
#' # Introduction
#'
#' ## Serial Computing
#' Most (legacy) software is written for serial computation:
#'
#' * Problem broken into discrete set of instructions
#' * Instructions executed sequentially on a single processor
#'
#' 
#' Figure from [here](https://computing.llnl.gov/tutorials/parallel_comp/)
#'
#' ## Parallel computation
#'
#' Parallel computing is the simultaneous use of multiple compute resources:
#'
#' * Problem divided into discrete parts that can be solved concurrently
#' * Instructions from each part execute simultaneously on different processors
#' * An overall control/coordination mechanism is employed
#'
#' 
#' Figure from [here](https://computing.llnl.gov/tutorials/parallel_comp/)
#'
#'
#'
#' ## Flynn's taxonomy
#' A classification of computer architectures ([Flynn, 1972](http://dx.doi.org/10.1109/TC.1972.5009071))
#'
#' * *Single Instruction, Single Data (SISD)*
#' * No parallelization
#' * *Single Instruction, Multiple Data (SIMD)*
#' * Run the same code/analysis on different datasets
#' * Examples:
#' * different species in species distribution model
#' * same species under different climates
#' * different MCMC chains from a Bayesian Model
#' * *Multiple Instruction, Single Data (MISD)*
#' * Run different code/analyses on the same data
#' * Examples:
#' * One species, multiple models
#' * *Multiple Instruction, Multiple Data streams (MIMD)*
#' * Run different code/analyses on different data
#' * Examples:
#' * Different species & different models
#'
#' 
#' Figure from [here](http://en.wikipedia.org/wiki/Flynn%27s_taxonomy)
#'
#' ## Our focus: *Single Instruction, Multiple Data (SIMD)*
#' 1. Parallel functions within an R script
#' * starts on single processor
#' * runs looped elements on multiple 'slave' processors
#' * returns results of all iterations to the original instance
#' * foreach, multicore, plyr, raster
#' 2. Alternative: run many separate instances of R in parallel with `Rscript`
#' * need another operation to combine the results
#' * preferable for long, complex jobs
#' * NOT planning to discuss in this session
#'
#' ### R Packages
#' There are many R packages for parallelization, check out the CRAN Task View on [High-Performance and Parallel Computing](http://cran.r-project.org/web/views/HighPerformanceComputing.html) for an overview. For example:
#'
#' * [Rmpi](http://cran.r-project.org/web/packages/Rmpi/index.html): Built on MPI (Message Passing Interface), a de facto standard in parallel computing.
#' * [snow](http://cran.r-project.org/web/packages/snow/index.html): Simple Network of Workstations can use several standards (PVM, MPI, NWS)
#' * [parallel](https://stat.ethz.ch/R-manual/R-devel/library/parallel/doc/parallel.pdf) Built in R package (since v2.14.0).
#'
#' ---------------
#'
#' ## Foreach Package
#' In this session we'll focus on the foreach package, which has numerous advantages including:
#'
#' * intuitive `for()` loop-like syntax
#' * flexibility of choosing a parallel 'backend' for laptops through to supercomputers (using multicore, parallel, snow, Rmpi, etc.)
#' * nice options for combining output from parallelized jobs
#'
#' ### Documentation for foreach:
#' - [foreach manual](http://cran.r-project.org/web/packages/foreach/foreach.pdf)
#' - [foreach vignette](http://cran.r-project.org/web/packages/foreach/vignettes/foreach.pdf)
#' - [Nested Loops](http://cran.r-project.org/web/packages/foreach/vignettes/nested.pdf)
#'
#'
#' ### Foreach _backends_
#' - [doParallel](http://cran.r-project.org/web/packages/doParallel/index.html) best for use on multicore machines (uses `fork` on linux/mac and `snow` on windows).
#' - [doMPI](http://cran.r-project.org/web/packages/doMPI/vignettes/doMPI.pdf): Interface to MPI (Message-Passing Interface)
#' - [doSNOW](http://cran.r-project.org/web/packages/doSNOW/doSNOW.pdf): Simple Network of Workstations
#'
#'
#' # Simple examples
#'
#' ## _Sequential_ `for()` loop
## ------------------------------------------------------------------------
x=vector()
for(i in 1:3) x[i]=i^2
x
#'
#'
#'
#' ## _Sequential_ `foreach()` loop
## ------------------------------------------------------------------------
x <- foreach(i=1:3) %do% i^2
x
#'
#' Note that `x` is a list with one element for each iterator variable (`i`). You can also specify a function to use to combine the outputs with `.combine`. Let's concatenate the results into a vector with `c`.
#'
#' ## _Sequential_ `foreach()` loop with `.combine`
## ------------------------------------------------------------------------
x <- foreach(i=1:3,.combine='c') %do% i^2
x
#'
#' Tells `foreach()` to first calculate each iteration, then `.combine` them with a `c(...)`
#'
#' ## _Sequential_ `foreach()` loop with `.combine`
## ------------------------------------------------------------------------
x <- foreach(i=1:3,.combine='rbind') %do% i^2
x
#'
#' ## Your Turn
#'
#' Write a `foreach()` loop that:
#'
#' * generates 10 sets of 100 random values from a normal distribution (`rnorm()`)
#' * column-binds (`cbind`) them.
#'
#'
#'
#'
#'
#' ## _Parallel_ `foreach()` loop
#' So far we've only used `%do%` which only uses a single processor.
#'
#' Before running `foreach()` in parallel, you have to register a _parallel backend_ with one of the `do` functions such as `doParallel()`. On most multicore systems, the easiest backend is typically `doParallel()`. On linux and mac, it uses `fork` system call and on Windows machines it uses `snow` backend. The nice thing is it chooses automatically for the system.
#'
## ----cache=F,message=FALSE-----------------------------------------------
# register specified number of workers
registerDoParallel(3)
# or, reserve all all available cores
#registerDoParallel()
# check how many cores (workers) are registered
getDoParWorkers()
#'
#' > _NOTE_ It may be a good idea to use n-1 cores for processing (so you can still use your computer to do other things while the analysis is running)
#'
#' To run in parallel, simply change the `%do%` to `%dopar%`. Wasn't that easy?
#'
## ------------------------------------------------------------------------
## run the loop
x <- foreach(i=1:3, .combine='c') %dopar% i^2
x
#'
#'
#' ## A slightly more complicated example
#'
#' In this section we will:
#'
#' 1. Generate data with known parameters
#' 2. Fit a set of regression models using subsets of the complete dataset (e.g. bootstrapping)
#' 3. Compare processing times for sequential vs. parallel execution
#'
#' For more on motivations to bootstrap, see [here](http://www.sagepub.com/sites/default/files/upm-binaries/21122_Chapter_21.pdf).
#'
#' Make up some data:
## ------------------------------------------------------------------------
n <- 100000 # number of data points
x1 <- rnorm (n) # make up x1 covariate
b0 <- 1.8 # set intercept (beta0)
b1 <- -1.5 # set beta1
p = invlogit(b0+b1*x1)
y <- rbinom (n, 1, p) # simulate data with noise
data=cbind.data.frame(y=y,x1=x1,p=p)
#'
#' Let's look at the data:
## ------------------------------------------------------------------------
kable(head(data),row.names = F,digits = 2)
#'
#'
## ------------------------------------------------------------------------
ggplot(data,aes(y=x1,x=as.factor(y)))+
geom_boxplot()+
coord_flip()+
geom_line(aes(x=p+1,y=x1),col="red",size=2,alpha=.5)+
xlab("Binary Response")+
ylab("Covariate")
#'
#' ### Sampling from a dataset with `sample_n()`
#'
## ------------------------------------------------------------------------
size=5
sample_n(data,size,replace=T)
#'
#'
#' ### Simple Generalized Linear Model
#'
#' This is the formal definition of the model we'll use:
#'
#' $y_i \sim Bernoulli(p_i)$
#'
#' $logit(p_i) = \beta_0 + \beta_1 X_i$
#'
#' The index $i$ identifies each grid cell (data point). $\beta_0$ - $\beta_1$ are model coefficients (intercept and slope), and $y_i$ is the simulated observation from cell $i$.
#'
## ---- eval=T-------------------------------------------------------------
trials = 10000
tsize = 100
#'
## ------------------------------------------------------------------------
ptime <- system.time({
result <- foreach(i=1:trials,
.combine = rbind.data.frame) %dopar% {
tdata=sample_n(data,tsize,replace=TRUE)
M1=glm(y ~ x1, data=tdata, family=binomial(link="logit"))
## return parameter estimates
cbind.data.frame(trial=i,t(coefficients(M1)))
}
})
ptime
#'
#'
#' Look at `results` object containing slope and aspect from subsampled models. There is one row per sample with columns for the estimated intercept and slope for that sample.
#'
## ------------------------------------------------------------------------
kable(head(result),digits = 2)
#'
## ------------------------------------------------------------------------
ggplot(dplyr::select(result,everything(),Intercept=contains("Intercept")))+
geom_density(aes(x=Intercept),fill="black",alpha=.2)+
geom_vline(aes(xintercept=b0),size=2)+
geom_density(aes(x=x1),fill="red",alpha=.2)+
geom_vline(aes(xintercept=b1),col="red",size=2)+
xlim(c(-5,5))+
ylab("Parameter Value")+
xlab("Density")
#'
#'
#' So we were able to perform `r trials` separate model fits in `r #ptime[3]` seconds. Let's see how long it would have taken in sequence.
#'
## ------------------------------------------------------------------------
stime <- system.time({
result <- foreach(i=1:trials,
.combine = rbind.data.frame) %do% {
tdata=sample_n(data,tsize,replace=TRUE)
M1=glm(y ~ x1, data=tdata,family=binomial(link="logit"))
## return parameter estimates
cbind.data.frame(trial=i,t(coefficients(M1)))
}
})
stime
#'
#' So we were able to run `r trials` separate model fits in `r #ptime[3]` seconds when using `r foreach::getDoParWorkers()` CPUs and `r #stime[3]` seconds on one CPU. That's `r #round(stime[3]/ptime[3],1)`X faster for this simple example.
#'
#' ## Your Turn
#'
#' * Generate some random as follows:
#'
## ------------------------------------------------------------------------
n <- 10000 # number of data points
x1 <- rnorm (n) # make up x1 covariate
b0 <- 25 # set intercept (beta0)
b1 <- -15 # set beta1
y <- rnorm (n, b0+b1*x1,10) # simulate data with noise
data2=cbind.data.frame(y=y,x1=x1)
#'
#' Write a parallel `foreach()` loop that:
#'
#' * selects a sample (as above) with `sample_n()`
#' * fits a 'normal' linear model with `lm()`
#' * Compiles the coefficient of determination (R^2) from each model
#'
#' Hint: see `?summary.lm`.
#'
#'
#'
#'
#'
#' ### Writing data to disk
#' For long-running processes, you may want to consider writing results to disk _as-you-go_ rather than waiting until the end in case of a problem (power failure, single job failure, etc.).
#'
## ------------------------------------------------------------------------
## assign target directory
td=tempdir()
foreach(i=1:trials,
.combine = rbind.data.frame) %dopar% {
tdata=sample_n(data,tsize,replace=TRUE)
M1=glm(y ~ x1, data=tdata, family=binomial(link="logit"))
## return parameter estimates
results=cbind.data.frame(trial=i,t(coefficients(M1)))
## write results to disk
file=paste0(td,"/results_",i,".csv")
write.csv(results,file=file)
return(NULL)
}
#'
#' That will save the result of each subprocess to disk (be careful about duplicated file names!):
## ------------------------------------------------------------------------
list.files(td,pattern="results")%>%head()
#'
#' ### Other useful `foreach` parameters
#'
#' * `.inorder` (true/false) results combined in the same order that they were submitted?
#' * `.errorhandling` (stop/remove/pass)
#' * `.packages` packages to made available to sub-processes
#' * `.export` variables to export to sub-processes
#'
#'
#' # Spatial example
#' In this section we will:
#'
#' 1. Generate some _spatial_ data
#' 2. Tile the region to facilitate processing the data in parallel.
#' 2. Perform a moving window mean for the full area
#' 3. Compare processing times for sequential vs. parallel execution
#'
#' ## Generate Spatial Data
#'
#' A function to generate `raster` object with spatial autocorrelation.
## ------------------------------------------------------------------------
simrast=function(nx=60,ny=60,theta=10,seed=1234){
## create a random raster with some spatial structure
## Theta is the scale of an exponential decay function.
## This controls degree of autocorrelation,
## values close to 1 are close to random while values near nx/4 have high autocorrelation
r=raster(nrows=ny, ncols=nx,vals=1,xmn=-nx/2, xmx=nx/2, ymn=-ny/2, ymx=ny/2)
names(r)="z"
# Simulate a Gaussian random field with an exponential covariance function
set.seed(seed) #set a seed so everyone's maps are the same
grid=list(x=seq(xmin(r),xmax(r)-1,by=res(r)[1]),y=seq(ymin(r),ymax(r)-1,res(r)[2]))
obj<-Exp.image.cov(grid=grid, theta=theta, setup=TRUE)
look<- sim.rf( obj)
values(r)=t(look)*10
return(r)
}
#'
#' Generate a raster using `simrast`.
## ------------------------------------------------------------------------
r=simrast(nx=3000,ny=1000,theta = 100)
r
#'
#' Plot the raster showing the grid.
## ----fig.height=3--------------------------------------------------------
gplot(r)+
geom_raster(aes(fill = value))+
scale_fill_gradient(low = 'white', high = 'blue')+
coord_equal()+ylab("Y")+xlab("X")
#'
#'
#' ## "Tile" the region
#'
#' To parallelize spatial data, you often need to _tile_ the data and process each tile separately. Here is a function that will take a bounding box, tile size and generate a tiling system. If given an `overlap` term, it will also add buffers to the tiles to reduce/eliminate edge effects, though this depends on what algorithm/model you are using.
#'
## ------------------------------------------------------------------------
tilebuilder=function(raster,size=10,overlap=NULL){
## get raster extents
xmin=xmin(raster)
xmax=xmax(raster)
ymin=ymin(raster)
ymax=ymax(raster)
xmins=c(seq(xmin,xmax-size,by=size))
ymins=c(seq(ymin,ymax-size,by=size))
exts=expand.grid(xmin=xmins,ymin=ymins)
exts$ymax=exts$ymin+size
exts$xmax=exts$xmin+size
if(!is.null(overlap)){
#if overlapped tiles are requested, create new columns with buffered extents
exts$yminb=exts$ymin
exts$xminb=exts$xmin
exts$ymaxb=exts$ymax
exts$xmaxb=exts$xmax
t1=(exts$ymin-overlap)>=ymin
exts$yminb[t1]=exts$ymin[t1]-overlap
t2=exts$xmin-overlap>=xmin
exts$xminb[t2]=exts$xmin[t2]-overlap
t3=exts$ymax+overlap<=ymax
exts$ymaxb[t3]=exts$ymax[t3]+overlap
t4=exts$xmax+overlap<=xmax
exts$xmaxb[t4]=exts$xmax[t4]+overlap
}
exts$tile=1:nrow(exts)
return(exts)
}
#'
#' Generate a tiling system for that raster. Here will use only three tiles (feel free to play with this).
#'
## ------------------------------------------------------------------------
jobs=tilebuilder(r,size=1000,overlap=80)
kable(jobs,row.names = F,digits = 2)
#'
#'
#' Plot the raster showing the grid.
## ----fig.height=4,fig.width=9--------------------------------------------
ggplot(jobs)+
geom_raster(aes(x=coordinates(r)[,1],y=coordinates(r)[,2],fill = values(r)))+
scale_fill_gradient(low = 'white', high = 'blue')+
geom_rect(mapping=aes(xmin=xmin,xmax=xmax,ymin=ymin,ymax=ymax),
fill="transparent",lty="dashed",col="darkgreen")+
geom_rect(aes(xmin=xminb,xmax=xmaxb,ymin=yminb,ymax=ymaxb),
fill="transparent",col="black")+
geom_text(aes(x=(xminb+xmax)/2,y=(yminb+ymax)/2,label=tile),size=10)+
coord_equal()+ylab("Y")+xlab("X")
#'
#' ## Run a simple spatial analysis: `focal` moving window
#' Use the `focal` function from the raster package to calculate a 3x3 moving window mean over the raster.
## ---- fig.height=4,fig.width=9-------------------------------------------
stime2=system.time({
r_focal1=focal(r,w=matrix(1,101,101),mean,pad=T)
})
stime2
## plot it
gplot(r_focal1)+
geom_raster(aes(fill = value))+
scale_fill_gradient(low = 'white', high = 'blue')+
coord_equal()+ylab("Y")+xlab("X")
#'
#' That works great (and pretty fast) for this little example, but as the data (or the size of the window) get larger, it can become prohibitive.
#'
#' ## Repeat the analysis, but parallelize using the tile system.
#'
#' First write a function that breaks up the original raster, computes the focal mean, then puts it back together. You could also put this directly in the `foreach()` loop.
#'
## ------------------------------------------------------------------------
focal_par=function(i,raster,jobs,w=matrix(1,101,101)){
## identify which row in jobs to process
t_ext=jobs[i,]
## crop original raster to (buffered) tile
r2=crop(raster,extent(t_ext$xminb,t_ext$xmaxb,t_ext$yminb,t_ext$ymaxb))
## run moving window mean over tile
rf=focal(r2,w=w,mean,pad=T)
## crop to tile
rf2=crop(rf,extent(t_ext$xmin,t_ext$xmax,t_ext$ymin,t_ext$ymax))
## return the object - could also write the file to disk and aggregate later outside of foreach()
return(rf2)
}
#'
#' Run the parallelized version.
## ------------------------------------------------------------------------
registerDoParallel(3)
ptime2=system.time({
r_focal=foreach(i=1:nrow(jobs),.combine=merge,.packages=c("raster")) %dopar% focal_par(i,r,jobs)
})
#'
#' Are the outputs the same?
## ------------------------------------------------------------------------
identical(r_focal,r_focal1)
#'
#' So we were able to process the data in `r #ptime2[3]` seconds when using `r foreach::getDoParWorkers()` CPUs and `r #stime2[3]` seconds on one CPU. That's `r #round(stime2[3]/ptime2[3],1)`X faster for this simple example.
#'
#'
#' ## Parallelized Raster functions
#' Some functions in raster package also easy to parallelize.
#'
## ------------------------------------------------------------------------
ncores=2
beginCluster(ncores)
fn=function(x) x^3
system.time(fn(r))
system.time(clusterR(r, fn, verbose=T))
endCluster()
#'
#' Does _not_ work with:
#'
#' * merge
#' * crop
#' * mosaic
#' * (dis)aggregate
#' * resample
#' * projectRaster
#' * focal
#' * distance
#' * buffer
#' * direction
#'
#'
#' # High Performance Computers (HPC)
#' _aka_ *supercomputers*, for example, check out the [University at Buffalo HPC](https://www.buffalo.edu/ccr.html)
#'
#' 
#'
#' Working on a cluster can be quite different from a laptop/workstation. The most important difference is the existence of _scheduler_ that manages large numbers of individual tasks.
#'
#' ## SLURM and R
#'
#' You typically don't run the script _interactively_, so you need to edit your script to 'behave' like a normal `#!` (linux command line) script. This is easy with [getopt](http://cran.r-project.org/web/packages/getopt/index.html) package.
#'
#'
## ------------------------------------------------------------------------
cat(paste("
library(getopt)
## get options
opta <- getopt(
matrix(c(
'date', 'd', 1, 'character'
), ncol=4, byrow=TRUE))
## extract value
date=as.Date(opta$date)
## Now your script using date as an input
print(date+1)
q(\"no\")
"
),file=paste("script.R",sep=""))
#'
#' Then you can run this script from the command line like this:
## ----eval=F--------------------------------------------------------------
## Rscript script.R --date 2013-11-05
#' You will need the complete path if `Rscript` is not on your system path. For example, on OS X, it might be at `/Library/Frameworks/R.framework/Versions/3.2/Resources/Rscript`.
#'
#'
#' Or even from within R like this:
## ---- eval=F-------------------------------------------------------------
## system("Rscript script.R --date 2013-11-05",intern=T)
#'
#' ### Driving cluster from R
#'
#' Possible to drive the cluster from within R via SLURM. First, define the jobs and write that file to disk:
## ------------------------------------------------------------------------
script="script.R"
dates=seq(as.Date("2000-01-01"),as.Date("2000-12-31"),by=60)
pjobs=data.frame(jobs=paste(script,"--date",dates))
write.table(pjobs,
file="process.txt",
row.names=F,col.names=F,quote=F)
#'
#' This table has one row per task:
## ----results='markup'----------------------------------------------------
pjobs
#'
#' Now identify other parameters for SLURM.
## ----eval=FALSE----------------------------------------------------------
## ### Set up submission script
## nodes=2
## walltime=5
#'
#' ### Write the SLURM script
#'
#' [More information here](https://www.buffalo.edu/ccr.html)
#'
## ----eval=F--------------------------------------------------------------
## ### write SLURM script to disk from R
##
## cat(paste("#!/bin/sh
## #SBATCH --partition=general-compute
## #SBATCH --time=00:",walltime,":00
## #SBATCH --nodes=",nodes,"
## #SBATCH --ntasks-per-node=8
## #SBATCH --constraint=IB
## #SBATCH --mem=300
## # Memory per node specification is in MB. It is optional.
## # The default limit is 3000MB per core.
## #SBATCH --job-name=\"date_test\"
## #SBATCH --output=date_test-srun.out
## #SBATCH --mail-user=adamw@buffalo.edu
## #SBATCH --mail-type=ALL
## ##SBATCH --requeue
## #Specifies that the job will be requeued after a node failure.
## #The default is that the job will not be requeued.
##
## ## Load necessary modules
## module load openmpi/gcc-4.8.3/1.8.4
## module load R
##
## IDIR=~
## WORKLIST=$IDIR/process.txt
## EXE=Rscript
## LOGSTDOUT=$IDIR/log/stdout
## LOGSTDERR=$IDIR/log/stderr
##
## ### use mpiexec to parallelize across lines in process.txt
## mpiexec -np $CORES xargs -a $WORKLIST -p $EXE 1> $LOGSTDOUT 2> $LOGSTDERR
## ",sep=""),file=paste("slurm_script.txt",sep=""))
#'
#' Now we have a list of jobs and a qsub script that points at those jobs with the necessary PBS settings.
## ----eval=FALSE----------------------------------------------------------
## ## run it!
## system("sbatch slurm_script.txt")
## ## Check status with squeue
## system("squeue -u adamw")
#'
#' For more detailed information about the UB HPC, [see the CCR Userguide](https://www.buffalo.edu/ccr/support/UserGuide.html).
#'
#' # Summary
#' > Each task should involve computationally-intensive work. If the tasks are very small, it can take _longer_ to run in parallel.
#'
#'
#' ## Choose your method
#' 1. Run from master process (e.g. `foreach`)
#' - easier to implement and collect results
#' - fragile (one failure can kill it and lose results)
#' - clumsy for *big* jobs
#' 2. Run as separate R processes
#' - see [`getopt`](http://cran.r-project.org/web/packages/getopt/index.html) library
#' - safer for big jobs: each job completely independent
#' - update job list to re-run incomplete submissions
#' - compatible with slurm / cluster computing
#' - forces you to have a clean processing script
#'
#'
#' ## Further Reading
#'
#' * [CRAN Task View: High-Performance and Parallel Computing with R](http://cran.r-project.org/web/views/HighPerformanceComputing.html)
#' * [Simple Parallel Statistical Computing in R](www.stat.uiowa.edu/~luke/talks/uiowa03.pdf)
#' * [Parallel Computing with the R Language in a Supercomputing Environment](http://download.springer.com/static/pdf/832/chp%253A10.1007%252F978-3-642-13872-0_64.pdf?auth66=1415215123_43bf0cbf5ae8f5143b7ee309ff5e3556&ext=.pdf)