Kafka Stream via Debezium from SQL with CDC Architecture

Naveen D
4 min readNov 4, 2020

We will create Kafka Cluster in Windows and provide Setup details on Stand Alone:

Perquisites:

Enable CDC on SQL :

Setting up SQL Server

Before using the SQL Server connector to monitor the changes committed on SQL Server, first enable CDC on a monitored database. Please bear in mind that CDC cannot be enabled for the primary database.

-- ====
-- Enable Database for CDC template
-- ====
USE MyDB
GO
EXEC sys.sp_cdc_enable_db
GO

Then enable CDC for each table that you plan to monitor.

-- ====
-- Enable a Table Specifying Filegroup Option Template
-- ====
USE MyDB
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'MyTable',
@role_name = N'MyRole',
@filegroup_name = N'MyDB_CT',
@supports_net_changes = 0
GO

Verify that the user have access to the CDC table.

-- ====
-- Verify the user of the connector have access, this query should not have empty result
-- ====
EXEC sys.sp_cdc_help_change_data_capture
GO

If the result is empty then please make sure that the user has privileges to access both the capture instance and CDC tables.

Install Java on Machine:

https://www.java.com/en/download/

Download Zookeeper from below:

https://zookeeper.apache.org/releases.html

Download Kafka from below:

https://kafka.apache.org/downloads

Only download .tar.gz extension files

Download Debezium SQL connector:

https://debezium.io/releases/

UnCompress or Unzip Kafka/Zookeeper/Debezium Files

Names the folders as below: [Kafka/Zookeeper]

Inside Kafka Folder :Create new folder for SQL connector

connect-plugins

Copy 4 Files Extract from Debezium tar.gz file

debezium-api-1.3.0.Final

debezium-connector-sqlserver-1.3.0.Final

debezium-core-1.3.0.Final

mssql-jdbc-7.2.2.jre8

To update ip listenser for connect →Go To Kafka/Config; Edit connect-server.Properties file to include below info; for distributed we need to do the same

listeners = PLAINTEXT://10.53.56.140:9092

To update Plugin .Path for connect →Go To Kafka/Config; Edit connect-standalone.Properties file to include below info; for distributed we need to do the same

plugin.path=
Drive name:\kafka\connect-plugins

Create SQL Config file, which contain SQL connection Configuration

Create File sqlConfig1.3.properties; To avoid dead lock use Read_Uncommited from SQL

name=SIPShipperProducer
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
database.hostname=101.55.55.155
database.port=1433
database.user=kafkaread
database.password=kafkaread
database.dbname=SIP
database.server.name=SIP
table.include.list=SIP.Shipper
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=SIPShipper
snapshot.isolation.mode=read_uncommitted

For more details on the configuration properties refer to below document:

PropertyDefaultDescriptionsnapshot.modeinitialA mode for taking an initial snapshot of the structure and optionally data of captured tables. Once the snapshot is complete, the connector will continue reading change events from the database’s redo logs.

Supported values are:
initial: Takes a snapshot of structure and data of captured tables; useful if topics should be populated with a complete representation of the data from the captured tables.
initial_only: Takes a snapshot of structure and data like initial but instead does not transition into streaming changes once the snapshot has completed.
schema_only: Takes a snapshot of the structure of captured tables only; useful if only changes happening from now onwards should be propagated to topics.snapshot.isolation​.moderepeatable_readMode to control which transaction isolation level is used and how long the connector locks the monitored tables. There are five possible values: read_uncommitted, read_committed, repeatable_read, snapshot, and exclusive ( in fact, exclusive mode uses repeatable read isolation level, however, it takes the exclusive lock on all tables to be read).
It is worth documenting that snapshot, read_committed and read_uncommitted modes do not prevent other transactions from updating table rows during initial snapshot, while exclusive and repeatable_read do.
Another aspect is data consistency. Only exclusive and snapshot modes guarantee full consistency, that is, initial snapshot and streaming logs constitute a linear history. In case of repeatable_read and read_committed modes, it might happen that, for instance, a record added appears twice - once in initial snapshot and once in streaming phase. Nonetheless, that consistency level should do for data mirroring. For read_uncommitted there are no data consistency guarantees at all (some data might be lost or corrupted).

https://debezium.io/documentation/reference/1.3/connectors/sqlserver.html

For any queries on Debezium reach :https://gitter.im/debezium/user?at=5ceb63caecdf942b4c50d8c0

To get only record change from the day of setup only:

schema_only: Takes a snapshot of the structure of captured tables only; useful if only changes happening from now onwards should be propagated to topics.

name=SIPShipperProducer
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
database.hostname=101.55.55.155
database.port=1433
database.user=kafkaread
database.password=kafkaread
database.dbname=SIP
database.server.name=SIP
table.include.list=SIP.Shipper
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=SIPShipper
snapshot.isolation.mode=read_uncommitted

snapshot.mode=schema_only

Update SQL1.3 Config with below:

Basic Setup Completed

Testing:

Start Zookeeper from Power Shell: Run below command from Kafka/Bin/Windows

./zookeeper-server-start.bat ../../config/zookeeper.Properties

Start Kafka from Power Shell: Run below command from Kafka/Bin/Windows

./kafka-server-start.bat ../../config/server.properties

Run below command to create Topics : in Kafka Directory

.\bin\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\sqlConfig1.3.properties

After Above command 3 Topics will be created for that Table in tmp folder:mp\kafka-logs

Topic 1: SIP.SIP.Shipper-0

Topic 2:SIP-0

Topic 3:SIPShipper-0

To get list of topic

./kafka-topics.bat — list — bootstrap-server localhost:9092

Consumer Testing: Run from Kafka/bin/windows folder; retrieve all the records; Make sure CDC is always running, else you can see streaming data

./kafka-console-consumer.bat — topic SIP.SIP.Shipper — from-beginning — bootstrap-server localhost:9092

*As its windows based Kafka all the command have .bat, if its linux you need to use .sh in place of .bat.

We can create Similar setup in Azure Data bricks without Windows or O-premise setup, but to do that we need to create single node Azure Data bricks spark cluster

--

--