mssql-kafka-connector

本文章主要是对开源的kafka-connect-cdc-mssql进行编译并集成到confluent平台中,鉴于还不太熟悉部分平台的功能,仅简单介绍此次实施的步骤。因
1.安装sql-server的docker image
安装步骤参考
注意版本以及ubuntu的docker需要root用户
安装结束后使用如下命令进入sql server客户端操作
root@xh:# sqlcmd -S localhost -U SA -P '密码'  //注意密码必须是大小写加特殊字符,否则无法创建dcoker镜像

  1. 下载并编译源码
    使用maven进行编译,你需要提前下载JDBC driver,可自行搜索,下载地址,注意自己的sql-server的JDBC driver版本即可
    之后maven insatll:
    cd ~/Projects/IdeaProjects/kafka-connect-cdc-mssql //x项目所在位置
    $ mvn install:install-file -DgroupId=com.microsoft.sqlserver -DartifactId=sqljdbc4 -Dversion=6.0.7130 -Dpackaging=jar -Dfile=<path to the download> //注意你自己的sqljdbc版本即可
    然后编译,因为test环境不同,易出错,可使用如下命令:
    $ mvn clean package -Dmaven.test.skip=true
    $ cp ./target/kafka-connect-cdc-mssql-0.0.1-SNAPSHOT.jar ~/workspace/confluent-3.3.0/share/java/kafka-connect-mssql //把生成的jar包放到confluent下的/share/java/自己创建的文件夹下
  2. 启动kafka
    首先启动zookeeper
    cd ~/worksapce/kafka/zookeeper-3.3.6/ //zookeeper的目录
    bin/zkServer.sh start
    启动kafka
    cd .. //接上,kafka的目录
    sudo bin/kafka-server-start.sh config/server.properties

以下命令因不涉及到schema-registry,可暂时忽略不考虑
cd ~/workspace/confluent-3.3.0
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

  1. 配置sqlserver.properties
    之后配置mssql-kafka-connector的相关文件,并在confluent下的etc中创建目录sqlserver,且在该目录下创建sqlserver.properties,其内容如下:
name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.cdc.mssql.MsSqlSourceConnector
#connector.class=/home/xh/workspace/confluent-3.3.0/share/java/kafka-connect-mssql/kafka-connect-cdc-mssql-0.0.1-SNAPSHOT.jar

# Set these required values
initial.database=Test
server.name=10.19.138.199
password=Xyj123456.
server.port=1433
username=SA
#topicFormat.format=kafka-mssql
change.tracking.tables=dbo.Inventory2    //注意dbo是schemaName,不要写成Test(DatabaseName)
  1. 启动mssql-kafka-connector
    $ cd ~/workspace/confluent-3.3.0
    $ bin/connect-standalone etc/kafka/connect-standalone.properties etc/sqlserver/sqlserver.properties
    根据错误提示到相应的行去找相应的maven依赖(~/.m2/repository/下) 并把该依赖jar包copy到~/workspace/confluent-3.3.0/share/java/kafka-connect-mssql/下,分别如下:

《mssql-kafka-connector》 需要的jar包.png

注意其依赖的父类kafka-connector-cdc也需要下载下来且使用maven编译好,同在该作者的github上。

6.在sql-server中允许database和table的change tracking

参考网址

命令如下:

1>ALTER DATABASE 数据库名 SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)

2>go

1>ALTER TABLE 表名 ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON)

2>go

另外会出现一个提示要求你允许数据库snapshot隔离,命令如下:

1>ALTER DATABASE 数据库名 SET ALLOW_SNAPSHOT_ISOLATION ON

2>go

  1. 使用该mssql-kafka-connector
    在sql server客户端插入一条数据,另开一个kafka消费者的端口,可以看到相应的binlog输出

《mssql-kafka-connector》 kafka消费者端内容.png

接下来要去测试吞吐量和时延了~Come on!

    原文作者:九七学姐
    原文地址: https://www.jianshu.com/p/b6f45d9984e2
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞