Springboot - Rabbitmq를 이용한 비동기 메시징 서비스(리액티브 마이크로서비스)
Spring - Rabbitmq를 이용한 비동기 메시징 서비스
-리액티브 마이크로서비스
Mac OS 환경에서 작성되었습니다.
오늘은 간단히 Spring boot + Rabbitmq를 이용한 비동기 메시징 서비스를 구현해볼 것이다.
일단 이 포스팅을 진행하는 이유는 요즘 시대에는 일체형 애플리케이션이 작은 서비스 단위인 마이크로서비스 단위로 나누어
서비스 단위로 배포하는 아키텍쳐가 대세인듯하다.
이 말은 즉슨, 아주 큰 애플리케이션이 작은 서비스 단위(마이크로서비스)로 나뉘어 각각 단독적으로 독립적으로 실행가능한 상태로 배포가 된다.
이런 경우 마이크로서비스끼리의 통신은 RESTful한 통신도 있지만 메시지 큐와 같은 서비스를 이용하여 비동기적으로 통신하기도 한다.
그리고 이 구조를 발행구독구조라고 한다.
위의 사진과 같이 두 마이크로서비스 애플리케이션이 외부의 큐로 연결되는 구조인 것이다.
간단히 설명하면 Sender라는 하나의 애플리케이션에서 큐로 발신하면
Receiver라는 애플리케이션이 해당 큐에 대한 이벤트를 수신하여
로직을 처리하게 되는 것이다.
외부의 큐도 어찌됬는 하나의 데몬으로 떠있는 애플리케이션 같은 것이다.
Rabbitmq를 다운로드한다.
▶︎▶︎▶︎래빗엠큐다운로드
Standalone MacOS binary를 클릭하여 받는다.
Rabbitmq 관리자 페이지 플러그인을 활성화하는 명령이다.
관리자 페이지에 들어갈 계정을 만들어준다.
만약 계정을 만드는 데 밑의 에러가 발생한다면
Error:
{:undef, [{:crypto, :hash, [:sha256, <<94, 223, 167, 31, 97, 108, 105, 118, 101>>], []}, {:rabbit_password, :hash, 2, [file: 'src/rabbit_password.erl', line: 34]}, {:rabbit_auth_backend_internal, :add_user_sans_validation, 3, [file: 'src/rabbit_auth_backend_internal.erl', line: 252]}, {:rpc, :"-handle_call_call/6-fun-0-", 5, [file: 'rpc.erl', line: 197]}]}
>brew install openssl 로 인스톨하면 된다.
Rabbitmq를 실행시켜 준다. 그리고 localhost:15672로 접속한 후에 방금 전에 만들었던 계정으로
로그인한다.
위의 사진에 Set permisstion 버튼을 클릭하여 해당 계정으로 큐에서 읽거나 쓰는 권한을 부여한다.
여기까지 간단한 Rabbitmq의 설정이 완료되었다. 이것보다 더 많은 설정 요소들이 존재하겠지만,
예제로 간단하게 이정도로만 설정한다.
참고: 관리자페이지 포트 : 15672
애플리케이션(amqp) :5672
클러스터링 : 25672
소스 예제 회원가입 이후에 외부 큐에 가입 메시지를 날리면(Sender) 수신애플리케이션에서 가입 회원의 이메일을 커맨드에 뿌려준다.(Receiver)
<Sender>
package org.rvslab.chapter3;
import java.util.Optional;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.data.rest.core.annotation.RepositoryRestResource;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@SpringBootApplication
@EnableSwagger2
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
CommandLineRunner init(CustomerRespository customerRepository) {
return (evt) -> {
customerRepository.save(new Customer("Adam","adam@boot.com"));
customerRepository.save(new Customer("John","john@boot.com"));
customerRepository.save(new Customer("Smith","smith@boot.com"));
customerRepository.save(new Customer("Edgar","edgar@boot.com"));
customerRepository.save(new Customer("Martin","martin@boot.com"));
customerRepository.save(new Customer("Tom","tom@boot.com"));
customerRepository.save(new Customer("Sean","sean@boot.com"));
};
}
}
@RestController
class CustomerController{
CustomerRegistrar customerRegistrar;
@Autowired
CustomerController(CustomerRegistrar customerRegistrar){
this.customerRegistrar = customerRegistrar;
}
@RequestMapping( path="/register", method = RequestMethod.POST)
Mono<Customer> register(@RequestBody Customer customer){
return customerRegistrar.register(customer);
}
}
@Component
@Lazy
class CustomerRegistrar {
CustomerRespository customerRespository;
Sender sender;
@Autowired
CustomerRegistrar(CustomerRespository customerRespository, Sender sender){
this.customerRespository = customerRespository;
this.sender = sender;
}
public Mono<Customer> registerMono(Mono<Customer> monoCustomer){
monoCustomer.doOnNext(customer -> {
if(customerRespository.findByName(customer.getName()).isPresent())
System.out.println("Duplicate Customer");
else {
customerRespository.save(customer);
//sender.send(customer.getEmail());
}
}).subscribe();
return monoCustomer;
}
// ideally repository will return a Mono object
public Mono<Customer> register(Customer customer){
if(customerRespository.findByName(customer.getName()).isPresent())
System.out.println("Duplicate Customer. No Action required");
else {
customerRespository.save(customer);
//외부 큐에게 메시지전송
sender.send(customer.getEmail());
System.out.println("Rabbitmq send :::: "+customer.toString());
}
return Mono.just(customer);
}
}
//외부 큐와 연결하기 위한 Sender를 빈으로 등록한다.
//또 빈으로 Queue객체를 등록해준다. 해당 문자열로 Receiver쪽에서도 동일하게 받아야한다.
//RabbitMessagingTemplate으로 외부 큐에게 메시지를 전송한다.
@Component
@Lazy
class Sender {
RabbitMessagingTemplate template;
@Autowired
Sender(RabbitMessagingTemplate template){
this.template = template;
}
@Bean
Queue queue() {
return new Queue("CustomerQ", false);
}
public void send(String message){
template.convertAndSend("CustomerQ", message);
System.out.println("Ready to send message but suppressed "+ message);
}
}
//repository does not support Reactive. Ideally this should use reactive repository
@RepositoryRestResource
@Lazy
interface CustomerRespository extends JpaRepository <Customer,Long>{
Optional<Customer> findByName(@Param("name") String name);
}
//Entity class
@Entity
class Customer{
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Long id;
private String name;
private String email;
public Customer (){}
public Customer(String name, String email) {
super();
this.name = name;
this.email = email;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
@Override
public String toString() {
return "Customer [id=" + id + ", name=" + name + ", email=" + email + "]";
}
}
management.security.enabled=false
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=yeoseong
spring.rabbitmq.password=yeoseong
<Receiver>
package org.rvslab.chapter3;
import java.util.Optional;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.data.rest.core.annotation.RepositoryRestResource;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@SpringBootApplication
@EnableSwagger2
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
CommandLineRunner init(CustomerRespository customerRepository) {
return (evt) -> {
customerRepository.save(new Customer("Adam","adam@boot.com"));
customerRepository.save(new Customer("John","john@boot.com"));
customerRepository.save(new Customer("Smith","smith@boot.com"));
customerRepository.save(new Customer("Edgar","edgar@boot.com"));
customerRepository.save(new Customer("Martin","martin@boot.com"));
customerRepository.save(new Customer("Tom","tom@boot.com"));
customerRepository.save(new Customer("Sean","sean@boot.com"));
};
}
}
@RestController
class CustomerController{
CustomerRegistrar customerRegistrar;
@Autowired
CustomerController(CustomerRegistrar customerRegistrar){
this.customerRegistrar = customerRegistrar;
}
@RequestMapping( path="/register", method = RequestMethod.POST)
Mono<Customer> register(@RequestBody Customer customer){
return customerRegistrar.register(customer);
}
}
@Component
@Lazy
class CustomerRegistrar {
CustomerRespository customerRespository;
Sender sender;
@Autowired
CustomerRegistrar(CustomerRespository customerRespository, Sender sender){
this.customerRespository = customerRespository;
this.sender = sender;
}
public Mono<Customer> registerMono(Mono<Customer> monoCustomer){
monoCustomer.doOnNext(customer -> {
if(customerRespository.findByName(customer.getName()).isPresent())
System.out.println("Duplicate Customer");
else {
customerRespository.save(customer);
//sender.send(customer.getEmail());
}
}).subscribe();
return monoCustomer;
}
// ideally repository will return a Mono object
public Mono<Customer> register(Customer customer){
if(customerRespository.findByName(customer.getName()).isPresent())
System.out.println("Duplicate Customer. No Action required");
else {
customerRespository.save(customer);
//외부 큐에게 메시지전송
sender.send(customer.getEmail());
System.out.println("Rabbitmq send :::: "+customer.toString());
}
return Mono.just(customer);
}
}
//외부 큐와 연결하기 위한 Sender를 빈으로 등록한다.
//또 빈으로 Queue객체를 등록해준다. 해당 문자열로 Receiver쪽에서도 동일하게 받아야한다.
//RabbitMessagingTemplate으로 외부 큐에게 메시지를 전송한다.
@Component
@Lazy
class Sender {
RabbitMessagingTemplate template;
@Autowired
Sender(RabbitMessagingTemplate template){
this.template = template;
}
@Bean
Queue queue() {
return new Queue("CustomerQ", false);
}
public void send(String message){
template.convertAndSend("CustomerQ", message);
System.out.println("Ready to send message but suppressed "+ message);
}
}
//repository does not support Reactive. Ideally this should use reactive repository
@RepositoryRestResource
@Lazy
interface CustomerRespository extends JpaRepository <Customer,Long>{
Optional<Customer> findByName(@Param("name") String name);
}
//Entity class
@Entity
class Customer{
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Long id;
private String name;
private String email;
public Customer (){}
public Customer(String name, String email) {
super();
this.name = name;
this.email = email;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
@Override
public String toString() {
return "Customer [id=" + id + ", name=" + name + ", email=" + email + "]";
}
}
server.port=8090
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=yeoseong
spring.rabbitmq.password=yeoseong
spring.mail.host=localhost
spring.mail.port=2525
부트 프로젝트생성할 때 의존성 설정 부분에서 I/O>AMQP를 선택해준다.
출처: https://coding-start.tistory.com/116?category=790331 [코딩스타트]
'Spring Cloud > RabbitMQ' 카테고리의 다른 글
[RabbitMQ] JSON Message Format 사용하기 - 2 (0) | 2021.11.01 |
---|---|
[RabbitMQ] JSON Message Format 사용하기 - 1 (0) | 2021.11.01 |
[RabbitMQ] Scheduling 사용하기 (0) | 2021.11.01 |
[RabbitMQ] Spring boot 기본 사용법 (0) | 2021.11.01 |
[RabbitMQ] Message System을 사용하는 이유 (0) | 2021.11.01 |
[RabbitMQ] Multiple Consumer 사용하기 (0) | 2021.11.01 |
[RabbitMQ] Prefetch (0) | 2021.10.25 |
RabbitMQ - 레빗엠큐 개념 및 동작방식, 실습 (0) | 2021.04.24 |