手写RPC框架

一、什么是RPC?

说起RPC,就不能不提到分布式,这个促使RPC诞生的领域。

  • 假设你有一个计算器接口,Calculator,以及它的实现类CalculatorImpl,那么在系统还是单体应用时,你要调用Calculator的add方法来执行一个加运算,直接new一个CalculatorImpl,然后调用add方法就行了,这其实就是非常普通的本地函数调用,因为在同一个地址空间,或者说在同一块内存,所以通过方法栈和参数栈就可以实现。

  • 现在,基于高性能和高可靠等因素的考虑,你决定将系统改造为分布式应用,将很多可以共享的功能都单独拎出来,比如上面说到的计算器,你单独把它放到一个服务里头,让别的服务去调用它。

这下问题来了,服务A里头并没有CalculatorImpl这个类,那它要怎样调用服务B的CalculatorImpl的add方法呢?

有些人会说,可以模仿B/S架构的调用方式呀,在B服务暴露一个Restful接口,然后A服务通过调用这个Restful接口来间接调用CalculatorImpl的add方法。

很好,这已经很接近RPC了,不过如果是这样,那每次调用时,是不是都需要写一串发起http请求的代码呢?比如httpClient.sendRequest…之类的;能不能像本地调用一样,去发起远程调用,让使用者感知不到远程调用的过程呢,像这样:

@Reference
private Calculator calculator;

calculator.add(1,2);

这时候,又有人会说,用代理模式呀!而且最好是结合Spring IoC一起使用,通过Spring注入calculator对象,注入时,如果扫描的对象加了@Reference注解,那么就给它生成一个代理对象,将这个代理对象放进容器中。而这个代理对象的内部,就是通过httpClient来实现RPC远程过程调用的。可能这段描述比较抽象,不过这就是很多RPC框架要解决的问题和解决的思路,比如阿里的Dubbo。

总结一下,RPC要解决的两个问题:

  • 解决分布式系统中,服务之间的调用问题。

  • 远程调用时,要能够像本地调用一样方便,让调用者感知不到远程调用的逻辑。


二、如何实现一个RPC?

实际情况下,RPC很少用到http协议来进行数据传输,毕竟我只是想传输一下数据而已,何必动用到一个文本传输的应用层协议呢,我为什么不直接使用二进制传输?比如直接用Java的Socket协议进行传输?

不管你用何种协议进行数据传输,一个完整的RPC过程,都可以用下面这张图来描述:

RPC.jpg

以左边的Client客户端为例,Rpc-client就是rpc的调用方,proxy就是我们上面说到的代理对象,也就是那个看起来像是Calculator的实现类,其实内部是通过rpc方式来进行远程调用的代理对象,至于Client Run-time Library,则是实现远程调用的工具包,比如jdk的Socket,最后通过底层网络实现实现数据的传输。

这个过程中最重要的就是序列化和反序列化了(message protocol),因为数据传输的数据包必须是二进制的,你直接丢一个Java对象过去,人家可不认识,你必须把Java对象序列化为二进制格式,传给Rpc-Server端,Server端接收到之后,再反序列化为Java对象。


三、RPC没那么简单:

要实现一个RPC不算难,难的是实现一个高性能高可靠的RPC框架。

比如,既然是分布式了,那么一个服务可能有多个实例,你在调用时,要如何获取这些实例的地址呢?

这时候就需要一个服务注册中心,比如在Dubbo里头,就可以使用Zookeeper作为注册中心,在调用时,从Zookeeper获取服务的实例列表,再从中选择一个进行调用。

那么选哪个调用好呢?这时候就需要负载均衡了,于是你又得考虑如何实现复杂均衡,比如Dubbo就提供了好几种负载均衡策略。

这还没完,总不能每次调用时都去注册中心查询实例列表吧,这样效率多低呀,于是又有了缓存,有了缓存,就要考虑缓存的更新问题,blablabla……

你以为就这样结束了,没呢,还有这些:

    • 客户端总不能每次调用完都干等着服务端返回数据吧,于是就要支持异步调用;

    • 服务端的接口修改了,老的接口还有人在用,怎么办?总不能让他们都改了吧?这就需要版本控制了;

    • 服务端总不能每次接到请求都马上启动一个线程去处理吧?于是就需要线程池;

    • 服务端关闭时,还没处理完的请求怎么办?是直接结束呢,还是等全部请求处理完再关闭呢?

    ……

如此种种,都是一个优秀的RPC框架需要考虑的问题,当然我们只能手写一个简单的RPC框架意思一下:


四、手写RPC框架实战:

1、首先创建两个项目:rpc-server和rpc-client

image.png

2、我们先从rpc-server服务端下手:

2.1、创建一个实现了序列化的类User:

@Data
public class User implements Serializable {
    private static final long serialVersionUID = 4997099209348987343L;   //相当于类的指纹
    private String name;
}

2.2、创建一个简单的业务处理类HelloServiceImpl及其对应的接口:

public interface HelloService {
    String sayHello(String content);
    String saveUser(User user);
}
/**
 * 创建一个简单的业务处理类
 * @author jigq
 * @create 2019-12-16 20:44
 */
public class HelloServiceImpl implements HelloService {
    @Override
    public String sayHello(String content) {
        return "Hello world:" + content;
    }
    @Override
    public String saveUser(User user) {
        System.out.println("user->" + user);
        return "success";
    }
}

2.3、想想我们上面的那张图,相当于消息的处理message protocol已经有了,我们就还需要一个代理proxy,那我们就创建一个代理RpcServerProxy:

/**
 * 这算是一个静态代理:
 * 代理的作用是,在客户端调用某个服务的时候,它不需要去关心底层的封装和协议,只需要关心它调用的逻辑就OK了
 * @author jigq
 * @create 2019-12-16 20:51
 */
public class RpcServerProxy {
    //创建一个无限核心线程数的线程池,只做演示,实际生产不这么用
    ExecutorService executorService = Executors.newCachedThreadPool();

    /**
     * 这是一个发布服务的方法,将某个服务发布到某个端口上,想想SpringBoot
     * @param service 具体的服务
     * @param port 服务发布到的端口号
     */
    public void publisher(Object service, int port){

        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(port);  //打开ServerSocket的一个服务监听
            while (true){  //使用 while (true) 进行持续监听
                Socket socket = serverSocket.accept();  //接收一个请求
                //**这里需要降到一个BIO模型,其实就是阻塞式IO————通过多线程去实现一个伪异步**
                executorService.execute(new ProcessorHandler(socket, service));  //执行实现了Rnnuable接口的线程
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            if (serverSocket != null){
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

2.4、其中ProcessorHandler是我们自定义的实现了Runable接口的实际业务处理类:

/**
 * 这个就是用来处理Socket请求的,可以理解为服务器上真正处理业务的类(支持多线程)
 * @author jigq
 * @create 2019-12-16 21:04
 */
public class ProcessorHandler implements Runnable {

    Socket socket;
    Object service;

    public ProcessorHandler() {
    }

    public ProcessorHandler(Socket socket, Object service) {
        this.socket = socket;
        this.service = service;
    }

    @Override
    public void run() {  //在这里处理真正的业务
        System.out.println("开始处理客户端请求");
        ObjectInputStream inputStream = null;
        try {
            inputStream = new ObjectInputStream(socket.getInputStream());  //获取网络Socket输入流
            //处理逻辑待完善
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

2.5、此时我们需要思考,客户端在调用我们服务端的时候,需要传递哪些东西,至少:

image.png

    • 我们要调用哪个类?

    • 类中的哪个方法?

    • 带有哪些参数?

    既然这样,那我们就创建一个实体类RPCRequest,来接收请求:

    /**
     * 用来封装请求信息数据
     * @author jigq
     * @create 2019-12-16 21:18
     */
    @Data
    public class RPCRequest implements Serializable {
        private static final long serialVersionUID = 8555289438487145186L;
        private String className;
        private String methodName;
        private Object[] parameters;
    }

    2.6、这时候我们再来完善刚刚的Processorhandler处理类:

    /**
     * 这个就是用来处理Socket请求的,可以理解为服务器上真正处理业务的类(支持多线程)
     * @author jigq
     * @create 2019-12-16 21:04
     */
    public class ProcessorHandler implements Runnable {
    
        Socket socket;
        Object service;
    
        public ProcessorHandler() {
        }
    
        public ProcessorHandler(Socket socket, Object service) {
            this.socket = socket;
            this.service = service;
        }
    
        @Override
        public void run() {  //在这里处理真正的业务
            System.out.println("开始处理客户端请求");
            ObjectInputStream inputStream = null;
            try {
                inputStream = new ObjectInputStream(socket.getInputStream());  //获取网络Socket输入流
                RPCRequest rpcRequest = (RPCRequest) inputStream.readObject();  //readObject()就是java原生反序列化的过程
                Object result = invoke(rpcRequest);
                //将这个结果result返回给客户端
                ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());  //建立一个socket通信管道
                objectOutputStream.writeObject(result);  //将结果写进去
                objectOutputStream.flush();
            } catch (IOException | ClassNotFoundException e) {
                e.printStackTrace();
            } finally {
                if (objectOutputStream != null){
                    try {
                        objectOutputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (inputStream != null){
                    try {
                        inputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        /**
         * 更具体的业务处理方法,获取到类名-方法名-参数,使用反射来处理请求
         * @param request
         * @return
         */
        private Object invoke(RPCRequest request){
            Object[] args = request.getParameters();
            //因为在调用方法的时候,我们需要知道参数的类型,因为在调用方法的时候需要传递类型
            //使用Class[]数组来接收参数的类型
            Class<?>[] types = new Class[args.length];
            for(int i=0; i< args.length; i++){
                types[i] = args[i].getClass();
            }
            try {
                Method method = service.getClass().getMethod(request.getMethodName(), types);
                //其实service我们也是可以从request.getClassName()后,通过反射得到处理类对应的对象
                Object result = method.invoke(service, args);  //invoke执行方法
                return result;
            } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
                e.printStackTrace();
            }
            return null;
        }
    }

    2.7、至此,我们的rpc-server端的处理逻辑就全部完成,这时候我们需要在入口中启动这个服务;

    /**
     * 项目启动类
     */
    public class App 
    {
        public static void main( String[] args )
        {
            HelloService helloService = new HelloServiceImpl();
            RpcServerProxy rpcServerProxy = new RpcServerProxy();
            rpcServerProxy.publisher(helloService, 8080);
        }
    }

    3、终于轮到客户端rpc-client啦:

    3.1、将三个和服务端rpc-server都需要使用的类拷贝过来,实际开发中他们是在一个公共的jar包中依赖进来:User+RPCRequest+HelloService:

    3.jpg

    3.2、编写客户端的代理类(RPCClientProxy):

    /**
     * 这里我们将使用动态代理的方式来实现此客户端代理,需要用到泛型:
     * 动态代理的实现有很多方法,比如:
     *      --java原生的proxy
     *      --cglib
     *      --javasist等;
     * @author jiguiquan
     * @create 2019-12-17 21:13
     */
    public class RpcClientProxy {
        /**
         * 动态代理的实现,需要用到泛型:
         * @param interfaceCls 相当于告诉需要代理哪个类,最后的结果将会返回远程类的代理,如代理的是HelloService类,那么就会返回HelloService供使用
         * @param host
         * @param port
         * @param <T>
         * @return
         */
        public <T> T clientProxy(Class<T> interfaceCls, String host, int port){
            return (T)Proxy.newProxyInstance(interfaceCls.getClassLoader(),
                    new Class<?>[]{interfaceCls}, new RemoteInvocationHandler(host, port));
        }
    }

    3.3、从这里我们需要一个专门处理远程调用的处理类RemoteInvocationHandler,注意需要实现InvocationHandler接口:

    InvocationHandler接口是proxy代理实例的调用处理程序实现的一个接口,每一个proxy代理实例都有一个关联的调用处理程序;在代理实例调用方法时,方法调用被编码分派到调用处理程序的invoke方法。

    下一篇就好好讲讲Java动态代理InvocationHandler和Proxy吧:

    /**
     * 专门处理客户端请求的处理类
     * @author jiguiquan
     * @create 2019-12-17 21:19
     */
    public class RemoteInvocationHandler implements InvocationHandler {
        String host;
        int port;
    
        public RemoteInvocationHandler() {
        }
    
        public RemoteInvocationHandler(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            //组装一个RPCRequest对象
            RPCRequest rpcRequest = new RPCRequest();
            rpcRequest.setMethodName(method.getName());
            rpcRequest.setParameters(args);
            RPCNetTransport rpcNetTransport = new RPCNetTransport(host, port);
            return rpcNetTransport.sendRequest(rpcRequest);
        }
    }

    3.4、在处理程序RemoteInvocationHandler处理过程中,需要执行远程请求时,我们可以自己封装出一个RPCNetTransport类来专门处理远程调用传输:

    /**
     * 专门用来处理网络通信的transpost类
     * @author jiguiquan
     * @create 2019-12-17 21:25
     */
    public class RPCNetTransport {
        String host;
        int port;
    
        public RPCNetTransport() {
        }
    
        public RPCNetTransport(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        private Socket newSocket(){
            System.out.println("创建一个新的socket连接");
    
            Socket socket;
            try {
                socket = new Socket(host, port);
            } catch (IOException e) {
                throw new RuntimeException("建立连接失败");
            }
            return socket;
        }
    
        public Object sendRequest(RPCRequest request){
            Socket socket = null;
            try {
                socket = newSocket();
                ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                objectOutputStream.writeObject(request);
                objectOutputStream.flush();
    
                //再构建一个输入流,用来读取返回结果用的
                ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
                Object result = objectInputStream.readObject();
                objectInputStream.close();
                objectOutputStream.close();
                return result;
            } catch (Exception e){
                throw new RuntimeException("发送数据异常:"+e);
            } finally {
                if (socket != null){
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    3.5、至此我们的客户端代码也就已经写完了,我们在主入口main方法中测试执行一下:

    public class App 
    {
        public static void main( String[] args )
        {
            RpcClientProxy proxy = new RpcClientProxy();
            HelloService helloService = proxy.clientProxy(HelloService.class, "localhost", 9090);
            System.out.println(helloService.sayHello("JiGuiquan"));
        }
    }

    3.6、在先启动rpc-server的前提下,启动rpc-client,在控制台可以看到以下内容:

    image.png

    很显然,输出结果是经过rpc-server处理后的结果,证明RPC调用成功;

    3.7、我们再试一次saveUser的方法:

    public class App 
    {
        public static void main( String[] args )
        {
            RpcClientProxy proxy = new RpcClientProxy();
            HelloService helloService = proxy.clientProxy(HelloService.class, "localhost", 9090);
            User user = new User();
            user.setName("吉桂权");
            System.out.println(helloService.saveUser(user));
        }
    }

    启动后,控制台输出结果如下:

    image.png

    rpc-server对应的控制台输出如下:

    image.png

    之所以能顺利处理,User的序列化和反序列化功不可没;

至此,手写简单RPC框架的实现已经全部完成,并测试可用!

最后,附一个rpc-server和rpc-client的demo项目代码结构截图:

4.jpg

jiguiquan@163.com

文章作者信息...

留下你的评论

*评论支持代码高亮<pre class="prettyprint linenums">代码</pre>

相关推荐