티스토리 뷰

 저번 시간에는 Kafka Source Connect에 대해서 알아봤습니다.

Sink 는 Source와 다르게 메시지를 소비하는 역할을 하는데, 사실 저희가 Kafka를 시작하면서 배웠던 Producer와 Consumer의 개념 이랑 비슷합니다. 이제 가지고 있는 메시지를 소비하는 역할를 하는게 Sink라고 생각하시면 될 것 같습니다.

 


#개요

실제로 Kafka Sink Connect를 구축하고, Kafka에서 DB로 저장하는 로직을 구현하려 합니다.


#Sink Connect 생성

Sink Connect를 생성하는 방식은 저번에 진행했던 Source Connect의 생성방법과 크게 다르지 않습니다.

먼저 Connect 서버를 실행합니다. (기본적으로 Zookeeper와 Kafka가 실행된 상태에서 실행해주세요.) 

만약 connect가 활성화 되어 있지 않다면 다음과 같이 재실행을 시켜줍니다.
.\bin\windows\connect-distributed.bat .\etc\kafka\connect-distributed.properties

 

먼저 Post-Man을 켜서 Body를 입력해줍니다. 

{
    "name":"my-sink-connect",
    "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://localhost:3306/mydb",
        "connection.user":"root",
        "connection.password":"test1357",
        "auto.create":"true",
        "auto.evolve":"true",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"my_topic_users"
    }
}

auto.create를 통해서 테이블을 자동으로 생성할 예정입니다. 

그리고 Topic명은 저희가 저번시간에 생성했던 Topic으로 골라줍니다. 이제 해당 Topic명을 바탕으로 테이블을 참조해서, 데이터를 보내겠지만, 새로 생성할 필요는 없습니다. 앞서 말씀드린 것처럼 auto.create를 통해 자동으로 생성이 되니까요! 그리고 해당 Body정보를 바탕으로  아래의 URL에 POST방식으로 요청을 보내면 됩니다.

localhost:8083/connectors

응답이 다음과 같이 온다면 성공입니다.


#Table Check

그리고 다시 돌아와 Table 정보를 살펴보면 다음과 같이 바뀌어 있습니다.

my_topic_users가 추가된 것을 확인 할 수 있습니다.

그리고 여기서... Select를 하면 무슨일이 일어날까요?

음.. 딱히 my_topic_users에 Insert한 일이 없으니 비어있겠죠?

하지만 결과를 보면 아래와 같습니다. 

꺄아아아아아아아아악!!

놀랍게도 다음과 같이 user에 있는 유저 정보와 똑같은 값이 이미 입력되어 있는 것을 확인 하실 수 있습니다. 이는 Sink Connect를 만들게 되면, Topic에 내용을 바탕으로 Insert가 된다는 것을 알 수 있습니다. 정말 그런지 확인해 볼까요?

Insert문을 이용해서 간단한 회원정보를 등록해 주세요. 저희는 분명.. Users에 데이터를 입력했습니다.

테이블에 제대로 입력이 됐는지 Select문을 활용해 확인해줍니다. 

마찬가지로 Sink Connect를 통해 연결 되어 있는 DB도 확인해볼까요?...

꺄아아아아아앙아아아악!!!

정말 이럴수가... Sink Connect를 통해서 굳이 DB에 Insert를 하지않아도, Topic의 정보를 이용해서 DB에 Insert를 하고 있습니다. 정말 신기하네요. 그리고 여러분들이 사용하면서 느끼셨겠지만, 코딩이라고 생각되는 부분은 하나도 없었을 겁니다. 이렇듯 Connect를 사용하면 코딩을 하지않아도 능동적인 DB연동이 될 뿐 아니라, Source와 Sink를 사용하여, DB적재도 훨씬 간편하게 할 수 있습니다. 

 

그럼 무조건 Source가 있어야 하냐 물어보시면, 그건 아닙니다. Sink만을 가지고 Data를 적재하고 싶을 때 아래와 같이 Topic의 정보를 먼저 가져와주세요.

 

{schema 로 시작하는 항목부터 복사를 해줍니다. 지금 아마 토픽에 2개의 메시지가 있으니 schema로 시작하는 부분이 2개일텐데 둘중 아무거나 복사해 하셔도 상관업습니다. 

그 뒤에 맨 뒤에 id,user_id,pwd,name을 변경해줍니다. 아래에 예시를 보여드리겠습니다. 여러분은 Payload부분만 수정해주시면 됩니다. 

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"pwd"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"created_at"}],"optional":false,"name":"users"},"payload":{"id":3,"user_id":"change","pwd":"change","name":"parkganggyun_changed","created_at":1651498425000}}

맨 오른쪽에 있어요 !! 영차영차

id는 현재 저는 DB에 2까지 있어서 3으로 했고 뒤에 있는 user_id,pwd,name은 임의로 원하는 값으로 넣어주세요.

그리고 Producer를 실행시킵니다.

cd [Kafka 설치 폴더]

./bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic my_topic_users

 

마지막으로 수정한 값을 복사후 해당 메시지에 넣어줍니다. 오류가 없이 실행돼셨나요? 그럼 DB를 확인해 보겠습니다.

먼저 저희는 Sink에서 바로 주입했기 때문에 당연하게도 원본 테이블인 User에는 해당 정보가 입력되어 있지 않습니다.

하지만 Sink Connect 되어 있는 Table은 아래와 같이 Change항목이 업데이트 된 것을 확인 할 수 있습니다. 개인적으로 Producer는 Source의 개념으로 Consumer는 Sink의 개념으로 기억하면, 나중에 활용할 때 큰 도움이 될 것 같았습니다. 이것으로 Apache Kafka의 기본적인 활용 방법은 마무리입니다. 

와아아아아아아아아아ㅏㅏㅏㅏ!!!

하지만 아직 활용을 안해봤죠? 그래서 이제 기존에 만든 Ecommerce-Application에 Apache Kafka를 적용 시킬 예정입니다.

 

감사합니다..

 

 

 

 

 

 

 

 

 

 

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함