We will create Kafka Cluster in Windows and provide Setup details on Stand Alone:
Perquisites:
Enable CDC on SQL :
Setting up SQL Server
Before using the SQL Server connector to monitor the changes committed on SQL Server, first enable CDC on a monitored database. Please bear in mind that CDC cannot be enabled for the primary
database.
-- ====
-- Enable Database for CDC template
-- ====
USE MyDB
GO
EXEC sys.sp_cdc_enable_db
GO
Then enable CDC for each table that you plan to monitor.
-- ====
-- Enable a Table Specifying Filegroup Option Template
-- ====
USE MyDB
GOEXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'MyTable',
@role_name = N'MyRole',
@filegroup_name = N'MyDB_CT',
@supports_net_changes = 0
GO
Verify that the user have access to the CDC table.
-- ====
-- Verify the user of the connector have access, this query should not have empty result
-- ====EXEC sys.sp_cdc_help_change_data_capture
GO
If the result is empty then please make sure that the user has privileges to access both the capture instance and CDC tables.
Install Java on Machine:
https://www.java.com/en/download/
Download Zookeeper from below:
https://zookeeper.apache.org/releases.html
Download Kafka from below:
https://kafka.apache.org/downloads
Only download .tar.gz extension files
Download Debezium SQL connector:
UnCompress or Unzip Kafka/Zookeeper/Debezium Files
Names the folders as below: [Kafka/Zookeeper]
Inside Kafka Folder :Create new folder for SQL connector
connect-plugins
Copy 4 Files Extract from Debezium tar.gz file
debezium-api-1.3.0.Final
debezium-connector-sqlserver-1.3.0.Final
debezium-core-1.3.0.Final
mssql-jdbc-7.2.2.jre8
To update ip listenser for connect →Go To Kafka/Config; Edit connect-server.Properties file to include below info; for distributed we need to do the same
listeners = PLAINTEXT://10.53.56.140:9092
To update Plugin .Path for connect →Go To Kafka/Config; Edit connect-standalone.Properties file to include below info; for distributed we need to do the same
plugin.path=
Drive name:\kafka\connect-plugins
Create SQL Config file, which contain SQL connection Configuration
Create File sqlConfig1.3.properties; To avoid dead lock use Read_Uncommited from SQL
name=SIPShipperProducer
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
database.hostname=101.55.55.155
database.port=1433
database.user=kafkaread
database.password=kafkaread
database.dbname=SIP
database.server.name=SIP
table.include.list=SIP.Shipper
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=SIPShipper
snapshot.isolation.mode=read_uncommitted
For more details on the configuration properties refer to below document:
PropertyDefaultDescriptionsnapshot.mode
initialA mode for taking an initial snapshot of the structure and optionally data of captured tables. Once the snapshot is complete, the connector will continue reading change events from the database’s redo logs.
Supported values are:initial
: Takes a snapshot of structure and data of captured tables; useful if topics should be populated with a complete representation of the data from the captured tables.initial_only
: Takes a snapshot of structure and data like initial
but instead does not transition into streaming changes once the snapshot has completed.schema_only
: Takes a snapshot of the structure of captured tables only; useful if only changes happening from now onwards should be propagated to topics.snapshot.isolation.mode
repeatable_readMode to control which transaction isolation level is used and how long the connector locks the monitored tables. There are five possible values: read_uncommitted
, read_committed
, repeatable_read
, snapshot
, and exclusive
( in fact, exclusive
mode uses repeatable read isolation level, however, it takes the exclusive lock on all tables to be read).
It is worth documenting that snapshot
, read_committed
and read_uncommitted
modes do not prevent other transactions from updating table rows during initial snapshot, while exclusive
and repeatable_read
do.
Another aspect is data consistency. Only exclusive
and snapshot
modes guarantee full consistency, that is, initial snapshot and streaming logs constitute a linear history. In case of repeatable_read
and read_committed
modes, it might happen that, for instance, a record added appears twice - once in initial snapshot and once in streaming phase. Nonetheless, that consistency level should do for data mirroring. For read_uncommitted
there are no data consistency guarantees at all (some data might be lost or corrupted).
https://debezium.io/documentation/reference/1.3/connectors/sqlserver.html
For any queries on Debezium reach :https://gitter.im/debezium/user?at=5ceb63caecdf942b4c50d8c0
To get only record change from the day of setup only:
schema_only
: Takes a snapshot of the structure of captured tables only; useful if only changes happening from now onwards should be propagated to topics.
name=SIPShipperProducer
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
database.hostname=101.55.55.155
database.port=1433
database.user=kafkaread
database.password=kafkaread
database.dbname=SIP
database.server.name=SIP
table.include.list=SIP.Shipper
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=SIPShipper
snapshot.isolation.mode=read_uncommitted
snapshot.mode=schema_only
Update SQL1.3 Config with below:
Basic Setup Completed
Testing:
Start Zookeeper from Power Shell: Run below command from Kafka/Bin/Windows
./zookeeper-server-start.bat ../../config/zookeeper.Properties
Start Kafka from Power Shell: Run below command from Kafka/Bin/Windows
./kafka-server-start.bat ../../config/server.properties
Run below command to create Topics : in Kafka Directory
.\bin\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\sqlConfig1.3.properties
After Above command 3 Topics will be created for that Table in tmp folder:mp\kafka-logs
Topic 1: SIP.SIP.Shipper-0
Topic 2:SIP-0
Topic 3:SIPShipper-0
To get list of topic
./kafka-topics.bat — list — bootstrap-server localhost:9092
Consumer Testing: Run from Kafka/bin/windows folder; retrieve all the records; Make sure CDC is always running, else you can see streaming data
./kafka-console-consumer.bat — topic SIP.SIP.Shipper — from-beginning — bootstrap-server localhost:9092
*As its windows based Kafka all the command have .bat, if its linux you need to use .sh in place of .bat.
We can create Similar setup in Azure Data bricks without Windows or O-premise setup, but to do that we need to create single node Azure Data bricks spark cluster