[RabbitMQ #5] Queue & Message 보존 설정 및 Fair Dispatch

2022. 5. 24. 11:38 Spring Cloud/RabbitMQ

Delivery Acknowledgement Timeout

RabbitMQ Server가 Publisher가 보낸 데이터를 가지고 있고, 해당 데이터를 수신한 Consumer가 하나도 없다고 가정 한다.

이런 경우, 새로운 Consumer가 start_consuming()을 수행 하면, 새로운 Consumer는 Queue에 Binding 된 데이터를 모두 수신 할 수 있다.

 

만약 Consumer가 수행 되지 않아 RabbitMQ Server가 데이터를 계속 가지고 있다면, RabbitMQ Server는 Delivery Acknowledgement Timeout(Default : 30초) 시간 동안 보관 하고 이후 데이터를 ACK 받은 것으로 처리 한다. 

 

Delivery Acknowledgement Timeout 값은 RabbitMQ Server의 [rabbitmq.conf] 파일에서 수정 할 수 있다.

# one hour in milliseconds
consumer_timeout = 3600000

만약 해당 Timeout을 Disable 하려면 [advanced.config] 파일을 아래 처럼 수정 한다.

%% advanced.config
[
  {rabbit, [
    {consumer_timeout, undefined}
  ]}
].

 

Message Durability

RabbitMQ 서버가 종료 후 재기동 하면, 기본적으로 Queue는 모두 제거 된다. Queue를 생성 할 때 Durable 옵션을 True로 설정하면 Queue가 제거 되는 것을 막을 수 있다.

channel.queue_declare(queue='hello', durable=True)

주의 : RabbitMQ에 이미 hello queue가 durable=False로 만들어져 있는 상태에서 위 코드를 수행하면, hello Queue의 durable 상태는 True로 변경되지 않는다. 이런 경우, hello queue를 지우거나, 다른 이름의 queue를 생성 해야 한다.

 

Publisher 입장에서는 메시지를 보낼 때, 아래와 같이 PERSISTENT_DELIVERY_MODE를 설정하면 RabbitMQ 서버가 죽더라도 메시지를 보존 할 수 있다.

 

주의 : PERSISTENT_DELIVERY_MODE로 설정 해도 메시지가 유실되는 것을 100% 방지 할 수 없다.

만약 강하게 메시지를 보장 하려면 publisher confirms를 사용 해야 한다.

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
                      ))

 

Message dispatch

RabbitMQ의 dispatch 방법은 기본적으로 round robin 방식이며, Message Queue에 담은 순서대로 Worker들에게 전달 한다.

균등한 메시지 처리가 필요한 상황에선 위 방식으로 충분 할 수 있으나, Worker들이 메시지 중 특정 순서로 오랜 처리 시간이 필요한 상황에는 맞지 않을 수 있다.

예를 들어 C1은 시간이 오래 걸리는 작업을 수행하고 C2은 비교적 가벼운 작업을 수행 한다고 가정하면, RabbitMQ로 부터 순차적으로 메시지를 제공 받더라도, 아래 영상 처럼 동작 될 수 있다.

 

 

#publisher.py

import pika

connection = pika.BlockingConnection(pika.URLParameters('amqp://admin:admin@192.168.0.3:5672/'))
channel = connection.channel()

#channel.queue_declare(queue='hello')

for i in range(10):
    channel.basic_publish(exchange='', routing_key='hello', body=str(i))
    print('# 메시지를 보냈습니다!' + str(i))

connection.close()
# consumer.py ( 메시지 수신 후 1초 sleep )

import pika
import time

connection = pika.BlockingConnection(pika.URLParameters('amqp://admin:admin@192.168.0.3:5672/'))
    
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body.decode())
    #time.sleep(body.count(b'.'))

    time.sleep(1)
    print(" [x] Done")

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print("메시지를 기다리고 있습니다. 종료하려면 CTRL+C를 누르세요.")
channel.start_consuming()
# consumer2.py ( 메시지 수신 후 3초 sleep )

import pika
import time

connection = pika.BlockingConnection(pika.URLParameters('amqp://admin:admin@192.168.0.3:5672/'))
    
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body.decode())
    #time.sleep(body.count(b'.'))

    time.sleep(3)
    print(" [x] Done")

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='hello', on_message_callback=callback)

print("메시지를 기다리고 있습니다. 종료하려면 CTRL+C를 누르세요.")
channel.start_consuming()

 

Fair dispatch

Prefetch Count는 Consumer에게 보내는 메시지 수를 지정하는데 사용하는 옵션이며, 요청을 처리했음을 의미하는 Ack가 RabbitMQ Server에 전달 되기 전까지 consumer가 전달 받을 수 있는 Message의 개수 이다.

 

consumer 코드를 아래와 같이 수정하면, Fair Dispatch를 해서 원하는 동작을 수행 할 수 있다.

  • channel.basic_consume(auto_ack=False). auto_ack 설정 삭제
    • auto_ack = True로 설정된 경우, Callback 함수가 수행되기 전 consumer는 ACK를 RabbitMQ 서버에 보내기 때문에 기대하는 동작을 수행 할 수 없음
  • channel.basic_qos(prefetch_count=1)로 설정
# consumer2.py

import pika
import time

connection = pika.BlockingConnection(pika.URLParameters('amqp://admin:admin@192.168.0.3:5672/'))
    
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body.decode())
    #time.sleep(body.count(b'.'))

    time.sleep(3)
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='hello', on_message_callback=callback)
#channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print("메시지를 기다리고 있습니다. 종료하려면 CTRL+C를 누르세요.")
channel.start_consuming()

 

 

Prefetch Count에 따른 성능 조정

Prefetch Count = 1인 경우 ( 작을 수록 Fair Dispatch 함 )

  • 하나의 메시지가 처리되기 전에는 새로운 메시지를 받지 않으므로, 여러 Worker들에게 동시에 작업을 분산 할 수 있다. 하지만 여러 worker가 있으나 각 요청이 빨리 처리되는 상황에서 각 worker의 다음 작업까지 대기 시간이 증가 할 수 있다.
  • Worker가 많거나 한 작업 단위의 처리 시간이 긴 경우, 모든 worker에게 균등하게 나눠지도록 값을 작게 설정하는 것이 좋다.

Prefetch Count 값을 크게 설정 할 경우

  • 메시지 큐에서 다량의 메시지를 하나의 worker에게 전달하여 buffer에 요청을 쌓고 계속 처리 할 수 있음.
  • 각 worker의 대기 시간을 감소 시킬 수 있지만, 특정 요청의 처리 시간이 긴 경우에는 다른 worker들이 일을 하지 않고 대기 하는 상황이 발생 할 수 있음.
  • Woker의 수가 적고 한 작업 단위의 처리 시간이 짧은 경우 값을 크게 설정 하는 것이 유리 함.
    • 하나의 Worker만 사용하는 경우 Prefetch Count 값을 크게 하는 것이 유리 함.

 

참고자료

https://lob-dev.tistory.com/entry/RabbitMQ-Message-Queue-%EB%B0%8F-Message-%EB%B3%B4%EC%A1%B4-%EC%84%A4%EC%A0%95%EA%B3%BC-Fair-Dispatch

https://www.rabbitmq.com/tutorials/tutorial-two-python.html

https://teragoon.wordpress.com/2012/01/27/fair-dispatchprefetch-%EC%84%A4%EC%A0%95/



출처: https://kaizen8501.tistory.com/220?category=958645 [Life4IoT]