RabbitMQ 远程过程调用RPC

    xiaoxiao2021-03-25  78

    所谓RPC,就是应用程序提供参数,远程调用另一个应用程序的算法,得到响应结果返回原程序;

    使用RabbitMQ可以实现RPC远程过程调用;

    客户端程序,通过消息队列给服务端程序发送消息,包括算法需要的参数,请求的唯一标识和回调队列。

    请求的唯一标识可以使得应用程序是哪次请求的响应,服务端程序,在响应消息时候,会把请求唯一标识原样返回。回调队列是服务器端程序给客户端程序响应消息的时候使用的消息队列。

    package rpc; import java.io.IOException; import java.util.UUID; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownSignalException; public class RPCClient { private static final String RPC_QUEUE_NAME = "rpc_queue"; //用于客户端发送消息到服务器端的队列 public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String responseQueue = channel.queueDeclare().getQueue(); //回调队列,服务器端给客户端返回响应的队列 String correlationId = UUID.randomUUID().toString() ;//每个请求的唯一标志,请求ID,让客户端区分得到的响应属于哪次请求 BasicProperties props = new BasicProperties.Builder() //给服务端指定回调队列名称和请求ID .replyTo(responseQueue) .correlationId(correlationId) .build(); String msg="RPC"; channel.basicPublish("", RPC_QUEUE_NAME, props, msg.getBytes()); System.out.println(" [client] Sent '"+msg+"'"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(responseQueue, consumer); //客户端在回调队列上监听消息 while (true) { Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId() .equals(correlationId)) { String result = new String(delivery.getBody()); System.out.println(" [client] Got '"+result+"'"); } } } } package rpc; import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownSignalException; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; //用于客户端发送消息到服务器端的队列 public static String sayHello(String name){ return "hello "+name; } public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null) ; QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false , consumer) ; while(true){ System.out.println("服务器端等待接收消息..."); Delivery delivery = consumer.nextDelivery(); String result=new String(delivery.getBody()); System.out.println("服务器端成功接收到消息:"+result); BasicProperties props = delivery.getProperties(); BasicProperties responseProps = new BasicProperties.Builder() //给服务端指定回调队列名称和请求ID .correlationId(props.getCorrelationId()) //请求ID .build(); //将结果返回到客户端监听的Queue channel.basicPublish("", props.getReplyTo() , responseProps , sayHello(result).getBytes() ) ; System.out.println(" [SERVER] SENT '"+sayHello(result)+"'"); } } }

    转载请注明原文地址: https://ju.6miu.com/read-20351.html

    最新回复(0)