rpc采用确认模式,client发送消息至server并等待消息回传,server接到消息并处理后回传给client,中间通过Correlation Id来确认消息,amqp协议下,代码如下
Tut6Config.java
@Profile({"tut6","rpc"})
@Configuration
public class Tut6Config {
@Profile("client")
private static class ClientConfig {
@Bean
public DirectExchange exchange() {
return new DirectExchange("tut.rpc");
}
@Bean
public Tut6Client client() {
return new Tut6Client();
}
}
@Profile("server")
private static class ServerConfig {
@Bean
public Queue queue() {
return new Queue("tut.rpc.requests");
}
@Bean
public DirectExchange exchange() {
return new DirectExchange("tut.rpc");
}
@Bean
public Binding binding(DirectExchange exchange,
Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("rpc");
}
@Bean
public Tut6Server server() {
return new Tut6Server();
}
}
}
Tut6Client.java
需要注意的是,client等待消息接收,若发送消息未接到返回则报异常,可用try-catch接收
public class Tut6Client {
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange exchange;
int start = 0;
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
System.out.println(" [x] Requesting fib(" + start + ")");
Integer response = 0;
try{
response = (Integer) template.convertSendAndReceive
(exchange.getName(), "rpc", start++);
}catch (Exception e){
System.out.println("服务端未直接终端了");
e.printStackTrace();
}
System.out.println(" [.] Got '" + response + "'");
}
}
Tut6Server.java
public class Tut6Server {
@RabbitListener(queues = "tut.rpc.requests")
// @SendTo("tut.rpc.replies") used when the
// client doesn't set replyTo.
public int fibonacci(int n) {
System.out.println(" [x] Received request for " + n);
int result = fib(n);
System.out.println(" [.] Returned " + result);
return result;
}
public int fib(int n) {
return n == 0 ? 0 : n == 1 ? 1 : (fib(n - 1) + fib(n - 2));
}
}