Flink支持哪些數據類型?

一、支持的數據類型

Flink 對可以在 DataSet 或 DataStream 中的元素類型進行瞭一些限制。這樣做的原因是系統會分析類型以確定有效的執行策略。

1.Java Tuple 和 Scala Case類;

2.Java POJO;

3.基本類型;

4.通用類;

5.值;

6.Hadoop Writables;

7.特殊類型

二、Flink之Tuple類型

Tuple類型  Tuple 是flink 一個很特殊的類型 (元組類型),是一個抽象類,共26個Tuple子類繼承Tuple 他們是 Tuple0一直到Tuple25

package org.apache.flink.api.java.tuple;
​
import java.io.Serializable;
import org.apache.flink.annotation.Public;
import org.apache.flink.types.NullFieldException;
​
@Public
public abstract class Tuple implements Serializable {
    private static final long serialVersionUID = 1L;
    public static final int MAX_ARITY = 25;
    private static final Class<?>[] CLASSES = new Class[]{Tuple0.class, Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, Tuple25.class};
​
    public Tuple() {
    }
​
    public abstract <T> T getField(int var1);
​
    public <T> T getFieldNotNull(int pos) {
        T field = this.getField(pos);
        if (field != null) {
            return field;
        } else {
            throw new NullFieldException(pos);
        }
    }
​
    public abstract <T> void setField(T var1, int var2);
​
    public abstract int getArity();
​
    public abstract <T extends Tuple> T copy();
​
    public static Class<? extends Tuple> getTupleClass(int arity) {
        if (arity >= 0 && arity <= 25) {
            return CLASSES[arity];
        } else {
            throw new IllegalArgumentException("The tuple arity must be in [0, 25].");
        }
    }
​
    public static Tuple newInstance(int arity) {
        switch(arity) {
        case 0:
            return Tuple0.INSTANCE;
        case 1:
            return new Tuple1();
        case 2:
            return new Tuple2();
        case 3:
            return new Tuple3();
        case 4:
            return new Tuple4();
        case 5:
            return new Tuple5();
        case 6:
            return new Tuple6();
        case 7:
            return new Tuple7();
        case 8:
            return new Tuple8();
        case 9:
            return new Tuple9();
        case 10:
            return new Tuple10();
        case 11:
            return new Tuple11();
        case 12:
            return new Tuple12();
        case 13:
            return new Tuple13();
        case 14:
            return new Tuple14();
        case 15:
            return new Tuple15();
        case 16:
            return new Tuple16();
        case 17:
            return new Tuple17();
        case 18:
            return new Tuple18();
        case 19:
            return new Tuple19();
        case 20:
            return new Tuple20();
        case 21:
            return new Tuple21();
        case 22:
            return new Tuple22();
        case 23:
            return new Tuple23();
        case 24:
            return new Tuple24();
        case 25:
            return new Tuple25();
        default:
            throw new IllegalArgumentException("The tuple arity must be in [0, 25].");
        }
    }
}

查看源碼我們看到Tuple0一直到Tuple25

我們看flink為我們為我們構造好瞭0-25個字段的模板類

ackage org.apache.flink.api.java.tuple;
​
import java.io.ObjectStreamException;
import org.apache.flink.annotation.Public;
​
@Public
public class Tuple0 extends Tuple {
    private static final long serialVersionUID = 1L;
    public static final Tuple0 INSTANCE = new Tuple0();
​
    public Tuple0() {
    }
​
    public int getArity() {
        return 0;
    }
​
    public <T> T getField(int pos) {
        throw new IndexOutOfBoundsException(String.valueOf(pos));
    }
​
    public <T> void setField(T value, int pos) {
        throw new IndexOutOfBoundsException(String.valueOf(pos));
    }
​
    public Tuple0 copy() {
        return new Tuple0();
    }
​
    public String toString() {
        return "()";
    }
​
    public boolean equals(Object o) {
        return this == o || o instanceof Tuple0;
    }
​
    public int hashCode() {
        return 0;
    }
​
    private Object readResolve() throws ObjectStreamException {
        return INSTANCE;
    }
}

三、Tuple的使用

方式一:初始化元組

可使用靜態方法 newInstance進行元組構造 指定元組空間大小;

ex: 1 則元組隻有一個空間,則實際使用的Tuple1 字段隻有f0

ex: 12 則元組隻有兩個空間,則實際使用的Tuple2 字段隻有f0,f1

指定  Tuple元組空間大小 (可理解為字段個數)

Tuple tuple = Tuple.newInstance(1);

方式一:構造元組 

使用Tuple.newInstance(xx),指定元組空間大小的話,這樣存取雖然能夠實現,但會存在存儲索引位置使用不正確的情況,可能由於失誤操作編寫出索引越界異常,而且使用不太方便,使用Tuplex.of(數據)方法構造Tuple元組

Tuple3<String, String, String> tuple3 = Tuple3.of("test0", "test1", "test2");
System.out.println(tuple3.f0); // test0
System.out.println(tuple3.f1); // test1
System.out.println(tuple3.f2); // test2

四、Flink之POJO類型

Java和Scala的類在滿足下列條件時,將會被Flink視作特殊的POJO數據類型專門進行處理:

1.是公共類;

2.無參構造是公共的;

3.所有的屬性都是可獲得的(聲明為公共的,或提供get,set方法);

4.字段的類型必須是Flink支持的。Flink會用Avro來序列化任意的對象。

Flink會分析POJO類型的結構獲知POJO的字段。POJO類型要比一般類型好用。此外,Flink訪問POJO要比一般類型更高效。

public class WordWithCount {
    public String word;
    public int count;
    public WordWithCount() {}
    public WordWithCount(String word, int count) { this.word = word; this.count = count; }
}
    DataStream<WordWithCount> wordCounts = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2));
    wordCounts.keyBy("word");

五、Flink之基本類型

Flink支持Java和Scala所有的基本數據類型,比如 Integer,String,和Double。

六、Flink之通用類型

Flink支持大多數的Java,Scala類(API和自定義)。包含不能序列化字段的類在增加一些限制後也可支持。遵循Java Bean規范的類一般都可以使用。

所有不能視為POJO的類Flink都會當做一般類處理。這些數據類型被視作黑箱,其內容是不可見的。通用類使用Kryo進行序列/反序列化。

七、Flink之值類型Values

通過實現org.apache.flinktypes.Value接口的read和write方法提供自定義代碼來進行序列化/反序列化,而不是使用通用的序列化框架。

Flink預定義的值類型與原生數據類型是一一對應的(例如:ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue)。這些值類型作為原生數據類型的可變變體,他們的值是可以改變的,允許程序重用對象從而緩解GC的壓力。

八、Flink之Hadoop的Writable類

它實現org.apache.hadoop.Writable接口的類型,該類型的序列化邏輯在write()和readFields()方法中實現。

九、Flink之特殊類型

Flink比較特殊的類型有以下兩種:

1.Scala的 Either、Option和Try。

2.Java ApI有自己的Either實現。

Java Api 與 Scala 的 類似Either,它表示兩種可能類型的值,LeftRightEither對於錯誤處理或需要輸出兩種不同類型的記錄的運算符很有用。

類型擦除和類型推理

Java編譯器在編譯之後會丟棄很多泛型類型信息。這在Java中稱為類型擦除。這意味著在運行時,對象的實例不再知道其泛型類型。

例如,在JVM中,DataStream<String>和DataStream<Long>的實例看起來是相同的。

List<String> l1 = new ArrayList<String>();
List<Integer> l2 = new ArrayList<Integer>();
System.out.println(l1.getClass() == l2.getClass());

泛型:一種較為準確的說法就是為瞭參數化類型,或者說可以將類型當作參數傳遞給一個類或者是方法。Flink 的Java API會試圖去重建(可以做類型推理)這些被丟棄的類型信息,並將它們明確地存儲在數據集以及操作中。你可以通過DataStream.getType()方法來獲取類型,這個方法將返回一個TypeInformation的實例,這個實例是Flink內部表示類型的方式。

到此這篇關於Flink支持哪些數據類型?的文章就介紹到這瞭,更多相關Flink的數據類型內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: