FrameWork/Spring Cloud
마이크로서비스간에 데이터 동기화 - 2
태윤2
2021. 9. 8. 02:09
Apache Kafka
- Apache Software Foundation의 Scalar 언어로 된 오픈 소스 메시지 브로커 프로젝트
- Open Source Message broker Project
- 링크드인(Linked-in)에서 개발, 2011년 오픈 소스화
- 2014년 11월 링크드인에서 Kafka를 개발하던 엔지니어들이 Kafka개발에 집중하기 위해 Confluent라는 회사 창립
- 실시간 데이터 피드를 관리하기 위해 통일된 높은 처리량, 낮은 지연 시간을 지닌 플랫폼 제공
- Apple, Netflix, Shopify, Yelp, Kakao, New York Times, Cisco, Ebay, Paypal, Hyperledger Fabric, Uber,Salesforce.com 등이 사용
Kafka가 개발되기 이전의 시스템
- 모든 시스템으로 데이터를 실시간으로 전송하여 처리할 수 있는 시스템
- 데이터가 많아지더라도 확장이 용이한 시스템
Kafka 데이터 처리 흐름
- Producer(보내는쪽)/Consumer(받는쪽) 분리
- 메시지를 여러 Consumer에게 허용
- 높은 처리량을 위한 메시지 최적화
- Scale-out 가능
- Eco-system
Kafka Broker(서버)
- 실행 된 Kafka 애플리케이션 서버
- 3대 이상의 Broker Cluster 구성
- Zookeeper(코디네이터) 연동
- 역할: 메타데이터 (Broker ID, Controller ID 등) 저장
- Controller 정보 저장
- Broker 관리
- n개 Broker 중 1대는 Controller 기능 수행
- Controller 역할
- 각 Broker에게 담당 파티션 할당 수행
- Broker 정상 동작 모니터링 관리
설치
- http://kafka.apache.org/downloads
- 2.13 다운로드
Echosystem 1 - Kafka Client
- Kafka와 데이터를 주고받기 위해서 사용하는 Java Library
- Producer, Consumer, Admin, Stream 등 Kafka관련 API 제공
- 다양한 3rd party library 존재: C/C++, Node.js, Python, .NET 등
Kafka 서버 기동
- Zookeeper 및 Kafka 서버 구동
- $KAFKA_HOME/bin/zookeeper-server-start.sh, $KAFKA_HOME/config/zookeeper.properties
- $KAFKA_HOME/bin/kafka-server-start.sh(bat), $KAFKA_HOME/config/server.properties
- Topic 생성
- Producer, Consumer 커맨드로 테스트 할 수 샘플 콘솔 프로그램 제공
- $KAFKA_HOME/bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:XXXX --partitions 1
- Topic 목록 확인
- $KAFKA_HOME/bin/kfka-topics.sh --bootstrap-server localhost:XXXX --list
- Topic 정보 확인
- $KAFKA_HOME/bin/kfka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:XXXX
window 서버 기동
- .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
- $ ./bin/windows/kafka-server-start.bat ./config/server.properties
- .\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list
- topic 리스트 확인
- .\bin\kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1
- topic 생성
- .\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic quickstart-events
- topic 정보 확인
- Producer 와 Consumer 테스트
- .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic quickstart-events
- .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning
Echosystem 2 - Kafka Connect
- Kafka Connect를 통해 Data를 Import/Export 가능
- 코드 없이 Configuration으로 데이터를 이동
- Standalone mode, Distribution(분산) mode 지원
- RESTful API 통해 지원
- Stream 또는 Batch 형태로 데이터 전송 가능
- 커스텀 Connector를 통한 다양한 Plugin 제공(File, S3, Hive, Mysql, etc ...)
- KafkaCluster - Server
- KafkaConnect Source - 가져오는 쪽
- KafkaConnect Sink - 보내는 쪽
- Source System - Kafka Cluster에 저장할 때 관여 함 (Hive, jdbc ...)
- Target System - DB,S3(스토리지) ...
MariaDB 설치
- MacOS
- Window
- zip 으로 다운
Order-Servcie에 Maria DB 등록
- build.gradle 에 Maria DB jdbc 주입
- h2-console 테스트
- create table
create table users(
id int auto_increment primary key,
user_id varchar(20),
pwd varchar(20),
name varchar(20),
created_at datetime default NOW()
);
Kafka Connect 설치
- MacOS
- JDBC Connector 설치
- https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html
- Windows
- log4j 관련 에러가 뜨는경우 경우
- connect-distributed.bat 파일의 config 파일 경로를 etc/kfaka/tools-log4j.properties로 변경
- JDBC Connector 설정
- ./etc/kafka/connect-distributed.properties 파일 마지막에 아래 plugin 정보 추가
- JdbcSourceConnector에서 MariaDB 사용하기 위해 mariadb 드라이버 복사
- Gradle은 ${USER.HOME}\.m2 폴더에 저장이 안되 maven 프로젝트를 사용해 miaradb-java-client-2.7.2 라이브러리를 받아서 miaradb-java-client-2.7.2.jar 파일을 kafka-connect 폴더의 .\share\java\kafka\로 복사함
connect 기동
- zookeeper와 kafka-server 기동후 connect 실행
.\bin\windows\connect-distributed.bat .\etc\kafka\connect-distributed.properties
- topic을 확인해보면 connect 관련 topic을 생성 함
Kafka Source Connect 테스트
echo '
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://localhost:3307/mydb",
"connection.user":"root",
"connection.password":"1234",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
- name - connector 이름
- connector.class - JdbcSourceConnector 사용
- connection.url - mariaDB 포트번호 3307로 사용중
- mode - 데이터를 자동 증가 시켜주는 모드
- incrementing.column.name - 자동으로 증가 시켜줄 컬럼은 id
- table.whitelist - MariaDB에 특정한 값이 변경 되었을때 whitelist에 있는 값의 변경을 감지해서 topic에 저장
- topic.prefix - my_topic_ 이란 prefix를 가진 topic에 저장
Postman을 통해 Source 등록
- Response : 201 Created
등록된 source 확인
- 데이터 저장
- topic 확인
- 데이터 추가 저장 후 topic 확인
Kafka Sink Connect 테스트
echo '
{
"name":"my-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3307/mydb",
"connection.user":"root",
"connection.password":"1234",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"my_topic_users"
}
}
'| curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
Topic이 전달받은 데이터를 활용해서 사용하는 역할
- auto.create - topic 과 같은 이름을 가진 테이블을 생성 하겠다는 설정
- topics - auto.create 할때 필요한 topic 이름
Postman을 통해 Source 등록
- my_topic_users 테이블 생성됨 (기존에 테이블에 갖고있던 컬럼을 그대로 복사)
sink-connect에 의해서 usres에 저장한 데이터가 my_topic_users에도 같이 저장됨
Kafka-Producer를 이용해서 Kafka Topic에 데이터 직접 전송
- Kafka-console-producer에서 데이터 전송 -> Topic에 추가 -> MariaDB에 추가