![不仅仅是流计算:Apache Flink实践](https://wfqqreader-1252317822.image.myqcloud.com/cover/768/24045768/b_24045768.jpg)
Apache Flink类型和序列化机制简介
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0144_0001.jpg?sign=1739525673-jHh7i9tynJtGookd0wexDqx6Gh9U72rp-0-bfd551bfcaafcf8339c69f94dd16eee5)
使用Apache Flink(以下简称Flink)编写处理逻辑时,新手总是容易被林林总总的概念所混淆:
为什么Flink有那么多的类型声明方式?
BasicTypeInfo.STRING TYPE INFO、Types.STRING 、Types.STRING()有何区别?
TypeInfoFactory又是什么?
TypeInformation.of和TypeHint是如何使用的呢?
接下来本文将逐步解密Flink的类型和序列化机制。
Flink的类型分类
Flink的类型系统源码位于org.apache.flink.api.common.typeinfo包,让我们对图1深入追踪,看一下类的继承关系图:
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0145_0001.jpg?sign=1739525673-WbZEQj51zyOVJtfdJuaj43DMzdZ8R3mu-0-db1d28ba7b0b4dd50fc755f0c19c086f)
图1:Flink类型分类
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0145_0002.jpg?sign=1739525673-k5XtK8dwxDRRP8nshk102FzH0bk3jKT4-0-dc5821c4933d81eae3c5d705be1541b2)
图2:TypeInformation类继承关系图
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0146_0001.jpg?sign=1739525673-A8feYcNyFe7FDREDzupPhgSOTi3jqoeq-0-84bf191ed42641a02fc626653ecb086e)
图3:使用.returns方法声明返回类型
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0147_0001.jpg?sign=1739525673-8AI5mrYty5pvaIOe8oZRadJGZRETSPhz-0-069ba5b976b7bba9039bf7bb96203978)
图4:Flink-ML注册子类类型信息
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0147_0002.jpg?sign=1739525673-TCsw5izEWUkD9lCYeSkZDSajz1TMNto9-0-d3e2522df7b62b5d601b6d099da0f79f)
图5:Flink允许注册自定义类型
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0148_0001.jpg?sign=1739525673-jlzaT2FK3PSD0cdCqtH4IiQ19TTcTMxw-0-9df9e86d5b777b05a0dc8d6a1be49dfd)
图6:class对象作为参数
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0148_0002.jpg?sign=1739525673-2txQrPK9ag0CT1RJB8aWm63KPAn92Ny2-0-c1932a0973e268ab87ce356ef7da105d)
图7:TypeHint作为参数,保存泛型信息
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0148_0003.jpg?sign=1739525673-DrffqLobTGwkwmRuuA0FUEE0CfGZ2I67-0-23bb631c0f2f6e11e990ec01bad369a2)
图8:BasicTypeInfo快捷方式
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0149_0001.jpg?sign=1739525673-v1w2b3lfTmcrm6iLjUI4ZgLFrwBdIuvc-0-0d5a462cc81f291f041c44bebfcb6641)
图9:使用BasicTypeInfo快捷方式来声明一行(Row)每个字段的类型信息
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0149_0002.jpg?sign=1739525673-6PJPDOxCZOSaTpQFPbV5o8rUzmlSxSvb-0-91c60c1fd0596f456358c6b505323628)
图10:Types类
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0149_0003.jpg?sign=1739525673-bbRwPgfmO7iBMXqYGftNIlYGkDfyzN81-0-634be174fea8c33bb796c11d4e02ddeb)
图11:flink-table模块的Types类
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0150_0001.jpg?sign=1739525673-TD1xBlezZcfS2PGedvCScEMRgCXDLYy5-0-3c5585ec348690d2798ca8ab32f3147d)
图12:为自定义类提供类型支持(图片未展示全部字段)
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0151_0001.jpg?sign=1739525673-2q6NRO7olvJpvLZi1GzPwUpcTGigVbOt-0-2bb6fc68469de6d52957f888fdf6bee2)
图13:Flink自带的TypeSerializer子类概览
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0152_0001.jpg?sign=1739525673-WeSaLgKoPxPhk5EiY7LNRzQ9M2Yp7R0p-0-cd8140174accfc4abe8d95fdd377896e)
图14:为Kryo增加自定义的Serializer
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0152_0002.jpg?sign=1739525673-63l1irjHSmoGtnWSVDEh4JTjIRDHH0r5-0-e7a837d0e8884af5089a4201fe2b0544)
图15:为Kryo增加自定义的Serializer
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0153_0001.jpg?sign=1739525673-2j8JevDTAYJsDbnqMzn21wh7FbzZnURA-0-1d5cf0b85d9f3c65c884a3b2f1fec269)
图16:类型信息到内存块
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0153_0002.jpg?sign=1739525673-3PG2VocPtgJLhhfKsornAqtWd84Mwuu6-0-b1755fcd29ee36c3269abaa96598444b)
图17:StringSerializer类的serialize()方法
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0154_0001.jpg?sign=1739525673-PbsmLVuHPpmkk0SD57U2CFgRl70KnC72-0-ab157f4e8eff0d054040bb4837f444b0)
图18:String对象的序列化过程
可以看到,图1和图2是一一对应的,TypeInformation类是描述一切类型的公共基类,它和它的所有子类必须可序列化(Serializable),因为类型信息将会伴随Flink的作业提交,被传递给每个执行节点。
由于Flink自己管理内存,采用了一种非常紧凑的存储格式(见官方博文),因而类型信息在整个数据处理流程中属于至关重要的元数据。
TypeExtractror类型提取
Flink内部实现了名为TypeExtractror的类,可以利用方法签名、子类信息等蛛丝马迹,自动提取和恢复类型信息(当然也可以显式声明,即本文所介绍的内容)。
然而由于Java的类型擦除,自动提取并不是总是有效。因而一些情况下(例如通过URLClassLoader动态加载的类),仍需手动处理;例如下图中对DataSet变换时,使用.returns()方法声明返回类型。
这里需要说明一下,returns()接受三种类型的参数:字符串描述的类名(例如"String")、TypeHint(接下来会讲到,用于泛型类型参数)、Java原生Class(例如String.class)等;不过字符串形式的用法即将废弃,如果确实有必要,请使用Class.forName()等方法来解决。
下面是ExecutionEnvironment类的registerType方法,它可以向Flink注册子类信息(Flink认识父类,但不一定认识子类的一些独特特性,因而需要注册),下面是Flink-ML机器学习库代码的例子:
从下图可以看到,如果通过TypeExtractor.createTypeInfo(type)方法获取到的类型信息属于PojoTypeInfo及其子类,那么将其注册到一起;否则统一交给Kryo去处理,Flink并不过问(这种情况下性能会变差)。
声明类型信息的常见手段
通过TypeInformation.of()方法,可以简单地创建类型信息对象。
1.对于非泛型的类,直接传入Class对象即可
2.对于泛型类,需要借助TypeHint来保存泛型类型信息
TypeHint的原理是创建匿名子类,运行时TypeExtractor可以通过getGenericSuperclass(). getActualTypeArguments()方法获取保存的实际类型。
3.预定义的快捷方式
例如BasicTypeInfo,这个类定义了一系列常用类型的快捷方式,对于String、Boolean、Byte、Short、Integer、Long、Float、Double、Char等基本类型的类型声明,可以直接使用。
例如下面是对Row类型各字段的类型声明,使用方法非常简明,不再需要new XxxTypeInfo<>(很多很多参数)
当然,如果觉得BasicTypeInfo还是太长,Flink还提供了完全等价的Types类(org.apache.flink.api.common.typeinfo.Types):
特别需要注意的是,flink-table模块也有一个Types类(org.apache.flink.table.api.Types),用于table模块内部的类型定义信息,用法稍有不同。使用IDE的自动import时一定要小心:
4.自定义TypeInfo和TypeInfoFactory
通过自定义TypeInfo为任意类提供Flink原生内存管理(而非Kryo),可令存储更紧凑,运行时也更高效。
开发者在自定义类上使用@TypeInfo注解,随后创建相应的TypeInfoFactory并覆盖createTypeInfo方法。
注意需要继承TypeInformation类,为每个字段定义类型,并覆盖元数据方法,例如是否是基本类型(isBasicType)、是否是Tuple(isTupleType)、元数(对于一维的Row类型,等于字段的个数)等等,从而为TypeExtractor提供决策依据。
更多示例,请参考Flink源码的org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
TypeSerializer
Flink自带了很多TypeSerializer子类,大多数情况下各种自定义类型都是常用类型的排列组合,因而可以直接复用:
如果不能满足,那么可以继承TypeSerializer及其子类以实现自己的序列化器。
Kryo序列化
对于Flink无法序列化的类型(例如用户自定义类型,没有registerType,也没有自定义TypeInfo和TypeInfoFactory),默认会交给Kryo处理。
如果Kryo仍然无法处理(例如Guava、Thrift、Protobuf等第三方库的一些类),有以下两种解决方案:
1.可以强制使用Avro来替代Kryo:
env.getConfig().enableForceAvro(); // env代表ExecutionEnvironment对象,下同
2.为Kryo增加自定义的Serializer以增强Kryo的功能:
env.getConfig().addDefaultKryoSerializer(Class<? > type, Class<? extends Serializer<? >> serializerClass
以及
env.getConfig().registerTypeWithKryoSerializer(Class<? > type, T serializer)
如果希望完全禁用Kryo(100% 使用Flink的序列化机制),则可以使用以下设置,但注意一切无法处理的类都将导致异常:
env.getConfig().disableGenericTypes();
类型机制的陷阱与缺陷
金无足赤,人无完人。Flink内置的类型系统虽然强大而灵活,但仍然有一些需要注意的点:
1.Lambda函数的类型提取
由于Flink类型提取依赖于继承等机制,而lambda函数比较特殊,它是匿名的,也没有与之相关的类,所以其类型信息较难获取。
Eclipse的JDT编译器会把lambda函数的泛型签名等信息写入编译后的字节码中,而对于javac等常见的其他编译器,则不会这样做,因而Flink就无法获取具体类型信息了。
2.Kryo的JavaSerializer在Flink下存在Bug
推荐使用org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer而非
com.esotericsoftware.kryo.serializers.JavaSerializer以防止与Flink不兼容。
类型机制与内存管理
下面以StringSerializer为例,来看下Flink是如何紧凑管理内存的:
下面是具体的序列化过程:
可以看到,Flink对于内存管理是非常细致的,层次分明,代码也容易理解。