R語言多線程運算操作(解決R循環慢的問題)
已經大半年沒有更新博客瞭。。最近都跑去寫分析報告半年沒有R
這次記錄下關於R循環(百萬級以上)死慢死慢的問題,這個問題去年就碰到過,當時也嘗試過多線程,but failed……昨天試瞭下,終於跑通瞭,而且過程還挺順利
step1
先查下自己電腦幾核的,n核貌似應該選跑n個線程,線程不是越多越好,線程個數和任務運行時間是條開口向下的拋物線,最高點預計在電腦的核數上。
detectCores( )檢查當前電腦可用核數 我的是4所以step2選的是4
library(parallel) cl.cores <- detectCores()
step 2
多線程計算
setwd("C:\\Users\\siyuanmao\\Documents\\imdada\\0-渠道投放和新人券聯動模型\\測算") options(scipen=3) ##取消科學計數法 channel_ad_ios_data<-seq(0,50000,5000) channel_ad_android_data<-seq(0,100000,10000) library(parallel) func <- function(n){#n=1 result_data<-read.csv("發券方案.csv",stringsAsFactors=FALSE) total_coupon_solution_data<-read.csv("結果表框架.csv",stringsAsFactors=FALSE) coupon_solution_data<-subset(result_data,solution== paste('方案',n,sep="")) for (i in 1:11){#i=3 coupon_solution_data$channel_ad_cost[3]<-5000*(i-1) for (j in 1:11){#j=5 coupon_solution_data$channel_ad_cost[4]<-10000*(j-1) solution_mark<-paste('方案',n,i,j,sep="-") coupon_solution_data$solution<-solution_mark total_coupon_solution_data<-rbind(total_coupon_solution_data,coupon_solution_data) } } print(solution_mark) return(total_coupon_solution_data) } #func(10) system.time({ x <- 1:7776 cl <- makeCluster(4) # 初始化四核心集群 results <- parLapply(cl,x,func) # lapply的並行版本 res.df <- do.call('rbind',results) # 整合結果 stopCluster(cl) # 關閉集群 }) df=as.data.frame(res.df)
原來非多線程的時候,我預計要跑12個小時以上,電腦發出呼呼~~的響聲,查瞭下Python循環會快點,然後改為python版(已經很久沒有用瞭,連個range都不會寫,摸索瞭大半天才改好,但是速度還是慢==),於是改成多線程,運行25分鐘就出結果瞭~~
補充:R語言 多線程
parallel包
包的安裝
install.packages("parallel") library(parallel)
包中常用函數
detectCores() 檢查當前的可用核數
clusterExport() 配置當前環境
makeCluster() 分配核數
stopCluster() 關閉集群
parLapply() lapply()函數的並行版本
其實R語言本來就是一門向量化語言,如果是對於一個向量的操作,使用apply函數一族能獲得比較高的效率,相比於for循環,這種高效來自於:
用C實現瞭for循環
減少對於data.frame等數據結構等不必要的拷貝
但是很多時候,如果想更快的話,光apply函數一族還不足夠,這時候就能用上多線程。
R語言parallel包可以幫助實現多線程。
parLapply的簡單代碼實戰
檢查當前核數
cl.cores <- detectCores() #結果 > cl.cores [1] 8
啟動集群和關閉集群
cl <- makeCluster(4) # 初始化四核心集群 ###並行任務 stopCluster(cl) # 關閉集群
parLapply執行多線程計算
#定義計算平方函數 square <- function(x) { return(x^2) }
#利用並行計算計算平方函數 num <- c(1:3) cl <- makeCluster(4) # 初始化四核心集群 results <- parLapply(cl,num,square)#調用parLapply並行計算平方函數 final <- do.call('c',results)#整合結果 stopCluster(cl) # 關閉集群 #結果 > final [1] 1,4,9
思考:在如此小的計算方式下,開4個核計算是否比開一個核要快
答案:當然是不一定,因為涉及到調度方式等額外開銷,所以不一定快,因為真正並行起作用的地方在於大數據量的計算。
時間開銷對比
兩段對比代碼
#定義計算平方函數 square <- function(x) { ######### #一段冗餘代碼增加執行時間 y = 2*x if(y <300) {z = y} else {z = x} ########## return(x^2) } num <- c(1:10000000)
#並行計算 print(system.time({ cl <- makeCluster(4) # 初始化四核心集群 results <- parLapply(cl,num,square)#調用parLapply並行計算平方函數 final <- do.call('c',results)#整合結果 stopCluster(cl) # 關閉集群 })) #結果 用戶 系統 流逝 7.89 0.27 19.01
#普通計算 print(system.time({ results <- lapply(num,square) final <- do.call('c',results)#整合結果 })) #結果 用戶 系統 流逝 29.74 0.00 29.79
顯然在數據量比較大的時候,並行計算的時間幾乎就是於核數反比。不過,也不是多開幾個核就好,註意內存很容易超支的,每個核都分配相應的內存,所以要註意內存開銷。出現內存問題的時候,需要檢查是否代碼是否合理,R語言版本(64位會比32位分配的內存大),核分配是否合理。
上一級環境中變量的引入
R語言裡邊對於環境變量有著有趣的定義,一層套一層,這裡不做深入展開。
類似於在c語言函數中使用全局變量,R在執行並行計算的時候,如果需要計算的函數出現在全局(上一級),那麼就需要聲明引入這個變量,否則將會報錯。
#定義計算冪函數 base = 2 square <- function(x) { return(x^base) } num <- c(1:1000000)
#利用並行計算計算冪函數 cl <- makeCluster(4) # 初始化四核心集群 results <- parLapply(cl,num,square)#調用parLapply並行計算平方函數 final <- do.call('c',results)#整合結果 stopCluster(cl) # 關閉集群 #結果報錯 Error in checkForRemoteErrors(val) : 4 nodes produced errors; first error: 找不到對象'base'
#利用並行計算計算冪函數 cl <- makeCluster(4) # 初始化四核心集群 clusterExport(cl,"base",envir = environment()) results <- parLapply(cl,num,square)#調用parLapply並行計算平方函數 final <- do.call('c',results)#整合結果 stopCluster(cl) # 關閉集群 #結果 > final [1] 1,4,9,16,25.......
foreach包
除瞭parallel包以外,還有針對並行for循環的foreach包,foreach()的使用也與parLapply()類似,兩個功能也類似,其中遇到的問題也類似。
包的安裝
install.packages("foreach") library(parallel)
foreach的使用
#定義計算冪函數 square <- function(x) { return(x^2) }
非並行情況的使用:
參數中的combine就是整合結果的函數,可以是c,可以是rbind,也可以是+等
results = foreach(x = c(1:3),.combine = 'c') %do% square(x) #結果 > results [1] 1,4,9
並行情況的使用:
註意並行情況的時候,需要與parallel包進行配合,引入library(doParallel)。同時%do%需要改成%dopar%。另外與parallel包不一樣的是,需要多加一句registerDoParallel(cl)來註冊核進行使用。
cl <- makeCluster(4) registerDoParallel(cl) results = foreach(x = c(1:100000),.combine = 'c') %dopar% square(x) stopCluster(cl)
上一級環境中變量的引入
同parallel包並行計算前需要clusterExport()來引入全局變量一樣,foreach也同樣需要聲明,不同的是,foreach聲明方式直接寫在foreach()的參數export裡邊。
#定義計算冪函數 base = 2 square <- function(x) { return(x^base) } cl <- makeCluster(4) registerDoParallel(cl) results = foreach(x = c(1:100000),.combine = 'c',.export ='base' ) %dopar% square(x) stopCluster(cl)
以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。如有錯誤或未考慮完全的地方,望不吝賜教。
推薦閱讀:
- None Found