spark dataframe全局排序id與分組後保留最大值行

正文

作為一個算法工程師,日常學習和工作中,不光要 訓練模型關註效果 ,更多的 時間 是在 準備樣本數據與分析數據 等,而這些過程 都與 大數據 spark和hadoop生態 的若幹工具息息相關。

今天我們就不在更新 機器學習算法模型 相關的內容,分享兩個 spark函數 吧,以前也在某種場景中使用過但沒有保存收藏,哎!! 事前不搜藏,臨時抱佛腳 的感覺 真是 痛苦,太耽誤幹活瞭

so,把這 兩個函數 記在這裡 以備不時 之需~

(1) 得到 spark dataframe 全局排序ID

這個函數的 應用場景 就是:根據某一列的數值對 spark 的 dataframe 進行排序, 得到全局多分區排序的全局有序ID,新增一列保存這個rank id ,並且保留別的列的數據無變化

有用戶會說,這不是很容易嗎 ,直接用 orderBy 不就可以瞭嗎,但是難點是:orderBy完記錄下全局ID 並且 保持原來全部列的DF數據

多說無益,遇到這個場景 直接copy 用起來 就知道 有多爽 瞭,同類問題 我們可以 用下面 這個函數 解決 ~

scala 寫的 spark 版本代碼:

def dfZipWithIndex(
  df: DataFrame,
  offset: Int = 1,
  colName: String ="rank_id",
  inFront: Boolean = true
) : DataFrame = {
  df.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map(ln =>
      Row.fromSeq(
        (if (inFront) Seq(ln._2 + offset) else Seq())
          ++ ln._1.toSeq ++
        (if (inFront) Seq() else Seq(ln._2 + offset))
      )
    ),
    StructType(
      (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]())
        ++ df.schema.fields ++
      (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false)))
    )
  )
}

函數調用我們可以用這行代碼調用: val ranked_df = dfZipWithIndex(raw_df.orderBy($"predict_score".desc)), 直接復制過去就可以~

python寫的 pyspark 版本代碼:

from pyspark.sql.types import LongType, StructField, StructType
def dfZipWithIndex (df, offset=1, colName="rank_id"):
    new_schema = StructType(
                    [StructField(colName,LongType(),True)]        # new added field in front
                    + df.schema.fields                            # previous schema
                )
    zipped_rdd = df.rdd.zipWithIndex()
    new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row)))
    return spark.createDataFrame(new_rdd, new_schema)

調用 同理 , 這裡我就不在進行贅述瞭。

(2)分組後保留最大值行

這個函數的 應用場景 就是: 當我們使用 spark 或則 sparkSQL 查找某個 dataframe 數據的時候,在某一天裡,任意一個用戶可能有多條記錄,我們需要 對每一個用戶,保留dataframe 中 某列值最大 的那行數據

其中的 關鍵點 在於:一次性求出對每個用戶分組後,求得每個用戶的多行記錄中,某個值最大的行進行數據保留

當然,經過 簡單修改代碼,不一定是最大,最小也是可以的,平均都ok

scala 寫的 spark 版本代碼:

// 得到一天內一個用戶多個記錄裡面時間最大的那行用戶的記錄
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions
val w = Window.partitionBy("user_id")
val result_df = raw_df
    .withColumn("max_time",functions.max("time").over(w))
    .where($"time" === $"max_time")
    .drop($"max_time")

python寫的 pyspark 版本代碼:

# pyspark dataframe 某列值最大的元素所在的那一行 
# GroupBy 列並過濾 Pyspark 中某列值最大的行 
# 創建一個Window 以按A列進行分區,並使用它來計算每個組的最大值。然後過濾出行,使 B 列中的值等於最大值 
from pyspark.sql import Window
w = Window.partitionBy('user_id')
result_df = spark.sql(raw_df).withColumn('max_time', fun.max('time').over(w))\
    .where(fun.col('time') == fun.col('time'))
    .drop('max_time')

我們可以看到: 這個函數的關鍵就是運用瞭 spark 的 window 函數 ,靈活運用 威力無窮 哦 !

到這裡,spark利器2函數之dataframe全局排序id與分組後保留最大值行 的全文 就寫完瞭 ,更多關於spark dataframe全局排序的資料請關註WalkonNet其它相關文章!

推薦閱讀: