[RabbitMQ] Message Order & Multiple Message Type
이번 글에서는 RabbitMQ의 Message Order와 Multiple Message Type을 다루는 방법에 대해 알아보도록 하겠습니다.
1. Message Order
Message Order는 Queue에 들어온 메시지가 처리되는 순서를 의미합니다.
만약 순서대로 처리되어야 되는 메시지가 Queue에 들어올 경우 Message Order는 반드시 지켜져야 합니다. 😅
1-1) One Queue + One Consumer
기본적으로 하나의 Queue에 하나의 Consumer만 존재할 경우에는, RabbitMQ는 메세지 처리순서를 보장합니다.
1-2) One Queue + Multiple Consumer
만약 하나의 Queue에 여러 Consumer가 존재하고, 각각의 Consumer들의 메시지 처리 속도가 다를 경우에는 어떻게 될까요?
위와 같은 경우에는 RabbitMQ는 메시지 처리 순서를 보장하지 않습니다.
2. Multiple Message Type
Message Type은 Queue에 들어온 메세지의 TypeId를 의미합니다.
RabbitMQ에는 하나의 Queue에 각기 다른 Message Type의 메세지를 허용할 수 있습니다.
각각의 메세지가 아래처럼 Queue안에 순서대로 쌓이게 됩니다.
사용자는 각 메세지 Type 별로 Queue를 운영할 수도 있고, 하나의 Queue안에 Multiple Message Type을 허용해 Strucure를 구성할 수 있습니다.
만약 각 Message Type 별로 Queue를 운영할 경우 Consumer 입장에서는 쉬운 코드로 구현할 수 있어 좋지만, 운영 관점에서보면 많은 수의 Queue를 관리해야 합니다.
만약 적은 수의 Queue에 Multiple Message Type을 허용해 구성한다면, Queue를 관리하는데에는 이점이 있으나, Consumer 입장에서는 각 메세지별로 filtering 기능을 구현해야 합니다.
2-1) One Queue + Multiple Message Type
만약 하나의 Queue에 Multiple Message Type을 허용하도록 구현한다면, Consumer에서는 아래와 같이 코드를 작성할 수 있습니다.
@Service
@RabbitListener(queues = "q.invoice")
public class InvoiceConsumer {
private static final Logger log = LoggerFactory.getLogger(InvoiceConsumer.class);
@RabbitHandler
public void handleInvoiceCreated(InvoiceCreatedMessage message){
log.info("invoice created : {} ", message);
}
@RabbitHandler
public void handleInvoicePaid(InvoicePaidMessage message){
log.info("invoice paid : {} ", message);
}
@RabbitHandler(isDefault = true)
public void handleDefault(Object message){
log.info("invoice paid : {} ", message);
}
}
이전과 달라진 것은 @RabbitListener가 class명 위로 올라갔으며, 새롭게 @RabbitHandler라는 어노테이션이 추가되었습니다. @RabbitHandler는 "q.invoice"에서 메세지를 받으면 해당 Message Type을 파라미터 값으로 받는 메서드에게 메세지를 적절히 전달합니다.
또한 isDefualt가 선언된 @RabbitHandler에서는 적절한 Message Type 파라미터 값을 찾지 못한 메세지들을 Object 클래스로 전달해 메서드를 실행합니다.
2-2) Consistent Hash Exchange
만약 동일 Type의 메세지는 동일 Queue에서 처리되도록 구현하고 싶으면 Consistent Hash Exchange를 사용하면됩니다.
예를 들어 아래와 같이 X/Y Type의 메세지가 있다고 가정해보겠습니다. 이 메시지는 NEW -> PAID 순서로 처리되어야 합니다.
위와 같은 상황에서 만약 2개의 Consumer가 있고, 각 Consumer의 처리속도가 다르다면 아래와 같이 Y(NEW)가 Y(PAID)보다 나중에 처리되는 상황이 발생할 수 있습니다. 분명 Exchange에는 Y(NEW) -> Y(PAID) 순서로 도착했지만, Consumer의 처리능력 차이로 인해 Message Order가 꼬여버린 상황이 되었습니다.
이와 같은 상황을 개선하기위해서 Consistent Hash Exchange를 사용할 수 있습니다. Consistent Hash Exchange는 동일한 Routing Key를 가진 메세지들은 동일한 Queue에 전달합니다. 따라서 아래 그림과 같이 X타입은 X타입끼리 순서대로 처리되며 Y타입도 동일하게 Y타입끼리 순서대로 처리될 수 있습니다.
Consistent Hash Exchange는 내부적으로 Routing Key의 Hash 값을 사용해 메세지를 분배합니다.
주의점으로는 이미 Queue에 메시지를 분배한 Consistent Hash Exchange에 추가적으로 Queue를 연결할 경우 메시지의 분배 규칙이 깨져버리게 됩니다. 이를 방지하기위해서는 추가적으로 Queue를 연결하기전에는 분배한 메시지를 모두 소비해야합니다.
3. Consistent Hash Exchange 생성하기
이제 Consistent Hash Exchange 생성해보겠습니다.
먼저 아래와 같은 명령어를 이용해 consistent hash exchange plugin을 설치합니다.
rabbitmq-plugins enable rabbitmq_consistent_hash_exchange
먼저 Type에서 "x-consistent-hash"를 선택한뒤 Name을 입력해 exchange를 생성합니다.
다음으로는 생성한 q를 exchange에 연결하면 됩니다. 이때 이전의 exchange와 다른점은 routing key를 distribution ratio로 사용한다는 것 입니다. 😉 Queue를 bind 할때 입력한 routing key 값에 비례해서 consistent hash exchanges는 메세지를 분배합니다. 따라서 consistent hash exchange에 binding 할때 routing key에는 numeric 값만 입력할 수 있습니다.
만약 binding 할때 routing key에 각각 10과 5를 입력했다면 2:1의 비율로 메시지를 분배하며, 동일한 routing key를 소유한 메세지는 동일 Queue에 전달합니다.
Queue를 exchange에 binding 할 때 아래와 같이 Routing key에 Numeric 값을 입력해주면 됩니다.
ONE
TWO
이제 모든 Setting을 끝마쳤습니다. 이제 아래와 같이 Producer에서 routing key 값을 넣어 메시지를 전달하면 됩니다. 이후에는 consistent hash exchange가 invoice number가 같은 메세지들은 동일 Queue에 메세지를 전달할 것 입니다.
public void sendInvoiceCreated(InvoiceCreatedMessage message) {
rabbitTemplate.convertAndSend(EXCHANGE, message.getInvoiceNumber(), message);
}
참고 자료 : https://www.udemy.com/course/rabbitmq-java-spring-boot-for-system-integration/
출처 : https://minholee93.tistory.com/entry/RabbitMQ-Message-Order-Multiple-Message-Type
'Spring Cloud > RabbitMQ' 카테고리의 다른 글
[RabbitMQ #3] RabbitMQ Server API 이용 Connection List 확인 하기 (0) | 2022.05.24 |
---|---|
[RabbitMQ #1] Rabbit MQ 설치 (0) | 2022.05.24 |
[AMQP] AMQP(Advanced Message Queuing Protocol) (0) | 2022.05.24 |
[RabbitMQ] Request / Reply with Spring Boot (0) | 2021.11.03 |
[RabbitMQ] Scheduling Consumer ON/OFF (0) | 2021.11.03 |
[RabbitMQ] Jackson2JsonMessageConvertor (0) | 2021.11.03 |
[RabbitMQ] RabbitMQ Structure 생성하기 (0) | 2021.11.03 |
[RabbitMQ] RabbitMQ vs Kafka (0) | 2021.11.03 |