티스토리 뷰

#개요

이제 고도화 작업은 거의 끝나 갑니다. 이번 시간에는 Order를 위한 Sink Connection을 만들고, 주문 입력 시 Topic에 해당 정보를 전달하고, 다시 해당 정보를 DB에 저장하는 과정을 구현하려합니다. 예전에 언급 드린적있듯이, Kafka를 통해서 어느 포트에서 오든 메시지를 저장하고 하나의 단일 DB에 저장하기 위한 과정을 구현하려 하는 거죠! 

+ 이번 포스팅은 많이 길어질 것 같습니다. 아무래도 마무리 단계이기도 하고 중요한 부분도 있어서 많은 부분을 적어야 할       것 같네요. 지루하시다면 마지막에 최종코드를 첨부할테니 확인해주세요!


#OrderController.java(Order-Service)

#완성 코드

package com.example.orderservice.controller;

import com.example.orderservice.dto.OrderDto;
import com.example.orderservice.jpa.OrderEntity;
import com.example.orderservice.messagequeue.KafkaProducer;
import com.example.orderservice.messagequeue.OrderProducer;
import com.example.orderservice.service.OrderService;
import com.example.orderservice.vo.RequestOrder;
import com.example.orderservice.vo.ResponseOrder;
import org.modelmapper.ModelMapper;
import org.modelmapper.convention.MatchingStrategies;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

@RestController
@RequestMapping("/order-service")
public class OrderController {
    Environment env;
    OrderService orderService;
    KafkaProducer kafkaProducer;

    OrderProducer orderProducer;

    @Autowired
    public OrderController(Environment env,
                           OrderService orderService,
                           KafkaProducer kafkaProducer,
                           OrderProducer orderProducer){
        this.env = env;
        this.orderService = orderService;
        this.kafkaProducer = kafkaProducer;
        this.orderProducer = orderProducer;
    }
    @PostMapping("/{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
                                                    @RequestBody RequestOrder orderDetails){
        ModelMapper mapper = new ModelMapper();
        mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
        orderDto.setUserId(userId);

        /*kafka*/
        orderDto.setOrderId(UUID.randomUUID().toString());
        orderDto.setTotalPrice(orderDetails.getQty() * orderDetails.getUnitPrice());

        /* send this order */
        kafkaProducer.send("example-catalog-topic",orderDto);
        orderProducer.send("orders", orderDto);


        ResponseOrder responseOrder = mapper.map(orderDto,ResponseOrder.class);
        return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);

    }
    @GetMapping("/{userId}/orders")
    public ResponseEntity<List<ResponseOrder>> getOrder(@PathVariable("userId") String userId){
        Iterable<OrderEntity> orderList = orderService.getOrdersByUserId(userId);

        List<ResponseOrder> result = new ArrayList<>();
        orderList.forEach(v->{
            result.add(new ModelMapper().map(v,ResponseOrder.class));
        });

        return ResponseEntity.status(HttpStatus.OK).body(result);
    }

}

 

먼저 위에 코드에서 핵심 적인 부분을 먼저 말씀드리려 합니다.

@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
                                                @RequestBody RequestOrder orderDetails){
    ModelMapper mapper = new ModelMapper();
    mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

    OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
    orderDto.setUserId(userId);

    /*kafka*/
    orderDto.setOrderId(UUID.randomUUID().toString());
    orderDto.setTotalPrice(orderDetails.getQty() * orderDetails.getUnitPrice());

    /* send this order */
    kafkaProducer.send("example-catalog-topic",orderDto);
    orderProducer.send("orders", orderDto);


    ResponseOrder responseOrder = mapper.map(orderDto,ResponseOrder.class);
    return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);

}

아마 코드를 확인해보시면 JPA라고 주석 처리된 부분이 사라진 것을 알 수 있습니다. 이제 더 이상 JPA를 활용해서 DB와 연동되는 것이 아니기 때문에 해당 부분을 저는 삭제했습니다. Controller에서는 크게 달라진 것은 없지만, 핵심적인 것은 orderProducer를 통해서 Topic에 Data를 전달하는 부분입니다. 그래서 orderProducer를 통해 Send를 해줬습니다.

생성자를 통해 OrderProducer를 생성하는 것을 잊지말아주세요. 아직 해당 클래스가 없기 때문에 오류 나는 것은 정상입니다.

    Environment env;
    OrderService orderService;
    KafkaProducer kafkaProducer;

    OrderProducer orderProducer;

    @Autowired
    public OrderController(Environment env,
                           OrderService orderService,
                           KafkaProducer kafkaProducer,
                           OrderProducer orderProducer){
        this.env = env;
        this.orderService = orderService;
        this.kafkaProducer = kafkaProducer;
        this.orderProducer = orderProducer;
    }

 


#DTO Class 생성

다음으로는 Class를 4개정도 추가하려합니다. 위치는 DTO Package 밑에 생성해주세요.

Order-service Dto package

해당 코드를 작성하기전 왜 이렇게 작성해야하는 건지 말씀드리고 가겠습니다.


해당 Dto들을 작성하기전 Topic의 구조에 대해 먼저 이해를 해야합니다. Kafka에서 topic안에 있는 정보를 아무거나 하나 보게되면 아래와 같은 구조를 확인해볼 수 있습니다.

Kafka 설치 폴더로 이동 후 아래의 Command 를 통해서 확인 할 수 있습니다.

! topic은 현재 Kafka서버에 있는 메시지가 있느 topic이면 어느것이든 상관 없습니다. 파란색 부분이 Topic명을 지정하는 부분입니다.

./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning

-해당 이미지는 Deserializer로 보기 쉽게 바꾼 항목입니다. 원래는 직렬화되어있습니다.-

그래서 진짜 짧게 설명하면 Dto Class를 만드는 이유는 해당 Topic을 운용하기 위한 Bean은 만드는 것이기에, 해당 형식에 맞춰서 Dto를 만드는 것입니다. 실제 코드는 아래와 같으니 대조하면서 확인해보세요.


#Field

package com.example.orderservice.dto;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class Field {
    private String type;
    private boolean optional;
    private String field;
}

Field값을 받기 위한 부분입니다. 위에 이미지와 겹치는 부분이 있습니다.

조금의 설명을 덧붙히자면, Field는 저장할 데이터의 타입을 지정하는 공간이라고 생각하시면 편할 것 같습니다. 

굉장히 유사하죠!?

#Payload

package com.example.orderservice.dto;

import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class Payload {
    private String order_id;
    private String user_id;
    private String product_id;
    private int qty;
    private int unit_price;
    private int total_price;
}

 

Pay Load도 마찬가지입니다 ㅋㅋ 누워서 떡먹기네요.다만 아래의 Payload는 아무 topic에서 가져왔기에 위에 Dto와는 조금 다릅니다. 아래의 Topic은 아마도 회원 정보와 관련된 Payload 같네요. 이처럼 Payload는 실질적으로 저장할 데이터를 가지고 있습니다. 그러니 Topic마다 다른 Payload를 가지고 있습니다!

#Schema

package com.example.orderservice.dto;

import lombok.Builder;
import lombok.Data;

import java.util.List;

@Data
@Builder // 필요한 필드를 직관적으로 지정 함
public class Schema {
    private String type;
    private List<Field> fields;
    private Boolean optional;
    private String name;


}

 

Fields는 Schema에 포함되어 Json을 보시면 Shcema.fields형태로 되어 있습니다. 그렇기에 List를 통해서 Field정보를 받받아 오는 과정을 거쳐야 합니다. 

Fileds는 Schema에 포함

#KafkaOrderDto

package com.example.orderservice.dto;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.Serializable;

@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
    private Schema schema;
    private Payload payload;
}

 

따로 설명드릴 건... 없는 것 같네요 후에 정보를 담는 그릇 정도로 생각하시면 좋을 것 같습니다.


#OrderProducer

다음으로는 Producer를 하나 만들어주려 합니다. 저희는 이제 주문에 대한 Producer가 필요하니 messagequeue ClassOrderProducer라는 Class를 추가해주려합니다.

#OrderProducer.java

package com.example.orderservice.messagequeue;

import com.example.orderservice.dto.*;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.Arrays;
import java.util.List;

@Service
@Slf4j
public class OrderProducer {
    private KafkaTemplate<String,String> kafkaTemplate;

    List<Field> fields =Arrays.asList(new Field("string",true,"order_id"),
            new Field("string",true,"user_id"),
            new Field("string",true,"product_id"),
            new Field("int32",true,"qty"),
            new Field("int32",true,"unit_price"),
            new Field("int32",true,"total_price"));

    Schema schema = Schema.builder()
            .type("struct")
            .fields(fields)
            .optional(false)
            .name("orders")
            .build();

    @Autowired
    public OrderProducer(KafkaTemplate<String,String> kafkaTemplate){
        this.kafkaTemplate = kafkaTemplate;
    }

    public OrderDto send(String topic, OrderDto orderDto){
        Payload payload = Payload.builder()
                .order_id(orderDto.getOrderId())
                .user_id(orderDto.getUserId())
                .product_id(orderDto.getProductId())
                .qty(orderDto.getQty())
                .unit_price(orderDto.getUnitPrice())
                .total_price(orderDto.getTotalPrice())
                .build();

        KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema,payload);
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try{
            jsonInString = mapper.writeValueAsString(kafkaOrderDto);
        }catch (JsonProcessingException ex){
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("성공적으로 메시지를 보냈어요! 나이스하네요 정보:" + kafkaOrderDto);

        return orderDto;

    }
}

 

 

이 코드에 대해서 설명을 해야하는데 너무기네요... 하지만.... 절대로 하겠습니다.

 

자 처음으로 확인해야 할 부분은 Fields 부분입니다. 해당 부분은 따로 데이터를 받아오는게 아닌 Field 설정같은 부분이에요. 그래서 직접 컬럼 정보를 기입해줘야합니다.

List<Field> fields =Arrays.asList(new Field("string",true,"order_id"),
        new Field("string",true,"user_id"),
        new Field("string",true,"product_id"),
        new Field("int32",true,"qty"),
        new Field("int32",true,"unit_price"),
        new Field("int32",true,"total_price"));

Field클래스에서는 @AllArgsConstructor Annotation을 통해 이미 생성자가 생성되어 있습니다. 그렇기에 

new Field("string",true,"user_id"),

다음과 같은 형태로 바로 Filed를 채워 줄 수 있습니다. 그럼 fields에는 아마 저희가 생각한 컬럼 정보가 이제 모두 담기게 됩니다. 

다음으로는 Schema입니다. 

Schema schema = Schema.builder()
        .type("struct")
        .fields(fields)
        .optional(false)
        .name("orders")
        .build();

Schema에서는 @Builder(Lombok) Annotation을 등록했는데 사용용도는 @AllArgsConstructor 이랑 거의 비슷하지만, 사용 방법이 조금더 직관적인게 특징입니다. 보시는 것처럼 해당클래스에 .builder()를 통해서 build를 시작하고 마지막에 build()를 통해서 생성을 합니다. 중간에 들어가있는 정보는 schema안에 있는 인자들입니다. 저런식으로 직관적으로 주입이 가능해요! 

.fields(fields)

그리고 저희가 설정한 fields 정보는 기입을 해줍니다.

그리고 실질적으로 저장할 메시지가 담겨잇는 Payload는 send method안에 있어야 합니다. orderDto의 정보가 필요하니까요! 마찬가지로 Payload도 .builder()를 통해서 시작합니다. 그리고 orderDto가 가지고 있는 정보를 각각 담아줍니다.

Payload payload = Payload.builder()
        .order_id(orderDto.getOrderId())
        .user_id(orderDto.getUserId())
        .product_id(orderDto.getProductId())
        .qty(orderDto.getQty())
        .unit_price(orderDto.getUnitPrice())
        .total_price(orderDto.getTotalPrice())
        .build();

 

그리고 kafkaOrderDto에 해당 내용을 담아줍니다.

KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema,payload);

 

그리고 MessageString 형태로 보내야 하니 형 변환 한 뒤 KafkaTemplate를 통해 send하는 건 저번에도 했던 과정입니다! 

ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try{
    jsonInString = mapper.writeValueAsString(kafkaOrderDto);
}catch (JsonProcessingException ex){
    ex.printStackTrace();
}

kafkaTemplate.send(topic, jsonInString);

1차적으로 수정은 여기까지면 완료입니다.


#Create Sink Connect

다음으로는 Kafka Sink Connector를 수정하려 합니다. 

Zookeeper, Kafka, Connect Server를 모두 실행해줍니다. 

127.0.0.1:8083/connectors URL을 POST타입으로 바꾼뒤 다음과 같은 Body 를 넣어줍니다. 

{
    "name":"my-order-sink-connect",
    "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://127.0.0.1:3307/mydb",
        "connection.user":"root",
        "connection.password":"test1357",
        "auto.create":"true",
        "auto.evolve":"true",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"orders"
    }
}

 

정상적으로 생성되었을 때 화면

해당 connect 정상 동작중인지 확인.


#마치며

이번 포스팅이 너무 길어진 관계로, Test는 다음 포스트에서 다룰려 합니다. 이제 해당 실습만 마친다면,  기능적인 부분은 거의 완료가 된 것 입니다. 보시면서, 문제가 있는 부분은 알려주시면 수정 하도록 하겠습니다.

 

 감사합니다.

-참고 강의-

 

Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) - 인프런 | 강의

Spring framework의 Spring Cloud 제품군을 이용하여 마이크로서비스 애플리케이션을 개발해 보는 과정입니다. Cloud Native Application으로써의 Spring Cloud를 어떻게 사용하는지, 구성을 어떻게 하는지에 대해

www.inflearn.com

 

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함