본문 바로가기

도서/스프링으로하는 마이크로서비스 구축

Messaging System - Apache kafka

이번 포스팅은 저번 포스팅에 이어서 두 번째 주제인 Event Driven Sysmtem 중 두 번째 주제인 메시징 시스템과 kafka 대해 정리해보려고 합니다.

 

이번 포스팅을 위해 많은 글과 영상 등을 보면서 중구난방 했던 정보들을 체계적으로 정리하기 위해서 많은 노력이 있었던 것 같습니다. 일반적으로 kafka가 뭐지?라는 식으로 구글링을 하면 대뜸 토픽, 소비자/생성자, 브로커, 클러스터 등 개념에 대한 설명이 난무했습니다. 저 단어들이 핵심 단어이라는 것은 나중에서야 알게 되었습니다. 그 이유는 먼저 메시징 시스템이라는 전반적인 이해가 필요했기 때문입니다.

 

대부분의 포스팅 글 들은 카프카가 뭔지, 어떻게 동작하는지에 대한 설명들이 조금 아쉬운 경향이 있었다고 생각합니다. (이렇게 생각하는 이유는 단지 저를 이해시키지 못했다고 때문입니다.) 이번 포스팅에서는 저는 이 어려운 카프카에 대해 어떻게 이해해 나갔는지 차근차근 단계를 거쳐 학습한 내용을 정리해 볼 예정입니다.

 

Overview of a Messaging System

메시징 시스템이라는 것은 말 그대로 메시지를 주고 받는 시스템을 의미합니다. 익숙한 RESTful API와 가장 큰 차이점은 메시징 시스템은 비동기적 특징이 강하다는 것입니다. RESTful API는 클라이언트의 요청에 대해 서버가 작업을 처리하고 응답을 보내는 일련의 과정이 동기적으로 처리됩니다. 반면 메시징 시스템은 메시지를 전송하는 쪽에서 응답을 기다리지 않습니다. 

 

이메일은 메시징 시스템을 보여주는 가장 보편적인 예시입니다. 이메일을 작성하고 전송을 완료한 이후에 우리는 메일의 회신이 올 때까지 대기하지 않습니다. 만약 그 메일을 처리하고 응답하는 것까지 대기해야만 한다면 우리는 이메일을 사용하지 않을 것입니다. 메시지는 처리될 것으로 기대하기에 우리는 전송을 완료하고 곧바로 다른 일을 수행할 수 있습니다. 

 

이처럼 메시징 시스템은 비동기적 성격을 강하게 띠기 때문에 기존의 RESTful API에 익숙한 개발을 해왔다면 메시징 시스템이 조금 낯설게 느껴질 수 있을 것 같습니다. 이제부터는 메시징 시스템의 핵심 개념들에 대해서 하나씩 학습한 내용을 정리해보려고 합니다. 

 

대체로 모든 메시징 시스템은 세 가지 구성 요소가 관련되어 있고 이런 구성 요소는 인터넷에서 다양한 용어로 지칭되고 있는것 같아 보였습니다. 세 가지 구성 요소는 다음과 같습니다.

  • Message 
  • Endpoint
  • Channel

먼저 메시지는 문자열, 배열, 숫자, 객체와 같은 데이터 구조 형태들로 표현될 수 있습니다. 메시지는 작업을 요청하거나 특정 상태를 알리는 역할을 담당합니다. 만약 메시지가 작업의 처리의 성격이 강하다면, 메시지 내에 작업의 처리를 위한 추가 정보를 포함하기에 메시지의 크기가 커질 수도 있고, 메시지가 사건이나 상태 변화 알림의 성격이 강하다면 단순히 '주문이 생성되었다!'라는 식의 가벼운 정보만 포함되어 메시지의 크기가 작을 수도 있습니다. 

 

메시지의 형태는 json이 될 수도 있고 클래스 객체일 수도 있습니다. 단순 문자열이나 숫자, 배열도 가능하고 쓰임에 추상화만 잘되어 있다면 어떤 형태로 메시지를 생성하고 전송하는 것은 크게 중요하지 않아 보이는 것 같습니다. 중요한 건 메시지를 주고받는 주체들이 메시지를 어떻게 읽고 사용하는지에 대한 프로토콜이 잘 설계되어 있는 것이 중요하다고 생각합니다. 

 

두 번째는 엔드포인트 입니다. 엔드포인트는 생산자/소비자, 게시자/구독자, 발신자/수신자 등 다양한 용어로 지칭되어 사용되고 있는 것 같습니다. 명명되는 이름이 전부 다르지만, 결국 메시지를 만들고 소비하는 주체들을 어떻게 부르는지의 차이만 있을 뿐이었습니다. 가장 많이 쓰는 용어는 생산자/소비자로 메시지를 생산하는 쪽이 생산자, 메시지를 수신하는 쪽에서는 메시지를 단순히 소비해서 소비자라는 이름이 붙은 것 같습니다. 

 

마지막은 채널입니다. 메시지 소비자는 등록된 채널에서 메시지를 선택합니다. 즉, 소비자가 등록하지 않은 채널로 들어온 메시지는 소비하지 않다는 것을 의미하기도 합니다. 채널은 일반적으로 Topic 이나 Queue로 지칭되기도 하는데 Kafak에선 Topic, RabbitMQ에서는 Queue라는 용어를 선택해서 사용합니다. 

 

메시지 채널은 크게 두가지 유형이 있습니다. 

  • point to point Channel
  • publish subscribe Channel

point to point channel pattern

 

먼저 point to point 채널 패턴은 큐로 지칭됩니다. 주로 메시지의 만들고 소비하는 주체들이 각각 한 곳이며 1:1 대응적이라는 성격을 띠고 있습니다. 즉, 각 메시지는 단 하나의 수신자에 의해서만 처리되는 것을 보장하고 있습니다. 

보통 라라벨에서 작업 큐에 처리할 작업을 선언하고 대기 큐에서 작업을 하나씩 순서대로 처리하는 로직을 떠올릴 수 있다면 P2P 패턴은 이 경우에 해당됩니다. 

publish-subscribe channel pattern

두 번째는 publish-subscribe 패턴입니다. 이 패턴에서 채널은 보통 토픽(Topic)이라고 불립니다. 토픽에는 메시지가 도착하기를 기다리는 많은 구독자가 있을 가능성이 높습니다. 가장 쉽게 떠올릴 수 있는 예시는 유튜브로 유튜브의 채널을 여러 사용자가 구독하고 있고 해당 채널에 영상이 업로드되면 그 채널을 구독하고 있는 사용자들에게 영상이 업로드 됐음을 일괄적으로 알립니다. 대표적인 publish-subscribe 패턴이라고 할 수 있습니다.

메시지가 토픽에 배치되면 추가 처리를 위해 모든 소비자에게 전송됩니다. 해당 토픽을 구독하고 있던 소비자들은 메시지를 받지 않는 한 메시지는 성공적으로 소비되지 않은 것으로 간주합니다. 이 패턴의 경우 단일 토픽에 대해 여러 소비자가 메시지를 소비하고 각 소비자마다 다른 처리를 한다고 기대할 수 있습니다. 물론 한 소비자가 여러 토픽을 구독해 여러 메시지를 수신받는 것 또한 가능합니다.

 

메시징 시스템은 메시지가 전송 중간에 손실되지 않도록 보장합니다. 데이터 지속성, 전송 매체, 복구 메커니즘등과 같이 메시지를 성공적으로 전송하는 것을 궁극적으로 제공하는 시스템을 메시징 시스템이라고 합니다. 

 

메시징 시스템은 브로커(Broker), 메시지 저장소(Message Store), 채널(channel)과 같은 모든 다른 구성 요소들을 통합하고 사용자에게 모니터링 및 관리 기능을 제공합니다. 대부분의 포스팅에서는 메시지 브로커 자체를 메시징 시스템으로 간주하는 경향이 있는 것 같지만 브로커는 메시징 시스템의 구성요소입니다. 

 

이제 메시징 시스템의 각 중요 구성 요소들에 대해 더 살펴 보려고 합니다. 

  • Produer/Consumer
  • Broker & Cluster
  • Message Store
  • Topic & Partition

먼저 Producer와 Consumer에 대해 설명하고자 합니다. 이전에서 언급한것 처럼 메시지를 만들고 소비하는 주체들을 의미합니다. 메시지를 만든다면 생산자라고 부르고 생성된 메시지를 소비한다면 소비자라고 부릅니다. 

 

중요한 점은 생산자가 어떻게 토픽에 메시지를 발행하고 발행된 메시지를 소비자가 어떻게 소비하는지입니다. 그 부분은 밑에서 메시징 시스템의 각 구성요소에 대한 간단한 개념 설명 이후에 하도록 하겠습니다. 

 

다음은 Broker와 Cluster입니다. 브로커 라는 이름의 의미에서 추측할 수 있듯이 중개자 역할을 담당하고 있습니다. 중개자는 2명 이상의 대상의 거래나 커뮤니케이션에서 중간에 위치하여 상호작용을 돕는 특징이 있습니다. 메시징 시스템에서 브로커는 생산자와 소비자 사이에서 중개자 역할을 담당합니다. 생산자가 발행한 메시지를 올바른 소비자에게 전달되도록 보장하는 역할을 담당합니다. 

메시징 시스템에서 CPU 같은 역할을 하며 실제 메시지 작업을 관리하는 핵심 주체라고 이해하면 될 것 같습니다. 메시징 시스템에서는 브로커의 수를 늘려서 시스템의 부하를 분산시킬 수도 있습니다. 이렇게 브로커들의 집합을 형성할때 메시징 시스템에서는 이 집합을 클러스터라 지칭합니다. 

 

브로커는 클라우드의 인스턴스, 도커 컨테이너 일 수도 있습니다. 중요한 건 파티션 집합을 호스팅 하고 파티션에 새 메시지를 넣어주거나 지우는 작업들은 처리하는 역할을 담당하고 있습니다. 

 

다음은 Message Store 입니다. 메시지 스토어는 메시징 시스템에서 메시지를 안전하게 저장하고 관리하는 저장소 역할을 합니다. 메시지 스토어는 메시지가 수신자에게 도달할 때까지나, 지정된 기간 동안 메시지를 저장하는 역할을 담당합니다. 헷갈리기 쉬운 지점은 토픽과 메시지 스토어는 어떻게 다른가입니다. 이전의 설명에서는 생산자가 발행한 메시지는 토픽에 저장되어 해당 토픽을 구독한 소비자가 토픽으로부터 메시지를 소비한다고 설명했었습니다. 대부분의 블로그 설명등은 이와 같이 표현되고 있어서 토픽이 메시지를 저장하는 물리적 단위라고 오해하기 쉽습니다. 

 

토픽과 메시지 스토어의 차이는 메시지를 논리적 또는 물리적으로 보느냐의 차이입니다. 토픽은 메시지를 논리적으로 분류하는 개념에 가깝습니다. 이후에 설명하겠지만, 토픽 내에서는 파티션이라는 단위로 메시지를 분류하여 동일한 성격을 띠는 메시지들을 별도로 구별하고 있습니다. 메시지 스토어는 실제 메시지들이 저장되는 저장소를 의미합니다. 

 

예를 들면 토픽은 유튜브 채널과 같고 메시지 스토어는 유튜브의 데이터 베이스 역할을 담당합니다. 유튜브 채널로 영상들이 분류되어 있지만 결국 같은 데이터베이스 안에서 저장하여 관리한다는 점에서 적절한 예시인 것 같습니다.

 

마지막은 Topic 과 Partition 입니다. 토픽은 방금 말한 것처럼 메시지들을 비슷한 특징을 갖는 것들끼리 분류하는 것을 의미합니다. 토픽은 파티션이라는 단위로 나눠서 메시지를 관리합니다. 실제로 특정 토픽을 구독한 소비자는 브로커에 의해 특정 토픽의 특정 파티션에서 베시지를 읽을 수 있게 되는 것입니다. 

 

토픽과 파티션의 관계

위 이미지처럼 하나의 토픽 안에서는 3개의 파티션으로 분리해서 메시지를 받아오거나 내보내는 작업을 수행합니다. 아래에서 더 자세히 보겠지만, 토픽 내에서도 어떤 파티션에 메시지를 쓸지에 대해서 관리하는 것이 가능합니다. 

 

일반적으로 메시지에 키가 없을 경우 즉, 어떤 파티션으로 갈지 정해진 프로토콜이 없는 경우에는 파티션 간에 Round-Robin 방식으로 메시지를 할당합니다. 때문에 메시지 들간에 처리 순서는 예측할 수 없습니다. 다만, 메시지에 키를 부여하여 특정 파티션을 지정할 경우에는 파티션 간의 순서는 보장되는 것이 가능합니다. (파티션의 자료구조는 입니다.)

 


Apache Kafka

카프카는 LinkedIn에서 개발되고 Apache Software Foundation에서 오픈소스로 관리되는 분산 스트리밍 플랫폼입니다. 메시징 시스템의 핵심 개념들을 모두 구현하고 있고 RabbitMQ 와 가장 많이 비교되고 있습니다. 

 

카프카와 RabbitMQ은 둘다 메시징 시스템으로써 기능을 제공하지만, 각각의 장단점이 분명하기 때문에 필요에 맞게 선택할 수 있습니다. 

RabbitMQ의 가장 큰 장점은 메시지 전달의 우선순위를 지정하는 메시지 브로커로 설계되어 메시지의 전송 순서가 보장된다는 것입니다. 다만 그 특징 때문에 카프카에 비해 처리되는 속도가 늦다는 단점이 있습니다. 반면에서 카프카는 메시지간 우선순위를 보장하지 않지만, 매우 빠른 속도로 메시지를 처리할 수 있기 때문에 보통 분산 이벤트 스트리밍 플랫폼이나, 실시간 데이터를 처리를 원한다면 가장 좋은 선택지가 될 것입니다. 

 

카프카는 메시징 시스템의 한 종류이기 때문에 시스템의 전반적인 프로세스는 메시징 시스템의 프로세스와 거의 일치합니다. 즉, 생산자가 메시지를 발행해서 토픽의 각 파티션으로 들어가고 해당 토픽을 구독하는 소비자는 메시지가 토픽에 새롭게 들으면 브로커에 의해 소비하게 됩니다. 메시징 시스템의 전반적인 틀은 크게 다르지 않다고 할 수 있습니다. 

 

그래서 저는 카프카의 고가용성에 대해 학습한 내용을 정리하고 넘어가려고 합니다. 고가용성이란, 어플리케이션의 중단을 최소화하고 지속적으로 작동할 수 있는 능력을 의미합니다. 앞서 메시징 시스템을 정의할 때 언급했지만, 메시징 시스템은 메시지가 전송 중간에 손실되지 않도록 보장합니다. 즉, 메시지를 성공적으로 전송하는 것이 가장 중요하기 때문에 카프카는 어떻게 고가용성을 달성할 수 있는지 학습한 내용을 공유하려고 합니다. 

 

 

 

카프카의 고가용성을 이해하기 위해서는 카프카의 특징들에 대해 먼저 이해하고 있어야 합니다. 다음 목록은 위 이미지를 이해하기 위한 카프카의 특징들을 정리해 봤습니다. 이 이미지를 완전히 이해해 봅시다. 

 

먼저, 파티셔닝 전략 입니다. 앞서 설명한 것처럼 메시징 시스템에서는 토픽에 특정 메시지가 발행되면, 토픽의 파티션 중 하나로 전달됩니다. 이때 어떤 파티션으로 메시지가 전달될지는 파티셔닝 전략에 따라 결정됩니다. 대표적인 파티셔닝 전략으로는 라운드로빈, 키 기반 파티셔닝 등이 있습니다. 이 중 키 기반 파티셔닝은 메시지에 키 값을 부여하여 같은 키를 갖는 메시지는 항상 같은 파티션에 저장되도록 하는 전략입니다. 

 

다음은 카프카 클러스터와 브로커 입니다. 카프카 클러스터는 기본적으로 3개의 카프카 브로커로 구성되어 있습니다. 앞서 설명한 것처럼 브로커는 클라우드 인스턴스나 도커 컨테이너처럼 서버로써 동작합니다. 때문에 각 브로커는 고유 IP를 갖고 있고 브로커 간의 네트워크 통신도 가능합니다. 클러스터 내에서 단일 브로커가 아닌 멀티 브로커로 설계한 이유는 메시지를 분산해서 저장하기 위해서입니다. 

 

다음은 파티션의 리더-팔로워 구조입니다. 앞서 언급한 것처럼 카프카 클러스터는 여러 대의 브로커를 구성해 메시지를 분산저장합니다. 분산 저장할 수 있는 이유가 바로 파티션의 리더-팔로워 구조 덕분입니다. 리더의 역할은 모든 읽기와 쓰기 요청을 처리하는 역할을 담당합니다. 즉, 생산자가 발행한 메시지를 받고 소비자가 메시지를 사용할 수 있도록 소통을 담당합니다. 반면, 팔로워는 리더의 데이터를 복제하는 역할을 담당합니다. 파티션은 기본적으로 독립적으로 메시지를 저장하고 있습니다. 때문에 팔로워는 리더의 데이터를 주기적으로 복사하여 리더와 동일한 데이터를 유지합니다. 

 

마지막은 컨슈머 그룹 입니다. 컨슈머 그룹은 동일한 토픽을 구독하는 컨슈머 인스턴스들을 하나로 묶는 역할을 합니다. 메시지가 여러 컨슈머에서 처리되는 것을 원치 않을 수 있습니다. 만약 아래 그림에서 보는 것처럼 컨슈머 그룹 A 없이 컨슈머 인스턴스 A1, A2, A3 만 있었다면 토픽 A로 들어오는 메시지는 각 인스턴스에서 3번 처리될 것입니다. 카프카에서는 이런 상황을 막기 위해서 컨슈머 그룹을 사용해 메시지가 중복으로 처리되는 것을 방지할 수 있습니다. 그리고 각 컨슈머 인스턴스에게 특정 파티션과 1:1 대응을 시켜서 특정 파티션으로 들어오는 메시지만 처리하도록 설정하는 것도 가능합니다. 

 

이제 아래 이미지를 이해하기 위한 사전 준비가 어느 정도 된 것 같습니다. 

 

이미지에서 가장 먼저 보이는 특징 중 하나는 특정 토픽에 대해 파티션이 3개로 분리되어 있고 카프카 클러스터의 수는 3개라는 점입니다. 즉 이미지를 표로 만들면 다음과 같습니다. 이해를 돕기 위한 예시를 들어보겠습니다.

브로커 1 브로커 2 브로커 3
파티션 0 (리더) 파티션 0 (팔로워) 파티션 0 (팔로워)
파티션 1 (팔로워) 파티션 1 (리더) 파티션 1 (팔로워)
파티션 2 (팔로워) 파티션 2 (팔로워) 파티션 2 (리더)

 

예를 들어 이 카프카에서는 파티셔닝 전략으로 키 기반 파티셔닝 전략을 사용한고 있습니다. Producer1이 발행한 메시지 x는 키 번호 x001을 갖습니다. 키 x001는 항상 파티션 1로 가야 합니다. (약속이라고 가정) 리터 파티션 1은 브로커 2에 있기 때문에 메시지 x는 브로커 2의 파티션 1로 갑니다. 이후 Consumer A1는 파티션에 메시지가 있는 경우 그 메시지를 처리할 수 있습니다. 

Producer2에서 생성된 메시지 y는 파티션 2로 가야 할 때 어느 브로커로 가야 할까요? 잘 알고 계시듯 브로커 1로 가게 됩니다. 

 

이처럼 파티션의 리더 팔로워 구조에서 리더 파티션은 메시지를 직접 처리하고 팔로워는 리더의 데이터를 항상 복제하여 저장하는 역할을 담당합니다. 만약 리더 브로커에서 장애가 발생할 경우 카프카는 팔로워 브로커 중 하나를 골라 리더로 승격시킵니다. 이 작업은 카프카 컨트롤러가 담당합니다. 카프카 컨트롤러는 카프카 클러스터에서 메타데이터를 관리하는 특수한 브로커입니다. 파티션의 리더를 선출하고 브로커 장애를 감지하는 등의 역할을 담당하고 있습니다. 

 

정리하자면, 파티션의 리더-팔로워 구조는 덕분에 카프카 클러스터는 고가용성을 유지하고 장애가 발생했을 때 데이터 손실을 최소화할 수 있습니다. 


 

Spring  for Apache Kafka (Github)

이 포스팅의 마지막은 Spring 프레임워크에서 Apache Kafka를 사용하는 예제를 끝으로 마무리하려고 합니다. Spring 진영에서  Apache Kafka를 사용하는 방법은 두 가지 방법이 있습니다.(제가 알고 있는 방법은 두 가지이지만 다른 방법이 더 있을 수도 있습니다.)

  • Spring Cloud Strems
  • Spring for Apache Kafka

저는 이번 포스팅에서 두 번째 방법인 Spring for Apache Kafka를 사용한 예시를 소개하려고 합니다. [스프링으로 하는 마이크로서비스 구축] 교재에서는 Spring Cloud Streams를 사용해서 카프카를 다루고 있지만, 제가 두 번째 방법을 선택한 이유는 두 방식의 명확한 차이점이 있기 때문입니다. 

 

Spring Cloud Streams는 마이크로서비스 어플리케이션에서 메시지 기반의 통신을 쉽게 구현할 수 있도록 메시징 시스템을 추상화한 프레임워크입니다. 때문에 카프카라는 특정 메시징 시스템에 종속되지 않고 RabbitMQ, ActiveMQ 등 다양한 메시징 시스템 오픈소스를 선택해서 애플리케이션을 개발하고자 할 때 유용하게 쓰일 수 있습니다. 

 

Spring for Apache Kafka카프카 사용에 최적화된 방법입니다. 카프카의 세부 설정 기능을 직접 제어하는 등 카프카에 특화된 기능들을 지원할 수 있다는 장점이 있습니다. 무엇보다 Spring Cloud Strems는 내부적으로 Spring for Apache Kafka를 사용해서 카프카와의 통합을 지원하고 있다고 합니다. 이제부터 본격적으로 Spring 진영에서 카프카를 사용하는 방법에 대해 정리해 보겠습니다. (관련된 소스코드는 제 개인 깃헙에 올려두었습니다.)

 

실습을 위한 기본 설계는 다음과 같습니다. 

  • 실시간 데이터 처리를 위해서 Upbit, Bithumb 등 코인거래소 오픈 API 를 통해 데이터를 가져옵니다. 
  • 실시간으로 들어오는 정보들을 카프카를 통해 처리합니다. 
  • 카프카를 모니터링 하기 위해 Kafka-UI라는 오픈소스를 사용했습니다. 
  • 도커 환경에서 카프카 클러스터를 구성합니다. 

먼저 도커 컴포즈를 사용해서 카프카 클러스터를 구성합니다. 위에서 언급한 것처럼 카프카 클러스터는 고가용성을 위해 기본적으로 3개의 카프카 브로커로 클러스터를 이루도록 설계했습니다. 

version: '3'

services:
  ...
  
  zookeeper:
    image: confluentinc/cp-zookeeper
    container_name: zookeeper
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
    ports:
      - 2181:2181
    networks:
      - app-tier
  
  kafka1:
    image: confluentinc/cp-kafka
    ports:
      - 9092:9092
    container_name: kafka1
    restart: on-failure
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_HOST_NAME: kafka1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ALLOW_AUTO_CREATE_TOPICS: 'false'
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper
    networks:
      - app-tier

  kafka2:
    image: confluentinc/cp-kafka
    ports:
      - 9093:9092
    container_name: kafka2
    restart: on-failure
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_HOST_NAME: kafka2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ALLOW_AUTO_CREATE_TOPICS: 'false'
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper
    networks:
      - app-tier

  kafka3:
    image: confluentinc/cp-kafka
    ports:
      - 9094:9092
    container_name: kafka3
    restart: on-failure
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_HOST_NAME: kafka3
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ALLOW_AUTO_CREATE_TOPICS: 'false'
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper
    networks:
      - app-tier

  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "8989:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9092,kafka3:9092
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
    depends_on:
      - zookeeper
      - kafka1
      - kafka2
      - kafka3
    networks:
      - app-tier

networks:
  app-tier:
    driver: bridge

 

모든 설정 정보에 대해 소개 하는 것보다는 핵심적인 내용 몇 가지만 보고 넘어가는 게 좋을 듯합니다. 먼저 주목할 점은 브로커마다 각각의 도커 컨테이너 환경에서 동작한다는 점입니다. 때문에 각각 외부와 연결할 포트를 전부 다르게 설정되어 있습니다. 다만 같은 도커 네트워크 환경으로 설정되어 있습니다. 다음은 zookeeper라는 도커 컨테이너를 만들었음을 확인할 수 있습니다. 

 

주키퍼는 분산 시스템의 코디네이션 서비스로 카프카 같은 분산서비스에서 분산 설정 정보를 중앙에서 관리하는 역할을 담당합니다. 주키퍼는 카프카 클러스터의 브로커, 토픽, 파티션 등의 메타 데이터를 저장하고 관리하는 역할을 담당하며 클러스터 내의 브로커가 추가되거나 제거될 때 이를 반영하는 기능도 수행합니다. 주키퍼의 주요 역할 중 파티션의 리더-팔로워 구조에서 파티션 리더를 선정하는 역할을 담당합니다. 

 

다음은 각 카프카 브로커의 환경변수 설정중 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092'에서 PLAINTEXT://0.0.0.0:9092는 텍스트 메시지를 수신받겠다는 의미입니다. 해당 실습에서는 메시지의 형태로 문자열을 사용했습니다. 

 

마지막으로 kafka-ui 입니다. 이 오픈소스는 카프카 클러스터를 모니터링할 수 있습니다. 즉, 토픽이 생성되고 메시지가 컨슈머에 전달되는 과정들을 모니터링할 수 있도록 도와주는 역할을 합니다. 

 

도커 환경 세팅 다음은 어플리케이션 환경에서 카프카 관련 의존성을 추가하는 것입니다. build.gradle에 다음 종속성을 추가합니다.

dependencies {
    implementation 'org.springframework.kafka:spring-kafka'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
}

 

그리고 application.yml 파일에서 Producer/Consumer에 대한 설정 정보를 추가합니다. 

 

spring:
  kafka:
    consumer:
      bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
      group-id: coin
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"

    producer:
      bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

    topic:
      upbit: topic-upbit
      bithumb: topic-bithumb

 

여기서 bootstrap-servers는 카프카 컨슈머와 프로듀서가 참조하는 카프카 브로커에 대한 서버 주소입니다. group-id는 'coin'이라는 소비자 그룹을 지정한다는 것을 의미합니다. 즉, 동일한 그룹 ID를 갖는 컨슈머들은 서로 메시지를 공유하지 못하도록 해서 메시지가 중복으로 처리되는 것을 방지할 수 있습니다. 마지막으로 upbit, bithumb이라는 토픽을 지정했음을 알 수 있습니다. 

 

다음은 Producer를 지정합니다. 

@Service
@RequiredArgsConstructor
public class KafkaProducer {
    private final KafkaTemplate<String, HashMap<String, Object>> kafkaTemplate;

    public void sendMessage(String topicName, HashMap<String, Object> data){
        Message<HashMap<String, Object>> message = MessageBuilder
                .withPayload(data)
                .setHeader(KafkaHeaders.TOPIC, topicName)
                .build();

        kafkaTemplate.send(message);
    }
}

 

key(string): value(HashMap) 형식의 메시지를 카프카 템플릿을 통해 발행합니다. KafkaTemplate은 카프카 브로커에 메시지를 보내기 위한 API를 제공합니다. 토픽, 파티션, 키 등을 지정하여 메시지를 전송하거나 메시지의 헤더도 설정하는 기능을 지원하고 있습니다. 

 

다음은 Consumer를 지정합니다. 

@Service
@RequiredArgsConstructor
public class BithumbKafkaConsumer {
    private final CoinManagerIF coinManagerInterface;
    @KafkaListener(topics = "${spring.kafka.topic.bithumb}", groupId = "${spring.kafka.consumer.group-id}")
    public void consume(HashMap<String, Object> data){
        coinManagerInterface.save(Exchange.BITHUMB, data);
    }
}

 

카프카 컨슈머는 @KafkaListener 어노테이션을 사용해서 구현할 수 있습니다. @KafkaListener는 카프카 토픽에서 메시지를 비동기적으로 수신하는 리스너를 정의할 수 있으며 토픽의 이름과 컨슈머 그룹의 아이디를 지정해서 토픽의 메시지를 수신할 수 있도록 지원합니다. 

 

이렇게 카프카 클러스터 내에서 토픽이 생성되고 토픽으로 메시지가 발행되고 소비되는 일련의 과정은 모두 Kafka-UI를 통해서 모니터링할 수 있습니다. 

kafka-ui로 모니터링


이번 포스팅에서는 메시징 시스템에 대한 전반적인 이해와 카프카에 대해 학습한 내용을 공유 했습니다. 내용이 워낙 많고 어려워서 제가 놓치거나 잘못 전달한 부분이 있을 수 있다고 생각합니다. 혹시 잘못된 정보를 전달했다면 댓글을 통해서 수정요청해 주시면 감사드리겠습니다.

반응형