1. 등장 배경
오늘날 IT 시스템은 점점 더 분산화되고 복잡해지고 있습니다.
소위 MSA라는 단일 애플리케이션이 아닌 여러 개의 독립적인 서비스로 구성되는 마이크로서비스 아키텍처가 널리 사용되고 있죠.
이러한 분산 시스템 환경에서는 서비스 간의 통신과 데이터 교환이 매우 중요해졌습니다.
예를 들어, 전자상거래 웹사이트를 생각해봅시다.
주문 처리 시스템, 재고 관리 시스템, 배송 시스템, 결제 시스템 등 여러 개의 독립적인 서비스로 구성되어 있습니다.
이러한 서비스들은 서로 연계되어 작동해야 하며, 데이터와 이벤트를 효율적으로 전달하고 조율해야 합니다.
과거에는 서비스 간 통신을 위해 직접 연결(Point-to-Point) 방식을 사용했습니다. 예를 들어, 주문 처리 시스템이 직접 재고 관리 시스템과 연결을 맺고, 주문 정보를 메시지로 전송합니다. 재고 관리 시스템은 이 메시지를 수신하여 처리합니다.
하지만 이 방식은 서비스 간 결합도가 높아져 유지보수가 어렵고, 확장성이 부족하다는 단점이 있었습니다.
이러한 문제를 해결하기 위해 메시지 브로커(Message Broker)가 등장했습니다.
메시지 브로커는 Publisher(송신자)로부터 전달받은 메시지를 Subscriber(수신자)로 전달해주는 중간 역할을 합니다.
이를 통해 유연성과 확장성이 향상되고, 서비스 간의 상호 작용이 원할해집니다.
RabbitMQ는 이러한 메시지 브로커 솔루션 중 하나로, 오픈 소스 프로젝트입니다.
분산 애플리케이션 간의 메시징 통신을 중개하는 역할을 하며, 비동기 메시징 처리, 메시지 전달 보장, 부하 분산, 메시지 버퍼링 등의 기능을 제공합니다.
예를 들어, 온라인 쇼핑몰에서 주문이 발생하면 주문 처리 시스템은 주문 정보를 RabbitMQ로 보냅니다.
그러면 RabbitMQ는 이 메시지를 재고 관리 시스템, 결제 시스템, 배송 시스템 등 관련 서비스로 라우팅합니다.
각 서비스는 메시지를 수신하고 처리한 후 다음 단계로 진행할 수 있습니다.
2. RabbitMQ 중요 개념
[ Producer ]
- 메시지를 생성하고 발송하는 주체
- 메시지를 생성하고 Exchange로 보냄
[ Broker ]
- 서버 자체
- 프로듀서와 컨슈머 사이에서 메시지를 전달하고 관리하는 메시지 브로커 역할
[ Exchange ]
- 메시지를 받아서 라우팅 규칙에 따라 적절한 Queue로 전달하는 역할
- 유형 : Direct, Fanout, Topic, Headers
[ Queues ]
- 메시지를 임시로 저장하는 버퍼 역할
- 컨슈머가 메시지를 처리할 때 까지 Queue에 메시지 대기
[ Binding ]
- Exchange와 Queue 간의 연결 규칙
- Binding 키 (Routing Key)를 사용하여 메시지가 어떤 Queue에 전달될지 결정
[ Consumer ]
- 메시지를 수신하고 처리하는 애플리케이션 또는 시스템
- Queue에서 메시지를 가져옴
[ 메시지 흐름 ]
- 프로듀서가 메시지를 Exchange로 보냅니다.
- Exchange는 Binding 규칙에 따라 메시지를 적절한 Queue로 라우팅합니다.
- 컨슈머는 자신이 연결한 Queue에서 메시지를 가져와 처리합니다.
3. RabbitMQ 사용 케이스
- 비동기 메시징 처리가 필요한 경우
- 예시 : 대용량 트래픽 처리 - 대량의 요청을 처리할 때, 그러한 요청을 큐에 넣고 비동기적으로 처리
- 분산 시스템 통합
- 예시 : 온라인 쇼핑몰에서 주문, 재고, 결제, 배송 등의 서비스가분리되어 있을 때, 서비스 간 메시징 처리하면서 연계
- 메시징 전달 보장
- 예시 : 결제 시스템 등 중요한 데이터 처리중 서버가 다운되어 실패해도, 서버가 다시 연결되면 재실행
- 부하 분산
- 예시 : 급격한 트래픽 증가로 요청을 처리하지 못할 때, 큐의 요청을 다수의 서비스 또는 쓰레드로 분산
4. RabbitMQ Exchange
프로듀서로부터 메시지를 받아 Binding 규칙에 따라 해당 메시지를 적절한 Queue로 라우팅하는 역할
[ Direct Exchange ]
- Routing Key와 정확히 일치하는 Queue에만 메시지 라우팅
- 예시 : 로그 수집 시스템에서 "error"로드는 에러 큐로, "warning" 로그는 경고 큐로 라우팅
[ Fanout Exchange ]
- Exchange에 등록된 모든 Queue에 메시지 라우팅 (브로드 캐스트)
- 예시 : 서버 정보와 같은 모든 서비스에 필요한 정보를 동시에 전파
[ Topic Exchange ]
- Routing Key의 패턴 매칭을 통해 메시지 라우팅
- 예시 : "loatodo.characters.todo" 라우팅 키를 사용하여 "loatodo.characters.*"로 바인딩된 Queue에는 모두 메시지가 전달
[ Headers Exchange ]
- 메시지 헤더의 속성을 기반으로 라우팅
- 예시: 메시지 헤더에 "format=pdf"가 포함된 경우 특정 Queue로 라우팅 가능
Exchange 생성 화면
- Name: Exchange 이름
- Type: 메시지 전달 방식
- Direct
- Fanout
- Topic
- Headers
- Durability: 브로커가 재시작될 때 남아있는지 여부
- Durable: 브로커가 재시작되어도 디스크에 저장되어 남아있음
- Transient: 브로커가 재시작되면 사라짐
- Auto-delete: 마지막 Queue 연결이 해제되면 삭제
- Internal : 브로커 내부에서만 사용되는 Exchange, 메시지를 받지 않고 다른 Exchange로부터 라우팅된 메시지만 처리
- Arguments: 추가적인 옵션 (키-값 쌍으로 구성된 딕셔너리 형식)
- x-max-length : 최대 길이
- x-expires : 비어있을 때 삭제될 시간
- x-dead-letter-exchange : 처리할 수 없는 메시지 다른 Exchange로 라우팅
5. RabbitMQ Queue
- Name: Queue 이름
- Durability: 브로커가 재시작될 때 남아있는지 여부
- Durable: 브로커가 재시작되어도 디스크에 저장되어 남아있음
- Transient: 브로커가 재시작되면 사라짐
- Auto delete: 마지막 Consumer가 consume을 끝낼 경우 자동 삭제
- Argument: 메시지 TTL, Max Length 같은 추가 옵션
6. 실습
1) Docker RabbitMQ 설치
RabbitMQ 서버는 도커를 이용하여 쉽게 세팅할 수 있습니다.
https://hub.docker.com/_/rabbitmq
docker pull rabbitmq
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --restart=unless-stopped rabbitmq
추가로 RabbitMQ 관리자 페이지를 접속위해 플러그인을 추가로 설치한다.
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_management
이제 http://localhost:15672 포트를 통해서 관리자 페이지에 접속할 수 있다.
- Username: guest
- Password: guest
2) Exchange, Queue 생성
rabbitMQ 관리자 페이지에서 Exchange와 Queue를 생성할 수 있습니다.
Exchange 생성
Queue 생성
Exchange에 queue 바인딩
3) Spring boot 서버
여기서는 spring boot를 사용했지만, rabbitMQ는 spring 뿐만 아니라 다양한 클라이언트, 서버 환경에서 지원합니다.
https://github.com/minhyeok2487/RabbitMQ_TEST
1초 걸리는 Rest API가 있고 그러한 api를 100번 호출 할때
@PostMapping("/api")
fun delay(@RequestBody message: String): String {
TimeUnit.SECONDS.sleep(1) // 로직이 1초 걸림
println("done = $message")
return "ok"
}
[ api 요청 ]
for i in {1..100}
do
curl -X POST \
http://localhost:8080/api \
-H 'Content-Type: text/plain' \
-d "$i"
done
클라이언트가 요청을 보내고 1초 동안 서버에서 처리 한 결과를 클라이언트에서 확인한 후 다음 요청을 보낸다.
RabbitMQ를 사용하면
@PostMapping("/rabbit")
fun sendMessage(@RequestBody message: String): String {
messageSenderService.sendMessage(message)
return "ok"
}
fun sendMessage(message: String) {
rabbitTemplate.convertAndSend("test-exchange", "test-routing-key", message)
}
클라이언트에서 받은 메시지를 Exchange를 통해 Queue에 쌓는다.
그후 서버는 Consumer는 Queue에 쌓인 메시지를 하나씩 처리한다.
@RabbitListener(queues = ["test-queue"])
fun receiveMessage(message: String) {
TimeUnit.SECONDS.sleep(1) // 로직이 1초 걸림
println("done = $message")
}
또한 중간에 Consumer 서버가 다운이 되어도 Queue에 쌓인 데이터가 지워지지않고
재연결시 다시 처리한다.