데이터 동기화 Orders -> Catalogs
- Orders Service에 요청 된 주문의 수량 정보를 Catalogs Service에 반영
- Orders Service에서 Kafka Topic으로 메시지 전송 -> Producer
- Catalogs Service에서 Kafka Topic에 전송 된 메시지 취득 -> Consumer
- 현재 각 서비스는 독립적인 DB를 가지고 있음
- 독립적인 DB들의 동기화를 위해 Kafka 를 이용
- Kafka topic에 메시지를 보내 동기화작업을 진행
- Order(Producer) 설정 , Catalog(Consumer) 설정
Catalog-Service
- build.gradle - spring-kafka 추가
- Message Queuing 과 관련된 패키지, 클래스 생성
- KafkaConsumerConfig
package com.example.catalogservice.messagequeue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
// 접속 할 수 있는 Kafka 정보가 들어 있는 빈
@Bean
public ConsumerFactory<String, String> consumerFactory() {
// key, value Json Type
HashMap<String, Object> properties = new HashMap<>();
// Server Address Value
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// Group ID
// Kafka 에서 topic 에 쌓여 있는 메시지를 가져가는 Consumer 들을 Group 화 할 수 있음
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
// Key,Value 의 역직렬화
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
// 접속 정보를 이용하는 실제 Listener
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
// 등록한 설정 정보를 등록
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
- KafkaConsumer
package com.example.catalogservice.messagequeue;
import com.example.catalogservice.domain.CatalogEntity;
import com.example.catalogservice.repository.CatalogRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
@Slf4j
public class KafkaConsumer {
CatalogRepository catalogRepository;
@Autowired
public KafkaConsumer(CatalogRepository catalogRepository) {
this.catalogRepository = catalogRepository;
}
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage) {
log.info("Kafka Message: ->" + kafkaMessage);
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
// String 으로 들어오는 Message 를 json 타입으로 변환 하는 코드
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
// Object 형태로 가져오는걸 String 다운캐스팅 후 파라미터로 전달
CatalogEntity entity = catalogRepository.findByProductId((String) map.get("productId"));
if (entity != null) {
// Object 형태로 가져오는 데이터를 Integer 로 변환후 뺀 후 저장
entity.setStock(entity.getStock() - (Integer) map.get("qty"));
catalogRepository.save(entity);
}
}
}
Order-Service
- build.gradle - spring-kafka 추가
- Message Queuing 과 관련된 패키지, 클래스 생성
- KafkaProducerConfig
package com.example.orderservice.messagequeue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.HashMap;
@EnableKafka
@Configuration
public class KafkaProducerConfig {
// 접속 할 수 있는 Kafka 정보가 들어 있는 빈
@Bean
public ProducerFactory<String, String> producerFactory() {
// key, value Json Type
HashMap<String, Object> properties = new HashMap<>();
// Server Address Value
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// Key,Value 의 직렬화
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
// 데이터 전달 하는 용도의 인스턴스
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
- KafkaProducer
package com.example.orderservice.messagequeue;
import com.example.orderservice.dto.OrderDto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// 데이터를 send 하는 메서드
public OrderDto send(String topic, OrderDto orderDto) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
// orderDto 인스턴스 내용값을 Json 포맷으로 변경
jsonInString = mapper.writeValueAsString(orderDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer send data from Order microservice: " + orderDto);
return orderDto;
}
}
- OrderController에서 send 호출
topic 이름은 Consumer에 KafkaListener에서 정한 topic name 사용
Kafka를 활용한 동기화 TEST
- zookeeper, kafka server 기동
- 현재 들어있는 데이터의 stock을 수정 해보자
- 주문하기
- 결과확인
- log 확인
Mulitple Orders Service 에서의 데이터 동기화
- Orders Service 2개 기동
- Users의 요청 분산 처리
- Orders 데이터도 분산 저장 -> 동기화 문제
- order-service 한개더 기동(terminal에서 코드 실행)
java -jar ./build/libs/order-service-0.0.1-SNAPSHOT.jar
- 데이터가 분산되서 저장됨
데이터 동기화 - Multiple Orders Service
- Orders Service에 요청 된 주문 정보를 DB가 아니라 kafka Topic으로 전송
- Kafka Topic에 설정 된 kafka Sink Connect를 사용해 단일 DB에 저장 -> 데이터 동기화
- Orders Service의 JPA 데이터베이스 교체
- H2 DB -> MariaDB
- Table 생성
create table orders (id int auto_increment primary key,
user_id varchar(50) not null,
product_id varchar(20) not null,
order_id varchar(50) not null,
qty int default 0,
unit_price int default 0,
total_price int default 0,
created_at datetime default now()
)
- order-service/ application.yml 수정
- 주문 후 mariaDB에 데이터가 저장되는지 확인
- Orders Service의 Producer에서 발생하기 위한 메시지 등록(데이터 포맷을 지켜야함)
- Class 구조
Order-Service 코드 수정
- Controller
- Dto
- KafkaOrderDto
- Schema
- Field
- Payload
- OrderProducer
- kafka message 부분을 Class를 통해 만들어서 전달해줌
- Postman을 통해 sink-connect 생성
Multiple Orders Service Test
- order-serivce 서버를 2개 기동 후 order 컬럼 생성
하나의 DB에 저장되는 것을 확인!
'FrameWork > Spring Cloud' 카테고리의 다른 글
Microservice에 Circuitbreaker 적용 (0) | 2021.09.11 |
---|---|
장애 처리와 Microservice 분산 추적 - 1 (0) | 2021.09.11 |
마이크로서비스간에 데이터 동기화 - 2 (0) | 2021.09.08 |
마이크로서비스간에 데이터 동기화 - 1 (0) | 2021.09.08 |
마이크로서비스간 통신 - Feign Web Service Client (0) | 2021.09.07 |