Java Flink窗口觸發器Trigger的用法詳解
定義
Trigger確定窗口(由窗口分配器形成)何時準備好由窗口函數處理。每個WindowAssigner都帶有一個默認值Trigger。如果默認觸發器不符合您的需求,您可以使用trigger(…)。
Trigger 源碼
public abstract class Trigger<T, W extends Window> implements Serializable { /** 隻要有元素落⼊到當前窗⼝, 就會調⽤該⽅法 * @param element 收到的元素 * @param timestamp 元素抵達時間. * @param window 元素所屬的window窗口. * @param ctx ⼀個上下⽂對象,通常⽤該對象註冊 timer(ProcessingTime/EventTime) 回調. */ public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception; /** * processing-time 定時器回調函數 * * @param time 定時器觸發的時間. * @param window 定時器觸發的窗口對象. * @param ctx ⼀個上下⽂對象,通常⽤該對象註冊 timer(ProcessingTime/EventTime) 回調. */ public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception; /** * event-time 定時器回調函數 * * @param time 定時器觸發的時間. * @param window 定時器觸發的窗口對象. * @param ctx ⼀個上下⽂對象,通常⽤該對象註冊 timer(ProcessingTime/EventTime) 回調. */ public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception; /** * 當 多個窗口合並到⼀個窗⼝的時候,調用該方法法,例如系統SessionWindow * * @param window 合並後的新窗口對象 * @param ctx ⼀個上下⽂對象,通常用該對象註冊 timer(ProcessingTime/EventTime)回調以及訪問狀態 */ public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception { throw new UnsupportedOperationException("This trigger does not support merging."); } /** * 當窗口被刪除後執⾏所需的任何操作。例如:可以清除定時器或者刪除狀態數據 */ public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception; }
TriggerResult 源碼
public enum TriggerResult { // 表示對窗口不執行任何操作。即不觸發窗口計算,也不刪除元素。 CONTINUE(false, false), // 觸發窗口計算,輸出結果,然後將窗口中的數據和窗口進行清除。 FIRE_AND_PURGE(true, true), // 觸發窗口計算,但是保留窗口元素 FIRE(true, false), // 不觸發窗口計算,丟棄窗口,並且刪除窗口的元素。 PURGE(false, true); private final boolean fire; private final boolean purge; private TriggerResult(boolean fire, boolean purge) { this.purge = purge; this.fire = fire; } public boolean isFire() { return this.fire; } public boolean isPurge() { return this.purge; } }
一旦觸發器確定窗口已準備好進行處理,就會觸發,返回狀態可以是FIRE或FIRE_AND_PURGE。其中FIRE是觸發窗口計算並保留窗口內容,而FIRE_AND_PURGE是觸發窗口計算並刪除窗口內容。默認情況下,預實現的觸發器隻是簡單地FIRE不清除窗口狀態。
Flink 預置的Trigger
- EventTimeTrigger:通過對比EventTime和窗口的Endtime確定是否觸發窗口計算,如果EventTime大於Window EndTime則觸發,否則不觸發,窗口將繼續等待。
- ProcessTimeTrigger:通過對比ProcessTime和窗口EndTme確定是否觸發窗口,如果ProcessTime大於EndTime則觸發計算,否則窗口繼續等待。
- ContinuousEventTimeTrigger:根據間隔時間周期性觸發窗口或者Window的結束時間小於當前EndTime觸發窗口計算。
- ContinuousProcessingTimeTrigger:根據間隔時間周期性觸發窗口或者Window的結束時間小於當前ProcessTime觸發窗口計算。
- CountTrigger:根據接入數據量是否超過設定的闕值判斷是否觸發窗口計算。
- DeltaTrigger:根據接入數據計算出來的Delta指標是否超過指定的Threshold去判斷是否觸發窗口計算。
- PurgingTrigger:可以將任意觸發器作為參數轉換為Purge類型的觸發器,計算完成後數據將被清理。
- NeverTrigger:任何時候都不觸發窗口計算
主要看看EventTimeTrigger和ProcessingTimeTrigger的源碼。
EventTimeTrigger源碼
public class EventTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private EventTimeTrigger() { } public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteEventTimeTimer(window.maxTimestamp()); } public boolean canMerge() { return true; } public void onMerge(TimeWindow window, OnMergeContext ctx) { long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentWatermark()) { ctx.registerEventTimeTimer(windowMaxTimestamp); } } public String toString() { return "EventTimeTrigger()"; } public static EventTimeTrigger create() { return new EventTimeTrigger(); } }
ProcessingTimeTrigger源碼
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private ProcessingTimeTrigger() { } public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { ctx.registerProcessingTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE; } public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteProcessingTimeTimer(window.maxTimestamp()); } public boolean canMerge() { return true; } public void onMerge(TimeWindow window, OnMergeContext ctx) { long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); } } public String toString() { return "ProcessingTimeTrigger()"; } public static ProcessingTimeTrigger create() { return new ProcessingTimeTrigger(); } }
在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())將會註冊一個ProcessingTime定時器,時間參數是window.maxTimestamp(),也就是窗口的最終時間,當時間到達這個窗口最終時間,定時器觸發並調用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,觸發窗口中數據的計算,但是會保留窗口元素。
需要註意的是ProcessingTimeTrigger類隻會在窗口的最終時間到達的時候觸發窗口函數的計算,計算完成後並不會清除窗口中的數據,這些數據存儲在內存中,除非調用PURGE或FIRE_AND_PURGE,否則數據將一直存在內存中。實際上,Flink中提供的Trigger類,除瞭PurgingTrigger類,其他的都不會對窗口中的數據進行清除。
常見窗口的Trigger
滾動窗口
TumblingEventTimeWindows :EventTimeTrigger public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } }
TumblingProcessingTimeWindows :ProcessingTimeTrigger
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } }
滑動窗口
SlidingEventTimeWindows:EventTimeTrigger public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } }
SlidingProcessingTimeWindows :ProcessingTimeTrigger
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } }
會話窗口
EventTimeSessionWindows:EventTimeTrigger public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } }
ProcessingTimeSessionWindows:ProcessingTimeTrigger
public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } }
全局窗口
GlobalWindows :NeverTrigger public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> { public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return new GlobalWindows.NeverTrigger(); } }
到此這篇關於Java Flink窗口觸發器Trigger的用法詳解的文章就介紹到這瞭,更多相關Java Flink窗口觸發器內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- spring schedule實現動態配置執行時間
- Redis分佈式非公平鎖的使用
- Springboot自帶定時任務實現動態配置Cron參數方式
- Java中的BaseTypeHandler自定義類型轉換器的使用
- Java中的自定義異常捕獲方式