pyodps中的apply用法及groupby取分組排序第一條數據

1、apply用法

apply在pandas裡非常好用的,那在pyodps裡如何去使用,還是有一些區別的,在pyodps中要對一行數據使用自定義函數,可以使用 apply 方法,axis 參數必須為 1,表示在行上操作。

apply 的自定義函數接收一個參數,為上一步 Collection 的一行數據,用戶可以通過屬性、或者偏移取得一個字段的數據。

iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3)
   sepaladd
0       8.6
1       7.9
2       7.9

reduce為 True 時,表示返回結果為Sequence,否則返回結果為Collection。 names和 types參數分別指定返回的Sequence或Collection的字段名和類型。 如果類型不指定,將會默認為string類型。

在 apply 的自定義函數中,reduce 為 False 時,也可以使用 yield關鍵字來返回多行結果。

iris.count()
150
def handle(row):
    yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count()
300

我們也可以在函數上來註釋返回的字段和類型,這樣就不需要在函數調用時再指定。

 from odps.df import output

 @output(['iris_add', 'iris_sub'], ['float', 'float'])
 def handle(row):
     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
 iris.apply(handle, axis=1).count()
 300

也可以使用 map-only 的 map_reduce,和 axis=1 的apply操作是等價的。

 iris.map_reduce(mapper=handle).count()
 300

如果想調用 ODPS 上已經存在的 UDTF,則函數指定為函數名即可。

 iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])

使用 apply 對行操作,且 reduce為 False 時,可以使用 並列多行輸出 與已有的行結合,用於後續聚合等操作。

並列多行輸出:

對於 list 及 map 類型的列,explode 方法會將該列轉換為多行輸出。使用 apply 方法也可以輸出多行。 為瞭進行聚合等操作,常常需要將這些輸出和原表中的列合並。此時可以使用 DataFrame 提供的並列多行輸出功能, 寫法為將多行輸出函數生成的集合與原集合中的列名一起映射。

並列多行輸出的例子如下:

>>> df
   id         a             b
0   1  [a1, b1]  [a2, b2, c2]
1   2      [c1]      [d2, e2]
>>> df[df.id, df.a.explode(), df.b]
   id   a             b
0   1  a1  [a2, b2, c2]
1   1  b1  [a2, b2, c2]
2   2  c1      [d2, e2]
>>> df[df.id, df.a.explode(), df.b.explode()]
   id   a   b
0   1  a1  a2
1   1  a1  b2
2   1  a1  c2
3   1  b1  a2
4   1  b1  b2
5   1  b1  c2
6   2  c1  d2
7   2  c1  e2

如果多行輸出方法對某個輸入不產生任何輸出,默認輸入行將不在最終結果中出現。如果需要在結果中出現該行,可以設置 keep_nulls=True

此時,與該行並列的值將輸出為空值:

>>> df
   id         a
0   1  [a1, b1]
1   2        []
>>> df[df.id, df.a.explode()]
   id   a
0   1  a1
1   1  b1
>>> df[df.id, df.a.explode(keep_nulls=True)]
   id     a
0   1    a1
1   1    b1
2   2  None

 from odps.df import output
 @output(['iris_add', 'iris_sub'], ['float', 'float'])
 def handle(row):
     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
 iris[iris.category, iris.apply(handle, axis=1)]

pyodps中有很多本來在pandas中一個API解決的東西卻要想半天才能搞定。

pandas中在groupby後隻要用first就可以去出分組後的第一行。

例如:

# 以student_id為分組列,然後取出分組後每組的第一條數據
df_stu_frist_course = df_stu_course.groupby('student_id').first()

2、取分組排序後的第一條數據

然而pyodps中卻很坑爹,沒有什麼first,隻能自己想辦法。這裡我又添加瞭一個排序

例如:

首先用student_id進行分組,然後用student_id和gmt_create進行排序,最後用窗口函數nth_value取分組中的第一個值並改名first_course_id, 並將其他字段輸出

df_group = df.groupby('student_id')
df_inst_stu_cou = df_inst_stu_cou['student_id', df_group.sort(['student_id', 'gmt_create'], ascending=[True, False]).course_id.nth_value(0).rename('first_course_id')]

但是這是並不是取出第一行,而是將所有以student_id分為一組的數據的其他列數據都改為排序後的第一個值, 也就是說原df_inst_stu_cou還沒有分組,隻是添加瞭分組的後取出第一個值的一列,所以我們要以student_id分組去重。
所以我們隻要再以student_id分組,然後用聚合函數cat將其他所有的列按照行進行連接(這裡我的連接符選擇瞭逗號),然後在map函數中用split分割成列表取第一個即可

df = df.groupby('student_id').agg(df.first_course_id.cat(sep=',').rename('first_course_ids'))
df['first_course_name'] = df.first_course_names.map(lambda x: x.split(',')[0], 'string')

到此這篇關於pyodps中的apply用法及groupby取分組排序第一條數據的文章就介紹到這瞭,更多相關pyodps apply的用法 內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: