连接器开发指引
该指引介绍开发者如何为Kafka Connect写一个新连接器,来在Kafka和其它系统间移动数据。先简要的回顾一些概念,然后介绍如果创建一个简单连接
核心概念和API
连接器和任务
为了在Kafka和其它系统间复制数据,用户为该系统创建了一个连接器,然后他们想从系统拉取数据或从向系统推送数据。连接器分两种类型:SourceConnectors从其它系统导入数据(例如:JDBCSourceConnector 用来导入关系统数据库到Kafka)和SinkConnectors导出数据(例如:HDFSSinkConnector 用来导出Kafka主题的内容给HDFS文件)。
连接器自已不执行任务数据的复制:它的配置描述数据的复制,连接器负责将任务分解为一组任务,这些任务可以分配给worker。这些任务也分两种类型:SourceTask和SinkTask.
处理分配任务时,每个任务必须复制数据的子集到Kafka或写入数据子集到Kafka.在Kafka Connect中,始终可以将这些分配框架化为一组输入和输出流,这些流由具有一致模式的记录组成。某些时候这些对应是明显的:在一组日志文件中的每个文件都可以被认为是一个流,每个被解析的行使用相同的模式形成一条记录,并且存储的偏移量为文件中的字节的偏移量。在其它情况下,可能需要更多的努力不映射这种模式:一个JDBC连接器可以将每个表映射成一个流,但是它的偏移量不太清楚。一个可能映射是使用timestamp字段去成生增量查询,以返回最新的数据,且最后查询到的timestamp可以当作偏移量。
流和记录
每个流应该是一个key-value记录的序列。key和value都可以有复合结构体-已经提供了话多原始类型,但是数组、对象和嵌套数据结构也可以表示。运行时数据格式不设定任何序列化格式;这个转换由框架内部处理。
此外对于key和value,记录(由源生成的记录和交付给接收的记录)还具有相关联流的ID和偏移量。框架使用他们定期提交那些已经被处理过的数据的偏移量,以便在发生故障时,可以从最后提交的的偏移量恢复处理,避免不必要事件再处理和重复。
动态连接器
不是所有的任务都是静态的,因此Connector的实现也负责监控外部系统的任何可能需要重新配置的改变。例如:在JDBCSourceConnector的例子中,Connector可能为每个任务分配了一组表。当一个新的表被创建时,这个表必须被发现,以便通过更新配置将新创建的表分配给其中一个任务。当它注意到需要重新配置的更改,它通知框架,同时框架更新任何相应的任务。
连接器示例
我们将用一个简单示例来介绍SourceConnector.SinkConnector的实现非常类似。首先创建SourceConnector的继承类,添加两个用于保存解析过的配置的字段(要读取的文件的名称和要发送数据的主题)
public class FileStreamSourceConnector extends SourceConnector {
private String filename;
private String topic;
最简单的方法是taskClass(),它定义了应该在工作进程中被实例化的用来实际读取数据类
@Override
public Class<? extends Task> taskClass() {
return FileStreamSourceTask.class;
}
下面我们将要定义FileStreamSourceTask类,然后,我们添加两个标准的生命周期方法,start()和stop():
@Override
public void start(Map<String, String> props) {
// The complete version includes error handling as well.
filename = props.get(FILE_CONFIG);
topic = props.get(TOPIC_CONFIG);
}
@Override
public void stop() {
// Nothing to do since no background monitoring is required.
}
最后,真正的核心实现是在taskConfigs()方法中,在这种情况下,我们仅仅处理一个文
件,所以,即使我们被允许根据maxTask参数生成更多的任务,我们返回只有一个实例的列表:
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
// Only one input stream makes sense.
Map<String, String> config = new HashMap<>();
if (filename != null)
config.put(FILE_CONFIG, filename);
config.put(TOPIC_CONFIG, topic);
configs.add(config);
return configs;
}
虽然在这个例子中没有使用到SourceTask,但SourceTask仍然提供了两个API来提交源系统的偏移量:commit和commitRecord.这些API是为具有对消息确认证机制源系统提供的。重写这些方法,允许源连接器在将源系统中的消息写入Kafka后,以批量或单独的方式确认他们。Commit API将偏移量保存在源系统中,直到poll返回的的偏移量止。该API的实现应该阻塞至提交完成。CommitRecord API在每条SourceReocrd被写入Kafka之后将期偏移量保存到源系统中。由于Kafka Connect会自动记录偏移量,因此,SourceTask不必实现它们。连接器确实需要确认源系统中消息的情况下,通常只需要一个API.
即使有多个任务,该方法的实现通常也非常简单。它只需要确认输入任务的数量,这可能需要它从其中提取数据的远程服务联系,然后对其进行分配。由于在任务之间拆分工作的一些模式非常常见,因此ConnectorUtils中提供了一些实用程序来简化这些情况。
注意,这个简单例子没有包括动太输入。有关如何触发任务配置更新的信息,请参阅下一节中的讨论。
任务示例-源任务
接下来我们要描述对应的SourceTask的实现。这的实现非常简短,无法在本指南中完全涵盖。我们将用伪代码来介绍大部分实现,但是你可以参考这些源代码获得完整的示例。
像连接器一样,我需要创建一个从Task继承的类。它也有一些标准的生命周期方法:
public class FileStreamSourceTask extends SourceTask {
String filename;
InputStream stream;
String topic;
@Override
public void start(Map<String, String> props) {
filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
stream = openOrThrowError(filename);
topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
}
@Override
public synchronized void stop() {
stream.close();
}
这些是稍微简化的版本,但是表明这些方法应该相对简单,并且它唯一要执行的工作是分配或释放资源。关于该实现,有两个点需要注意:首先,start()方法没有处理上个偏移量的恢复,这将在后面的小节中讨论。第二,stop()方法是同步的。这是有必须要的,因为SourceTasks被赋予了专有的线程,这样它可以无限期阻塞,所以它必须由Worker中的其它线程调用来停止它们。
接下来,我们实现任务的主函数,poll方法从输入系统获取事件,并且返回一个List<SourceRecord>
@Override
public List<SourceRecord> poll() throws InterruptedException {
try {
ArrayList<SourceRecord> records = new ArrayList<>();
while (streamValid(stream) && records.isEmpty()) {
LineAndOffset line = readToNextLine(stream);
if (line != null) {
Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);
records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
} else {
Thread.sleep(1);
}
}
return records;
} catch (IOException e) {
// Underlying stream was killed, probably as a result of calling stop. Allow to return
// null, and driving thread will handle any shutdown if necessary.
}
return null;
}
我们再次忽略了一些细节,但是我们可以看到重要步骤,poll方法被重复的调用,同时每调用都将循环尝试从文件中读取记录。对于它读取的每一行,它还会跟踪文件的偏移量。它使用这些信息来创建一个拥有四条信息的输出SourceRecord:源分区(只有一个分区,单个文件被读取),源偏移量(文件中字节偏移量),输出主题的名称和输出的值(行,包括了一个模式表明这个值将永远是一个字符串)。SourceRecord的其它构造函数的也可以包括一个指定的输出分区,一个键和头。
注意,该实现使用普通的Java ImputStream接口,并且如果数据不可用可能会休眠。这是可以被接受的,因为Kafka Connect为每个任务提供了一个专用的线程。虽然任务实现需要复合基本的poll()接口,但是它们在如何实现方面有非常多的灵活性。在这种情况下,一个基于NIO的实现可能更有效,但是这种简单的方法可以工作,实现起来很快,并且较老版本java兼容。
接收任务
前面的章节描述了如何实现一个简单的SourceTask.不像SourceConnector和SinkConnector,SourceTask和SinkTask拥有非常不一样的接口,因为SourceTask用一个拉的接口,而SinkTask使用一个推的接口。两个共享共同的生命周期方法,但是SinkTask的接口有非常大的不同:
public abstract class SinkTask implements Task {
public void initialize(SinkTaskContext context) {
this.context = context;
}
public abstract void put(Collection<SinkRecord> records);
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
}
SinkTask文件包含有完整的细节,但是该接口几乎与SourceTask一样简单。put()方法应该包含大部分的实现,接收SinkRecords的集合,执行任何必要的转换,同时将它们保存在目的系统中。该方法不需要在返回前确保数据被完全写入目的系统。实际上,在很多情况下内部缓存将会非常有用,这样可以一次性发送一批记录,从而减少了向下游数据存储插入事件的开销。SinkRecords包含与SourceReocrds相同的信息:Kafka主题,分区,偏移量,事件的键和值,和可选的头信息。
flush()方法在提交偏移量的过程中使用,它允许任务从失败中恢复和并从安全点恢复,这样就不会丢失事件。该方法应该将任何未完成的数据推送到目标系统,并且阻塞至写入被确认,offsets通常可以被忽略,但是在某些情况下非常有用,比如,现实想要将偏移量信息保存在目标系统中,以提供刚好一次交付。例如:一个HDFS连接器可以这样做,使用原子移动操作来确保flush()操作原子的将数据和偏移量提交到HDFS的一个最终位置。
从前一个偏移量恢复
SourceTask实现包每个记录的流ID(输入文件名)和偏移量(在文件中的位置)。框架定期使用这些信息来提交偏移量,这样在任务失几的情况下,该任务可以恢复,同时最少化重复处理或复制事件(或者从最新的偏移量恢复,如果Kafka Connect优雅的停止,另外,在独立模式或工作重新配置)。该提交过程完全通过框架自动执行,但是只有连接器知道如何在输入流中寻找正确的位置,以便从该位置恢复。
要在启动时正确的恢复,该任务可以使用被传入initialize()方法中的SourceContext来访问偏移量数据。在initialize()方法中,我们需要添加更多代码来读取偏移量(如果存在)并寻找到那个位置。
stream = new FileInputStream(filename);
Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
if (offset != null) {
Long lastRecordedOffset = (Long) offset.get("position");
if (lastRecordedOffset != null)
seekToOffset(stream, lastRecordedOffset);
}
当然,您可能需要为每个输入流读取许多键。OffsetStorageReader接口还允许您发出批量读取来有效地加载所有偏移量,然后通过将每个输入流查找到适当的位置来应用它们。
动态输入/输出流
Kafka连接器原本打算定义批量数据复制作业,比如复制整个数据库,而不是创建许多作业来分别复制每个表。这样设计的一个后果是,对于一个连器的输入输出流的集合可以随着时间变化。
源连接器必须监控源系统的更改,例如:数据库中表的增加和删除。当它接收到更改时,它应该通过ConnectorContext对象通知框架需要重新配置。例如,在一个SourceConnector中:
if (inputsChanged())
this.context.requestTaskReconfiguration();
框架将立刻请求新的配置信息,同时更新任务,并且在重新配置前允许它们优雅的提交它们的处理。请注意,在SourceConnector中,此监视目前由连接器实现决定。如果额外的线程被要求执行该监视,连接器必须自行分配。
现想情况下该监视变化的代码将会与Connector隔离,并且任务不必当心它们。尽管,变更也可以影响到任务,最常见的是当输入系经中的某个输入流被销毁时,例如:如果一个表被从数据库中删掉。如果该任务在连接器之前需到了这个问题,这将很普遍如果该连接器需要轮询更改,该任务将需要处理后续错误。幸运的是,这通常可以通过捕获和处理适当的异常来简单地处理。
SinkConnectors通常只需要处理添加的流,这可能转换为输出中的新条目(例如,一个新的数据库表)。框架管理着Kafka输入的任何变更,比如,因为订阅正则表达式变化而导致的入主题集变化。SinkTasks应该期待新的输入流,这可能要求在下游系统中创建新的资源,比如数据库中的一张新表。在这些情况下,要处理的最棘手的情况可能是多个SinkTasks在第一次看到一个新的输入流和同时尝试创建新资源之间的冲突。另一方面,SinkConnectors通常不需要特殊的代码来处理一组动态流。
连接器配置校验
Kafka Connect允许你在提交一个连接器去执行前校验连接器配置,并返回错误信息和建议内容。要使用这个优点,连接器开发者必须config()方法的实现,来暴定义的配置给框架。
下面在FileStreamSourceConnector中的代码定义了配置,并将其暴露给框架。
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
public ConfigDef config() {
return CONFIG_DEF;
}
ConfigDef这个类用于确定期待的配置集合。对于每个配置,你可以确定名称,类型,默认值,文档,组信息,在组内的顺序,配置值的范围以及在UI中显示的名称。另外,你可以通过重写Validator类为单个配置校验提供特殊的校验逻辑。此外,由于配置之间还可能存在依赖关系,例如:有效值和配置的可见性可能跟随其它配置值而改变。为了处理这个问题,ConfigDe允许你指定配置的依赖,并且提供一个Recommender的实现来获取有效的值和给当前配置值设置可见性配置。
此外,连接器中的validate()方法提供了一个默认的验证实现,该实现返回一个允许的配置列表以及每个配置的配置错误和推荐值。但是,它不使用建议的值进行配置验证。你可以重写默认实现来定制配置校验,这样你可以使用推荐配置。
Working with Schemas
文件流连接器是一个很好的例子,因为它简单,但是它也只有简单的数据构构-每行只是一个字符串。几乎所有的实际连接器都需要具有更复杂的数据格式的模式。
创建更复杂的数据,你将需要用到Kafka Connect的data API.除了基本类型以外,大部分结构化的记录都需要与两个类交互:Schema和Struct.
API文档提供了完整的说明,但是这里有一个创建Schema和Struct的简单例子:
Schema schema = SchemaBuilder.struct().name(NAME)
.field("name", Schema.STRING_SCHEMA)
.field("age", Schema.INT_SCHEMA)
.field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
.build();
Struct struct = new Struct(schema)
.put("name", "Barbara Liskov")
.put("age", 75);
如果你实现了一个源连接器,你需要决定何时以及如何创建schemas.要可能的情况,你应该尽可以的避免重新计算。例如:如果你的连接器保证获取一个固定的schema,静态的创建它同时复用单个实例。
然而,许多连接器需要动态的schema.一个单的例子是数据库连接器。即使只考虑单个表,也不会为整个连接器预定义模式(因为它随表而异)。同样由于用户可能执行一个ALTER TABLER命令,在连接器的的生命周期内,单个表也可能不是固定的。连接器必须能检测到这些变更,并且做出适当的应对。
Sink Connector通常比较简单,因为它们消费数据,因此不必创建schema.然则,但是它必须尽可能的小心校验接收到的schema是否为期望的格式。当schema不匹配时-通常说明上游的生产正在生成不能被正确转换到目标系统的无效数据-sink connector应该抛出一个异常,以向系统指标此错误。