티스토리 뷰
#개요
이제 고도화 작업은 거의 끝나 갑니다. 이번 시간에는 Order를 위한 Sink Connection을 만들고, 주문 입력 시 Topic에 해당 정보를 전달하고, 다시 해당 정보를 DB에 저장하는 과정을 구현하려합니다. 예전에 언급 드린적있듯이, Kafka를 통해서 어느 포트에서 오든 메시지를 저장하고 하나의 단일 DB에 저장하기 위한 과정을 구현하려 하는 거죠!
+ 이번 포스팅은 많이 길어질 것 같습니다. 아무래도 마무리 단계이기도 하고 중요한 부분도 있어서 많은 부분을 적어야 할 것 같네요. 지루하시다면 마지막에 최종코드를 첨부할테니 확인해주세요!
#OrderController.java(Order-Service)
#완성 코드
package com.example.orderservice.controller;
import com.example.orderservice.dto.OrderDto;
import com.example.orderservice.jpa.OrderEntity;
import com.example.orderservice.messagequeue.KafkaProducer;
import com.example.orderservice.messagequeue.OrderProducer;
import com.example.orderservice.service.OrderService;
import com.example.orderservice.vo.RequestOrder;
import com.example.orderservice.vo.ResponseOrder;
import org.modelmapper.ModelMapper;
import org.modelmapper.convention.MatchingStrategies;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@RestController
@RequestMapping("/order-service")
public class OrderController {
Environment env;
OrderService orderService;
KafkaProducer kafkaProducer;
OrderProducer orderProducer;
@Autowired
public OrderController(Environment env,
OrderService orderService,
KafkaProducer kafkaProducer,
OrderProducer orderProducer){
this.env = env;
this.orderService = orderService;
this.kafkaProducer = kafkaProducer;
this.orderProducer = orderProducer;
}
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
@RequestBody RequestOrder orderDetails){
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
orderDto.setUserId(userId);
/*kafka*/
orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(orderDetails.getQty() * orderDetails.getUnitPrice());
/* send this order */
kafkaProducer.send("example-catalog-topic",orderDto);
orderProducer.send("orders", orderDto);
ResponseOrder responseOrder = mapper.map(orderDto,ResponseOrder.class);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
@GetMapping("/{userId}/orders")
public ResponseEntity<List<ResponseOrder>> getOrder(@PathVariable("userId") String userId){
Iterable<OrderEntity> orderList = orderService.getOrdersByUserId(userId);
List<ResponseOrder> result = new ArrayList<>();
orderList.forEach(v->{
result.add(new ModelMapper().map(v,ResponseOrder.class));
});
return ResponseEntity.status(HttpStatus.OK).body(result);
}
}
먼저 위에 코드에서 핵심 적인 부분을 먼저 말씀드리려 합니다.
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
@RequestBody RequestOrder orderDetails){
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
orderDto.setUserId(userId);
/*kafka*/
orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(orderDetails.getQty() * orderDetails.getUnitPrice());
/* send this order */
kafkaProducer.send("example-catalog-topic",orderDto);
orderProducer.send("orders", orderDto);
ResponseOrder responseOrder = mapper.map(orderDto,ResponseOrder.class);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
아마 코드를 확인해보시면 JPA라고 주석 처리된 부분이 사라진 것을 알 수 있습니다. 이제 더 이상 JPA를 활용해서 DB와 연동되는 것이 아니기 때문에 해당 부분을 저는 삭제했습니다. Controller에서는 크게 달라진 것은 없지만, 핵심적인 것은 orderProducer를 통해서 Topic에 Data를 전달하는 부분입니다. 그래서 orderProducer를 통해 Send를 해줬습니다.
생성자를 통해 OrderProducer를 생성하는 것을 잊지말아주세요. 아직 해당 클래스가 없기 때문에 오류 나는 것은 정상입니다.
Environment env;
OrderService orderService;
KafkaProducer kafkaProducer;
OrderProducer orderProducer;
@Autowired
public OrderController(Environment env,
OrderService orderService,
KafkaProducer kafkaProducer,
OrderProducer orderProducer){
this.env = env;
this.orderService = orderService;
this.kafkaProducer = kafkaProducer;
this.orderProducer = orderProducer;
}
#DTO Class 생성
다음으로는 Class를 4개정도 추가하려합니다. 위치는 DTO Package 밑에 생성해주세요.
해당 코드를 작성하기전 왜 이렇게 작성해야하는 건지 말씀드리고 가겠습니다.
해당 Dto들을 작성하기전 Topic의 구조에 대해 먼저 이해를 해야합니다. Kafka에서 topic안에 있는 정보를 아무거나 하나 보게되면 아래와 같은 구조를 확인해볼 수 있습니다.
Kafka 설치 폴더로 이동 후 아래의 Command 를 통해서 확인 할 수 있습니다.
! topic은 현재 Kafka서버에 있는 메시지가 있느 topic이면 어느것이든 상관 없습니다. 파란색 부분이 Topic명을 지정하는 부분입니다.
./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning
-해당 이미지는 Deserializer로 보기 쉽게 바꾼 항목입니다. 원래는 직렬화되어있습니다.-
그래서 진짜 짧게 설명하면 Dto Class를 만드는 이유는 해당 Topic을 운용하기 위한 Bean은 만드는 것이기에, 해당 형식에 맞춰서 Dto를 만드는 것입니다. 실제 코드는 아래와 같으니 대조하면서 확인해보세요.
#Field
package com.example.orderservice.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class Field {
private String type;
private boolean optional;
private String field;
}
Field값을 받기 위한 부분입니다. 위에 이미지와 겹치는 부분이 있습니다.
조금의 설명을 덧붙히자면, Field는 저장할 데이터의 타입을 지정하는 공간이라고 생각하시면 편할 것 같습니다.
#Payload
package com.example.orderservice.dto;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class Payload {
private String order_id;
private String user_id;
private String product_id;
private int qty;
private int unit_price;
private int total_price;
}
Pay Load도 마찬가지입니다 ㅋㅋ 누워서 떡먹기네요.다만 아래의 Payload는 아무 topic에서 가져왔기에 위에 Dto와는 조금 다릅니다. 아래의 Topic은 아마도 회원 정보와 관련된 Payload 같네요. 이처럼 Payload는 실질적으로 저장할 데이터를 가지고 있습니다. 그러니 Topic마다 다른 Payload를 가지고 있습니다!
#Schema
package com.example.orderservice.dto;
import lombok.Builder;
import lombok.Data;
import java.util.List;
@Data
@Builder // 필요한 필드를 직관적으로 지정 함
public class Schema {
private String type;
private List<Field> fields;
private Boolean optional;
private String name;
}
Fields는 Schema에 포함되어 Json을 보시면 Shcema.fields형태로 되어 있습니다. 그렇기에 List를 통해서 Field정보를 받받아 오는 과정을 거쳐야 합니다.
#KafkaOrderDto
package com.example.orderservice.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
private Schema schema;
private Payload payload;
}
따로 설명드릴 건... 없는 것 같네요 후에 정보를 담는 그릇 정도로 생각하시면 좋을 것 같습니다.
#OrderProducer
다음으로는 Producer를 하나 만들어주려 합니다. 저희는 이제 주문에 대한 Producer가 필요하니 messagequeue Class에 OrderProducer라는 Class를 추가해주려합니다.
#OrderProducer.java
package com.example.orderservice.messagequeue;
import com.example.orderservice.dto.*;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.List;
@Service
@Slf4j
public class OrderProducer {
private KafkaTemplate<String,String> kafkaTemplate;
List<Field> fields =Arrays.asList(new Field("string",true,"order_id"),
new Field("string",true,"user_id"),
new Field("string",true,"product_id"),
new Field("int32",true,"qty"),
new Field("int32",true,"unit_price"),
new Field("int32",true,"total_price"));
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();
@Autowired
public OrderProducer(KafkaTemplate<String,String> kafkaTemplate){
this.kafkaTemplate = kafkaTemplate;
}
public OrderDto send(String topic, OrderDto orderDto){
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema,payload);
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try{
jsonInString = mapper.writeValueAsString(kafkaOrderDto);
}catch (JsonProcessingException ex){
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("성공적으로 메시지를 보냈어요! 나이스하네요 정보:" + kafkaOrderDto);
return orderDto;
}
}
이 코드에 대해서 설명을 해야하는데 너무기네요... 하지만.... 절대로 하겠습니다.
자 처음으로 확인해야 할 부분은 Fields 부분입니다. 해당 부분은 따로 데이터를 받아오는게 아닌 Field 설정같은 부분이에요. 그래서 직접 컬럼 정보를 기입해줘야합니다.
List<Field> fields =Arrays.asList(new Field("string",true,"order_id"),
new Field("string",true,"user_id"),
new Field("string",true,"product_id"),
new Field("int32",true,"qty"),
new Field("int32",true,"unit_price"),
new Field("int32",true,"total_price"));
Field클래스에서는 @AllArgsConstructor Annotation을 통해 이미 생성자가 생성되어 있습니다. 그렇기에
new Field("string",true,"user_id"),
다음과 같은 형태로 바로 Filed를 채워 줄 수 있습니다. 그럼 fields에는 아마 저희가 생각한 컬럼 정보가 이제 모두 담기게 됩니다.
다음으로는 Schema입니다.
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();
Schema에서는 @Builder(Lombok) Annotation을 등록했는데 사용용도는 @AllArgsConstructor 이랑 거의 비슷하지만, 사용 방법이 조금더 직관적인게 특징입니다. 보시는 것처럼 해당클래스에 .builder()를 통해서 build를 시작하고 마지막에 build()를 통해서 생성을 합니다. 중간에 들어가있는 정보는 schema안에 있는 인자들입니다. 저런식으로 직관적으로 주입이 가능해요!
.fields(fields)
그리고 저희가 설정한 fields 정보는 기입을 해줍니다.
그리고 실질적으로 저장할 메시지가 담겨잇는 Payload는 send method안에 있어야 합니다. orderDto의 정보가 필요하니까요! 마찬가지로 Payload도 .builder()를 통해서 시작합니다. 그리고 orderDto가 가지고 있는 정보를 각각 담아줍니다.
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
그리고 kafkaOrderDto에 해당 내용을 담아줍니다.
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema,payload);
그리고 Message는 String 형태로 보내야 하니 형 변환 한 뒤 KafkaTemplate를 통해 send하는 건 저번에도 했던 과정입니다!
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try{
jsonInString = mapper.writeValueAsString(kafkaOrderDto);
}catch (JsonProcessingException ex){
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
1차적으로 수정은 여기까지면 완료입니다.
#Create Sink Connect
다음으로는 Kafka Sink Connector를 수정하려 합니다.
Zookeeper, Kafka, Connect Server를 모두 실행해줍니다.
127.0.0.1:8083/connectors URL을 POST타입으로 바꾼뒤 다음과 같은 Body 를 넣어줍니다.
{
"name":"my-order-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://127.0.0.1:3307/mydb",
"connection.user":"root",
"connection.password":"test1357",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"orders"
}
}
해당 connect 정상 동작중인지 확인.
#마치며
이번 포스팅이 너무 길어진 관계로, Test는 다음 포스트에서 다룰려 합니다. 이제 해당 실습만 마친다면, 기능적인 부분은 거의 완료가 된 것 입니다. 보시면서, 문제가 있는 부분은 알려주시면 수정 하도록 하겠습니다.
감사합니다.
-참고 강의-
'웹 프로그래밍 > MSA 학개론' 카테고리의 다른 글
[MSA] Zipkin 개요 & 설치방법 (0) | 2022.05.03 |
---|---|
[MSA] Orders Microservice 고도화 - Order Kafka Producer TEST (0) | 2022.05.03 |
[MSA] Kafka Connect를 활용한 단일 데이터베이스 활용 (0) | 2022.05.03 |
[MSA] 잠시 쉬어가는 Mutiple Service에서의 동기화 문제 (0) | 2022.05.03 |
[MSA] OrderService , CategoryService Kafka 연동 테스트 (0) | 2022.05.02 |
- Total
- Today
- Yesterday
- zipkin
- MariaDB
- docker
- springcloud
- Feign
- consumer
- prometheus
- Spring + ELK
- config
- Kafka Connect
- 미래의나에게동기부여
- rabbitmq
- git
- MSA
- Logstash 활용
- JWT
- LoadBalancer
- Gateway
- 운동일기
- kafka
- elasticSearch
- producer
- github
- 운동
- ACTUATOR
- 오늘저녁 삼겹살
- 루틴기록
- 빅-오
- UserService
- Logstash to ElasticSearch
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |