CDC(Change Data Capture) with Kafka
CDC(Change Data Capture) with Kafka
Kafka Connect
- Kafka Connect를 이용한 CDC(Change Data Capture)를 통해 주문팀에서 생성된 데이터가 추천상품을 위해, 패턴 분석이 필요한 마케팅팀에 동기화 되는지를 실습한다.
- Connect는 Connector를 실행시켜주는 서버로 DB동기화시, 벤더사가 만든 Connector, 또는 OSS(Debezium, Confluent) 계열의 Connector를 사용한다.
- Lab에서는 경량의 h2 DB를 사용한다.
Connector, H2 database 다운로드
- H2 DB와 Kafka Connect를 위한 JDBC 드라이브를 다운로드한다.
git clone https://github.com/acmexii/kafka-connect.git
cd kafka-connect
- h2-database 아카이브를 압축해제한다.
mkdir ./h2
unzip h2.zip ./h2/
H2 데이터베이스 실행
- bin 폴더로 이동해 h2 database를 서버모드로 실행한다.
cd ./h2/bin
chmod 755 h2.sh
./h2.sh -webPort 8087 -tcpPort 9099
- 지정한 webPort로 Client WebUI가 접근 가능하다.
- h2 database는 9099포트(default 9092)로 실행된다.
Kafka 설치 및 실행
새로운 터미널에서 kafka를 수동으로 설치한다.
cd kafka-connect
curl "https://archive.apache.org/dist/kafka/2.7.1/kafka_2.13-2.7.1.tgz" -o ./kafka-2.7.1.tgz
tar xvfz kafka-2.7.1.tgz
cd kafka_2.13-2.7.1/
bin/zookeeper-server-start.sh config/zookeeper.properties &
- 새로운 터미널에서 kafka 데몬을 실행한다.
cd kafka-connect
cd kafka_2.13-2.7.1/
bin/kafka-server-start.sh config/server.properties &
Kafka JDBC Connector 설치
- Jdbc Connector를 설치된 Kafka 서버에 등록하고 사용한다.
- Connector를 설치할 폴더를 생성한다.
cd kafka-connect/kafka_2.13-2.7.1/
export kafka_home=$PWD
mkdir connectors
cd connectors
- 다운받은 confluentinc-kafka-connect-jdbc-10.2.5.zip을 복사 후 unzip 한다.
cp ../../confluentinc-kafka-connect-jdbc-10.2.5.zip ./
unzip confluentinc-kafka-connect-jdbc-10.2.5.zip
Connect 서버에 Connector 등록
- kafka Connect에 설치한 Confluent jdbc Connector를 등록한다.
- $kafka_home/config 폴더로 이동 후 connect-distributed.properties 파일 오픈하고,
cd $kafka_home/config
vi connect-distributed.properties
- 마지막 행으로 이동하여 주석을 제거한다.
plugin.path=/workspace/kafka-cdc/kafka-connect/kafka_2.13-2.7.1/connectors
- 위와 같이 편집하고 저장종료한다.
Kafka Connect 서버 실행
- $kafka_home에서 connect를 실행한다.
cd $kafka_home
bin/connect-distributed.sh config/connect-distributed.properties &
- Kafka Connect는 default 8083 포트로 실행이 된다.
- Kafka topic을 확인해 본다.
$kafka_home/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
-
Connect를 위한 토픽이 추가되었다.
connect-configs, connect-offsets, connect-status
Source Connector 설치
- Kafka connect의 REST API를 통해 Source 및 Sink connector를 등록한다.
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "h2-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:h2:tcp://localhost:9099/./test",
"connection.user":"sa",
"connection.password":"passwd",
"mode":"incrementing",
"incrementing.column.name" : "ID",
"table.whitelist" : "ORDER_TABLE",
"topic.prefix" : "SYNC_",
"tasks.max" : "1"
}
}'
Connector 등록시, 'No suitable driver' 오류가 발생할 경우, Classpath에 h2 driver를 설정해 준다. h2/bin에 있는 JDBC 드라이브를 $kafka_home/lib에 복사하고 다시 Connect를 실행한다.
- 등록한 Connector를 확인한다.
http localhost:8083/connectors
Order 마이크로서비스 설정
- 주문 서비스를 h2 Database에 연결한다.
- Order의 application.yml을 열어 default profile의 datasource를 확인한다.
datasource:
url: jdbc:h2:tcp://localhost:9099/./test
username: sa
password: passwd
driverClassName: org.h2.Driver
소스 테이블에 Data 입력
- order 마이크로서비스를 기동하고 소스 테이블에 데이터를 생성한다.
cd order
mvn spring-boot:run
http POST :8081/orders productId=1 qty=10 customerId=1000 price=10000
- Kafka topic을 확인해 본다.
$kafka_home/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
-
'SYNC_ORDER_TABLE' 토픽이 추가되어 목록에 나타난다.
Kafka Connect는 테이블 단위로 토픽이 생성되어 Provider와 Consumer간 데이터를 Sync합니다.
$kafka_home/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic SYNC_ORDER_TABLE --from-beginning
Sink Connector 설치
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "h2-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:h2:tcp://localhost:9099/./test",
"connection.user":"sa",
"connection.password":"passwd",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"SYNC_ORDER_TABLE"
}
}'
marketing 마이크로서비스 설정 및 실행
- 마케팅 서비스를 h2 Database에 연결한다.
- marketing 서비스의 application.yml을 열어 default profile의 datasource를 확인한다.
datasource:
url: jdbc:h2:tcp://localhost:9099/./test
username: sa
password: passwd
driverClassName: org.h2.Driver
Sink Connector를 통해 주문서비스에서 입력한 정보가 CDC를 통해 마케팅 테이블(SYNC_ORDER_TABLE)에 복제된 데이터가 조회된다.
- 다시한번 Orders 테이블에 데이터를 입력하고 마케팅팀에 주문 데이터 동기화가 되는지 확인해 본다.
http POST :8081/orders productId=1 qty=10 customerId=1000 price=10000
http GET :8082/syncOrders
이기종간 DBMS 연계
- Sink Connector의 JDBC Url만 다른 DB정보로 설정하여 Connect Server에 등록하면 이기종 DB간에도 데이터가 동기화가 가능해진다.