티스토리 뷰

 이제 끝이 보이고 있습니다. 물론 Back-end의 구현만 살펴보고 있지만, MSA라는 처음 보는 아키텍쳐와 함께 하면서, 여러가지를 배웠던 거 같네요. 이제 데이터 동기화를 위한 Apache Kafka의 활용법을 적용하고, 장애처리, 그리고 마지막으로 모니터링 기능까지 추가하면 끝입니다. 산더미네요... 그리고 클라이맥스로 Docker를 활용해서 모든 서비스를 하나하나 Container화를 구축할 예정이니까요.

 

 이번 시간 부터는 다시 E-Commerce Application 을 고도화 시킬 예정입니다. 실습에 앞서서 만약 구축된 E-Commer Application이 없다면 아래의 파일들을 받아주세요 


웬만하면, D: 바로 밑에 두 폴더 모두 위치시켜주시길 바랍니다. 해당 위치가 아닐 경우 

Config-Server에 Native Repo 주소를 변경해줘야 하기에 조금 번거로울 수 있습니다.

#Work Space

해당 폴더는 Work Space자체입니다.

spring_cloud.zip
0.57MB


#Repository Folder

해당 폴더는 Repository Folder입니다. 

native-file-repo.zip
0.00MB


#개요

이번 포스팅에서는 Order서비스와 Catalogs서비스를 수정하려 합니다. 이렇게 수정하는 목적은 Kafka를 활용해서, Order서비스를 Producer로 그리고 Catalogs 서비스는 Consumer로서 Order 서비스에서 주문 메시지가 전달되면, 해당 주문 메시지를 분석해 물건이 팔린만큼 재고(Col : Stock) 항목을 줄여주게 하기 위해서 입니다. 


#POM.XML ( Catalog Service )

가장 먼저 진행해야할 작업은 POM.XML에 Dependency를 추가해야합니다. 

<!-- Kafka -->
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>

Kafka와의 연동을 위해 Kafka를 추가해줍니다. 


#MessageQueue 패키지 생성(CatalogsService)

다음으로는 MessageQueue라는 패키지를 하나 생성해줍니다. 그리고 그 안에 KafkaConsumerConfig라는 클래스를 하나 만들어줍니다. 아래의 위치와 같습니다.

그리고 아래와 같은 코드를 작성해줍니다.


#KafkaConsumerConfig(Catalog-Service)

package com.example.catalogservice.messagequeue;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String,String> consumerFactory(){
        Map<String,Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroupId");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(properties);
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
        return kafkaListenerContainerFactory;
    };
}

먼저 Kafka와의 연동을 위해 @EnableKafka@Configuration 어노테이션을 해당 클래스에 등록해줍니다.

@EnableKafka
@Configuration

다음으로는 @bean을 두개를 등록 해줄 건데 처음으로 만들 Bean ConsumerFactory입니다.  각 설정값들을 먼저 선언하고, PropertiesMap에 담을 예정입니다. 그리고 설정이 완료되면 해당 값을 Return하는 Bean입니다.

@Bean
public ConsumerFactory<String,String> consumerFactory(){
    Map<String,Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroupId");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(properties);
}

다음으로는 해당 값을 설정한 가져올 Bean입니다.

@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory(){
    ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory
            = new ConcurrentKafkaListenerContainerFactory<>();
    kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
    return kafkaListenerContainerFactory;
};

#KafkaConsumer(Catalog-Service)

다음으로는 핵심적으로 로직을 가지고 있는 Consumer Class입니다. 아래와 같이 작성해주세요.

package com.example.catalogservice.messagequeue;

import com.example.catalogservice.entity.CatalogEntity;
import com.example.catalogservice.entity.CatalogRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

@Service
@Slf4j
public class KafkaConsumer {
    CatalogRepository repository;

    @Autowired
    public KafkaConsumer(CatalogRepository repository){
        this.repository = repository;
    }

    @KafkaListener(topics = "example-catalog-topic")
    public void updateQty(String kafkaMessage){
        log.info("Kafka Message: ->" + kafkaMessage);

        Map<Object,Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try{
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>(){});
            }catch(JsonProcessingException ex){
                ex.printStackTrace();
            }
        CatalogEntity entity = repository.findByProductId((String)map.get("productId"));
        if (entity != null){
            entity.setStock(entity.getStock() - (Integer)map.get("qty"));
            repository.save(entity);
        }

    }
}

 

가장 먼저 Annotation을 추가해야합니다. 

@Service임을 명시하고 @Slf4j를 통해 로그를 출력할 Class라는걸 명시해줍니다.

@Service
@Slf4j

다음은 생성자에 repository 정보를 주입하려 합니다. CatalogRepository를 생성하고, @Autowired를 등록 후 주입을 시켜줍니다.

CatalogRepository repository;

@Autowired
public KafkaConsumer(CatalogRepository repository){
    this.repository = repository;
}

다음은 핵심 로직 입니다. 각 코드 옆에 간단한 주석을 달았습니다.

/* 참조할 Topic 명을 명시합니다. */
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage){
	/* 로그 출력 Kafka에서 전달된 메시지를 확인*/
    log.info("Kafka Message: ->" + kafkaMessage);
	
    Map<Object,Object> map = new HashMap<>();
    ObjectMapper mapper = new ObjectMapper();
    try{
    	/* 생성한 map안에 KafkaMessage의 정보를 TypeRefrerence타입으로 저장합니다. */
        map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>(){});
        }catch(JsonProcessingException ex){
            ex.printStackTrace();
        }
    /* entity안에 ProductId를 바탕으로 DB조회 결과를 담습니다. */
    CatalogEntity entity = repository.findByProductId((String)map.get("productId"));
    if (entity != null){
    	/* 주문 정보와 대조에 남은 수량을 조정합니다. */
        entity.setStock(entity.getStock() - (Integer)map.get("qty"));
        /* Save는 기본적으로 제공됩니다. 조정된 수량을 저장합니다. */
        repository.save(entity);
    }

}

해당 작업까지만 하면, Consumer는 제작 완료 입니다. 이제는 Producer 제작을 통해서 메시지를 보낼 수 있도록 하겠습니다.


#POM.XML(Order-Service)

이번에는 Producer를 만들도록 하겠습니다. Order-Service로 이동후 Pom.xml에 Dependency를 추가하려합니다. 

<!-- Kafka -->
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>

#MessageQueue 패키지 생성(CatalogsService)

마찬가지로 MessageQueue라는 패키지 생성 후 KafkaProducerConfig 라는 Class를 생성해줍니다.

그리고 다음과 같이 입력합니다


#KafkaProducerConfig

package com.example.orderservice.messagequeue;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.*;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String,String> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}

 

Consumer와 마찬가지로 Kafka와의 연동 그리고 Configuration등록을 해줍니다.

@EnableKafka
@Configuration

그리고 @Bean을 등록합니다. 

다만 Consumer와 다르게 Serializer로 되어 있는데 이는, Producer에서 보낼때는 데이터를 직렬화 된 상태로 보낼 예정이기 때문입니다.

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    return new DefaultKafkaProducerFactory<>(properties);
}

그리고 Template생성을 통해서 해당 내용을 KafkaTemplate형태로 Return을 해줍니다.

@Bean
public KafkaTemplate<String,String> kafkaTemplate(){
    return new KafkaTemplate<>(producerFactory());
}

 

#KafkaProducer

그리고 다음과 같이 Producer를 작성해줍니다.

package com.example.orderservice.messagequeue;

import com.example.orderservice.dto.OrderDto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class KafkaProducer {
    private KafkaTemplate<String,String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String,String> kafkaTemplate){
        this.kafkaTemplate = kafkaTemplate;
    }

    public OrderDto send(String topic, OrderDto orderDto){
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try{
            jsonInString = mapper.writeValueAsString(orderDto);
        }catch (JsonProcessingException ex){
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("성공적으로 메시지를 보냈어요! 나이스하네요 정보:" + orderDto);

        return orderDto;

    }
}

 

설명 생략

@Service
@Slf4j

마찬가지로 저희가 생성한KafkaTemplate를 Autowired해줍니다. 바로 위에서 @Bean으로 등록한 항목입니다!

private KafkaTemplate<String,String> kafkaTemplate;

@Autowired
public KafkaProducer(KafkaTemplate<String,String> kafkaTemplate){
    this.kafkaTemplate = kafkaTemplate;
}

 

그리고 전달을 할때는, String으로 보내지만 형태자체는 Json을 띄우도록 하기 위해서 mapper을 활용해줍니다.

변환이 완료된 데이터를 send를 통해서 Kafka에 메시지를 전달합니다.

 

public OrderDto send(String topic, OrderDto orderDto){
    ObjectMapper mapper = new ObjectMapper();
    String jsonInString = "";
    try{
        jsonInString = mapper.writeValueAsString(orderDto);
    }catch (JsonProcessingException ex){
        ex.printStackTrace();
    }

    kafkaTemplate.send(topic, jsonInString);
    log.info("성공적으로 메시지를 보냈어요! 나이스하네요 정보:" + orderDto);

    return orderDto;

}

#OrderController.java

다음으로는 OrderController를 조금 수정해주려 합니다.

마지막에 최종코드를 첨부하니 시간이 없으시다면 바로 내리셔도 됩니다.

 

먼저 전역변수를 하나 선언할 예정인데. 아래와 같이 입력해주세요.

KafkaProducer kafkaProducer;

선언한 변수를 생성자에 주입시켜줍니다.

@Autowired
public OrderController(Environment env,OrderService orderService,KafkaProducer kafkaProducer){
    this.env = env;
    this.orderService = orderService;
    this.kafkaProducer = kafkaProducer;
}

이제 거의 끝나가네요 마지막으로 createOrder를 조금 수정하려합니다.

Service를 호출해야 아무래도 정보를 불러올거 같네요, KafkaProducer.send에 필요했던 정보를 입력해줍니다. topic명은 저희가 Consumer에서 지정했던 example-category-topic으로 지정하고, orderDto에 담겨있는 주문 정보 또한 지정해줍니다.

@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
                                                @RequestBody RequestOrder orderDetails){
    ModelMapper mapper = new ModelMapper();
    mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
    /*jpa*/
    OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
    orderDto.setUserId(userId);
    OrderDto createdOrder = orderService.createOrder(orderDto);

    ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);

    /* send this order */
    kafkaProducer.send("example-catalog-topic",orderDto);

    return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);

}

완성!


마치며...

원래는, Test Case까지 추가해 작성하려 했으나 포스팅이 너무 길어지며 루즈해질거 같아

다음 포스팅에서 조금 더 자세하게 다뤄보려 합니다.

 

감사합니다.

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/09   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30
글 보관함