FrameWork/Spring Cloud

마이크로서비스간에 데이터 동기화 - 2

태윤2 2021. 9. 8. 02:09

Apache Kafka

  • Apache Software Foundation의 Scalar 언어로 된 오픈 소스 메시지 브로커 프로젝트
    • Open Source Message broker Project
  • 링크드인(Linked-in)에서 개발, 2011년 오픈 소스화
    • 2014년 11월 링크드인에서 Kafka를 개발하던 엔지니어들이 Kafka개발에 집중하기 위해 Confluent라는 회사 창립
  • 실시간 데이터 피드를 관리하기 위해 통일된 높은 처리량, 낮은 지연 시간을 지닌 플랫폼 제공
  • Apple, Netflix, Shopify, Yelp, Kakao, New York Times, Cisco, Ebay, Paypal, Hyperledger Fabric, Uber,Salesforce.com 등이 사용

 

Kafka가 개발되기 이전의 시스템

  • 모든 시스템으로 데이터를 실시간으로 전송하여 처리할 수 있는 시스템
  • 데이터가 많아지더라도 확장이 용이한 시스템

 

Kafka 데이터 처리 흐름

  • Producer(보내는쪽)/Consumer(받는쪽) 분리
  • 메시지를 여러 Consumer에게 허용
  • 높은 처리량을 위한 메시지 최적화
  • Scale-out 가능
  • Eco-system

 

Kafka Broker(서버)

  • 실행 된 Kafka 애플리케이션 서버
  • 3대 이상의 Broker Cluster 구성
  • Zookeeper(코디네이터) 연동
    • 역할: 메타데이터 (Broker ID, Controller ID 등) 저장
    • Controller 정보 저장
    • Broker 관리
  • n개 Broker 중 1대는 Controller 기능 수행
    • Controller 역할
    • 각 Broker에게 담당 파티션 할당 수행
    • Broker 정상 동작 모니터링 관리

 

설치

 

Echosystem 1 - Kafka Client

 

Kafka 서버 기동

  • Zookeeper 및 Kafka 서버 구동
    • $KAFKA_HOME/bin/zookeeper-server-start.sh, $KAFKA_HOME/config/zookeeper.properties
    • $KAFKA_HOME/bin/kafka-server-start.sh(bat), $KAFKA_HOME/config/server.properties
  • Topic 생성
    • Producer, Consumer 커맨드로 테스트 할 수 샘플 콘솔 프로그램 제공
    • $KAFKA_HOME/bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:XXXX --partitions 1
  • Topic 목록 확인
    • $KAFKA_HOME/bin/kfka-topics.sh --bootstrap-server localhost:XXXX --list
  • Topic 정보 확인
    • $KAFKA_HOME/bin/kfka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:XXXX

 

window 서버 기동

  • .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

zookeeper 2181 포트에서 실행

  • $ ./bin/windows/kafka-server-start.bat ./config/server.properties

9092 포트에서 실행

  • .\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list
  • topic 리스트 확인

 

  • .\bin\kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1
  • topic 생성
  • .\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic quickstart-events
  • topic 정보 확인

 

  • Producer 와 Consumer 테스트
    • .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic quickstart-events
    • .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning

Echosystem 2 - Kafka Connect

  • Kafka Connect를 통해 Data를 Import/Export 가능
  • 코드 없이 Configuration으로 데이터를 이동
  • Standalone mode, Distribution(분산) mode 지원
    • RESTful API 통해 지원
    • Stream 또는 Batch 형태로 데이터 전송 가능
    • 커스텀 Connector를 통한 다양한 Plugin 제공(File, S3, Hive, Mysql, etc ...)
  • KafkaCluster - Server
  • KafkaConnect Source - 가져오는 쪽
  • KafkaConnect Sink - 보내는 쪽
  • Source System - Kafka Cluster에 저장할 때 관여 함 (Hive, jdbc ...)
  • Target System - DB,S3(스토리지) ...

MariaDB 설치 

  • MacOS 

https://mariadb.org/ 

 

MariaDB Foundation - MariaDB.org

… Continue reading "MariaDB Foundation"

mariadb.org

  • Window
  • zip 으로 다운

 

Order-Servcie에 Maria DB 등록

  • build.gradle 에 Maria DB jdbc 주입

  • h2-console 테스트

  • create table
create table users(
 id int auto_increment primary key,
 user_id varchar(20),
 pwd varchar(20),
 name varchar(20),
 created_at datetime default NOW()
);

Kafka Connect 설치

  • Windows
     
  • log4j 관련 에러가 뜨는경우 경우
    • connect-distributed.bat 파일의 config 파일 경로를 etc/kfaka/tools-log4j.properties로 변경
  • JDBC Connector 설정
    • ./etc/kafka/connect-distributed.properties 파일 마지막에 아래 plugin 정보 추가
    • JdbcSourceConnector에서 MariaDB 사용하기 위해 mariadb 드라이버 복사
      • Gradle은 ${USER.HOME}\.m2 폴더에 저장이 안되 maven 프로젝트를 사용해 miaradb-java-client-2.7.2 라이브러리를 받아서 miaradb-java-client-2.7.2.jar 파일을 kafka-connect 폴더의 .\share\java\kafka\로 복사함

connect 기동

  • zookeeper와 kafka-server 기동후 connect 실행
.\bin\windows\connect-distributed.bat .\etc\kafka\connect-distributed.properties
  • topic을 확인해보면 connect 관련 topic을 생성 함

Kafka Source Connect 테스트

echo '
{
	"name" : "my-source-connect",
	"config" : {
		"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
		"connection.url":"jdbc:mysql://localhost:3307/mydb",
		"connection.user":"root",
		"connection.password":"1234",
		"mode": "incrementing",
		"incrementing.column.name" : "id",
		"table.whitelist":"users",
		"topic.prefix" : "my_topic_",
		"tasks.max" : "1"
	}

}

' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
  • name - connector 이름
  • connector.class - JdbcSourceConnector 사용
  • connection.url - mariaDB 포트번호 3307로 사용중
  • mode - 데이터를 자동 증가 시켜주는 모드
  • incrementing.column.name - 자동으로 증가 시켜줄 컬럼은 id
  • table.whitelist - MariaDB에 특정한 값이 변경 되었을때 whitelist에 있는 값의 변경을 감지해서 topic에 저장
  • topic.prefix - my_topic_ 이란 prefix를 가진 topic에 저장

Postman을 통해 Source 등록

  • Response : 201 Created

등록된 source 확인

  • 데이터 저장

  • topic 확인

  • 데이터 추가 저장 후 topic 확인

Kafka Sink Connect 테스트

echo '
	{
	"name":"my-sink-connect",
	"config":{
		"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
		"connection.url":"jdbc:mysql://localhost:3307/mydb",
		"connection.user":"root",
		"connection.password":"1234",
		"auto.create":"true",
		"auto.evolve":"true",
		"delete.enabled":"false",
		"tasks.max":"1",
		"topics":"my_topic_users"
	}
}
'| curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

Topic이 전달받은 데이터를 활용해서 사용하는 역할

  • auto.create - topic 과 같은 이름을 가진 테이블을 생성 하겠다는 설정
  • topics - auto.create 할때 필요한 topic 이름

 

Postman을 통해 Source 등록

  • my_topic_users 테이블 생성됨 (기존에 테이블에 갖고있던 컬럼을 그대로 복사)

sink-connect에 의해서 usres에 저장한 데이터가 my_topic_users에도 같이 저장됨

Kafka-Producer를 이용해서 Kafka Topic에 데이터 직접 전송

  • Kafka-console-producer에서 데이터 전송 -> Topic에 추가 -> MariaDB에 추가

producer