手把手教你手写一个RPC

文章目录

介绍

SPI

如何使用SPI?

服务端

ZK注册中心

客户端

负载均衡

服务端处理数据

如何使用

结尾

在之前的文章中对实现Rpc以及dubbo中的SPI机制有了一些了解,为了更进一步了解rpc调用的整个过程,在本篇中我们会自己实现简单的SPI机制、实现zookeeper注册中心、负载均衡等功能。源码的链接我会放在文章的末尾,如果有需要的话可以clone下来,debug整个项目,这样也可以对整个rpc调用过程更加清楚。OK,废话不多说啦,接下来就是展示它的功能。

介绍

为了方便对功能进行管理,这里进行了多模块划分:



这个见名字就知道模块是负责什么的了,比如api模块就是专门负责存放接口的,cluster就是容错部分,这里就是负载均衡那块啦。

在上一篇文章中,我们就知道了在dubbo中,SPI可以说是它的核心部分了,所以我们也先从SPI部分说起。

SPI

如何使用SPI?

这个与dubbo中的基本一样,在META-INF下建一个extensions包,以接口名为文件名。在文件中,使用key-value的形式来设置它的实现类:



SPI的具体实现类为ExtensionLoader,他与dubbo基本上是一样,可以说是dubbo中弱化版的实现。相信看了上一篇文章的话,再看这个SPI的实现,简直就是小菜一碟。



服务端

在dubbo中有xml形式、注解形式来提供服务,这篇文章中以注解的方式来实现提供服务。所以我们要先定义一个服务端的注解,以便将这个注解添加到实现类上:

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Inherited
public @interface Service {
    String interfaceName() default "";
    String version() default "";
    String group() default "";
}

将这个注解添加到实现类上就可以了,我们启动下服务端:

public class AnnotationProvider {
    public static void main(String[] args) throws IOException {
        new AnnotationConfigApplicationContext(ProviderComponentScan.class);
        //启动netty
        NettyRpcServer nettyRpcServer=new NettyRpcServer();
        nettyRpcServer.start();
    }
    @Configuration
    @RpcComponentScan(basePackages = {"com.xlfc.provider.impl"})
    static public class ProviderComponentScan{
    }
}

仅仅将注解添加到实现类上是不行的,因为它没有进入到spring容器,我们的spring也就无法处理它,所以接下来就是要spring扫描到这些类:

@Override
    public void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {
        CustomScanner rpcServiceScanner=new CustomScanner(beanDefinitionRegistry, Service.class);
        CustomScanner springBeanScanner=new CustomScanner(beanDefinitionRegistry, Component.class);
        if (resourceLoader!=null){
            rpcServiceScanner.setResourceLoader(resourceLoader);
            springBeanScanner.setResourceLoader(resourceLoader);
        }
        String[] packageToScan=getPackageToScan(annotationMetadata);
        //其实就是扫描包下面有这些注解的类,将其加入到容器中后才可以使用。
        springBeanScanner.scan(SPRING_BEAN_BASE_PACKAGE);
        rpcServiceScanner.scan(packageToScan);

    }


    /**
     * 获取到需要扫描的内容
     * */
    private String[] getPackageToScan(AnnotationMetadata annotationMetadata) {
        String[] packageToScan=new String[0];
        //可见DubboComponentScanRegistrar的getPackagesToScan0方法
        AnnotationAttributes attributes = AnnotationAttributes.fromMap(
                annotationMetadata.getAnnotationAttributes(annotationClass.getName()));

        if (attributes!=null){
            packageToScan=attributes.getStringArray(BASE_PACKAGE_ATTRIBUTE_NAME);
        }
        //说明是没有扫描的
        if (packageToScan.length==0){
            packageToScan=new String[]{((StandardAnnotationMetadata)annotationMetadata).getIntrospectedClass().getPackage().getName()};
        }
        return packageToScan;
    }

这一步就是让spring扫描到@Service标记的文件,关于这一部分dubbo中实现的更加深入,如果想了解更多,可以去看DubboComponentScanRegistrar的getPackagesToScan0。

这样spring在扫描到文件中后,就可以在容器加载完毕时对其进行一些处理。通过实现ApplicationListener来重写onApplicationEvent方法:

@SneakyThrows
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (Objects.isNull(event.getApplicationContext().getParent())){
            ApplicationContext context = event.getApplicationContext();
            //处理提供端/生产者
            handlerProvider(context);
            //处理消费端
            handlerConsumer(context);
        }
    }
    @SneakyThrows
    private void handlerProvider(ApplicationContext context) throws UnknownHostException {
        Map<String, Object> beans = context.getBeansWithAnnotation(Service.class);
        String host = InetAddress.getLocalHost().getHostAddress();
        if (!beans.isEmpty()){
            for (Object bean:beans.values()){
                Service service = bean.getClass().getAnnotation(Service.class);
                RpcService rpcServiceConfig=new RpcService(host,PORT,bean,service.version(),service.group());
                serviceProvider.register(rpcServiceConfig);
            }
        }
    }

因为是服务端嘛,所以这个信息是必须要注册到注册中心的。dubbo是以zookeeper为默认的注册中心的,这里我们就也已zookeeper为注册中心。






ZK注册中心

使用zk为注册中心,大部分功能可以直接使用org.apache.curator里面的方法:

@Override
    public void register(RpcService rpcService) {
        this.addService(rpcService);
        InetSocketAddress address = new InetSocketAddress(rpcService.getHost(), rpcService.getPort());
        String servicePath= RpcConstants.ZK_REGISTER_ROOT_PATH+"/"+rpcService.getServiceName()+rpcService.getGroup()+rpcService.getVersion()+address;
        ZookeeperUtils.createPersistentNode(zkClient,servicePath);
    }


public static void createPersistentNode(CuratorFramework zkClient, String path) {
        try {
            if (!(REGISTERED_PATH_SET.contains(path) || zkClient.checkExists().forPath(path) != null)) {
                zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
            }
            REGISTERED_PATH_SET.add(path);
        } catch (Exception e) {
            log.error("create persistent node for path [{}] fail", path);
        }
    }
我们可以通过ZooInspector这个软件来看zookeeper注册的信息:



zookeeper有了服务信息,我们才可以为客户端提供服务。

客户端

服务端有自定义的注解,那么客户端也要定义一个注解,这样才能够知道哪些地方需要服务:

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
@Inherited
public @interface Reference {
    String interfaceName() default "";
    String version() default "";
    String group() default "";
}
public class AnnotationConsumer {
    public static void main(String[] args) throws IOException {
        AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(ProviderComponentScan.class);
        annotationConfigApplicationContext.start();
        final HelloController helloController = (HelloController) annotationConfigApplicationContext.getBean("helloController");
        helloController.test();
        System.in.read();
    }
    @Configuration
    @RpcComponentScan(basePackages = {"com.xlfc.consumer"})
    static public class ProviderComponentScan{
    }
}

在上面的将服务端@Service注解的类添加进spring扫描的地方,我们将客户端@Reference也添加进去了。所以只需要在spring加载完毕后对@Reference相关类进行一些处理。

@SneakyThrows
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (Objects.isNull(event.getApplicationContext().getParent())){
            ApplicationContext context = event.getApplicationContext();
            //处理提供端/生产者
            handlerProvider(context);
            //处理消费端
            handlerConsumer(context);
        }
    }

private void handlerConsumer(ApplicationContext context) {
        Map<String, Object> beans = context.getBeansWithAnnotation(Component.class);
        if (!beans.isEmpty()){
            for (Object bean:beans.values()){
                Class<?> targetClass = bean.getClass();
                Field[] declaredFields =targetClass.getDeclaredFields();
                for (Field declaredField:declaredFields){
                    Reference rpcReference = declaredField.getAnnotation(Reference.class);
                    if (rpcReference!=null){
                        RpcService rpcServiceConfig=new RpcService(rpcReference.version(),rpcReference.group());
                        RpcClientProxy rpcClientProxy=new RpcClientProxy(rpcClient,rpcServiceConfig);
                        Object clientProxy=rpcClientProxy.getProxy(declaredField.getType());
                        declaredField.setAccessible(true);
                        try {
                            declaredField.set(bean,clientProxy);
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
我们获得的是一个接口,如果才能调用对象的方法呢?没错,就是生成一个代理类,这样就可以通过调用代理类来执行它的方法:

public <T> T getProxy(Class<?> clazz) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz},new ConsumerInvocationHandler());
    }

执行代理类时的操作,通过使用CompletableFuture,保证最后一定可以获得该方法的结果。当然也可以使用countDownLatch来保证最后一定获得执行结果,两种方式都可以,无非是实现过程不一样:

private class ConsumerInvocationHandler implements InvocationHandler{
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            RpcRequest rpcRequest=new RpcRequest(method,args,rpcServiceConfig);
            RpcResponse<Object> rpcResponse=null;
            CompletableFuture<RpcResponse<Object>> completableFuture= (CompletableFuture<RpcResponse<Object>>) rpcRequestTransport.sendRpcRequest(rpcRequest);
            rpcResponse=completableFuture.get();
            return rpcResponse.getData();
        }
    }
接下来就是要获取注册中心中的信息,然后根据负载均衡选出一个合适的ip,开启netty进行访问:

@Override
    public Object sendRpcRequest(RpcRequest rpcRequest) throws Exception {
        CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
        InetSocketAddress inetSocketAddress = serviceConsumer.getIpAndPort(rpcRequest);//获取到ip与地址。
        Channel channel=getChannel(inetSocketAddress);

        if (channel.isActive()){
            NettyClientHandler.COMPLETABLE_CLIENT.put(rpcRequest.getRequestId(),resultFuture);
            RpcMessage rpcMessage=this.createRpcMessage(rpcRequest);
            channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future->{
                if (!future.isSuccess()){
                    future.channel().close();
                    resultFuture.completeExceptionally(future.cause());
                }
            });
        }else{
            throw new IllegalStateException();
        }
        return resultFuture;
    }
负载均衡

在dubbo中负载均衡算法有很多,比如像随机、权重轮询、一致性hash等等,这里我们只是实现一个简单的随机。






将负载均衡作为一个SPI接口:

@SPI
public interface LoadBalance {
    String selectServiceAddress(List<String> serviceAddresses, RpcRequest rpcRequest);
}
public abstract class AbstractLoadBalance implements LoadBalance {
    @Override
    public String selectServiceAddress(List<String> serviceAddresses, RpcRequest rpcRequest) {
        if (serviceAddresses==null || serviceAddresses.isEmpty()){
            return null;
        }
        if (serviceAddresses.size()==1){
            return serviceAddresses.get(0);
        }
        return doSelect(serviceAddresses,rpcRequest);
    }
    protected abstract String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) ;
}

public class RandomLoadBalance extends AbstractLoadBalance {
    @Override
    protected String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) {
        Random random=new Random();
        return serviceAddresses.get(random.nextInt(serviceAddresses.size()));
    }
}

因为有SPI机制,如果想扩展负载均衡算法也是极为方便的。

有了接口名就可以以它为查询条件,在zk中找到有哪些ip提供这个服务,根据这些提供服务的地址呢,可以坐下负载均衡,得到最后要执行的ip。

 public InetSocketAddress getIpAndPort(RpcRequest rpcRequest) {
        String rpcServiceName = rpcRequest.getRpcServiceName();
        CuratorFramework zkClient = ZookeeperUtils.getZkClient();
        List<String> serviceUrlList = ZookeeperUtils.getChildrenNodes(zkClient,rpcServiceName);
        if (serviceUrlList==null || serviceUrlList.size()==0){
            throw new RpcException("未找到服务,该服务为:"+rpcServiceName);
        }
        //做下负载均衡
        LoadBalance random = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("random");
//        List<String> list=new ArrayList<>();
//        for (int i = 0; i < 15; i++) {
//            list.add();
//        }
//        String targetServiceUrl = list.get(0);
        String targetServiceUrl = random.selectServiceAddress(serviceUrlList, rpcRequest);
        String[] socketAddressArray = targetServiceUrl.split(":");
        String host = socketAddressArray[0];
        int port = Integer.parseInt(socketAddressArray[1]);
        return new InetSocketAddress(host,port);
    }

在netty传递数据之前,需要对数据进行编码、序列化等操作。序列化方式有很多,比如像java、hession2、kryo等,在实际效率中还是kryo的效率更高,因此我们就采用这个kryo来进行序列化。

同样为了方便扩展,也是将它设置为SPI接口:

 @Override
    public byte[] serialize(Object obj) {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
             Output output = new Output(byteArrayOutputStream)) {
            Kryo kryo = kryoThreadLocal.get();
            kryo.writeObject(output, obj);
            kryoThreadLocal.remove();
            return output.toBytes();
        } catch (Exception e) {
            throw new RpcException("序列化失败");
        }
    }


    @Override
    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
             Input input = new Input(byteArrayInputStream)) {
            Kryo kryo = kryoThreadLocal.get();
            Object o = kryo.readObject(input, clazz);
            kryoThreadLocal.remove();
            return clazz.cast(o);
        } catch (Exception e) {
            throw new RpcException("反序列化失败");
        }
    }
服务端处理数据

因为是采用Netty来作为通信框架,所以可以继承ChannelInboundHandlerAdapter来重写channelRead方法,这样就可以获取到传递的数据了:

if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {
                    rpcMessage.setMessageType(RpcConstants.HEARTBEAT_RESPONSE_TYPE);
                    rpcMessage.setData(RpcConstants.PONG);
                } else {
                    RpcRequest rpcRequest = (RpcRequest) ((RpcMessage) msg).getData();
                    log.info("服务端接收一条新消息:请求id为"+rpcRequest.getRequestId()+",接口为"+rpcRequest.getInterfaceName()+",方法为"+rpcRequest.getMethodName());
                    Object result = this.handlerRequest(rpcRequest);
                    rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);
                    if (ctx.channel().isActive() && ctx.channel().isWritable()) {
                        rpcResponse= RpcResponse.success(result, rpcRequest.getRequestId());
                    } else {
                        rpcResponse = RpcResponse.fail(RpcResponseCodeEnum.FAIL);
                    }
                    rpcMessage.setData(rpcResponse);
                }
                ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

通过反射来执行对应的方法:

public Object handlerRequest(RpcRequest invocation) {
        Object service = serviceProvider.getService(invocation.getRpcServiceName());
        Object result;
        try {
            Method method = service.getClass().getMethod(invocation.getMethodName(), invocation.getParamTypes());
            result = method.invoke(service, invocation.getParameters());
        } catch (NoSuchMethodException | IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
            throw new RpcException(e.getMessage(), e);
        }
        return result;
    }

至此就完成了一整条的RPC调用过程。

如何使用

那么下载完项目后如何启动?

在启动之前需要先做一件事,那就是启动zookeeper。在代码中我设置zk的ip为本地,如果是使用云服务器的话,那么是需要修改下ip地址的。

首先要启动服务端的类,切记要先启动服务端的,因为你不提供注册中心数据,人家客户端怎么拉数据嘛;

public class AnnotationProvider {
    public static void main(String[] args) throws IOException {
        new AnnotationConfigApplicationContext(ProviderComponentScan.class);
        //启动netty
        NettyRpcServer nettyRpcServer=new NettyRpcServer();
        nettyRpcServer.start();
    }
    @Configuration
    @RpcComponentScan(basePackages = {"com.xlfc.provider.impl"})
    static public class ProviderComponentScan{
    }
}

等待服务端启动完毕后,然后再启动客户端的类:

public class AnnotationConsumer {
    public static void main(String[] args) throws IOException {
        AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(ProviderComponentScan.class);
        annotationConfigApplicationContext.start();
        final HelloController helloController = (HelloController) annotationConfigApplicationContext.getBean("helloController");
        helloController.test();
        System.in.read();
    }

    @Configuration
    @RpcComponentScan(basePackages = {"com.xlfc.consumer"})
    static public class ProviderComponentScan{
    }
}

结尾

虽然RPC调用过程不复杂,但是实际上要写出来的话,还是有些坑的,毕竟纸上得来终觉浅,绝知此事要躬行。为此也是参考了dubbo源码以及github上其他人写的的rpc例子。以上就是本文对于这个RPC的介绍了,下面附上它的gitee链接。

https://gitee.com/stylesmile/java_write_frame/tree/master/myRpc

netty实现tomcat(简易版)

消息队列技术点梳理(思维导图版)

关于一个简单接口的高并发测试与优化记录

Java实现线程间的通信的五种方式

手写简单版SpringMVC

使用java手写一个简单的web服务器

idea 插件激活(该方法为备用,建议优先使用我们提供的正版激活)

作者:java知路


欢迎关注微信公众号 :java知路