Dubbo源码解析之客户端Consumer

前面我们学习了Dubbo源码解析之服务端Provider。对服务提供方进行思路上的讲解,我们知道以下知识点。本篇文章主要对消费方进行讲解。废话不多说请看下文。

  • 如何将对象方法生成Invoker
  • 如何将Invoker注册到注册地中心
  • 如何处理客户端的请求
  • 二进制数据转java数据协议
  • 协议中包含的序列化知识
  • 一、启动一个客户端Consumer

    1. 定义一个接口

    注意这里其实是引用的前文中的接口。生产中是服务提供方打一个jar包给客户端用。

    @Testpublic void consumerTest() {    // 当前应用配置    ApplicationConfig application = new ApplicationConfig();    application.setName("consumerTest");    // 连接注册中心配置    RegistryConfig registry = new RegistryConfig();    registry.setAddress("zookeeper://127.0.0.1:2181");    // 注意:ReferenceConfig为重对象,内部封装了与注册中心的连接,以及与服务提供方的连接    // 引用远程服务    ReferenceConfig<UserService> reference = new ReferenceConfig<UserService>(); // 此实例很重,封装了与注册中心的连接以及与提供者的连接,请自行缓存,否则可能造成内存和连接泄漏    reference.setApplication(application);    reference.setRegistry(registry); // 多个注册中心可以用setRegistries()    reference.setInterface(UserService.class);    reference.setVersion("1.0.0");    UserService userService = reference.get();    userService.say("hello");}

    2. 生成本地服务

    @Testpublic void consumerTest() {    // 当前应用配置    ApplicationConfig application = new ApplicationConfig();    application.setName("consumerTest");    // 连接注册中心配置    RegistryConfig registry = new RegistryConfig();    registry.setAddress("zookeeper://127.0.0.1:2181");    // 注意:ReferenceConfig为重对象,内部封装了与注册中心的连接,以及与服务提供方的连接    // 引用远程服务    ReferenceConfig<UserService> reference = new ReferenceConfig<UserService>(); // 此实例很重,封装了与注册中心的连接以及与提供者的连接,请自行缓存,否则可能造成内存和连接泄漏    reference.setApplication(application);    reference.setRegistry(registry); // 多个注册中心可以用setRegistries()    reference.setInterface(UserService.class);    reference.setVersion("1.0.0");    UserService userService = reference.get();    userService.say("hello");}

    3. 原理分析

    首先客户端只有接口的,那么可以根据这个接口生成一个代理。而代理对象的逻辑就是,从zk中找到服务端地址。
    然后通过netty客户端去请求服务端的数据。然后返回

    二、源码分析

    带着我们猜测的逻辑一起来看下ReferenceConfig的实现原理。

    public synchronized T get() {      if (destroyed){          throw new IllegalStateException("Already destroyed!");      }  	if (ref == null) {  	    //逻辑就在init里面  		init();  	}  	return ref;  }

    init先做写检查信息,如这个方法是否存在接口中
    createProxy#loadRegistries

    1. 集群容错策略

    注意只有多个服务提供方才会有这里,只有一个服务提供方,没办法容错处理哈。

    可以看到一共有9种策略。

    当时服务端是多个的时候,才会生成集群策略。另外既然是集群就要选择到底使用哪个来执行。这就是
    负载均衡或者说叫路由策略。

    LoadBalance负载均衡

  • directory中获取所有的invoker
  • 如果有多个invoker就去看配置的负载均衡策略
  • 根据负载均衡策略找到一个Inoker
  • public abstract class AbstractClusterInvoker<T> implements Invoker<T> {    public Result invoke(final Invocation invocation) throws RpcException {        checkWheatherDestoried();        LoadBalance loadbalance;        //获取所有的invoker        List<Invoker<T>> invokers = list(invocation);        //如果有多个invoker就去看配置的负载均衡策略        if (invokers != null && invokers.size() > 0) {            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()                    .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));        } else {            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);        }        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);        //根据策略选一个        return doInvoke(invocation, invokers, loadbalance);    }          protected  List<Invoker<T>> list(Invocation invocation) throws RpcException {    	List<Invoker<T>> invokers = directory.list(invocation);    	return invokers;     }}

    2. invoker生成代理对象

    代理的知识点不用说了。

    3. 客户端的invoker逻辑

    Protocol#refer

    主要看DubboProtocol的逻辑

    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {      // create rpc invoker.      DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);      invokers.add(invoker);      return invoker;}

    DubboInvoker

    底层调用netty通信api发送数据到客户端。然后读取数据。

    客户端doInvoke时候会生成ExchangeClient就是NettyClient。public class DubboInvoker<T> extends AbstractInvoker<T> {    @Override    protected Result doInvoke(final Invocation invocation) throws Throwable {        RpcInvocation inv = (RpcInvocation) invocation;        final String methodName = RpcUtils.getMethodName(invocation);        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());        inv.setAttachment(Constants.VERSION_KEY, version);        ExchangeClient currentClient;        if (clients.length == 1) {            currentClient = clients[0];        } else {            currentClient = clients[index.getAndIncrement() % clients.length];        }        try {            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);            if (isOneway) {            	boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);                currentClient.send(inv, isSent);                RpcContext.getContext().setFuture(null);                return new RpcResult();            } else if (isAsync) {            	ResponseFuture future = currentClient.request(inv, timeout) ;                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));                return new RpcResult();            } else {            	RpcContext.getContext().setFuture(null);                return (Result) currentClient.request(inv, timeout).get();            }        } catch (TimeoutException e) {            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);        } catch (RemotingException e) {            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);        }    }    @Override    public boolean isAvailable() {        if (!super.isAvailable())            return false;        for (ExchangeClient client : clients){            if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)){                //cannot write == not Available ?                return true ;            }        }        return false;    }  }

    三、总结

    在前文的基础上,客户端的代码算是比较简单的。

  • 主要是集群容错
  • 负载均衡、路由。
  • 客户端如何发送数据DubboInvoker
  • 主要是利用代理来实现的。

    最后求关注,求订阅,谢谢你的阅读!

    声明:本站部分文章及图片源自用户投稿,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!

    上一篇 2020年8月2日
    下一篇 2020年8月2日

    相关推荐