Tutorial RabbitMQ com Spring Boot

Nesse texto pretendo deixar um exemplo de como publicar e consumir mensagens em uma fila utilizando o RabbitMQ, Java e Spring Boot.

Antes de começar a programar devemos esclarecer alguns pontos:

O que é RabbitMQ?

RabbitMQ é um servidor de mensageria de código aberto (open source), que faz uso do protocolo AMQP (Advanced Message Queuing Protocol). O rabbit é compatível com muitas linguagens de programação e permite lidar com o tráfego de mensagens de forma simples e confiável. Vale falar que também possui uma interface de administração nativa e é multiplataforma.

Como vamos usar localmente?

Há duas opções para usar o rabbit localmente: instalar ou usar um container docker. Nesse exemplo vamos usar o docker.

Iniciando uma fila

O próprio site do RabbitMQ mostra isso, se liga:

Vamos pegar esse comando e rodar no prompt:

O resultado no final é algo parecido com isso:

Pronto, o rabbit está funcionando. Caso queira, olhe os containeres no docker desktop, deve aparecer esse aqui:

Como falei existe uma interface e ela é acessível pela porta 15672 por default, então acessando http://localhost:15672/ é possível ter acesso a ela.

Para logar nós usaremos guest como usuário e guest como senha.

Essa é a tela de overview do rabbit:

O próximo passo é criar a fila que utilizaremos no nosso exemplo. Mas relaxa, não é difícil. Vamos clicar em “Queues” e logo em seguida em “Add a new queue”:

Feito isso, vamos preencher os dados dessa fila (eu coloquei apenas o nome e deixei o restante como default):

O nome da minha fila será teste e após a criação podemos vê-la na página de queues:

Criando um consumidor:

Em um projeto Spring Boot com maven, vamos colocar a seguinte dependência:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>

Na verdade só precisamos da primeira, mas é sempre bom ter as de teste para execução de testes unitários, de integração etc.

Após o maven baixar as dependências vamos fazer o código.

Essa é a classe responsável por criar um consumidor:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;


@Component
public class QueueConsumer {

@RabbitListener(queues = {"${queue.name}"})
public void receive(@Payload String fileBody) {
System.out.println("Message " + fileBody);
}

}

Não estranhe, é só isso de código mesmo.

No application.properties vamos configurar as seguintes propriedades:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

queue.name=teste

Por fim, a classe main recebe a anotação @EnableRabbit, segue exemplo:

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableRabbit
@SpringBootApplication
public class AppApplication {

public static void main(String[] args) {
SpringApplication.run(AppApplication.class, args);
}

}

Essa anotação poderia ser colocada em uma classe de configuração também, sem problemas. Nesse exemplo eu apenas não criei uma classe com essa finalidade

Agora é só iniciar a aplicação, após ela ter iniciado vamos voltar a tela de filas do rabbit e consultar esse consumidor. Para isso, clique no nome da fila:

A seguinte tela será exibida:

Olha o consumidor ali, está conectado. Vamos enviar uma mensagem de teste:

Clique em “Publish Message”

Lá na console da ide podemos ver o system.out.println que deixamos na classe:

O consumo está funcionando, vamos fazer a publicação.

Criando um publicador

Chegou a hora de produzir mensagens.

O primeiro passo é colocar as dependências e preencher o application.properties igual ao que foi feito no consumidor. Não vou repetir porque é exatamente a mesma coisa.

A seguir devemos criar uma classe de configuração:

import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SenderConfig {

@Value("${queue.name}")
private String message;

@Bean
public Queue queue() {
return new Queue(message, true);
}

}

Essa classe cria um bean de Queue que será utilizado no envio das mensagens.

Logo depois uma classe que de fato faz o envio das mensagens:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class QueueSender {

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private Queue queue;

public void send(String order) {
rabbitTemplate.convertAndSend(this.queue.getName(), order);
}
}

E na classe main a anotação @EnableRabbit deve ser utilizada também:

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableRabbit
@SpringBootApplication
public class AppApplication {

public static void main(String[] args) {
SpringApplication.run(AppApplication.class, args);
}

}

Para facilitar os testes, criei uma controller com um método rest GET para iniciar o envio das mensagens:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/teste")
public class TesteController {

@Autowired
private QueueSender queueSender;

@GetMapping
public String send(){
queueSender.send("test message");
return "ok. done";
}

}

Feito isso, vamos iniciar a aplicação e abrir essa url em algum navegador:

O resultado é esse mostrado acima. No consumidor a mensagem foi recebida e processada:

O ponto até aqui é: estamos enviando algo diretamente para a fila, mas com rabbitmq o aconselhável é enviar para um exchange e esse exchange enviar para a fila.

Ficando assim:

A exchange é responsável por avaliar e entregar a mensagem para a queue ligada a ela.

Tipos de exchange:

  • Direct: entrega a mensagem para as filas que estão ligadas a ela, permitindo o uso de routing keys.
  • Fanout: esse tipo é mais utilizado como broadcast, todos os interessados vão receber as mensagens sem qualquer filtro. Não aceita routing key.
  • Topic: Parece uma mistura entre as duas primeiras, permite o envio para várias filas e também permite o uso de routing keys, podendo fazer o binding das mensagens de acordo com o valor enviado na routing key.

O que é routing key?

Routing key: É uma chave enviada junto a mensagem que o exchange usa para decidir para onde vai rotear a mensagem.

Vamos configurar uma exchange? Os passos são bem simples:

Agora vamos colocar uma routing key e fazer o bind para uma fila? Também é muito simples: é só clicar no nome da exchange criada e preencher os campos conforme esse exemplo:

Depois é só clicar em bind e está feito:

Agora é a hora de escrever um código que publique uma mensagem no exchange e o consumidor criado lá em cima receba isso.

A primeira coisa é criar uma classe de configuração para o rabbitmq:

@Configuration
public class RabbitMQConfig {


@Bean
public Queue testeQueue() {
return new Queue("teste", true);
}

@Bean
DirectExchange exchange() {
return new DirectExchange("direct-exchange");
}

@Bean
Binding testeBinding(Queue testeQueue, DirectExchange exchange) {
return BindingBuilder.bind(testeQueue).to(exchange).with("teste-routing-key");
}

}

Repare que nessa classe eu defini uma fila, um exchange e o binding do exchange para a fila.

Logo após criei uma controller apenas para ficar fácil de testar:

@RestController
@RequestMapping("/teste")
public class TesteController {

public TesteController(AmqpTemplate queueSender) {
this.queueSender = queueSender;
}

private final AmqpTemplate queueSender;

@GetMapping
public String send(){
queueSender.convertAndSend("teste-exchange", "routing-key-teste", "test message");
return "ok. done";
}

}

voilá ao enviar a mensagem será recebida lá naquela classe que escuta a fila, a qual criamos no começo desse texto.

Colocando headers:

Mais um detalhe importante: é possível atribuir headers nas mensagens que enviarmos, se liga nesse exemplo:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/teste")
public class TesteController {

public TesteController(AmqpTemplate queueSender) {
this.queueSender = queueSender;
}

private final AmqpTemplate queueSender;

@GetMapping
public String send(){

String mensagem = "test message";

MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("ultima", "sim");
Message message = new Message(mensagem.getBytes(), messageProperties);

queueSender.convertAndSend("teste-exchange", "routing-key-teste", message);
return "ok. done";
}

}

Nada de achar o código feio hein, é apenas uma prova de conceito.

Lá no listener podemos ver esse header:

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *