[RabbitMQ #6] Publish/Subscribe
앞에 [RabbitMQ #2]Simple Send/Receive 예제에서는 Queue에 직접 메시지를 보내고 Queue에서 직접 메시지를 받았다. 본 글에서는 Exchanges를 사용해 Pub/Sub 메시지 구조를 구현하는 방법을 설명 한다.
Exchange type을 fanout으로 설정 하면, Exchange에 Binding 되어 있는 모든 Queue에게 메시지를 전달 한다.
이 기능을 이용해 Pub/Sub 메시지 구조를 구현 할 수 있다.
- Subscriber #1/#2에서 "logs" Exchange를 Binding 하면 Pub/Sub 구조에서 Subscribe 하는 동작과 유사하게 동작 한다.
- Publisher가 "info: Hello World!" 메시지를 "logs" Exchange에 보내면, Exchange는 binding 되어 있는 모든 Queue에게 데이터를 보낸다.
emit_log.py ( Publisher Code )
- "logs" Exchange를 "fanout" 모드로 설정하고 "info: Hello World" 메시지를 보낸다.
# emit_log.py
import pika
import sys
connection = pika.BlockingConnection(
pika.URLParameters('amqp://admin:admin@192.168.0.3:5672/'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()
receive_logs.py
- channel.queue_declare 시, queue 이름을 지정하지 않으면 Random으로 Queue이 이름을 생성한다.
- result.method.queue로 Random으로 생성된 Queue 이름을 확인 할 수 있다.
- 생성된 Queue를 "logs" Exchange에 Bind 하고 데이터 수신을 기다린다.
- exclusive=True 옵션 : 생성된 channel이 close 되면 Queue를 삭제 한다.
- 다른 프로세스에서 사용할 수 없는, 해당 channel 만 사용 할 수 있는 Queue라는 의미
- auto_ack = True 옵션 : receive_log.py 프로세스의 channel에 데이터가 수신 되면 RabbitMQ Server에게 자동으로 ACK를 보낸다. 참고로 pika 라이브러는 callback 함수( def callback() )가 호출 되기 전에 ACK를 RabbitMQ Server에게 보낸다.
# receive_logs.py
import pika
connection = pika.BlockingConnection(
pika.URLParameters('amqp://admin:admin@192.168.0.3:5672/'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
코드 실행 화면
random으로 생성된 Queue의 이름 확인
참고자료
https://www.rabbitmq.com/confirms.html
https://www.rabbitmq.com/tutorials/tutorial-three-python.html
출처: https://kaizen8501.tistory.com/222?category=958645 [Life4IoT]
'Spring Cloud > RabbitMQ' 카테고리의 다른 글
[RabbitMQ] 다른 메시지 큐 간 차이 (MQTT, ZeroMQ, Kafka) (0) | 2022.05.24 |
---|---|
[RabbitMQ #99] Exchanges Type (0) | 2022.05.24 |
[RabbitMQ #99] Server Connection 방법 (0) | 2022.05.24 |
[RabbitMQ #7] Routing (0) | 2022.05.24 |
[RabbitMQ #2] Simple Send/Receive (0) | 2022.05.24 |
[RabbitMQ #5] Queue & Message 보존 설정 및 Fair Dispatch (0) | 2022.05.24 |
[RabbitMQ #4] 경쟁 소비자 패턴(Competing Consumer Pattern) (0) | 2022.05.24 |
[RabbitMQ #3] RabbitMQ Server API 이용 Connection List 확인 하기 (0) | 2022.05.24 |