[RabbitMQ #2] Simple Send/Receive

2022. 5. 24. 11:42 Spring Cloud/RabbitMQ

RabbitMQ를 이용하여 아래 그림과 같이 Publisher가 보낸 데이터를 Consumer가 수신 하는 예제를 구현한다.

 

RabbitMQ User 설정

설치 된 RabbitMQ 웹 페이지에 접속 한다. (ex, 192.168.0.3:15672)

Admin - Add a user - Username, Password 입력 - Add user 버튼 클릭

 

User 등록. 예제에서는 admin/admin으로 설정

virtual hosts 권한 설정 필요

 아래 그림과 같이 admin에 virtual hosts 권한을 설정 한다.

 

pika 설치

pika : RabbitMQ/AMQP 관련 파이썬 플러그인

$ pip install pika

 

Rabbit MQ Publisher ( publisher.py )

# Manager Sample
import pika

connection = pika.BlockingConnection(pika.URLParameters('amqp://admin:admin@192.168.0.3:5672/'))
channel = connection.channel()

for i in range(10):
    channel.basic_publish(exchange='', routing_key='hello', body=str(i))
    print('# 메시지를 보냈습니다!' + str(i))

connection.close()
  • 192.168.0.3 RabbitMQ Broker에 접속 ( ID : admin, Password : admin )
  • 모든 exchange에 routing_key='hello'로 데이터를 송신

 

Rabbit MQ Consumer ( consumer.py )

import pika

connection = pika.BlockingConnection(pika.URLParameters('amqp://admin:admin@192.168.0.3:5672/'))
    
channel = connection.channel()
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print("# 메시지를 받았습니다: %r" % body)

channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print("메시지를 기다리고 있습니다. 종료하려면 CTRL+C를 누르세요.")
channel.start_consuming()
  • 192.168.0.3 RabbitMQ Broker에 접속 ( ID : admin, Password : admin )
  • hello 라는 이름의 Queue 생성
  • hello라는 이름의 routing_key를 수신하면 수행하기 위한 callback 함수 등록
  • start_consuming() 함수를 통해 broker로 부터 데이터 수신을 기다림
  • auto_ack = True : Consumer가 데이터를 수신하면 자동으로 Broker에서 ACK를 보냄. 
    • basick_consume의 auto_ack의 Default 값은 False임. 즉 프로그램에서 데이터를 수신해도 ACK를 보내지 않음. 이런 경우, 프로그램을 재 시작 했을 때, Broker 입장에서는 Publisher가 보낸 데이터를 받은 Consumer가 없다고 판단 하여 재 시작한 Consumer에게 이전 데이터를 또 보낸다.

실행 결과

$ python consumer.py
메시지를 기다리고 있습니다. 종료하려면 CTRL+C를 누르세요.
# 메시지를 받았습니다: b'0'
# 메시지를 받았습니다: b'1'
# 메시지를 받았습니다: b'2'
# 메시지를 받았습니다: b'3'
# 메시지를 받았습니다: b'4'
# 메시지를 받았습니다: b'5'
# 메시지를 받았습니다: b'6'
# 메시지를 받았습니다: b'7'
# 메시지를 받았습니다: b'8'
# 메시지를 받았습니다: b'9'
$ python publisher.py
# 메시지를 보냈습니다!0
# 메시지를 보냈습니다!1
# 메시지를 보냈습니다!2
# 메시지를 보냈습니다!3
# 메시지를 보냈습니다!4
# 메시지를 보냈습니다!5
# 메시지를 보냈습니다!6
# 메시지를 보냈습니다!7
# 메시지를 보냈습니다!8
# 메시지를 보냈습니다!9

 

트러블슈팅

처음에 ConnectionParameters 함수를 이용하여 Broker에 연결하려고 하였으나, 아래와 같이 에러 코드를 발생하며 연결에 실패 했다. 

 

connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.0.3"))

pika.exceptions.ProbableAuthenticationError: ConnectionClosedByBroker: (403) 'ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.'

 

위 문제는 RabbitMQ Broker에 등록된 User 정보가 없어서 발생 한 것으로 보이며, 아래와 같이 URLParameters 함수를 이용하여 접속하니 해결 되었다.

connection = pika.BlockingConnection(pika.URLParameters('amqp://admin:admin@192.168.0.3:5672/'))

 

참고

https://www.rabbitmq.com/tutorials/amqp-concepts.html



출처: https://kaizen8501.tistory.com/195?category=958645 [Life4IoT]