[RabbitMQ #6] Publish/Subscribe

2022. 5. 24. 11:45 Spring Cloud/RabbitMQ

앞에 [RabbitMQ #2]Simple Sen​d/Receive 예제에서는 Queue에 직접 메시지를 보내고 Queue에서 직접 메시지를 받았다. 본 글에서는 Exchanges를 사용해 Pub/Sub 메시지 구조를 구현하는 방법을 설명 한다.

 

Exchange type을 fanout으로 설정 하면, Exchange에 Binding 되어 있는 모든 Queue에게 메시지를 전달 한다.

이 기능을 이용해 Pub/Sub 메시지 구조를 구현 할 수 있다. 

- Subscriber #1/#2에서 "logs" Exchange를 Binding 하면 Pub/Sub 구조에서 Subscribe 하는 동작과 유사하게 동작 한다.

- Publisher가 "info: Hello World!" 메시지를 "logs" Exchange에 보내면, Exchange는 binding 되어 있는 모든 Queue에게 데이터를 보낸다.

 

 

emit_log.py ( Publisher Code )

  • "logs" Exchange를 "fanout" 모드로 설정하고 "info: Hello World" 메시지를 보낸다.
# emit_log.py

import pika
import sys

connection = pika.BlockingConnection(
    pika.URLParameters('amqp://admin:admin@192.168.0.3:5672/'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

receive_logs.py

  • channel.queue_declare 시, queue 이름을 지정하지 않으면 Random으로 Queue이 이름을 생성한다.
    • result.method.queue로 Random으로 생성된 Queue 이름을 확인 할 수 있다.
  • 생성된 Queue를 "logs" Exchange에 Bind 하고 데이터 수신을 기다린다.
  • exclusive=True 옵션 : 생성된 channel이 close 되면 Queue를 삭제 한다.
    • 다른 프로세스에서 사용할 수 없는, 해당 channel 만 사용 할 수 있는 Queue라는 의미
  • auto_ack = True 옵션 : receive_log.py 프로세스의 channel에 데이터가 수신 되면 RabbitMQ Server에게 자동으로 ACK를 보낸다. 참고로 pika 라이브러는 callback 함수( def callback() )가 호출 되기 전에 ACK를 RabbitMQ Server에게 보낸다.
# receive_logs.py

import pika

connection = pika.BlockingConnection(
    pika.URLParameters('amqp://admin:admin@192.168.0.3:5672/'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

 

코드 실행 화면

 

random으로 생성된 Queue의 이름 확인

 

참고자료

https://www.rabbitmq.com/confirms.html

https://www.rabbitmq.com/tutorials/tutorial-three-python.html



출처: https://kaizen8501.tistory.com/222?category=958645 [Life4IoT]