数据通过定期执行SQL查询并为结果集中的每行创建输出记录来进行加载。默认情况下,数据库中的所有表都被复制,每个表都有自己的输出topic。connector会对数据库进行监视,对新表和删除的表自动进行调整。
特性
connector可以获取到数据表中的新增数据和变化数据,通过以下的增量查询查询模式。
增量查询模式
- Incremental column
跟踪一个自增的column,只能检测到新记录,已有数据的更新不能被检测到,所以只能用于不变数据。比如流式的事实表。 - Timestamp column
跟踪一个时间戳列。因为时间戳不是独一无二的,这种模式不能保证所有更新的数据都被交付。 - Incremental and timestamp column
最具鲁棒性,将递增列与时间戳列结合在一起,每个元组(id, 时间戳)唯一的标识行更新。即使更新在部分完成后失败,未处理的更新依然会在系统恢复时被正确的监测。 - Custom query
可以使用custom query而不是拷贝整张表。
Schema进化
当启用了Avro converter时,JDBC connector支持schema进化。如果数据库表的schema发生了变化,JDBC connector可以检测到这个变化,并创建一个新的Kafka Connect schema并尝试在Schema Registry中注册新的Avro schema。能否成功注册schema取决于Schema Registry的兼容性级别,默认情况下是backward。
例如,如果从表中删除一列,更改是向后兼容的,相应的Avro模式可以在模式注册表中成功注册。如果您修改数据库表模式以更改列类型或添加列,那么当将Avro模式注册到模式注册中心时,它将被拒绝,因为更改不向后兼容。
可以修改Schema Registry的兼容级别,有两种方法:
(1)使用PUT /config/(string: subject)设置主题兼容性级别。主题包含topic-key和topic-value,topic由topic.prefix和表名组合而成。
(2)可以在Schema Registry的avro.compatibility.level中设置兼容性级别。这是一个全局设定。
然而,由于JDBC API的限制,一些兼容的schema更改可能被视为不兼容的更改。例如,添加具有默认值的列是向后兼容的更改。然而,JDBC API的限制使得将其映射到Kafka连接模式中正确类型的默认值非常困难,因此默认值目前被忽略了。其含义是,即使数据库表模式的某些更改是向后兼容的,在模式注册中心注册的模式也不是向后兼容的,因为它不包含默认值。
如果JDBC连接器和HDFS连接器一起使用,那么模式兼容性也会受到一些限制。启用Hive集成时,模式兼容性要求backforward、forward和full,以确保Hive模式能够查询主题下的全部数据。由于某些兼容的模式更改将被视为不兼容的模式更改,因此这些更改将不起作用,因为生成的Hive模式将无法查询某个主题的整个数据。