[RabbitMQ #5] Queue & Message 보존 설정 및 Fair Dispatch
Delivery Acknowledgement Timeout
RabbitMQ Server가 Publisher가 보낸 데이터를 가지고 있고, 해당 데이터를 수신한 Consumer가 하나도 없다고 가정 한다.
이런 경우, 새로운 Consumer가 start_consuming()을 수행 하면, 새로운 Consumer는 Queue에 Binding 된 데이터를 모두 수신 할 수 있다.
만약 Consumer가 수행 되지 않아 RabbitMQ Server가 데이터를 계속 가지고 있다면, RabbitMQ Server는 Delivery Acknowledgement Timeout(Default : 30초) 시간 동안 보관 하고 이후 데이터를 ACK 받은 것으로 처리 한다.
Delivery Acknowledgement Timeout 값은 RabbitMQ Server의 [rabbitmq.conf] 파일에서 수정 할 수 있다.
# one hour in milliseconds
consumer_timeout = 3600000
만약 해당 Timeout을 Disable 하려면 [advanced.config] 파일을 아래 처럼 수정 한다.
%% advanced.config
[
{rabbit, [
{consumer_timeout, undefined}
]}
].
Message Durability
RabbitMQ 서버가 종료 후 재기동 하면, 기본적으로 Queue는 모두 제거 된다. Queue를 생성 할 때 Durable 옵션을 True로 설정하면 Queue가 제거 되는 것을 막을 수 있다.
channel.queue_declare(queue='hello', durable=True)
주의 : RabbitMQ에 이미 hello queue가 durable=False로 만들어져 있는 상태에서 위 코드를 수행하면, hello Queue의 durable 상태는 True로 변경되지 않는다. 이런 경우, hello queue를 지우거나, 다른 이름의 queue를 생성 해야 한다.
Publisher 입장에서는 메시지를 보낼 때, 아래와 같이 PERSISTENT_DELIVERY_MODE를 설정하면 RabbitMQ 서버가 죽더라도 메시지를 보존 할 수 있다.
주의 : PERSISTENT_DELIVERY_MODE로 설정 해도 메시지가 유실되는 것을 100% 방지 할 수 없다.
만약 강하게 메시지를 보장 하려면 publisher confirms를 사용 해야 한다.
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
))
Message dispatch
RabbitMQ의 dispatch 방법은 기본적으로 round robin 방식이며, Message Queue에 담은 순서대로 Worker들에게 전달 한다.
균등한 메시지 처리가 필요한 상황에선 위 방식으로 충분 할 수 있으나, Worker들이 메시지 중 특정 순서로 오랜 처리 시간이 필요한 상황에는 맞지 않을 수 있다.
예를 들어 C1은 시간이 오래 걸리는 작업을 수행하고 C2은 비교적 가벼운 작업을 수행 한다고 가정하면, RabbitMQ로 부터 순차적으로 메시지를 제공 받더라도, 아래 영상 처럼 동작 될 수 있다.
#publisher.py
import pika
connection = pika.BlockingConnection(pika.URLParameters('amqp://admin:admin@192.168.0.3:5672/'))
channel = connection.channel()
#channel.queue_declare(queue='hello')
for i in range(10):
channel.basic_publish(exchange='', routing_key='hello', body=str(i))
print('# 메시지를 보냈습니다!' + str(i))
connection.close()
# consumer.py ( 메시지 수신 후 1초 sleep )
import pika
import time
connection = pika.BlockingConnection(pika.URLParameters('amqp://admin:admin@192.168.0.3:5672/'))
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
#time.sleep(body.count(b'.'))
time.sleep(1)
print(" [x] Done")
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print("메시지를 기다리고 있습니다. 종료하려면 CTRL+C를 누르세요.")
channel.start_consuming()
# consumer2.py ( 메시지 수신 후 3초 sleep )
import pika
import time
connection = pika.BlockingConnection(pika.URLParameters('amqp://admin:admin@192.168.0.3:5672/'))
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
#time.sleep(body.count(b'.'))
time.sleep(3)
print(" [x] Done")
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='hello', on_message_callback=callback)
print("메시지를 기다리고 있습니다. 종료하려면 CTRL+C를 누르세요.")
channel.start_consuming()
Fair dispatch
Prefetch Count는 Consumer에게 보내는 메시지 수를 지정하는데 사용하는 옵션이며, 요청을 처리했음을 의미하는 Ack가 RabbitMQ Server에 전달 되기 전까지 consumer가 전달 받을 수 있는 Message의 개수 이다.
consumer 코드를 아래와 같이 수정하면, Fair Dispatch를 해서 원하는 동작을 수행 할 수 있다.
- channel.basic_consume(auto_ack=False). auto_ack 설정 삭제
- auto_ack = True로 설정된 경우, Callback 함수가 수행되기 전 consumer는 ACK를 RabbitMQ 서버에 보내기 때문에 기대하는 동작을 수행 할 수 없음
- channel.basic_qos(prefetch_count=1)로 설정
# consumer2.py
import pika
import time
connection = pika.BlockingConnection(pika.URLParameters('amqp://admin:admin@192.168.0.3:5672/'))
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
#time.sleep(body.count(b'.'))
time.sleep(3)
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='hello', on_message_callback=callback)
#channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print("메시지를 기다리고 있습니다. 종료하려면 CTRL+C를 누르세요.")
channel.start_consuming()
Prefetch Count에 따른 성능 조정
Prefetch Count = 1인 경우 ( 작을 수록 Fair Dispatch 함 )
- 하나의 메시지가 처리되기 전에는 새로운 메시지를 받지 않으므로, 여러 Worker들에게 동시에 작업을 분산 할 수 있다. 하지만 여러 worker가 있으나 각 요청이 빨리 처리되는 상황에서 각 worker의 다음 작업까지 대기 시간이 증가 할 수 있다.
- Worker가 많거나 한 작업 단위의 처리 시간이 긴 경우, 모든 worker에게 균등하게 나눠지도록 값을 작게 설정하는 것이 좋다.
Prefetch Count 값을 크게 설정 할 경우
- 메시지 큐에서 다량의 메시지를 하나의 worker에게 전달하여 buffer에 요청을 쌓고 계속 처리 할 수 있음.
- 각 worker의 대기 시간을 감소 시킬 수 있지만, 특정 요청의 처리 시간이 긴 경우에는 다른 worker들이 일을 하지 않고 대기 하는 상황이 발생 할 수 있음.
- Woker의 수가 적고 한 작업 단위의 처리 시간이 짧은 경우 값을 크게 설정 하는 것이 유리 함.
- 하나의 Worker만 사용하는 경우 Prefetch Count 값을 크게 하는 것이 유리 함.
참고자료
https://www.rabbitmq.com/tutorials/tutorial-two-python.html
https://teragoon.wordpress.com/2012/01/27/fair-dispatchprefetch-%EC%84%A4%EC%A0%95/
출처: https://kaizen8501.tistory.com/220?category=958645 [Life4IoT]
'Spring Cloud > RabbitMQ' 카테고리의 다른 글
[RabbitMQ #99] Server Connection 방법 (0) | 2022.05.24 |
---|---|
[RabbitMQ #7] Routing (0) | 2022.05.24 |
[RabbitMQ #6] Publish/Subscribe (0) | 2022.05.24 |
[RabbitMQ #2] Simple Send/Receive (0) | 2022.05.24 |
[RabbitMQ #4] 경쟁 소비자 패턴(Competing Consumer Pattern) (0) | 2022.05.24 |
[RabbitMQ #3] RabbitMQ Server API 이용 Connection List 확인 하기 (0) | 2022.05.24 |
[RabbitMQ #1] Rabbit MQ 설치 (0) | 2022.05.24 |
[AMQP] AMQP(Advanced Message Queuing Protocol) (0) | 2022.05.24 |