티스토리 뷰
이제 끝이 보이고 있습니다. 물론 Back-end의 구현만 살펴보고 있지만, MSA라는 처음 보는 아키텍쳐와 함께 하면서, 여러가지를 배웠던 거 같네요. 이제 데이터 동기화를 위한 Apache Kafka의 활용법을 적용하고, 장애처리, 그리고 마지막으로 모니터링 기능까지 추가하면 끝입니다. 산더미네요... 그리고 클라이맥스로 Docker를 활용해서 모든 서비스를 하나하나 Container화를 구축할 예정이니까요.
이번 시간 부터는 다시 E-Commerce Application 을 고도화 시킬 예정입니다. 실습에 앞서서 만약 구축된 E-Commer Application이 없다면 아래의 파일들을 받아주세요
웬만하면, D: 바로 밑에 두 폴더 모두 위치시켜주시길 바랍니다. 해당 위치가 아닐 경우
Config-Server에 Native Repo 주소를 변경해줘야 하기에 조금 번거로울 수 있습니다.
#Work Space
해당 폴더는 Work Space자체입니다.
#Repository Folder
해당 폴더는 Repository Folder입니다.
#개요
이번 포스팅에서는 Order서비스와 Catalogs서비스를 수정하려 합니다. 이렇게 수정하는 목적은 Kafka를 활용해서, Order서비스를 Producer로 그리고 Catalogs 서비스는 Consumer로서 Order 서비스에서 주문 메시지가 전달되면, 해당 주문 메시지를 분석해 물건이 팔린만큼 재고(Col : Stock) 항목을 줄여주게 하기 위해서 입니다.
#POM.XML ( Catalog Service )
가장 먼저 진행해야할 작업은 POM.XML에 Dependency를 추가해야합니다.
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Kafka와의 연동을 위해 Kafka를 추가해줍니다.
#MessageQueue 패키지 생성(CatalogsService)
다음으로는 MessageQueue라는 패키지를 하나 생성해줍니다. 그리고 그 안에 KafkaConsumerConfig라는 클래스를 하나 만들어줍니다. 아래의 위치와 같습니다.
그리고 아래와 같은 코드를 작성해줍니다.
#KafkaConsumerConfig(Catalog-Service)
package com.example.catalogservice.messagequeue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String,String> consumerFactory(){
Map<String,Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
};
}
먼저 Kafka와의 연동을 위해 @EnableKafka와 @Configuration 어노테이션을 해당 클래스에 등록해줍니다.
@EnableKafka
@Configuration
다음으로는 @bean을 두개를 등록 해줄 건데 처음으로 만들 Bean은 ConsumerFactory입니다. 각 설정값들을 먼저 선언하고, Properties란 Map에 담을 예정입니다. 그리고 설정이 완료되면 해당 값을 Return하는 Bean입니다.
@Bean
public ConsumerFactory<String,String> consumerFactory(){
Map<String,Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
다음으로는 해당 값을 설정한 가져올 Bean입니다.
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
};
#KafkaConsumer(Catalog-Service)
다음으로는 핵심적으로 로직을 가지고 있는 Consumer Class입니다. 아래와 같이 작성해주세요.
package com.example.catalogservice.messagequeue;
import com.example.catalogservice.entity.CatalogEntity;
import com.example.catalogservice.entity.CatalogRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
@Slf4j
public class KafkaConsumer {
CatalogRepository repository;
@Autowired
public KafkaConsumer(CatalogRepository repository){
this.repository = repository;
}
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage){
log.info("Kafka Message: ->" + kafkaMessage);
Map<Object,Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try{
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>(){});
}catch(JsonProcessingException ex){
ex.printStackTrace();
}
CatalogEntity entity = repository.findByProductId((String)map.get("productId"));
if (entity != null){
entity.setStock(entity.getStock() - (Integer)map.get("qty"));
repository.save(entity);
}
}
}
가장 먼저 Annotation을 추가해야합니다.
@Service임을 명시하고 @Slf4j를 통해 로그를 출력할 Class라는걸 명시해줍니다.
@Service
@Slf4j
다음은 생성자에 repository 정보를 주입하려 합니다. CatalogRepository를 생성하고, @Autowired를 등록 후 주입을 시켜줍니다.
CatalogRepository repository;
@Autowired
public KafkaConsumer(CatalogRepository repository){
this.repository = repository;
}
다음은 핵심 로직 입니다. 각 코드 옆에 간단한 주석을 달았습니다.
/* 참조할 Topic 명을 명시합니다. */
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage){
/* 로그 출력 Kafka에서 전달된 메시지를 확인*/
log.info("Kafka Message: ->" + kafkaMessage);
Map<Object,Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try{
/* 생성한 map안에 KafkaMessage의 정보를 TypeRefrerence타입으로 저장합니다. */
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>(){});
}catch(JsonProcessingException ex){
ex.printStackTrace();
}
/* entity안에 ProductId를 바탕으로 DB조회 결과를 담습니다. */
CatalogEntity entity = repository.findByProductId((String)map.get("productId"));
if (entity != null){
/* 주문 정보와 대조에 남은 수량을 조정합니다. */
entity.setStock(entity.getStock() - (Integer)map.get("qty"));
/* Save는 기본적으로 제공됩니다. 조정된 수량을 저장합니다. */
repository.save(entity);
}
}
해당 작업까지만 하면, Consumer는 제작 완료 입니다. 이제는 Producer 제작을 통해서 메시지를 보낼 수 있도록 하겠습니다.
#POM.XML(Order-Service)
이번에는 Producer를 만들도록 하겠습니다. Order-Service로 이동후 Pom.xml에 Dependency를 추가하려합니다.
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
#MessageQueue 패키지 생성(CatalogsService)
마찬가지로 MessageQueue라는 패키지 생성 후 KafkaProducerConfig 라는 Class를 생성해줍니다.
#KafkaProducerConfig
package com.example.orderservice.messagequeue;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String,String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
Consumer와 마찬가지로 Kafka와의 연동 그리고 Configuration등록을 해줍니다.
@EnableKafka
@Configuration
그리고 @Bean을 등록합니다.
다만 Consumer와 다르게 Serializer로 되어 있는데 이는, Producer에서 보낼때는 데이터를 직렬화 된 상태로 보낼 예정이기 때문입니다.
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
그리고 Template생성을 통해서 해당 내용을 KafkaTemplate형태로 Return을 해줍니다.
@Bean
public KafkaTemplate<String,String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
#KafkaProducer
그리고 다음과 같이 Producer를 작성해줍니다.
package com.example.orderservice.messagequeue;
import com.example.orderservice.dto.OrderDto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class KafkaProducer {
private KafkaTemplate<String,String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String,String> kafkaTemplate){
this.kafkaTemplate = kafkaTemplate;
}
public OrderDto send(String topic, OrderDto orderDto){
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try{
jsonInString = mapper.writeValueAsString(orderDto);
}catch (JsonProcessingException ex){
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("성공적으로 메시지를 보냈어요! 나이스하네요 정보:" + orderDto);
return orderDto;
}
}
설명 생략
@Service
@Slf4j
마찬가지로 저희가 생성한KafkaTemplate를 Autowired해줍니다. 바로 위에서 @Bean으로 등록한 항목입니다!
private KafkaTemplate<String,String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String,String> kafkaTemplate){
this.kafkaTemplate = kafkaTemplate;
}
그리고 전달을 할때는, String으로 보내지만 형태자체는 Json을 띄우도록 하기 위해서 mapper을 활용해줍니다.
변환이 완료된 데이터를 send를 통해서 Kafka에 메시지를 전달합니다.
public OrderDto send(String topic, OrderDto orderDto){
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try{
jsonInString = mapper.writeValueAsString(orderDto);
}catch (JsonProcessingException ex){
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("성공적으로 메시지를 보냈어요! 나이스하네요 정보:" + orderDto);
return orderDto;
}
#OrderController.java
다음으로는 OrderController를 조금 수정해주려 합니다.
마지막에 최종코드를 첨부하니 시간이 없으시다면 바로 내리셔도 됩니다.
먼저 전역변수를 하나 선언할 예정인데. 아래와 같이 입력해주세요.
KafkaProducer kafkaProducer;
선언한 변수를 생성자에 주입시켜줍니다.
@Autowired
public OrderController(Environment env,OrderService orderService,KafkaProducer kafkaProducer){
this.env = env;
this.orderService = orderService;
this.kafkaProducer = kafkaProducer;
}
이제 거의 끝나가네요 마지막으로 createOrder를 조금 수정하려합니다.
Service를 호출해야 아무래도 정보를 불러올거 같네요, KafkaProducer.send에 필요했던 정보를 입력해줍니다. topic명은 저희가 Consumer에서 지정했던 example-category-topic으로 지정하고, orderDto에 담겨있는 주문 정보 또한 지정해줍니다.
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
@RequestBody RequestOrder orderDetails){
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
/*jpa*/
OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
orderDto.setUserId(userId);
OrderDto createdOrder = orderService.createOrder(orderDto);
ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);
/* send this order */
kafkaProducer.send("example-catalog-topic",orderDto);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
완성!
마치며...
원래는, Test Case까지 추가해 작성하려 했으나 포스팅이 너무 길어지며 루즈해질거 같아
다음 포스팅에서 조금 더 자세하게 다뤄보려 합니다.
감사합니다.
'웹 프로그래밍 > MSA 학개론' 카테고리의 다른 글
[MSA] 잠시 쉬어가는 Mutiple Service에서의 동기화 문제 (0) | 2022.05.03 |
---|---|
[MSA] OrderService , CategoryService Kafka 연동 테스트 (0) | 2022.05.02 |
[MSA] Kafka Connect (Sink) 사용 (0) | 2022.05.02 |
[MSA] Kafka Connect ( Source ) 사용 (0) | 2022.05.02 |
[MSA] Kafka Connect 설치 (0) | 2022.04.28 |
- Total
- Today
- Yesterday
- Feign
- github
- rabbitmq
- 운동
- Gateway
- 루틴기록
- docker
- prometheus
- MSA
- zipkin
- 빅-오
- UserService
- config
- Logstash to ElasticSearch
- springcloud
- producer
- elasticSearch
- JWT
- Spring + ELK
- Kafka Connect
- 오늘저녁 삼겹살
- 미래의나에게동기부여
- LoadBalancer
- kafka
- ACTUATOR
- 운동일기
- consumer
- MariaDB
- git
- Logstash 활용
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |