본문 바로가기
FrameWork/Spring Cloud

마이크로서비스간에 데이터 동기화 - 3

by 태윤2 2021. 9. 8.

데이터 동기화 Orders -> Catalogs

  • Orders Service에 요청 된 주문의 수량 정보를 Catalogs Service에 반영
  • Orders Service에서 Kafka Topic으로 메시지 전송 -> Producer
  • Catalogs Service에서 Kafka Topic에 전송 된 메시지 취득 -> Consumer

  • 현재 각 서비스는 독립적인 DB를 가지고 있음
  • 독립적인 DB들의 동기화를 위해 Kafka 를 이용

  • Kafka topic에 메시지를 보내 동기화작업을 진행
  • Order(Producer) 설정 , Catalog(Consumer) 설정 

Catalog-Service

  • build.gradle - spring-kafka 추가

  • Message Queuing 과 관련된 패키지, 클래스 생성

  • KafkaConsumerConfig
package com.example.catalogservice.messagequeue;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    // 접속 할 수 있는 Kafka 정보가 들어 있는 빈
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        // key, value Json Type
        HashMap<String, Object> properties = new HashMap<>();
        // Server Address Value
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        // Group ID
        // Kafka 에서 topic 에 쌓여 있는 메시지를 가져가는 Consumer 들을 Group 화 할 수 있음
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
        // Key,Value 의 역직렬화
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    // 접속 정보를 이용하는 실제 Listener
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();
        // 등록한 설정 정보를 등록
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

        return kafkaListenerContainerFactory;
    }

}
  • KafkaConsumer
package com.example.catalogservice.messagequeue;

import com.example.catalogservice.domain.CatalogEntity;
import com.example.catalogservice.repository.CatalogRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

@Service
@Slf4j

public class KafkaConsumer {
    CatalogRepository catalogRepository;

    @Autowired
    public KafkaConsumer(CatalogRepository catalogRepository) {
        this.catalogRepository = catalogRepository;
    }


    @KafkaListener(topics = "example-catalog-topic")
    public void updateQty(String kafkaMessage) {
        log.info("Kafka Message: ->" + kafkaMessage);

        Map<Object, Object> map = new HashMap<>();

        ObjectMapper mapper = new ObjectMapper();

        try {
            // String 으로 들어오는 Message 를 json 타입으로 변환 하는 코드
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        // Object 형태로 가져오는걸 String 다운캐스팅 후 파라미터로 전달
        CatalogEntity entity = catalogRepository.findByProductId((String) map.get("productId"));


        if (entity != null) {
            // Object 형태로 가져오는 데이터를 Integer 로 변환후 뺀 후 저장
            entity.setStock(entity.getStock() - (Integer) map.get("qty"));
            catalogRepository.save(entity);
        }
    }
}

Order-Service

  • build.gradle - spring-kafka 추가

  • Message Queuing 과 관련된 패키지, 클래스 생성

  • KafkaProducerConfig
package com.example.orderservice.messagequeue;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;

import java.util.HashMap;

@EnableKafka
@Configuration
public class KafkaProducerConfig {

    // 접속 할 수 있는 Kafka 정보가 들어 있는 빈
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        // key, value Json Type
        HashMap<String, Object> properties = new HashMap<>();
        // Server Address Value
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

        // Key,Value 의 직렬화
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    // 데이터 전달 하는 용도의 인스턴스
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}
  • KafkaProducer
package com.example.orderservice.messagequeue;

import com.example.orderservice.dto.OrderDto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }


    // 데이터를 send 하는 메서드
    public OrderDto send(String topic, OrderDto orderDto) {

        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";

        try {
            // orderDto 인스턴스 내용값을 Json 포맷으로 변경
            jsonInString = mapper.writeValueAsString(orderDto);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("Kafka Producer send data from Order microservice: " + orderDto);

        return orderDto;

    }
}
  • OrderController에서 send 호출

topic 이름은 Consumer에 KafkaListener에서 정한 topic name 사용

 

 

Kafka를 활용한 동기화 TEST

  • zookeeper, kafka server 기동
  • 현재 들어있는 데이터의 stock을 수정 해보자

  • 주문하기

  • 결과확인

1번 아이템의 stock이 20개 감소한걸 확인할 수 있다

 

  • log 확인

order-service
catalog-service

 

Mulitple Orders Service 에서의 데이터 동기화

  • Orders Service 2개 기동
    • Users의 요청 분산 처리
    • Orders 데이터도 분산 저장 -> 동기화 문제

  • order-service 한개더 기동(terminal에서 코드 실행)
java -jar ./build/libs/order-service-0.0.1-SNAPSHOT.jar
  • 데이터가 분산되서 저장됨

 

데이터 동기화 - Multiple Orders Service

  • Orders Service에 요청 된 주문 정보를 DB가 아니라 kafka Topic으로 전송
  • Kafka Topic에 설정 된 kafka Sink Connect를 사용해 단일 DB에 저장 -> 데이터 동기화

  • Orders Service의 JPA 데이터베이스 교체
    • H2 DB -> MariaDB
       
    • Table 생성
create table orders (id int auto_increment primary key,

user_id varchar(50) not null,

product_id varchar(20) not null,

order_id varchar(50) not null,

qty int default 0,

unit_price int default 0,

total_price int default 0,

created_at datetime default now()

)
  • order-service/ application.yml 수정

  • 주문 후 mariaDB에 데이터가 저장되는지 확인

  • Orders Service의 Producer에서 발생하기 위한 메시지 등록(데이터 포맷을 지켜야함)

qty_id -> qty 로 해야함

  • Class 구조

Order-Service 코드 수정

  • Controller

  • Dto
    • KafkaOrderDto
    • Schema
    • Field
    • Payload
  • OrderProducer

  • kafka message 부분을 Class를 통해 만들어서 전달해줌

 

  • Postman을 통해 sink-connect 생성

 

 

Multiple Orders Service Test

  • order-serivce 서버를 2개 기동 후 order 컬럼 생성

 

 

하나의 DB에 저장되는 것을 확인!