R語言多線程運算操作(解決R循環慢的問題)

已經大半年沒有更新博客瞭。。最近都跑去寫分析報告半年沒有R

這次記錄下關於R循環(百萬級以上)死慢死慢的問題,這個問題去年就碰到過,當時也嘗試過多線程,but failed……昨天試瞭下,終於跑通瞭,而且過程還挺順利

step1

先查下自己電腦幾核的,n核貌似應該選跑n個線程,線程不是越多越好,線程個數和任務運行時間是條開口向下的拋物線,最高點預計在電腦的核數上。

detectCores( )檢查當前電腦可用核數 我的是4所以step2選的是4

library(parallel)
cl.cores <- detectCores()

step 2

多線程計算

setwd("C:\\Users\\siyuanmao\\Documents\\imdada\\0-渠道投放和新人券聯動模型\\測算")
options(scipen=3)  ##取消科學計數法
channel_ad_ios_data<-seq(0,50000,5000)
channel_ad_android_data<-seq(0,100000,10000)
library(parallel)
func <- function(n){#n=1
  result_data<-read.csv("發券方案.csv",stringsAsFactors=FALSE)
  total_coupon_solution_data<-read.csv("結果表框架.csv",stringsAsFactors=FALSE)
  coupon_solution_data<-subset(result_data,solution== paste('方案',n,sep=""))
  
  for (i in 1:11){#i=3
    coupon_solution_data$channel_ad_cost[3]<-5000*(i-1)
    
    for (j in 1:11){#j=5
      coupon_solution_data$channel_ad_cost[4]<-10000*(j-1)
      solution_mark<-paste('方案',n,i,j,sep="-")
      coupon_solution_data$solution<-solution_mark
      
      total_coupon_solution_data<-rbind(total_coupon_solution_data,coupon_solution_data)
    }
  }
  print(solution_mark)
  return(total_coupon_solution_data)
}
#func(10)
system.time({
x <- 1:7776
cl <- makeCluster(4) # 初始化四核心集群
results <- parLapply(cl,x,func) # lapply的並行版本
res.df <- do.call('rbind',results) # 整合結果
stopCluster(cl) # 關閉集群
})
df=as.data.frame(res.df)

原來非多線程的時候,我預計要跑12個小時以上,電腦發出呼呼~~的響聲,查瞭下Python循環會快點,然後改為python版(已經很久沒有用瞭,連個range都不會寫,摸索瞭大半天才改好,但是速度還是慢==),於是改成多線程,運行25分鐘就出結果瞭~~

補充:R語言 多線程

parallel包

包的安裝

install.packages("parallel")
library(parallel)

包中常用函數

detectCores() 檢查當前的可用核數

clusterExport() 配置當前環境

makeCluster() 分配核數

stopCluster() 關閉集群

parLapply() lapply()函數的並行版本

其實R語言本來就是一門向量化語言,如果是對於一個向量的操作,使用apply函數一族能獲得比較高的效率,相比於for循環,這種高效來自於:

用C實現瞭for循環

減少對於data.frame等數據結構等不必要的拷貝

但是很多時候,如果想更快的話,光apply函數一族還不足夠,這時候就能用上多線程。

R語言parallel包可以幫助實現多線程。

parLapply的簡單代碼實戰

檢查當前核數

cl.cores <- detectCores()
#結果
> cl.cores
[1] 8

啟動集群和關閉集群

cl <- makeCluster(4) # 初始化四核心集群
###並行任務
stopCluster(cl) # 關閉集群

parLapply執行多線程計算

#定義計算平方函數
square <- function(x)
{
    return(x^2)
}
#利用並行計算計算平方函數
num <- c(1:3)
cl <- makeCluster(4) # 初始化四核心集群
results <- parLapply(cl,num,square)#調用parLapply並行計算平方函數
final <- do.call('c',results)#整合結果
stopCluster(cl) # 關閉集群
#結果
> final
[1] 1,4,9

思考:在如此小的計算方式下,開4個核計算是否比開一個核要快

答案:當然是不一定,因為涉及到調度方式等額外開銷,所以不一定快,因為真正並行起作用的地方在於大數據量的計算。

時間開銷對比

兩段對比代碼

#定義計算平方函數
square <- function(x)
{
   #########
   #一段冗餘代碼增加執行時間
    y = 2*x
    if(y <300)
    {z = y}
    else
    {z = x}
   ##########   
    return(x^2)
}
num <- c(1:10000000)
#並行計算
print(system.time({
    cl <- makeCluster(4) # 初始化四核心集群
    results <- parLapply(cl,num,square)#調用parLapply並行計算平方函數
final <- do.call('c',results)#整合結果
stopCluster(cl) # 關閉集群
}))
#結果
用戶  系統  流逝 
 7.89  0.27 19.01  
#普通計算
print(system.time({
    results <- lapply(num,square)
    final <- do.call('c',results)#整合結果
}))
#結果
用戶  系統  流逝 
29.74  0.00 29.79

顯然在數據量比較大的時候,並行計算的時間幾乎就是於核數反比。不過,也不是多開幾個核就好,註意內存很容易超支的,每個核都分配相應的內存,所以要註意內存開銷。出現內存問題的時候,需要檢查是否代碼是否合理,R語言版本(64位會比32位分配的內存大),核分配是否合理。

上一級環境中變量的引入

R語言裡邊對於環境變量有著有趣的定義,一層套一層,這裡不做深入展開。

類似於在c語言函數中使用全局變量,R在執行並行計算的時候,如果需要計算的函數出現在全局(上一級),那麼就需要聲明引入這個變量,否則將會報錯。

#定義計算冪函數
base = 2
square <- function(x)
{
    return(x^base)
}
num <- c(1:1000000)
#利用並行計算計算冪函數
cl <- makeCluster(4) # 初始化四核心集群
results <- parLapply(cl,num,square)#調用parLapply並行計算平方函數
final <- do.call('c',results)#整合結果
stopCluster(cl) # 關閉集群
#結果報錯
Error in checkForRemoteErrors(val) : 
  4 nodes produced errors; first error: 找不到對象'base'
#利用並行計算計算冪函數
cl <- makeCluster(4) # 初始化四核心集群
clusterExport(cl,"base",envir = environment())
results <- parLapply(cl,num,square)#調用parLapply並行計算平方函數
final <- do.call('c',results)#整合結果
stopCluster(cl) # 關閉集群
#結果
> final
[1] 1,4,9,16,25.......

foreach包

除瞭parallel包以外,還有針對並行for循環的foreach包,foreach()的使用也與parLapply()類似,兩個功能也類似,其中遇到的問題也類似。

包的安裝

install.packages("foreach")
library(parallel)

foreach的使用

#定義計算冪函數
square <- function(x)
{
    return(x^2)
}

非並行情況的使用:

參數中的combine就是整合結果的函數,可以是c,可以是rbind,也可以是+等

results = foreach(x = c(1:3),.combine = 'c') %do% square(x)
#結果
> results
[1] 1,4,9

並行情況的使用:

註意並行情況的時候,需要與parallel包進行配合,引入library(doParallel)。同時%do%需要改成%dopar%。另外與parallel包不一樣的是,需要多加一句registerDoParallel(cl)來註冊核進行使用。

cl <- makeCluster(4)
registerDoParallel(cl)
results = foreach(x = c(1:100000),.combine = 'c') %dopar% square(x)
stopCluster(cl)

上一級環境中變量的引入

同parallel包並行計算前需要clusterExport()來引入全局變量一樣,foreach也同樣需要聲明,不同的是,foreach聲明方式直接寫在foreach()的參數export裡邊。

#定義計算冪函數
base = 2
square <- function(x)
{
    return(x^base)
}
cl <- makeCluster(4)
registerDoParallel(cl)
results = foreach(x = c(1:100000),.combine = 'c',.export ='base' ) %dopar% square(x)
stopCluster(cl)

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。如有錯誤或未考慮完全的地方,望不吝賜教。