一、什么是RPC?
说起RPC,就不能不提到分布式,这个促使RPC诞生的领域。
-
假设你有一个计算器接口,Calculator,以及它的实现类CalculatorImpl,那么在系统还是单体应用时,你要调用Calculator的add方法来执行一个加运算,直接new一个CalculatorImpl,然后调用add方法就行了,这其实就是非常普通的本地函数调用,因为在同一个地址空间,或者说在同一块内存,所以通过方法栈和参数栈就可以实现。
-
现在,基于高性能和高可靠等因素的考虑,你决定将系统改造为分布式应用,将很多可以共享的功能都单独拎出来,比如上面说到的计算器,你单独把它放到一个服务里头,让别的服务去调用它。
这下问题来了,服务A里头并没有CalculatorImpl这个类,那它要怎样调用服务B的CalculatorImpl的add方法呢?
有些人会说,可以模仿B/S架构的调用方式呀,在B服务暴露一个Restful接口,然后A服务通过调用这个Restful接口来间接调用CalculatorImpl的add方法。
很好,这已经很接近RPC了,不过如果是这样,那每次调用时,是不是都需要写一串发起http请求的代码呢?比如httpClient.sendRequest…之类的;能不能像本地调用一样,去发起远程调用,让使用者感知不到远程调用的过程呢,像这样:
@Reference private Calculator calculator; calculator.add(1,2);
这时候,又有人会说,用代理模式呀!而且最好是结合Spring IoC一起使用,通过Spring注入calculator对象,注入时,如果扫描的对象加了@Reference注解,那么就给它生成一个代理对象,将这个代理对象放进容器中。而这个代理对象的内部,就是通过httpClient来实现RPC远程过程调用的。可能这段描述比较抽象,不过这就是很多RPC框架要解决的问题和解决的思路,比如阿里的Dubbo。
总结一下,RPC要解决的两个问题:
-
解决分布式系统中,服务之间的调用问题。
-
远程调用时,要能够像本地调用一样方便,让调用者感知不到远程调用的逻辑。
二、如何实现一个RPC?
实际情况下,RPC很少用到http协议来进行数据传输,毕竟我只是想传输一下数据而已,何必动用到一个文本传输的应用层协议呢,我为什么不直接使用二进制传输?比如直接用Java的Socket协议进行传输?
不管你用何种协议进行数据传输,一个完整的RPC过程,都可以用下面这张图来描述:
以左边的Client客户端为例,Rpc-client就是rpc的调用方,proxy就是我们上面说到的代理对象,也就是那个看起来像是Calculator的实现类,其实内部是通过rpc方式来进行远程调用的代理对象,至于Client Run-time Library,则是实现远程调用的工具包,比如jdk的Socket,最后通过底层网络实现实现数据的传输。
这个过程中最重要的就是序列化和反序列化了(message protocol),因为数据传输的数据包必须是二进制的,你直接丢一个Java对象过去,人家可不认识,你必须把Java对象序列化为二进制格式,传给Rpc-Server端,Server端接收到之后,再反序列化为Java对象。
三、RPC没那么简单:
要实现一个RPC不算难,难的是实现一个高性能高可靠的RPC框架。
比如,既然是分布式了,那么一个服务可能有多个实例,你在调用时,要如何获取这些实例的地址呢?
这时候就需要一个服务注册中心,比如在Dubbo里头,就可以使用Zookeeper作为注册中心,在调用时,从Zookeeper获取服务的实例列表,再从中选择一个进行调用。
那么选哪个调用好呢?这时候就需要负载均衡了,于是你又得考虑如何实现复杂均衡,比如Dubbo就提供了好几种负载均衡策略。
这还没完,总不能每次调用时都去注册中心查询实例列表吧,这样效率多低呀,于是又有了缓存,有了缓存,就要考虑缓存的更新问题,blablabla……
你以为就这样结束了,没呢,还有这些:
-
客户端总不能每次调用完都干等着服务端返回数据吧,于是就要支持异步调用;
-
服务端的接口修改了,老的接口还有人在用,怎么办?总不能让他们都改了吧?这就需要版本控制了;
-
服务端总不能每次接到请求都马上启动一个线程去处理吧?于是就需要线程池;
-
服务端关闭时,还没处理完的请求怎么办?是直接结束呢,还是等全部请求处理完再关闭呢?
……
如此种种,都是一个优秀的RPC框架需要考虑的问题,当然我们只能手写一个简单的RPC框架意思一下:
四、手写RPC框架实战:
1、首先创建两个项目:rpc-server和rpc-client
2、我们先从rpc-server服务端下手:
2.1、创建一个实现了序列化的类User:
@Data public class User implements Serializable { private static final long serialVersionUID = 4997099209348987343L; //相当于类的指纹 private String name; }
2.2、创建一个简单的业务处理类HelloServiceImpl及其对应的接口:
public interface HelloService { String sayHello(String content); String saveUser(User user); }
/** * 创建一个简单的业务处理类 * @author jigq * @create 2019-12-16 20:44 */ public class HelloServiceImpl implements HelloService { @Override public String sayHello(String content) { return "Hello world:" + content; } @Override public String saveUser(User user) { System.out.println("user->" + user); return "success"; } }
2.3、想想我们上面的那张图,相当于消息的处理message protocol已经有了,我们就还需要一个代理proxy,那我们就创建一个代理RpcServerProxy:
/** * 这算是一个静态代理: * 代理的作用是,在客户端调用某个服务的时候,它不需要去关心底层的封装和协议,只需要关心它调用的逻辑就OK了 * @author jigq * @create 2019-12-16 20:51 */ public class RpcServerProxy { //创建一个无限核心线程数的线程池,只做演示,实际生产不这么用 ExecutorService executorService = Executors.newCachedThreadPool(); /** * 这是一个发布服务的方法,将某个服务发布到某个端口上,想想SpringBoot * @param service 具体的服务 * @param port 服务发布到的端口号 */ public void publisher(Object service, int port){ ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(port); //打开ServerSocket的一个服务监听 while (true){ //使用 while (true) 进行持续监听 Socket socket = serverSocket.accept(); //接收一个请求 //**这里需要降到一个BIO模型,其实就是阻塞式IO————通过多线程去实现一个伪异步** executorService.execute(new ProcessorHandler(socket, service)); //执行实现了Rnnuable接口的线程 } } catch (Exception e){ e.printStackTrace(); } finally { if (serverSocket != null){ try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
2.4、其中ProcessorHandler是我们自定义的实现了Runable接口的实际业务处理类:
/** * 这个就是用来处理Socket请求的,可以理解为服务器上真正处理业务的类(支持多线程) * @author jigq * @create 2019-12-16 21:04 */ public class ProcessorHandler implements Runnable { Socket socket; Object service; public ProcessorHandler() { } public ProcessorHandler(Socket socket, Object service) { this.socket = socket; this.service = service; } @Override public void run() { //在这里处理真正的业务 System.out.println("开始处理客户端请求"); ObjectInputStream inputStream = null; try { inputStream = new ObjectInputStream(socket.getInputStream()); //获取网络Socket输入流 //处理逻辑待完善 } catch (IOException e) { e.printStackTrace(); } } }
2.5、此时我们需要思考,客户端在调用我们服务端的时候,需要传递哪些东西,至少:
-
我们要调用哪个类?
-
类中的哪个方法?
-
带有哪些参数?
既然这样,那我们就创建一个实体类RPCRequest,来接收请求:
/** * 用来封装请求信息数据 * @author jigq * @create 2019-12-16 21:18 */ @Data public class RPCRequest implements Serializable { private static final long serialVersionUID = 8555289438487145186L; private String className; private String methodName; private Object[] parameters; }
2.6、这时候我们再来完善刚刚的Processorhandler处理类:
/** * 这个就是用来处理Socket请求的,可以理解为服务器上真正处理业务的类(支持多线程) * @author jigq * @create 2019-12-16 21:04 */ public class ProcessorHandler implements Runnable { Socket socket; Object service; public ProcessorHandler() { } public ProcessorHandler(Socket socket, Object service) { this.socket = socket; this.service = service; } @Override public void run() { //在这里处理真正的业务 System.out.println("开始处理客户端请求"); ObjectInputStream inputStream = null; try { inputStream = new ObjectInputStream(socket.getInputStream()); //获取网络Socket输入流 RPCRequest rpcRequest = (RPCRequest) inputStream.readObject(); //readObject()就是java原生反序列化的过程 Object result = invoke(rpcRequest); //将这个结果result返回给客户端 ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); //建立一个socket通信管道 objectOutputStream.writeObject(result); //将结果写进去 objectOutputStream.flush(); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } finally { if (objectOutputStream != null){ try { objectOutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (inputStream != null){ try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 更具体的业务处理方法,获取到类名-方法名-参数,使用反射来处理请求 * @param request * @return */ private Object invoke(RPCRequest request){ Object[] args = request.getParameters(); //因为在调用方法的时候,我们需要知道参数的类型,因为在调用方法的时候需要传递类型 //使用Class[]数组来接收参数的类型 Class<?>[] types = new Class[args.length]; for(int i=0; i< args.length; i++){ types[i] = args[i].getClass(); } try { Method method = service.getClass().getMethod(request.getMethodName(), types); //其实service我们也是可以从request.getClassName()后,通过反射得到处理类对应的对象 Object result = method.invoke(service, args); //invoke执行方法 return result; } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { e.printStackTrace(); } return null; } }
2.7、至此,我们的rpc-server端的处理逻辑就全部完成,这时候我们需要在入口中启动这个服务;
/** * 项目启动类 */ public class App { public static void main( String[] args ) { HelloService helloService = new HelloServiceImpl(); RpcServerProxy rpcServerProxy = new RpcServerProxy(); rpcServerProxy.publisher(helloService, 8080); } }
3、终于轮到客户端rpc-client啦:
3.1、将三个和服务端rpc-server都需要使用的类拷贝过来,实际开发中他们是在一个公共的jar包中依赖进来:User+RPCRequest+HelloService:
3.2、编写客户端的代理类(RPCClientProxy):
/** * 这里我们将使用动态代理的方式来实现此客户端代理,需要用到泛型: * 动态代理的实现有很多方法,比如: * --java原生的proxy * --cglib * --javasist等; * @author jiguiquan * @create 2019-12-17 21:13 */ public class RpcClientProxy { /** * 动态代理的实现,需要用到泛型: * @param interfaceCls 相当于告诉需要代理哪个类,最后的结果将会返回远程类的代理,如代理的是HelloService类,那么就会返回HelloService供使用 * @param host * @param port * @param <T> * @return */ public <T> T clientProxy(Class<T> interfaceCls, String host, int port){ return (T)Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls}, new RemoteInvocationHandler(host, port)); } }
3.3、从这里我们需要一个专门处理远程调用的处理类RemoteInvocationHandler,注意需要实现InvocationHandler接口:
InvocationHandler接口是proxy代理实例的调用处理程序实现的一个接口,每一个proxy代理实例都有一个关联的调用处理程序;在代理实例调用方法时,方法调用被编码分派到调用处理程序的invoke方法。
下一篇就好好讲讲Java动态代理InvocationHandler和Proxy吧:
/** * 专门处理客户端请求的处理类 * @author jiguiquan * @create 2019-12-17 21:19 */ public class RemoteInvocationHandler implements InvocationHandler { String host; int port; public RemoteInvocationHandler() { } public RemoteInvocationHandler(String host, int port) { this.host = host; this.port = port; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //组装一个RPCRequest对象 RPCRequest rpcRequest = new RPCRequest(); rpcRequest.setMethodName(method.getName()); rpcRequest.setParameters(args); RPCNetTransport rpcNetTransport = new RPCNetTransport(host, port); return rpcNetTransport.sendRequest(rpcRequest); } }
3.4、在处理程序RemoteInvocationHandler处理过程中,需要执行远程请求时,我们可以自己封装出一个RPCNetTransport类来专门处理远程调用传输:
/** * 专门用来处理网络通信的transpost类 * @author jiguiquan * @create 2019-12-17 21:25 */ public class RPCNetTransport { String host; int port; public RPCNetTransport() { } public RPCNetTransport(String host, int port) { this.host = host; this.port = port; } private Socket newSocket(){ System.out.println("创建一个新的socket连接"); Socket socket; try { socket = new Socket(host, port); } catch (IOException e) { throw new RuntimeException("建立连接失败"); } return socket; } public Object sendRequest(RPCRequest request){ Socket socket = null; try { socket = newSocket(); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(request); objectOutputStream.flush(); //再构建一个输入流,用来读取返回结果用的 ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); Object result = objectInputStream.readObject(); objectInputStream.close(); objectOutputStream.close(); return result; } catch (Exception e){ throw new RuntimeException("发送数据异常:"+e); } finally { if (socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
3.5、至此我们的客户端代码也就已经写完了,我们在主入口main方法中测试执行一下:
public class App { public static void main( String[] args ) { RpcClientProxy proxy = new RpcClientProxy(); HelloService helloService = proxy.clientProxy(HelloService.class, "localhost", 9090); System.out.println(helloService.sayHello("JiGuiquan")); } }
3.6、在先启动rpc-server的前提下,启动rpc-client,在控制台可以看到以下内容:
很显然,输出结果是经过rpc-server处理后的结果,证明RPC调用成功;
3.7、我们再试一次saveUser的方法:
public class App { public static void main( String[] args ) { RpcClientProxy proxy = new RpcClientProxy(); HelloService helloService = proxy.clientProxy(HelloService.class, "localhost", 9090); User user = new User(); user.setName("吉桂权"); System.out.println(helloService.saveUser(user)); } }
启动后,控制台输出结果如下:
rpc-server对应的控制台输出如下:
之所以能顺利处理,User的序列化和反序列化功不可没;
至此,手写简单RPC框架的实现已经全部完成,并测试可用!
最后,附一个rpc-server和rpc-client的demo项目代码结构截图: