SOFA 源码分析 — 连接管理器

前言

RPC 框架需要维护客户端和服务端的连接,通常是一个客户端对应多个服务端,而客户端看到的是接口,并不是服务端的地址,服务端地址对于客户端来讲是透明的。

那么,如何实现这样一个 RPC 框架的网络连接呢?

我们从 SOFA 中寻找答案。

连接管理器介绍

先从一个小 demo 开始看:

ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
    .setInterfaceId(HelloService.class.getName()) // 指定接口
    .setProtocol("bolt") // 指定协议
    .setDirectUrl("bolt://127.0.0.1:9696"); // 指定直连地址

HelloService helloService = consumerConfig.refer();

while (true) {
  System.out.println(helloService.sayHello("world"));
  try {
    Thread.sleep(2000);
  } catch (Exception e) {
  }
}

上面的代码中,一个 ConsumerConfig 对应一个接口服务,并指定了直连地址。

然后调用 ref 方法。每个 ConsumerConfig 绑定了一个 ConsumerBootstrap,这是一个非单例的类。

而每个 ConsumerBootstrap 又绑定了一个 Cluster,这是真正的客户端。该类包含了一个客户端所有的关键信息,例如:

  1. Router 路由链
  2. loadBalance 负载均衡
  3. addressHolder 地址管理器
  4. connectionHolder 连接管理器
  5. filterChain 过滤器链

这 5 个实例是 Cluster 的核心。一个客户端的正常使用绝对离不开这 5 个元素。

我们之前分析了 5 个中的 4 个,今天分析最后一个 —— 连接管理器。

他可以说是 RPC 网络通信的核心。

地址管理器代表的是:一个客户端可以拥有多个接口。
连接管理器代表的是:一个客户端可以拥有多个 TCP 连接。

很明显,地址管理器的数据肯定比连接管理器要多。因为通常一个 TCP 连接(Server 端)可以含有多个接口。

那么 SOFA 是如何实现连接管理器的呢?

从 AbstractCluster 的 init 方法中,我们知道,该方法初始化了 Cluster。同时也初始化了 connectionHolder。

具体代码如下:

// 连接管理器
connectionHolder = ConnectionHolderFactory.getConnectionHolder(consumerBootstrap);

使用了 SPI 的方式进行的初始化。目前 RPC 框架的具体实现类只有一个 AllConnectConnectionHolder。即长连接管理器。

该类需要一个 ConsumerConfig 才能初始化。

该类中包含很多和连接相关的属性,有 4 个 Map,未初始化的 Map,存活的节点列表,存活但亚健康的列表,失败待重试的列表。这些 Map 的元素都会随着服务的网络变化而变化。

而这些 Map 中的元素则是:ConcurrentHashMap<ProviderInfo, ClientTransport> 。

即每个服务者的信息对应一个客户端传输。那么这个 ClientTransport 是什么呢?看过之前文章的都知道,这个一个 RPC 和 Bolt 的胶水类。该类的默认实现 BoltClientTransport 包含了一个 RpcClient 属性,注意,该属性是个静态的。也就是说,是所有实例公用的。并且,BoltClientTransport 包含一个 ProviderInfo 属性。还有一个 Url 属性,Connection 属性(网络连接)。

我们理一下:一个 ConsumerConfig 绑定一个 Cluster,一个 Cluster 绑定一个 connectionHolder,一个 connectionHolder 绑定多个 ProviderInfo 和 ClientTransport。

因为一个客户端可以和多个服务进行通信。

代码如何实现?

在 Cluster 中,会对 connectionHolder 进行初始化,在 Cluster 从注册中心得到服务端列表后,会建立长连接。

从这里开始,地址管理器开始运作。

Cluster 的 updateAllProviders 方法是源头。该方法会将服务列表添加到 connectionHolder 中。即调用 connectionHolder.updateAllProviders(providerGroups) 方法。该方法会全量更新服务端列表。

如果更新的时候,发现有新的服务,便会建立长连接。具体代码如下:

if (!needAdd.isEmpty()) {
    addNode(needAdd);
}

addNode 方法就是添加新的节点。该方法会多线程建立 TCP 连接。

首先会根据 ProviderInfo 信息创建一个 ClientTransport,然后向线程池提交一个任务,任务内容是 initClientTransport(),即初始化客户端传输。

该方法代码如下(精简过了):

private void initClientTransport(String interfaceId, ProviderInfo providerInfo, ClientTransport transport) {
        transport.connect();
        if (doubleCheck(interfaceId, providerInfo, transport)) {
            printSuccess(interfaceId, providerInfo, transport);
            addAlive(providerInfo, transport);
        } else {
            printFailure(interfaceId, providerInfo, transport);
            addRetry(providerInfo, transport);
        }
}

其中关键是调用 transport 的 connect 方法建立连接。

该方法的默认实现在 BoltClientTransport 中,符合我们的预期。我们知道, BoltClientTransport 有一个 RpcClient 的静态实例。这个实例在类加载的时候,就会在静态块中初始化。初始化内容则是初始化他的一些属性,例如地址解析器,连接管理器,连接监控等等。

我们再看 BoltClientTransport 的 connect 方法,该方法主要逻辑是初始化连接。方式则是通过 RpcClient 的 getConnection 方法来获取,具体代码如下:

 connection = RPC_CLIENT.getConnection(url, url.getConnectTimeout());

传入一个 URL 和超时时间。 RpcClient 则是调用连接管理器的 getAndCreateIfAbsent 方法获取,同样传入 Url,这个方法的名字很好,根据 URL 获取连接,如果没有,就创建一个。

有必要看看具体代码:

public Connection getAndCreateIfAbsent(Url url) throws InterruptedException, RemotingException {
    // get and create a connection pool with initialized connections.
    ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(url.getUniqueKey(),
        new ConnectionPoolCall(url));
    if (null != pool) {
        return pool.get();
    } else {
        logger.error("[NOTIFYME] bug detected! pool here must not be null!");
        return null;
    }
}

该方法会继续调用自身的 getConnectionPoolAndCreateIfAbsent 方法,传入 URL 的唯一标识,和一个 ConnectionPoolCall 对象(实现了 Callable)。

然后阻塞等待返回连接。

我们看看这个 ConnectionPoolCall 的 call 方法实现。该方法调用了连接管理器的 doCreate 方法。传入了 URL 和一个连接池。然后 call 方法返回连接池。

doCreate 方法中,重点就是 create 方法,传入了一个 url,返回一个 Connection,并放入连接池。默认池中只有一个长连接。

而 create 方法则是调用连接工厂的 createConnection 方法。然后调用 doCreateConnection 方法。该方法内部给了我们明确的答案:调用 Netty 的 Bootstrap 的 connect 方法。

代码如下:

bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));

熟悉 Netty 的同学一眼便看出来了。这是一个连接服务端的操作。而这个 BootStrap 的初始化则是在 RpcClient 初始化的时候进行的。注意:BootStrap 是可以共享的。

可以看到, ConnectionPoolCall 的 call 方法就是用来创建 Netty 连接的。回到 getAndCreateIfAbsent 方法里,继续看 getConnectionPoolAndCreateIfAbsent 方法的实现。

该方法内部将 Callable 包装成一个 FutureTask,目的应该是为了以后的异步运行吧,总之,最后还是同步调用了 run 方法。然后调用 get 方法阻塞等待,等待刚刚 call 方法返回的连接池。然后返回。

得到连接池,连接池调用 get 方法,从池中根据策略选取一个连接返回。目前只有一个随机选取的策略。

这个 Connection 连接实例会保存在 BoltClientTransport 中。

在客户端进行调用的时候, RpcClient 会根据 URL 找到对应的连接,然后,获取这个连接对应的 Channel ,向服务端发送数据。具体代码如下:

conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture f) throws Exception {
        if (!f.isSuccess()) {
            conn.removeInvokeFuture(request.getId());
            future.putResponse(commandFactory.createSendFailedResponse(
                conn.getRemoteAddress(), f.cause()));
            logger.error("Invoke send failed, id={}", request.getId(), f.cause());
        }
    }
});

以上,就是 SOFA 的连接的原理和设计。

总结

连接管理器是我们分析 SOFA—RPC Cluster 中的最后一个模块,他管理着一个客户端对应的所有服务网络连接。

connectionHolder 内部包含多个 Map,Map 中的 key 是 Provider,value 是 ClientTransport,ClientTransport 是 RpcClient 和 SOFA 的胶水类,通常一个 Provider 对应一个 ClientTransport。ClientTransport 其实就是一个连接的包装。

ClientTransport 获取连接的方式则是通过 RpcClient 的 连接管理器获取的。该连接管理器内部包含一个连接工厂,会根据 URL 创建连接。创建连接的凡是则是通过 Netty 的 BootStrap 来创建。

当我们使用 Provider 对应的 ClientTransport 中的 RpcClient 发送数据的时候,则会根据 URL 找到对应 Connection,并获取他的 Channel ,向服务端发送数据。

好了,以上就是 SOFA—RPC 连接管理的分析。

篇幅有限,如有错误,还请指正。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 228,936评论 6 535
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 98,744评论 3 421
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 176,879评论 0 381
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 63,181评论 1 315
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 71,935评论 6 410
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 55,325评论 1 324
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 43,384评论 3 443
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 42,534评论 0 289
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 49,084评论 1 335
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 40,892评论 3 356
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 43,067评论 1 371
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 38,623评论 5 362
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 44,322评论 3 347
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 34,735评论 0 27
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 35,990评论 1 289
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 51,800评论 3 395
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 48,084评论 2 375

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,796评论 18 139
  • 1.ios高性能编程 (1).内层 最小的内层平均值和峰值(2).耗电量 高效的算法和数据结构(3).初始化时...
    欧辰_OSR阅读 29,482评论 8 265
  • 点击查看原文 Web SDK 开发手册 SDK 概述 网易云信 SDK 为 Web 应用提供一个完善的 IM 系统...
    layjoy阅读 13,839评论 0 15
  • 3109103-陈求忠总结,《2017年12月11号》(连续总结第10天)辅导员-海燕姐 一:今天完成目标 1迷宫...
    进阶阶段阅读 136评论 0 0
  • 耳畔总回响那句耳熟能详的鸡汤:“上帝为你关上了扇门,就会打开一扇窗。”不,不能等待,是请自己去打开! 生活和工作时...
    楚一凡阅读 352评论 3 1