5.3 数据序列化
数据序列化就是Java对象和二进制数据流之间的相互转换,前文提到的TypeInfomation#createSerializer接口负责创建每种类型的序列化器,进行数据的序列化。
5.3.1 数据序列化/反序列化
前文提到过TypeInfomation是数据物理类型的描述。在需要将数据进行序列化的时候,使用TypeInfomation的生成序列化器接口创建出TypeSerializer,TypeSerializer提供了序列化和反序列化能力。TypeSerializer是抽象类,所有的常见数据类型都实现了自己的序列化器TypeSerializer实现。
数据序列化、反序列化的概要过程如图5-9所示。
图5-9 序列化和反序列化过程
TypeSerializer的实现类比较多,感兴趣的读者可以查看官方的Java API文档或者源代码了解更多细节。
对于嵌套类型的数据结构,从最内层的原子字段开始进行序列化,外层的TypeSerializer负责将内层的序列化结果组装到一起。
下边通过一个实例来了解嵌套复杂结构的序列化和反序列化,如图5-10所示。
一个Tuple3<Integer,Double,Person>对象,其中前两个字段为基本类型,Person是一个自定义的Pojo类,Person类中包含了两个基本类型的字段,Tuple3<Interger,Double,Person>的类型信息描述使用TupleTypeInfo进行保存。
序列化的时候,从TupleTypeInfo中获取对应的序列化器,注意对于Tuple这种嵌套类型,Tuple的序列化器会遍历Tuple中所有字段,并根据字段的数据类型生成子序列化器,如果Tuple嵌套了其他复杂类型,则会递归地生成子序列化器。按照顺序将Tuple中的字段序列化为二进制数据,依次写入连续的内存片段中。
反序列化的时候,则会依次调用Tuple中字段对应的序列化器,从内存中顺序读取二进制数据,反序列化为Java对象,重新组装成Tuple对象。
图5-10 嵌套类型序列化示例
注意:反序列化的时候,Tuple中的每个子序列化器能够自动识别应该读取多少字节的数据,如对于int类型,读取32字节,对于String类型,则会首先读取长度部分,根据长度的数值计算出字符串的起始内存地址和应该读取的字节长度。
5.3.2 String序列化过程示例
StringSerializer中实现了serialize和deserialize方法,调用StringValue.class实现了数据的序列化和反序列化,如代码清单5-5所示。
代码清单5-5 StringSerializer序列化字符串
最终的实际序列化的动作交给StringValue.class执行,写入String的长度和String的值到java.io.DataOutput,实际上就是写入MemorySegment中,如代码清单5-6所示。
代码清单5-6 String类型数据实际序列化过程
Flink中的序列化的类实际上都调用了DataOutputView接口,DataOutputView接口继承自DataOutput接口。
反序列化的逻辑是相反的,将二进制数据流转换为UTF8编码的字符串,如代码清单5-7所示。
代码清单5-7 String类型数据反序列化过程
5.3.3 作业序列化
Flink作业在执行时,需要进行数据序列化,执行前还要先将作业的UDF代码等进行序列化,所有的UDF都需要实现Serializable接口。
这里需要特别提一下内部类序列化,在Flink中编写UDF的时候,经常会使用匿名内部类的实现方式,如代码清单5-8所示。
代码清单5-8 使用匿名内部类实现MapFunction
上边代码中的MapFunction使用了匿名内部类的方式实现,默认内部类会持有一个外部对象的引用this$0,如果外部对象不实现序列化接口,内部类的序列化会失败,所在Flink中使用ASM操作字节码将匿名内部类中的this$0设置为null。在Flink DataStreamp的map、filter、keyBy等接口中都使用了ClosureCleaner#clean方法来设置this$0。
5.3.4 Kryo序列化
Kryo的JavaSerializer在Flink下存在Bug,可能导致ClassNotFound异常。所以Flink自行维护了org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer替代Kryo的JavaSerializer。